跳到主要内容
跳到主要内容

客户端 (0.7.x 及更早版本)

Java 客户端库,用于通过其协议与数据库服务器通信。当前实现仅支持 HTTP 接口。该库提供自己的 API 来向服务器发送请求。

弃用

此库即将被弃用。对于新项目,请使用最新的 Java 客户端

设置

<!-- https://mvnrepository.com/artifact/com.clickhouse/clickhouse-http-client -->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-http-client</artifactId>
<version>0.7.2</version>
</dependency>

0.5.0 版本起,驱动程序使用了一个新的客户端 http 库,需要将其添加为依赖项。

<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents.client5/httpclient5 -->
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.3.1</version>
</dependency>

初始化

连接 URL 格式:protocol://host[:port][/database][?param[=value][&param[=value]][#tag[,tag]],例如

连接到单个节点

ClickHouseNode server = ClickHouseNode.of("https://:8123/default?compress=0");

连接到具有多个节点的集群

ClickHouseNodes servers = ClickHouseNodes.of(
"jdbc:ch:http://server1.domain,server2.domain,server3.domain/my_db"
+ "?load_balancing_policy=random&health_check_interval=5000&failover=2");

查询 API

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(servers)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from numbers limit :limit")
.params(1000)
.executeAndWait()) {
ClickHouseResponseSummary summary = response.getSummary();
long totalRows = summary.getTotalRowsToRead();
}

流式查询 API

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(servers)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from numbers limit :limit")
.params(1000)
.executeAndWait()) {
for (ClickHouseRecord r : response.records()) {
int num = r.getValue(0).asInteger();
// type conversion
String str = r.getValue(0).asString();
LocalDate date = r.getValue(0).asDate();
}
}

请参阅 repo 中的完整代码示例

插入 API




try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(servers).write()
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("insert into my_table select c2, c3 from input('c1 UInt8, c2 String, c3 Int32')")
.data(myInputStream) // `myInputStream` is source of data in RowBinary format
.executeAndWait()) {
ClickHouseResponseSummary summary = response.getSummary();
summary.getWrittenRows();
}

请参阅 repo 中的完整代码示例

RowBinary 编码

RowBinary 格式在其 页面 上进行了描述。

这里有一个 代码 示例。

功能

压缩

客户端默认使用 LZ4 压缩,这需要以下依赖项

<!-- https://mvnrepository.com/artifact/org.lz4/lz4-java -->
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.8.0</version>
</dependency>

您可以选择使用 gzip,方法是在连接 URL 中设置 compress_algorithm=gzip

或者,您可以通过几种方式禁用压缩。

  1. 通过在连接 URL 中设置 compress=0 禁用:https://:8123/default?compress=0
  2. 通过客户端配置禁用
ClickHouseClient client = ClickHouseClient.builder()
.config(new ClickHouseConfig(Map.of(ClickHouseClientOption.COMPRESS, false)))
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();

请参阅压缩文档以了解有关不同压缩选项的更多信息。

多个查询

在同一会话中,在一个工作线程中依次执行多个查询

CompletableFuture<List<ClickHouseResponseSummary>> future = ClickHouseClient.send(servers.apply(servers.getNodeSelector()),
"create database if not exists my_base",
"use my_base",
"create table if not exists test_table(s String) engine=Memory",
"insert into test_table values('1')('2')('3')",
"select * from test_table limit 1",
"truncate table test_table",
"drop table if exists test_table");
List<ClickHouseResponseSummary> results = future.get();

命名参数

您可以按名称传递参数,而不是仅仅依赖它们在参数列表中的位置。此功能可通过 params 函数使用。

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(servers)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from my_table where name=:name limit :limit")
.params("Ben", 1000)
.executeAndWait()) {
//...
}
}
参数

所有涉及 String 类型(StringString[]Map<String, String>)的 params 签名都假定传递的键是有效的 ClickHouse SQL 字符串。例如

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(servers)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from my_table where name=:name")
.params(Map.of("name","'Ben'"))
.executeAndWait()) {
//...
}
}

如果您不想手动将 String 对象解析为 ClickHouse SQL,可以使用位于 com.clickhouse.data 的辅助函数 ClickHouseValues.convertToSqlExpression

try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(servers)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from my_table where name=:name")
.params(Map.of("name", ClickHouseValues.convertToSqlExpression("Ben's")))
.executeAndWait()) {
//...
}
}

在上面的示例中,ClickHouseValues.convertToSqlExpression 将转义内部单引号,并用有效的单引号包围变量。

其他类型,例如 IntegerUUIDArrayEnum 将在 params 内部自动转换。

节点发现

Java 客户端提供自动发现 ClickHouse 节点的能力。默认情况下,自动发现处于禁用状态。要手动启用它,请将 auto_discovery 设置为 true

properties.setProperty("auto_discovery", "true");

或在连接 URL 中

jdbc:ch://my-server/system?auto_discovery=true

如果启用了自动发现,则无需在连接 URL 中指定所有 ClickHouse 节点。URL 中指定的节点将被视为种子节点,Java 客户端将自动从系统表和/或 clickhouse-keeper 或 zookeeper 中发现更多节点。

以下选项负责自动发现配置

属性默认值描述
auto_discoveryfalse客户端是否应从系统表和/或 clickhouse-keeper/zookeeper 中发现更多节点。
node_discovery_interval0节点发现间隔,以毫秒为单位,零值或负值表示一次性发现。
node_discovery_limit100一次可以发现的最大节点数;零值或负值表示无限制。

负载均衡

Java 客户端根据负载均衡策略选择一个 ClickHouse 节点来发送请求。通常,负载均衡策略负责以下事项

  1. 从托管节点列表中获取一个节点。
  2. 管理节点的状态。
  3. 可选地,调度后台进程进行节点发现(如果启用了自动发现)并运行健康检查。

以下是配置负载均衡的选项列表

属性默认值描述
load_balancing_policy""负载均衡策略可以是以下之一
  • firstAlive - 请求发送到托管节点列表中的第一个健康节点
  • random - 请求发送到托管节点列表中的一个随机节点
  • roundRobin - 请求依次发送到托管节点列表中的每个节点。
  • 实现 ClickHouseLoadBalancingPolicy 的完整类名 - 自定义负载均衡策略
  • 如果未指定,则请求将发送到托管节点列表中的第一个节点
    load_balancing_tags""用于过滤节点的负载均衡标签。请求仅发送到具有指定标签的节点
    health_check_interval0健康检查间隔,以毫秒为单位,零值或负值表示一次性。
    health_check_methodClickHouseHealthCheckMethod.SELECT_ONE健康检查方法。可以是以下之一
  • ClickHouseHealthCheckMethod.SELECT_ONE - 使用 select 1 查询进行检查
  • ClickHouseHealthCheckMethod.PING - 协议特定的检查,通常更快
  • node_check_interval0节点检查间隔,以毫秒为单位,负数被视为零。如果自上次检查以来经过了指定的时间量,则检查节点状态。
    health_check_intervalnode_check_interval 之间的区别在于,health_check_interval 选项调度后台作业,该作业检查节点列表(全部或故障节点)的状态,而 node_check_interval 指定自上次检查特定节点以来经过的时间量
    check_all_nodesfalse是否对所有节点或仅对故障节点执行健康检查。

    故障转移和重试

    Java 客户端提供配置选项,用于为失败的查询设置故障转移和重试行为

    属性默认值描述
    failover0请求可以发生故障转移的最大次数。零值或负值表示不进行故障转移。故障转移将失败的请求发送到不同的节点(根据负载均衡策略),以便从故障中恢复。
    retry0请求可以发生重试的最大次数。零值或负值表示不进行重试。重试将请求发送到同一节点,并且仅当 ClickHouse 服务器返回 NETWORK_ERROR 错误代码时才进行重试
    repeat_on_session_locktrue当会话被锁定时,是否重复执行直到超时(根据 session_timeoutconnect_timeout)。如果 ClickHouse 服务器返回 SESSION_IS_LOCKED 错误代码,则会重复失败的请求

    添加自定义 http 标头

    如果我们要向请求添加自定义 HTTP 标头,Java 客户端支持 HTTP/S 传输层。我们应该使用 custom_http_headers 属性,标头需要用 , 分隔。标头键/值应使用 = 分隔

    Java 客户端支持

    options.put("custom_http_headers", "X-ClickHouse-Quota=test, X-ClickHouse-Test=test");

    JDBC 驱动程序

    properties.setProperty("custom_http_headers", "X-ClickHouse-Quota=test, X-ClickHouse-Test=test");