跳至主要内容

ClickHouse Rust 客户端

连接到 ClickHouse 的官方 Rust 客户端,最初由 Paul Loyd 开发。客户端源代码可在 GitHub 仓库 中找到。

概述

  • 使用 serde 进行行编码/解码。
  • 支持 serde 属性:skip_serializingskip_deserializingrename
  • 通过 HTTP 传输使用 RowBinary 格式。
    • 计划切换到通过 TCP 使用 Native 格式。
  • 支持 TLS(通过 native-tlsrustls-tls 特性)。
  • 支持压缩和解压缩 (LZ4)。
  • 提供用于选择或插入数据、执行 DDL 和客户端批量处理的 API。
  • 提供方便的模拟对象用于单元测试。

安装

要使用该板条箱,请在您的 Cargo.toml 中添加以下内容

[dependencies]
clickhouse = "0.12.2"

[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }

另请参阅:crates.io 页面

Cargo 特性

  • lz4(默认启用)— 启用 Compression::Lz4Compression::Lz4Hc(_) 变体。如果启用,则默认情况下对所有查询使用 Compression::Lz4,除了 WATCH 查询。
  • native-tls — 通过 hyper-tls 支持使用 HTTPS 协议的 URL,它链接到 OpenSSL。
  • rustls-tls — 通过 hyper-rustls 支持使用 HTTPS 协议的 URL,它不链接到 OpenSSL。
  • inserter — 启用 client.inserter()
  • test-util — 添加模拟对象。请参阅 示例。仅在 dev-dependencies 中使用它。
  • watch — 启用 client.watch 功能。有关详细信息,请参阅相应的章节。
  • uuid — 添加 serde::uuid 以与 uuid 板条箱一起使用。
  • time — 添加 serde::time 以与 time 板条箱一起使用。
信息

通过 HTTPS URL 连接到 ClickHouse 时,应启用 native-tlsrustls-tls 特性。如果两者都启用,则 rustls-tls 特性将优先。

ClickHouse 版本兼容性

该客户端与 ClickHouse 的 LTS 或更新版本以及 ClickHouse Cloud 兼容。

v22.6 之前的 ClickHouse 服务器在某些罕见情况下会 错误地处理 RowBinary。您可以使用 v0.11+ 并启用 wa-37420 特性来解决此问题。注意:此特性不应与更新的 ClickHouse 版本一起使用。

示例

我们的目标是通过客户端存储库中的 示例 涵盖客户端使用的各种场景。概述可在 示例自述文件 中找到。

如果示例或以下文档中存在不清楚或缺失的内容,请随时 联系我们

用法

注意

ch2rs 板条箱可用于根据 ClickHouse 生成行类型。

创建客户端实例

提示

重用创建的客户端或克隆它们以重用底层的 hyper 连接池。

use clickhouse::Client;

let client = Client::default()
// should include both protocol and port
.with_url("https://127.0.0.1:8123")
.with_user("name")
.with_password("123")
.with_database("test");

HTTPS 或 ClickHouse Cloud 连接

HTTPS 可与 rustls-tlsnative-tls Cargo 特性一起使用。

然后,像往常一样创建客户端。在此示例中,环境变量用于存储连接详细信息

信息

URL 应包含协议和端口,例如 https://instance.clickhouse.cloud:8443

fn read_env_var(key: &str) -> String {
env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}

let client = Client::default()
.with_url(read_env_var("CLICKHOUSE_URL"))
.with_user(read_env_var("CLICKHOUSE_USER"))
.with_password(read_env_var("CLICKHOUSE_PASSWORD"));

另请参阅

选择行

use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;

#[derive(Row, Deserialize)]
struct MyRow<'a> {
no: u32,
name: &'a str,
}

let table_name = "some";
let mut cursor = client
.query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
.bind(Identifier(table_name))
.bind(500)
.bind(504)
.fetch::<MyRow<'_>>()?;

while let Some(row) = cursor.next().await? { .. }
  • 占位符 ?fields 将替换为 no, nameRow 的字段)。
  • 占位符 ? 将替换为后续 bind() 调用中的值。
  • 可以使用方便的 fetch_one::<Row>()fetch_all::<Row>() 方法分别获取第一行或所有行。
  • sql::Identifier 可用于绑定表名。

注意:由于整个响应是流式传输的,因此即使在生成某些行后,游标也可能返回错误。如果在您的用例中发生这种情况,您可以尝试 query(...).with_option("wait_end_of_query", "1") 以在服务器端启用响应缓冲。 更多详细信息buffer_size 选项也很有用。

危险

在选择行时谨慎使用 wait_end_of_query,因为它会导致服务器端更高的内存消耗,并且可能会降低整体性能。

插入行

use serde::Serialize;
use clickhouse::Row;

#[derive(Row, Serialize)]
struct MyRow {
no: u32,
name: String,
}

let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
  • 如果没有调用 end(),则 INSERT 将被中止。
  • 行将作为流逐步发送以分散网络负载。
  • 只有当所有行都适合同一个分区并且它们的数目小于 max_insert_block_size 时,ClickHouse 才会原子地插入批次。

异步插入(服务器端批量处理)

您可以使用 ClickHouse 异步插入 来避免传入数据的客户端批量处理。这可以通过简单地为 insert 方法(甚至为 Client 实例本身)提供 async_insert 选项来完成,以便它将影响所有 insert 调用。

let client = Client::default()
.with_url("https://127.0.0.1:8123")
.with_option("async_insert", "1")
.with_option("wait_for_async_insert", "0");

另请参阅

插入程序特性(客户端批量处理)

需要 inserter Cargo 特性。

let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));

inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
println!(
"{} bytes, {} rows, {} transactions have been inserted",
stats.bytes, stats.rows, stats.transactions,
);
}

// don't forget to finalize the inserter during the application shutdown
// and commit the remaining rows. `.end()` will provide stats as well.
inserter.end().await?;
  • 如果达到任何阈值(max_bytesmax_rowsperiod),则 Inserter 会在 commit() 中结束活动插入。
  • 可以通过使用 with_period_bias 偏差结束活动 INSERT 之间的时间间隔,以避免并行插入程序导致的负载峰值。
  • Inserter::time_left() 可用于检测当前周期何时结束。如果您的流很少发出项目,请再次调用 Inserter::commit() 以检查限制。
  • 时间阈值通过使用 quanta 板条箱来实现,以加快插入程序的速度。如果启用了 test-util,则不会使用它(因此,时间可以在自定义测试中由 tokio::time::advance() 管理)。
  • commit() 调用之间的所有行都将插入到同一个 INSERT 语句中。
危险

如果您想终止/完成插入,请不要忘记刷新

inserter.end().await?;

执行 DDL

对于单节点部署,只需像这样执行 DDL 即可

client.query("DROP TABLE IF EXISTS some").execute().await?;

然而,在使用负载均衡器或 ClickHouse Cloud 的集群部署中,建议使用wait_end_of_query选项等待 DDL 在所有副本上应用完成。可以这样操作

client
.query("DROP TABLE IF EXISTS some")
.with_option("wait_end_of_query", "1")
.execute()
.await?;

ClickHouse 设置

您可以使用with_option方法应用各种ClickHouse 设置。例如

let numbers = client
.query("SELECT number FROM system.numbers")
// This setting will be applied to this particular query only;
// it will override the global client setting.
.with_option("limit", "3")
.fetch_all::<u64>()
.await?;

除了query之外,它与insertinserter方法的工作方式类似;此外,可以在Client实例上调用相同的方法来为所有查询设置全局设置。

查询 ID

使用.with_option,您可以设置query_id选项以在 ClickHouse 查询日志中识别查询。

let numbers = client
.query("SELECT number FROM system.numbers LIMIT 1")
.with_option("query_id", "some-query-id")
.fetch_all::<u64>()
.await?;

除了query之外,它与insertinserter方法的工作方式类似。

危险

如果您手动设置query_id,请确保它是唯一的。UUID 是一个不错的选择。

另请参阅:客户端存储库中的query_id 示例

会话 ID

query_id类似,您可以设置session_id以在同一会话中执行语句。session_id可以在客户端级别全局设置,也可以在每个queryinsertinserter调用中设置。

let client = Client::default()
.with_url("https://127.0.0.1:8123")
.with_option("session_id", "my-session");
危险

对于集群部署,由于缺少“粘性会话”,您需要连接到特定的集群节点才能正确利用此功能,因为例如,轮询负载均衡器不能保证后续请求将由同一个 ClickHouse 节点处理。

另请参阅:客户端存储库中的session_id 示例

自定义 HTTP 标头

如果您正在使用代理身份验证或需要传递自定义标头,您可以这样做

let client = Client::default()
.with_url("https://127.0.0.1:8123")
.with_header("X-My-Header", "hello");

另请参阅:客户端存储库中的自定义 HTTP 标头示例

自定义 HTTP 客户端

这对于调整底层 HTTP 连接池设置可能很有用。

use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;

let connector = HttpConnector::new(); // or HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
// For how long keep a particular idle socket alive on the client side (in milliseconds).
// It is supposed to be a fair bit less that the ClickHouse server KeepAlive timeout,
// which was by default 3 seconds for pre-23.11 versions, and 10 seconds after that.
.pool_idle_timeout(Duration::from_millis(2_500))
// Sets the maximum idle Keep-Alive connections allowed in the pool.
.pool_max_idle_per_host(4)
.build(connector);

let client = Client::with_http_client(hyper_client).with_url("https://127.0.0.1:8123");
危险

此示例依赖于旧版 Hyper API,并且将来可能会更改。

另请参阅:客户端存储库中的自定义 HTTP 客户端示例

数据类型

  • (U)Int(8|16|32|64|128)映射到/来自相应的(u|i)(8|16|32|64|128)类型或围绕它们的 newtypes。
  • (U)Int256 不直接支持,但有解决方法
  • Float(32|64)映射到/来自相应的f(32|64)或围绕它们的 newtypes。
  • Decimal(32|64|128)映射到/来自相应的i(32|64|128)或围绕它们的 newtypes。使用fixnum或其他有符号定点数字的实现更方便。
  • Boolean映射到/来自bool或围绕它的 newtypes。
  • String映射到/来自任何字符串或字节类型,例如&str&[u8]StringVec<u8>SmartString。也支持 Newtypes。要存储字节,请考虑使用serde_bytes,因为它更高效。
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
str: &'a str,
string: String,
#[serde(with = "serde_bytes")]
bytes: Vec<u8>,
#[serde(with = "serde_bytes")]
byte_slice: &'a [u8],
}
  • FixedString(N)作为字节数组支持,例如[u8; N]
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
fixed_str: [u8; 16], // FixedString(16)
}
use serde_repr::{Deserialize_repr, Serialize_repr};

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
level: Level,
}

#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
Debug = 1,
Info = 2,
Warn = 3,
Error = 4,
}
  • UUID通过使用serde::uuid映射到/来自uuid::Uuid。需要uuid功能。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::uuid")]
uuid: uuid::Uuid,
}
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4")]
ipv4: std::net::Ipv4Addr,
}
  • Date映射到/来自u16或围绕它的 newtype,并表示自1970-01-01以来经过的天数。此外,通过使用serde::time::date支持time::Date,这需要time功能。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: u16,
#[serde(with = "clickhouse::serde::time::date")]
date: Date,
}
  • Date32映射到/来自i32或围绕它的 newtype,并表示自1970-01-01以来经过的天数。此外,通过使用serde::time::date32支持time::Date,这需要time功能。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: i32,
#[serde(with = "clickhouse::serde::time::date32")]
date: Date,
}
  • DateTime映射到/来自u32或围绕它的 newtype,并表示自 Unix 纪元以来经过的秒数。此外,通过使用serde::time::datetime支持time::OffsetDateTime,这需要time功能。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: u32,
#[serde(with = "clickhouse::serde::time::datetime")]
dt: OffsetDateTime,
}
  • DateTime64(_)映射到/来自i32或围绕它的 newtype,并表示自 Unix 纪元以来经过的时间。此外,通过使用serde::time::datetime64::*支持time::OffsetDateTime,这需要time功能。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: i64, // elapsed s/us/ms/ns depending on `DateTime64(X)`
#[serde(with = "clickhouse::serde::time::datetime64::secs")]
dt64s: OffsetDateTime, // `DateTime64(0)`
#[serde(with = "clickhouse::serde::time::datetime64::millis")]
dt64ms: OffsetDateTime, // `DateTime64(3)`
#[serde(with = "clickhouse::serde::time::datetime64::micros")]
dt64us: OffsetDateTime, // `DateTime64(6)`
#[serde(with = "clickhouse::serde::time::datetime64::nanos")]
dt64ns: OffsetDateTime, // `DateTime64(9)`
}
  • Tuple(A, B, ...)映射到/来自(A, B, ...)或围绕它的 newtype。
  • Array(_)映射到/来自任何切片,例如Vec<_>&[_]。也支持 Newtypes。
  • Map(K, V)的行为类似于Array((K, V))
  • LowCardinality(_)无缝支持。
  • Nullable(_)映射到/来自Option<_>。对于clickhouse::serde::*助手,添加::option
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4::option")]
ipv4_opt: Option<Ipv4Addr>,
}
  • Nested通过提供多个带有重命名的数组来支持。
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(rename = "items.name")]
items_name: Vec<String>,
#[serde(rename = "items.count")]
items_count: Vec<u32>,
}
  • 支持Geo类型。Point的行为类似于元组(f64, f64),其余类型只是点的切片。
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
point: Point,
ring: Ring,
polygon: Polygon,
multi_polygon: MultiPolygon,
line_string: LineString,
multi_line_string: MultiLineString,
}
  • VariantDynamic、(新的)JSON数据类型尚不支持。

模拟

该板条箱提供了用于模拟 CH 服务器和测试 DDL、SELECTINSERTWATCH查询的实用程序。可以使用test-util功能启用此功能。仅将其用作开发依赖项。

请参阅示例

故障排除

CANNOT_READ_ALL_DATA

CANNOT_READ_ALL_DATA错误的最常见原因是应用程序端的行定义与 ClickHouse 中的行定义不匹配。

考虑以下表格

CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp

然后,如果应用程序端定义了EventLog且类型不匹配,例如

#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: String, // <- should be u32 instead!
}

插入数据时,可能会发生以下错误

Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")

在此示例中,可以通过正确定义EventLog结构来解决此问题

#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: u32
}

已知限制

  • VariantDynamic、(新的)JSON数据类型尚不支持。
  • 服务器端参数绑定尚不支持;请参阅此问题以进行跟踪。

联系我们

如果您有任何疑问或需要帮助,请随时在社区 Slack或通过GitHub 问题与我们联系。