ClickHouse Rust 客户端
用于连接 ClickHouse 的官方 Rust 客户端,最初由 Paul Loyd 开发。客户端源代码可在 GitHub 仓库 中找到。
概述
- 使用
serde
进行行编码/解码。 - 支持
serde
属性:skip_serializing
、skip_deserializing
、rename
。 - 通过 HTTP 传输使用
RowBinary
格式。- 计划切换到通过 TCP 的
Native
格式。
- 计划切换到通过 TCP 的
- 支持 TLS(通过
native-tls
和rustls-tls
功能)。 - 支持压缩和解压缩 (LZ4)。
- 提供用于选择或插入数据、执行 DDL 以及客户端批处理的 API。
- 为单元测试提供方便的 mocks。
安装
要使用该 crate,请将以下内容添加到您的 Cargo.toml
[dependencies]
clickhouse = "0.12.2"
[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }
另请参阅:crates.io 页面。
Cargo 功能
lz4
(默认启用)— 启用Compression::Lz4
和Compression::Lz4Hc(_)
变体。如果启用,则默认情况下Compression::Lz4
用于除WATCH
之外的所有查询。native-tls
— 通过hyper-tls
支持具有HTTPS
模式的 URL,它链接到 OpenSSL。rustls-tls
— 通过hyper-rustls
支持具有HTTPS
模式的 URL,它不链接到 OpenSSL。inserter
— 启用client.inserter()
。test-util
— 添加 mocks。请参阅示例。仅在dev-dependencies
中使用它。watch
— 启用client.watch
功能。请参阅相应的部分以了解详细信息。uuid
— 添加serde::uuid
以使用 uuid crate。time
— 添加serde::time
以使用 time crate。
当通过 HTTPS
url 连接到 ClickHouse 时,应启用 native-tls
或 rustls-tls
功能。如果两者都启用,则 rustls-tls
功能将优先。
ClickHouse 版本兼容性
该客户端与 LTS 或更新版本的 ClickHouse 以及 ClickHouse Cloud 兼容。
低于 v22.6 的 ClickHouse 服务器在某些罕见情况下会错误地处理 RowBinary。您可以使用 v0.11+ 并启用 wa-37420
功能来解决此问题。注意:此功能不应与较新的 ClickHouse 版本一起使用。
示例
我们的目标是通过客户端仓库中的示例来涵盖客户端使用的各种场景。概述可在示例 README 中找到。
如果示例或以下文档中有什么不清楚或遗漏的内容,请随时联系我们。
用法
ch2rs crate 可用于从 ClickHouse 生成行类型。
创建客户端实例
重用已创建的客户端或克隆它们,以便重用底层 hyper 连接池。
use clickhouse::Client;
let client = Client::default()
// should include both protocol and port
.with_url("https://127.0.0.1:8123")
.with_user("name")
.with_password("123")
.with_database("test");
HTTPS 或 ClickHouse Cloud 连接
HTTPS 适用于 rustls-tls
或 native-tls
cargo 功能。
然后,像往常一样创建客户端。在此示例中,环境变量用于存储连接详细信息
URL 应同时包含协议和端口,例如 https://instance.clickhouse.cloud:8443
。
fn read_env_var(key: &str) -> String {
env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}
let client = Client::default()
.with_url(read_env_var("CLICKHOUSE_URL"))
.with_user(read_env_var("CLICKHOUSE_USER"))
.with_password(read_env_var("CLICKHOUSE_PASSWORD"));
另请参阅
- 客户端仓库中的HTTPS 与 ClickHouse Cloud 示例。这也应适用于本地 HTTPS 连接。
选择行
use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;
#[derive(Row, Deserialize)]
struct MyRow<'a> {
no: u32,
name: &'a str,
}
let table_name = "some";
let mut cursor = client
.query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
.bind(Identifier(table_name))
.bind(500)
.bind(504)
.fetch::<MyRow<'_>>()?;
while let Some(row) = cursor.next().await? { .. }
- 占位符
?fields
被no, name
(Row
的字段)替换。 - 占位符
?
被以下bind()
调用中的值替换。 - 方便的
fetch_one::<Row>()
和fetch_all::<Row>()
方法可用于分别获取第一行或所有行。 sql::Identifier
可用于绑定表名。
注意:由于整个响应是流式传输的,因此即使在生成某些行之后,cursors 也可能返回错误。如果您的用例中发生这种情况,您可以尝试使用 query(...).with_option("wait_end_of_query", "1")
以在服务器端启用响应缓冲。更多详细信息。 buffer_size
选项也可能很有用。
选择行时请谨慎使用 wait_end_of_query
,因为它会导致服务器端更高的内存消耗,并可能降低整体性能。
插入行
use serde::Serialize;
use clickhouse::Row;
#[derive(Row, Serialize)]
struct MyRow {
no: u32,
name: String,
}
let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
- 如果未调用
end()
,则INSERT
将中止。 - 行作为流逐渐发送,以分散网络负载。
- 仅当所有行都适合同一分区且其数量小于
max_insert_block_size
时,ClickHouse 才会原子地插入批次。
异步插入(服务器端批处理)
您可以使用 ClickHouse 异步插入 来避免客户端批处理传入数据。这可以通过简单地将 async_insert
选项提供给 insert
方法(甚至提供给 Client
实例本身,以便它将影响所有 insert
调用)来完成。
let client = Client::default()
.with_url("https://127.0.0.1:8123")
.with_option("async_insert", "1")
.with_option("wait_for_async_insert", "0");
另请参阅
- 客户端仓库中的异步插入示例。
Inserter 功能(客户端批处理)
需要 inserter
cargo 功能。
let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));
inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
println!(
"{} bytes, {} rows, {} transactions have been inserted",
stats.bytes, stats.rows, stats.transactions,
);
}
// don't forget to finalize the inserter during the application shutdown
// and commit the remaining rows. `.end()` will provide stats as well.
inserter.end().await?;
- 如果达到任何阈值(
max_bytes
、max_rows
、period
),Inserter
将在commit()
中结束活动插入。 - 可以通过使用
with_period_bias
偏置结束活动INSERT
之间的间隔,以避免并行 inserter 引起的负载峰值。 Inserter::time_left()
可用于检测当前周期何时结束。如果您的流很少发出项目,请再次调用Inserter::commit()
以检查限制。- 通过使用 quanta crate 来加速
inserter
的时间阈值实现。如果启用了test-util
,则不使用(因此,时间可以由自定义测试中的tokio::time::advance()
管理)。 commit()
调用之间的所有行都插入到同一个INSERT
语句中。
如果您想终止/完成插入,请不要忘记刷新
inserter.end().await?;
执行 DDL
对于单节点部署,像这样执行 DDL 就足够了
client.query("DROP TABLE IF EXISTS some").execute().await?;
但是,在具有负载均衡器或 ClickHouse Cloud 的集群部署中,建议等待 DDL 应用于所有副本,使用 wait_end_of_query
选项。可以像这样完成
client
.query("DROP TABLE IF EXISTS some")
.with_option("wait_end_of_query", "1")
.execute()
.await?;
ClickHouse 设置
您可以使用 with_option
方法应用各种 ClickHouse 设置。例如
let numbers = client
.query("SELECT number FROM system.numbers")
// This setting will be applied to this particular query only;
// it will override the global client setting.
.with_option("limit", "3")
.fetch_all::<u64>()
.await?;
除了 query
之外,它与 insert
和 inserter
方法类似地工作;此外,可以在 Client
实例上调用相同的方法来为所有查询设置全局设置。
查询 ID
使用 .with_option
,您可以设置 query_id
选项以在 ClickHouse 查询日志中标识查询。
let numbers = client
.query("SELECT number FROM system.numbers LIMIT 1")
.with_option("query_id", "some-query-id")
.fetch_all::<u64>()
.await?;
除了 query
之外,它与 insert
和 inserter
方法类似地工作。
如果您手动设置 query_id
,请确保它是唯一的。 UUID 是一个不错的选择。
另请参阅:客户端仓库中的query_id 示例。
会话 ID
与 query_id
类似,您可以设置 session_id
以在同一会话中执行语句。 session_id
可以在客户端级别全局设置,也可以在每个 query
、insert
或 inserter
调用中设置。
let client = Client::default()
.with_url("https://127.0.0.1:8123")
.with_option("session_id", "my-session");
对于集群部署,由于缺少“粘性会话”,您需要连接到特定的集群节点才能正确使用此功能,例如,轮询负载均衡器将不保证后续请求将由同一 ClickHouse 节点处理。
另请参阅:客户端仓库中的session_id 示例。
自定义 HTTP 标头
如果您使用代理身份验证或需要传递自定义标头,您可以这样做
let client = Client::default()
.with_url("https://127.0.0.1:8123")
.with_header("X-My-Header", "hello");
另请参阅:客户端仓库中的自定义 HTTP 标头示例。
自定义 HTTP 客户端
这对于调整底层 HTTP 连接池设置可能很有用。
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;
let connector = HttpConnector::new(); // or HttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
// For how long keep a particular idle socket alive on the client side (in milliseconds).
// It is supposed to be a fair bit less that the ClickHouse server KeepAlive timeout,
// which was by default 3 seconds for pre-23.11 versions, and 10 seconds after that.
.pool_idle_timeout(Duration::from_millis(2_500))
// Sets the maximum idle Keep-Alive connections allowed in the pool.
.pool_max_idle_per_host(4)
.build(connector);
let client = Client::with_http_client(hyper_client).with_url("https://127.0.0.1:8123");
此示例依赖于旧版 Hyper API,并且将来可能会发生更改。
另请参阅:客户端仓库中的自定义 HTTP 客户端示例。
数据类型
(U)Int(8|16|32|64|128)
映射到/从相应的(u|i)(8|16|32|64|128)
类型或围绕它们的新类型。(U)Int256
不直接支持,但有一个解决方法。Float(32|64)
映射到/从相应的f(32|64)
或围绕它们的新类型。Decimal(32|64|128)
映射到/从相应的i(32|64|128)
或围绕它们的新类型。使用fixnum
或其他有符号定点数的实现会更方便。Boolean
映射到/从bool
或围绕它的新类型。String
映射到/从任何字符串或字节类型,例如&str
、&[u8]
、String
、Vec<u8>
或SmartString
。也支持新类型。要存储字节,请考虑使用serde_bytes
,因为它更高效。
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
str: &'a str,
string: String,
#[serde(with = "serde_bytes")]
bytes: Vec<u8>,
#[serde(with = "serde_bytes")]
byte_slice: &'a [u8],
}
FixedString(N)
作为字节数组支持,例如[u8; N]
。
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
fixed_str: [u8; 16], // FixedString(16)
}
Enum(8|16)
使用serde_repr
支持。
use serde_repr::{Deserialize_repr, Serialize_repr};
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
level: Level,
}
#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
Debug = 1,
Info = 2,
Warn = 3,
Error = 4,
}
UUID
通过使用serde::uuid
映射到/从uuid::Uuid
。需要uuid
功能。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::uuid")]
uuid: uuid::Uuid,
}
IPv6
映射到/从std::net::Ipv6Addr
。IPv4
通过使用serde::ipv4
映射到/从std::net::Ipv4Addr
。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4")]
ipv4: std::net::Ipv4Addr,
}
Date
映射到/从u16
或围绕它的新类型,并表示自1970-01-01
以来经过的天数。此外,time::Date
通过使用serde::time::date
支持,这需要time
功能。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: u16,
#[serde(with = "clickhouse::serde::time::date")]
date: Date,
}
Date32
映射到/从i32
或围绕它的新类型,并表示自1970-01-01
以来经过的天数。此外,time::Date
通过使用serde::time::date32
支持,这需要time
功能。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: i32,
#[serde(with = "clickhouse::serde::time::date32")]
date: Date,
}
DateTime
映射到/从u32
或围绕它的新类型,并表示自 UNIX 纪元以来经过的秒数。此外,time::OffsetDateTime
通过使用serde::time::datetime
支持,这需要time
功能。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: u32,
#[serde(with = "clickhouse::serde::time::datetime")]
dt: OffsetDateTime,
}
DateTime64(_)
映射到/从i32
或围绕它的新类型,并表示自 UNIX 纪元以来经过的时间。此外,time::OffsetDateTime
通过使用serde::time::datetime64::*
支持,这需要time
功能。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: i64, // elapsed s/us/ms/ns depending on `DateTime64(X)`
#[serde(with = "clickhouse::serde::time::datetime64::secs")]
dt64s: OffsetDateTime, // `DateTime64(0)`
#[serde(with = "clickhouse::serde::time::datetime64::millis")]
dt64ms: OffsetDateTime, // `DateTime64(3)`
#[serde(with = "clickhouse::serde::time::datetime64::micros")]
dt64us: OffsetDateTime, // `DateTime64(6)`
#[serde(with = "clickhouse::serde::time::datetime64::nanos")]
dt64ns: OffsetDateTime, // `DateTime64(9)`
}
Tuple(A, B, ...)
映射到/从(A, B, ...)
或围绕它的新类型。Array(_)
映射到/从任何切片,例如Vec<_>
、&[_]
。也支持新类型。Map(K, V)
的行为类似于Array((K, V))
。- 无缝支持
LowCardinality(_)
。 Nullable(_)
映射到/从Option<_>
。对于clickhouse::serde::*
助手,添加::option
。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4::option")]
ipv4_opt: Option<Ipv4Addr>,
}
- 通过提供具有重命名的多个数组来支持
Nested
。
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(rename = "items.name")]
items_name: Vec<String>,
#[serde(rename = "items.count")]
items_count: Vec<u32>,
}
- 支持
Geo
类型。Point
的行为类似于元组(f64, f64)
,其余类型只是点的切片。
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
point: Point,
ring: Ring,
polygon: Polygon,
multi_polygon: MultiPolygon,
line_string: LineString,
multi_line_string: MultiLineString,
}
- 尚不支持
Variant
、Dynamic
、(新的)JSON
数据类型。
Mocking
该 crate 提供了用于 mocking CH 服务器和测试 DDL、SELECT
、INSERT
和 WATCH
查询的实用程序。该功能可以通过 test-util
功能启用。仅将其用作 dev-dependency。
请参阅示例。
故障排除
CANNOT_READ_ALL_DATA
CANNOT_READ_ALL_DATA
错误最常见的原因是应用程序端的行定义与 ClickHouse 中的行定义不匹配。
考虑以下表
CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp
然后,如果在应用程序端定义 EventLog
时类型不匹配,例如
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: String, // <- should be u32 instead!
}
插入数据时,可能会发生以下错误
Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")
在此示例中,通过 EventLog
结构的正确定义来修复此问题
#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: u32
}
已知限制
- 尚不支持
Variant
、Dynamic
、(新的)JSON
数据类型。 - 尚不支持服务器端参数绑定;有关跟踪,请参阅此 issue。
联系我们
如果您有任何问题或需要帮助,请随时通过 Community Slack 或通过 GitHub issues 联系我们。