跳至主要内容

客户端 (V1)

用于通过其协议与数据库服务器通信的 Java 客户端库。当前实现仅支持HTTP 接口。该库提供自己的 API 来向服务器发送请求。

注意:此组件将很快弃用。

设置

<!-- https://mvnrepository.com/artifact/com.clickhouse/clickhouse-http-client -->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-http-client</artifactId>
<version>0.6.5</version>
</dependency>

从版本0.5.0开始,驱动程序使用新的客户端 http 库,需要将其添加为依赖项。

<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents.client5/httpclient5 -->
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.3.1</version>
</dependency>

初始化

连接 URL 格式:protocol://host[:port][/database][?param[=value][&param[=value]][#tag[,tag]],例如

连接到单个节点

ClickHouseNode server = ClickHouseNode.of("https://127.0.0.1:8123/default?compress=0");

连接到具有多个节点的集群

ClickHouseNodes servers = ClickHouseNodes.of(
"jdbc:ch:http://server1.domain,server2.domain,server3.domain/my_db"
+ "?load_balancing_policy=random&health_check_interval=5000&failover=2");

查询 API

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(servers)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from numbers limit :limit")
.params(1000)
.executeAndWait()) {
ClickHouseResponseSummary summary = response.getSummary();
long totalRows = summary.getTotalRowsToRead();
}

流式查询 API

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(servers)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from numbers limit :limit")
.params(1000)
.executeAndWait()) {
for (ClickHouseRecord r : response.records()) {
int num = r.getValue(0).asInteger();
// type conversion
String str = r.getValue(0).asString();
LocalDate date = r.getValue(0).asDate();
}
}

请参阅完整的代码示例,位于仓库中。

插入 API




try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(servers).write()
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("insert into my_table select c2, c3 from input('c1 UInt8, c2 String, c3 Int32')")
.data(myInputStream) // `myInputStream` is source of data in RowBinary format
.executeAndWait()) {
ClickHouseResponseSummary summary = response.getSummary();
summary.getWrittenRows();
}

请参阅完整的代码示例,位于仓库中。

RowBinary 编码

RowBinary 格式在其页面上进行了描述。

有一个代码示例。

功能

压缩

客户端默认使用 LZ4 压缩,这需要此依赖项

<!-- https://mvnrepository.com/artifact/org.lz4/lz4-java -->
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.8.0</version>
</dependency>

您可以通过在连接 URL 中设置compress_algorithm=gzip来选择使用 gzip。

或者,您可以通过几种方式禁用压缩。

  1. 通过在连接 URL 中设置compress=0禁用:https://127.0.0.1:8123/default?compress=0
  2. 通过客户端配置禁用
ClickHouseClient client = ClickHouseClient.builder()
.config(new ClickHouseConfig(Map.of(ClickHouseClientOption.COMPRESS, false)))
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();

请参阅压缩文档,以了解有关不同压缩选项的更多信息。

多个查询

在一个工作线程中按顺序执行同一个会话中的多个查询。

CompletableFuture<List<ClickHouseResponseSummary>> future = ClickHouseClient.send(servers.apply(servers.getNodeSelector()),
"create database if not exists my_base",
"use my_base",
"create table if not exists test_table(s String) engine=Memory",
"insert into test_table values('1')('2')('3')",
"select * from test_table limit 1",
"truncate table test_table",
"drop table if exists test_table");
List<ClickHouseResponseSummary> results = future.get();

命名参数

您可以通过名称传递参数,而不是仅依赖于它们在参数列表中的位置。此功能可使用params函数获得。

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(servers)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from my_table where name=:name limit :limit")
.params("Ben", 1000)
.executeAndWait()) {
//...
}
}
参数

所有涉及String类型的params签名(StringString[]Map<String, String>)都假设传递的键是有效的 ClickHouse SQL 字符串。例如

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(servers)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from my_table where name=:name")
.params(Map.of("name","'Ben'"))
.executeAndWait()) {
//...
}
}

如果您不想手动将 String 对象解析为 ClickHouse SQL,可以使用位于com.clickhouse.data的辅助函数ClickHouseValues.convertToSqlExpression

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(servers)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from my_table where name=:name")
.params(Map.of("name", ClickHouseValues.convertToSqlExpression("Ben's")))
.executeAndWait()) {
//...
}
}

在上面的示例中,ClickHouseValues.convertToSqlExpression将转义内部单引号,并用有效的单引号将变量括起来。

其他类型,例如IntegerUUIDArrayEnum将在params内部自动转换。

节点发现

Java 客户端提供自动发现 ClickHouse 节点的能力。默认情况下禁用自动发现。要手动启用它,请将auto_discovery设置为true

properties.setProperty("auto_discovery", "true");

或在连接 URL 中

jdbc:ch://my-server/system?auto_discovery=true

如果启用了自动发现,则无需在连接 URL 中指定所有 ClickHouse 节点。URL 中指定的节点将被视为种子,Java 客户端将自动从系统表和/或 clickhouse-keeper 或 zookeeper 中发现更多节点。

以下选项负责自动发现配置

属性默认值描述
auto_discoveryfalse客户端是否应该从系统表和/或 clickhouse-keeper/zookeeper 中发现更多节点。
node_discovery_interval0节点发现间隔(毫秒),零或负值表示一次性发现。
node_discovery_limit100一次可以发现的节点最大数量;零或负值表示没有限制。

负载均衡

Java 客户端根据负载均衡策略选择要向其发送请求的 ClickHouse 节点。通常,负载均衡策略负责以下事项

  1. 从托管节点列表中获取节点。
  2. 管理节点的状态。
  3. 可选地安排节点发现的后台进程(如果启用了自动发现)并运行健康检查。

以下是配置负载均衡的选项列表

属性默认值描述
load_balancing_policy""负载均衡策略可以是以下之一
  • firstAlive - 请求发送到托管节点列表中的第一个正常节点
  • random - 请求发送到托管节点列表中的随机节点
  • roundRobin - 请求依次发送到托管节点列表中的每个节点。
  • 实现ClickHouseLoadBalancingPolicy的完整限定类名 - 自定义负载均衡策略
  • 如果未指定,则请求将发送到托管节点列表中的第一个节点
    load_balancing_tags""用于筛选节点的负载均衡标签。请求仅发送到具有指定标签的节点
    health_check_interval0健康检查间隔(毫秒),零或负值表示一次性检查。
    health_check_methodClickHouseHealthCheckMethod.SELECT_ONE健康检查方法。可以是以下之一
  • ClickHouseHealthCheckMethod.SELECT_ONE - 使用select 1查询进行检查
  • ClickHouseHealthCheckMethod.PING - 协议特定的检查,通常更快
  • node_check_interval0节点检查间隔(毫秒),负数被视为零。如果自上次检查以来已过指定时间,则会检查节点状态。
    health_check_intervalnode_check_interval之间的区别在于,health_check_interval选项安排后台作业,该作业检查节点列表(所有节点或故障节点)的状态,但node_check_interval指定自上次检查以来特定节点已过的时间
    check_all_nodesfalse是否对所有节点或仅对故障节点执行健康检查。

    故障转移和重试

    Java 客户端提供配置选项来设置失败查询的故障转移和重试行为

    属性默认值描述
    failover0请求可以发生故障转移的次数。零或负值表示不进行故障转移。故障转移将失败的请求发送到不同的节点(根据负载均衡策略)以从故障转移中恢复。
    retry0请求可以重试的次数。零或负值表示不进行重试。重试将请求发送到同一节点,并且仅当 ClickHouse 服务器返回NETWORK_ERROR错误代码时才会发送。
    repeat_on_session_locktrue会话锁定时是否重复执行,直到超时(根据session_timeoutconnect_timeout)。如果 ClickHouse 服务器返回SESSION_IS_LOCKED错误代码,则会重复失败的请求

    添加自定义 HTTP 标头

    如果我们想向请求添加自定义 HTTP 标头,Java 客户端支持 HTTP/S 传输层。我们应该使用custom_http_headers属性,并且标头需要用,分隔。标头键/值应该用=分隔

    Java 客户端支持

    options.put("custom_http_headers", "X-ClickHouse-Quota=test, X-ClickHouse-Test=test");

    JDBC 驱动程序

    properties.setProperty("custom_http_headers", "X-ClickHouse-Quota=test, X-ClickHouse-Test=test");