跳至主要内容

Java 客户端 (V2)

Java 客户端库,用于通过其协议与数据库服务器通信。当前实现仅支持 HTTP 接口。该库提供自己的 API 来发送请求到服务器。该库还提供工具来处理不同的二进制数据格式(RowBinary 和 Native)。

设置

<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>client-v2</artifactId>
<version>0.6.5</version>
</dependency>

初始化

Client 对象通过 com.clickhouse.client.api.Client.Builder#build() 初始化。每个客户端都有自己的上下文,它们之间不共享任何对象。Builder 具有用于方便设置的配置方法。

示例

 Client client = new Client.Builder()
.addEndpoint("https://clickhouse-cloud-instance:8443/")
.setUsername(user)
.setPassword(password)
.build();

ClientAutoCloseable,当不再需要时应关闭。

配置

所有设置由实例方法(又称配置方法)定义,这些方法使每个值的范围和上下文清晰。主要配置参数在一个范围内定义(客户端或操作),并且不会相互覆盖。

配置在客户端创建期间定义。参见 com.clickhouse.client.api.Client.Builder

通用定义

ClickHouseFormat

支持格式 的枚举。它包含 ClickHouse 支持的所有格式。

  • raw - 用户应转码原始数据
  • full - 客户端可以自行转码数据,并接受原始数据流
  • - - ClickHouse 不支持该格式的操作

此客户端版本支持

格式输入输出
TabSeparatedrawraw
TabSeparatedRawrawraw
TabSeparatedWithNamesrawraw
TabSeparatedWithNamesAndTypesrawraw
TabSeparatedRawWithNamesrawraw
TabSeparatedRawWithNamesAndTypesrawraw
Templaterawraw
TemplateIgnoreSpacesraw-
CSVrawraw
CSVWithNamesrawraw
CSVWithNamesAndTypesrawraw
CustomSeparatedrawraw
CustomSeparatedWithNamesrawraw
CustomSeparatedWithNamesAndTypesrawraw
SQLInsert-raw
Valuesrawraw
Vertical-raw
JSONrawraw
JSONAsStringraw-
JSONAsObjectraw-
JSONStringsrawraw
JSONColumnsrawraw
JSONColumnsWithMetadatarawraw
JSONCompactrawraw
JSONCompactStrings-raw
JSONCompactColumnsrawraw
JSONEachRowrawraw
PrettyJSONEachRow-raw
JSONEachRowWithProgress-raw
JSONStringsEachRowrawraw
JSONStringsEachRowWithProgress-raw
JSONCompactEachRowrawraw
JSONCompactEachRowWithNamesrawraw
JSONCompactEachRowWithNamesAndTypesrawraw
JSONCompactStringsEachRowrawraw
JSONCompactStringsEachRowWithNamesrawraw
JSONCompactStringsEachRowWithNamesAndTypesrawraw
JSONObjectEachRowrawraw
BSONEachRowrawraw
TSKVrawraw
Pretty-raw
PrettyNoEscapes-raw
PrettyMonoBlock-raw
PrettyNoEscapesMonoBlock-raw
PrettyCompact-raw
PrettyCompactNoEscapes-raw
PrettyCompactMonoBlock-raw
PrettyCompactNoEscapesMonoBlock-raw
PrettySpace-raw
PrettySpaceNoEscapes-raw
PrettySpaceMonoBlock-raw
PrettySpaceNoEscapesMonoBlock-raw
Prometheus-raw
Protobufrawraw
ProtobufSinglerawraw
ProtobufListrawraw
Avrorawraw
AvroConfluentraw-
Parquetrawraw
ParquetMetadataraw-
Arrowrawraw
ArrowStreamrawraw
ORCrawraw
Oneraw-
Npyrawraw
RowBinaryfullfull
RowBinaryWithNamesfullfull
RowBinaryWithNamesAndTypesfullfull
RowBinaryWithDefaultsfull-
Nativefullraw
Null-raw
XML-raw
CapnProtorawraw
LineAsStringrawraw
Regexpraw-
RawBLOBrawraw
MsgPackrawraw
MySQLDumpraw-
DWARFraw-
Markdown-raw
Formraw-

插入 API

insert(String tableName, InputStream data, ClickHouseFormat format)

以指定格式的字节流 InputStream 形式接受数据。预计 dataformat 编码。

签名

CompletableFuture<InsertResponse> insert(String tableName, InputStream data, ClickHouseFormat format, InsertSettings settings)
CompletableFuture<InsertResponse> insert(String tableName, InputStream data, ClickHouseFormat format)

参数

tableName - 目标表名。

data - 编码数据的输入流。

format - 数据编码的格式。

settings - 请求设置。

返回值

InsertResponse 类型的 Future - 操作结果和附加信息,如服务器端指标。

示例

try (InputStream dataStream = getDataStream()) {
try (InsertResponse response = client.insert(TABLE_NAME, dataStream, ClickHouseFormat.JSONEachRow,
insertSettings).get(3, TimeUnit.SECONDS)) {

log.info("Insert finished: {} rows written", response.getMetrics().getMetric(ServerMetrics.NUM_ROWS_WRITTEN).getLong());
} catch (Exception e) {
log.error("Failed to write JSONEachRow data", e);
throw new RuntimeException(e);
}
}

insert(String tableName, List<?> data, InsertSettings settings)

向数据库发送写入请求。对象列表将转换为有效格式,然后发送到服务器。列表项目的类应使用 register(Class, TableSchema) 方法预先注册。

签名

client.insert(String tableName, List<?> data, InsertSettings settings)
client.insert(String tableName, List<?> data)

参数

tableName - 目标表的名称。

data - 集合 DTO(数据传输对象)对象。

settings - 请求设置。

返回值

InsertResponse 类型的 Future - 操作结果和附加信息,如服务器端指标。

示例

// Important step (done once) - register class to pre-compile object serializer according to the table schema. 
client.register(ArticleViewEvent.class, client.getTableSchema(TABLE_NAME));


List<ArticleViewEvent> events = loadBatch();

try (InsertResponse response = client.insert(TABLE_NAME, events).get()) {
// handle response, then it will be closed and connection that served request will be released.
}

InsertSettings

插入操作的配置选项。

配置方法

setQueryId(String queryId)
设置将分配给操作的查询 ID
setDeduplicationToken(String token)
设置去重令牌。此令牌将发送到服务器,可用于识别查询。
waitEndOfQuery(Boolean waitEndOfQuery)
请求服务器在发送响应之前等待查询结束。
setInputStreamCopyBufferSize(int size)
复制缓冲区大小。该缓冲区在写入操作期间用于将数据从用户提供的输入流复制到输出流。

InsertResponse

保存插入操作结果的响应对象。仅当客户端从服务器收到响应时才可用。

注意

此对象应尽快关闭以释放连接,因为在完全读取之前响应中的所有数据,该连接无法重新使用。

OperationMetrics getMetrics()
返回包含操作指标的对象
String getQueryId()
返回应用程序为操作分配的查询 ID(通过操作设置或服务器)。

查询 API

query(String sqlQuery)

按原样发送 sqlQuery。响应格式由查询设置确定。QueryResponse 将保存对响应流的引用,该流应由支持格式的阅读器使用。

签名

CompletableFuture<QueryResponse> query(String sqlQuery, QuerySettings settings)
CompletableFuture<QueryResponse> query(String sqlQuery)

参数

sqlQuery - 单个 SQL 语句。该查询按原样发送到服务器。

settings - 请求设置。

返回值

QueryResponse 类型的 Future - 结果数据集和附加信息,如服务器端指标。响应对象应在使用数据集后关闭。

示例

final String sql = "select * from " + TABLE_NAME + " where title <> '' limit 10";

// Default format is RowBinaryWithNamesAndTypesFormatReader so reader have all information about columns
try (QueryResponse response = client.query(sql).get(3, TimeUnit.SECONDS);) {

// Create a reader to access the data in a convenient way
ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);

while (reader.hasNext()) {
reader.next(); // Read the next record from stream and parse it

// get values
double id = reader.getDouble("id");
String title = reader.getString("title");
String url = reader.getString("url");

// collecting data
}
} catch (Exception e) {
log.error("Failed to read data", e);
}

// put business logic outside of the reading block to release http connection asap.

query(String sqlQuery, Map<String, Object> queryParams, QuerySettings settings)

按原样发送 sqlQuery。此外,还会发送查询参数,以便服务器可以编译 SQL 表达式。

签名

CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Object> queryParams, QuerySettings settings)

参数

sqlQuery - 包含占位符 {} 的 sql 表达式。

queryParams - 用于在服务器上完成 sql 表达式的变量映射。

settings - 请求设置。

返回值

QueryResponse 类型的 Future - 结果数据集和附加信息,如服务器端指标。响应对象应在使用数据集后关闭。

示例


// define parameters. They will be sent to the server along with the request.
Map<String, Object> queryParams = new HashMap<>();
queryParams.put("param1", 2);

try (QueryResponse queryResponse =
client.query("SELECT * FROM " + table + " WHERE col1 >= {param1:UInt32}", queryParams, new QuerySettings()).get()) {

// Create a reader to access the data in a convenient way
ClickHouseBinaryFormatReader reader = client.newBinaryFormatReader(response);

while (reader.hasNext()) {
reader.next(); // Read the next record from stream and parse it

// reading data
}

} catch (Exception e) {
log.error("Failed to read data", e);
}

queryAll(String sqlQuery)

RowBinaryWithNamesAndTypes 格式查询数据。将结果作为集合返回。读取性能与阅读器相同,但需要更多内存来保存整个数据集。

签名

List<GenericRecord> queryAll(String sqlQuery)

参数

sqlQuery - 用于从服务器查询数据的 sql 表达式。

返回值

GenericRecord 对象列表表示的完整数据集,这些对象以行式提供对结果数据的访问。

示例

try {
log.info("Reading whole table and process record by record");
final String sql = "select * from " + TABLE_NAME + " where title <> ''";

// Read whole result set and process it record by record
client.queryAll(sql).forEach(row -> {
double id = row.getDouble("id");
String title = row.getString("title");
String url = row.getString("url");

log.info("id: {}, title: {}, url: {}", id, title, url);
});
} catch (Exception e) {
log.error("Failed to read data", e);
}

QuerySettings

查询操作的配置选项。

配置方法

setQueryId(String queryId)
设置将分配给操作的查询 ID
setFormat(ClickHouseFormat format)
设置响应格式。有关完整列表,请参见 `RowBinaryWithNamesAndTypes`。
setMaxExecutionTime(Integer maxExecutionTime)
设置服务器上的操作执行时间。不会影响读取超时。
waitEndOfQuery(Boolean waitEndOfQuery)
请求服务器在发送响应之前等待查询结束。
setUseServerTimeZone(Boolean useServerTimeZone)
服务器时区(参见客户端配置)将用于解析操作结果中的日期/时间类型。默认值为 `false`
setUseTimeZone(String timeZone)
请求服务器使用 `timeZone` 进行时间转换。参见 session_timezone

QueryResponse

保存查询执行结果的响应对象。仅当客户端从服务器收到响应时才可用。

注意

此对象应尽快关闭以释放连接,因为在完全读取之前响应中的所有数据,该连接无法重新使用。

ClickHouseFormat getFormat()
返回响应中数据的编码格式。
InputStream getInputStream()
返回指定格式的未压缩数据字节流。
OperationMetrics getMetrics()
返回包含操作指标的对象
String getQueryId()
返回应用程序为操作分配的查询 ID(通过操作设置或服务器)。
TimeZone getTimeZone()
返回用于处理响应中 Date/DateTime 类型的时区。

示例

通用 API

getTableSchema(String table)

获取表 table 的模式。

签名

TableSchema getTableSchema(String table)
TableSchema getTableSchema(String table, String database)

参数

table - 要获取模式数据的表名。

database - 目标表所在的数据库。

返回值

返回包含表列列表的 TableSchema 对象。

getTableSchemaFromQuery(String sql)

从 SQL 语句中获取模式。

签名

TableSchema getTableSchemaFromQuery(String sql)

参数

sql - 要返回其模式的 "SELECT" SQL 语句。

返回值

返回一个 TableSchema 对象,其列与 sql 表达式匹配。

TableSchema

register(Class<?> clazz, TableSchema schema)

编译用于使用 schema 写入/读取数据的 Java 类 SerDe 层。此方法将为 getter/setter 对和相应列创建序列化器和反序列化器。通过从方法名中提取列名来查找列匹配项。例如,getFirstName 将用于列 first_namefirstname

签名

void register(Class<?> clazz, TableSchema schema)

参数

clazz - 表示用于读取/写入数据的 POJO 的类。

schema - 用于与 POJO 属性匹配的数据模式。

示例

client.register(ArticleViewEvent.class, client.getTableSchema(TABLE_NAME));

使用示例

完整的示例代码存储在仓库中的 'example` 文件夹