简介
今天,我们欢迎来自我们的技术合作伙伴 Streamkap 的客座文章,Streamkap 是一个用于 ClickHouse 的开箱即用变更数据捕获 (CDC) 解决方案。这篇博客深入探讨了构建此类产品的细节和挑战。对于那些只想为 ClickHouse 获得可用的开箱即用 CDC 解决方案的读者,我们很高兴推荐 Streamkap 作为托管服务。
我们很高兴宣布我们新的 ClickHouse 连接器,用于将 CDC 数据从 PostgreSQL、MySQL、SQL Server、Oracle 和 MongoDB 等数据库流式传输到 ClickHouse。
Streamkap 最近切换到 ClickHouse,以实时处理我们所有的日志和指标,此前我们发现其他解决方案未能达到我们要求的查询性能。在我们自己采用 ClickHouse 后,我们希望开始提供 ClickHouse CDC 集成,但发现现有连接器存在问题,因此我们着手构建一个新的连接器来解决这些问题。
在这篇文章中,我们假设您熟悉 ClickHouse 数据库和变更数据捕获 (CDC) 的概念,如果您不熟悉,可以通过阅读 流式 ETL 中的变更数据捕获来了解更多信息。
我们将深入探讨为 ClickHouse 构建 CDC 解决方案的挑战以及我们如何应对这些挑战,讨论我们如何处理模式演变、数据一致性和快照。最后,我们将展示如何在保持高性能流式管道的同时实现这一切。
技术
ClickHouse 是一个开源的面向列的数据库。面向列的结构意味着数据按列而不是按行存储和检索。ClickHouse 已成为构建实时应用程序的事实标准选择,因为它能够摄取大量数据并在写入时而不是在读取时物化数据。这大大加快了查询速度,使 ClickHouse 适用于服务实时应用程序。
Streamkap 是一个无服务器流式平台,支持将实时变更数据捕获 (CDC) 摄取到 ClickHouse 中。在底层,Kafka、Debezium、Flink 等技术与生产级连接器/管道相结合。
以下是 Streamkap 如何从数据库流式传输到 ClickHouse 的概述。
挑战
当我们最初希望将 CDC 数据流式传输到 ClickHouse 时,我们开始寻找可以使用的现有连接器。在审查了官方的 ClickHouse Kafka Connect 连接器以及市场上的其他连接器后,我们很快意识到我们需要对它们进行大量修改才能支持各种用例。意识到这些连接器需要进行大量修改,我们着手构建我们自己的解决方案。以下是我们需要确保解决的一些关键要求,然后我们才能将我们的解决方案投入生产。
数据类型
现有解决方案对数据类型的支持不佳
- 嵌套结构体
- 嵌套数组,包含嵌套结构体的数组
- 具有微秒精度的 Timestamp
- 具有微秒精度的时间
- 没有时间信息的日期(自 epoch 以来的天数)
- JSON 作为纯字符串字段传输
元数据
在处理 CDC 数据时,添加额外的元数据列(例如时间戳和 CDC 记录类型)非常有用。这允许更简单、更强大的摄取后转换,以及诊断任何延迟问题。
插入/更新插入
在 Streamkap,我们看到希望使用插入或更新插入的客户数量均分。插入是仅追加模式,因此维护所有更改的历史记录,而更新插入仅显示最终数据(插入 + 更新)。虽然大多数公司都习惯了批量 ETL 的这种能力,但当与流式 ETL 结合使用时,这是一个新概念。在 批量处理与实时流处理中了解更多信息
模式演变
当源表更改时,我们需要更新目标表以处理此模式漂移,并且不会导致管道中断。
半结构化数据
MongoDB/Elasticsearch 等源允许复杂嵌套记录结构中存在不一致性,这些不一致性需要在摄取管道插入 ClickHouse 之前进行协调。例如
- 在某些记录中,Date/Time 表示为数字(自 epoch 以来的秒/毫秒),而在其他记录中表示为字符串(ISO 格式)
- 在某些记录中,嵌套字段是字符串,而在其他记录中是更复杂的嵌套结构体
- 深度嵌套的复杂半结构化数据通常需要在插入 ClickHouse 之前进行预处理,并映射到适当的类型,例如 Tuple、Nested。
我们的方法
现在让我们深入了解我们的连接器以及我们如何应对每个挑战。
数据类型
我们发现默认方法通常只是将数据作为 JSON 插入 ClickHouse,然后在加载后转换数据。
我们内置了对以下数据类型的支持
Kafka Connect 数据类型 | ClickHouse 数据类型 |
---|---|
INT8 | Int8 |
INT16 | Int16 |
INT32 | Int32 |
INT64 | Int64 |
FLOAT32 | Float32 |
FLOAT64 | Float64 |
BOOLEAN | Bool |
BYTES | BLOB (String) |
STRING | String |
org.apache.kafka.connect.data.Decimal | DECIMAL(38, 0) |
org.apache.kafka.connect.data.Timestampio.debezium.time.ZonedTimestamp | DateTime64 |
org.apache.kafka.connect.data.Date | Date |
io.debezium.data.Json | String |
STRUCT | Tuple |
ARRAY | Array |
JSON 字段目前作为字符串摄取,allow_experimental_object_type=1
的使用目前正在测试中。
元数据
连接器为每次插入 ClickHouse 表添加额外的键列,以便更好地进行加载后分析和建模,并支持更新插入。
以下元数据列被添加到每个 ClickHouse 表中
_streamkap_ts_ms
:CDC 事件时间戳_streamkap_deleted
:如果当前的 CDC 事件是删除事件_streamkap_partition
:smallint,表示通过对源记录键字段应用一致哈希获得的内部 Streamkap 分区号_streamkap_source_ts_ms
:更改事件在源数据库中发生的时间戳_streamkap_op
:CDC 事件操作类型(c 插入,u 更新,d 删除,r 快照,t 截断)
插入/更新插入
Streamkap 连接器支持两种数据摄取到 ClickHouse 的模式:插入(追加)和 更新插入。
更新插入模式是我们连接器的默认模式,当 ClickHouse 表需要包含源数据的最新版本时使用。
插入(追加)模式
插入模式会导致每个更改都被跟踪并作为新行插入 ClickHouse 中,而删除事件将在 ClickHouse 中使用元值 _streamkap_deleted
标记为已删除。
这对于较大的数据量很有用,可以保持低延迟并维护更改历史记录。
例如,Streamkap 在收集我们的指标时使用插入模式,因为只插入不可变数据。
然后我们使用指标表之上的 物化视图,以便在摄取时为时间序列分析创建多个聚合。在此表上设置合适的 TTL,以便 ClickHouse 为我们处理删除操作,同时提供足够的历史数据来调查任何问题,或者如果我们必须出于某种原因重建物化视图。
要使用插入(追加)模式,请使用 ClickHouse 引擎 MergeTree。
更新插入模式
更新插入是插入和更新的组合。如果在行的主键上找到匹配项,则该值将被覆盖。相反,如果没有匹配项,则将插入该事件。
更新插入模式是使用 ClickHouse 的 ReplacingMergeTree 引擎实现的。
ReplacingMergeTree 引擎在基于排序键的定期后台合并期间对数据进行重复数据删除,从而可以清理旧记录。此过程的异步性质意味着可能会有一个小窗口,您在该窗口中仍然保留视图中的旧记录。因此,查询必须使用 FINAL 修饰符来确保返回数据的最新版本,然后这将对查询时剩余的任何相同记录执行重复数据删除。
具有基本类型的更新插入示例
此处以 JSON 格式显示了更新插入的输入记录。键只有一个字段 id
,它是将对其行进行重复数据删除的主键
{
"id": "123456hYCcEM62894000000000",
"str_col": "some-str-values-000000000",
"IntColumn": 123000,
"Int8": 0,
"InT16": 10,
"bool_col": true,
"double_col": 1.7976931348623157E308,
"json_col": "{\"a\": 0}",
"__deleted": false,
"created_at": 1707379532748,
"date_col": 19761,
"ts_tz": "2023-10-24T15:19:51Z",
"_streamkap_ts_ms": 1707379532748,
"binary_col": "AQIDBAU=",
"byte_buf": "AQIDBAU=",
"bigint_col": "E4f/////0tCeAA=="
}
生成的表
SHOW CREATE TABLE streamkap_test_nominal_upsert
FORMAT Vertical
Query id: 1abf2898-69b3-4785-a849-65c3879493bb
Row 1:
──────
statement: CREATE TABLE streamkap.streamkap_test_nominal_upsert
(
`id` String COMMENT 'id',
`str_col` String COMMENT 'str_col',
`IntColumn` Int32 COMMENT 'IntColumn',
`Int8` Int8 COMMENT 'Int8',
`InT16` Int16 COMMENT 'InT16',
`bool_col` Bool COMMENT 'bool_col',
`double_col` Float64 COMMENT 'double_col',
`json_col` String COMMENT 'json_col',
`__deleted` Bool COMMENT '__deleted',
`created_at` DateTime64(3) COMMENT 'created_at',
`date_col` Date COMMENT 'date_col',
`ts_tz` DateTime64(3) COMMENT 'ts_tz',
`_streamkap_ts_ms` Int64 COMMENT '_streamkap_ts_ms',
`binary_col` String COMMENT 'binary_col',
`byte_buf` String COMMENT 'byte_buf',
`bigint_col` Decimal(38, 0) COMMENT 'bigint_col',
`_streamkap_partition` Int32 COMMENT '_streamkap_partition',
`_streamkap_deleted` UInt8 MATERIALIZED if(__deleted = true, 1, 0)
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}', _streamkap_ts_ms, _streamkap_deleted)
PARTITION BY _streamkap_partition
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192
示例数据
SELECT *
FROM streamkap_test_nominal_upsert
FORMAT Vertical
Row 1:
──────
id: 123456hYCcEM62894000000000
str_col: some-str-values-000000000
IntColumn: 123000
Int8: 0
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 0}
__deleted: false
created_at: 2024-02-08 08:03:37.368
date_col: 2024-02-08
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1707379417368
binary_col:
byte_buf:
bigint_col: 92233720368547000000000
_streamkap_partition: 0
Row 2:
──────
id: 123456hYCcEM62894000000000
str_col: some-str-values-000000000
IntColumn: 123000
Int8: 0
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 0}
__deleted: false
created_at: 2024-02-08 08:03:41.608
date_col: 2024-02-08
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1707379421608
binary_col: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col: 92233720368547000000000
_streamkap_partition: 0
使用 FINAL
的重复数据删除数据
SELECT *
FROM streamkap_test_nominal_upsert
FINAL
FORMAT Vertical
Row 1:
──────
id: 123456hYCcEM62894000000000
str_col: some-str-values-000000000
IntColumn: 123000
Int8: 0
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 0}
__deleted: false
created_at: 2024-02-08 08:03:41.608
date_col: 2024-02-08
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1707379421608
binary_col: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col: 92233720368547000000000
_streamkap_partition: 0
处理半结构化数据
嵌套数组和结构体
下面,我们提供了一些示例,说明如何将复杂结构自动映射到 ClickHouse 类型。
为了支持包含结构体的数组,我们需要更改 Streamkap 在 ClickHouse 中的角色,将 flatten_nested 设置为 0
ALTER ROLE STREAMKAP_ROLE SETTINGS flatten_nested = 0;
包含子数组的嵌套结构体字段
此处以 JSON 格式显示了输入记录,其中键只有一个字段 id
{
"id": 1,
"obj": {
"nb": 123,
"str": "abc",
"sub_arr": [
{
"sub_nb": 789,
"sub_str": "mnp"
}
]
}
}
生成的表。请注意 obj
列是如何映射到 Tuple(nb Int32, str String, sub_arr Array(Tuple(n Int32, s String)), sub_arr_str Array(String))
以处理复杂结构的
SHOW CREATE TABLE chdb.streamkap_nested_struct_with_array
CREATE TABLE chdb.streamkap_nested_struct_with_array
(
`obj` Tuple(nb Int32, str String, sub_arr Array(Tuple(n Int32, s String)), sub_arr_str Array(String)) COMMENT 'obj',
`__deleted` Bool COMMENT '__deleted',
`_streamkap_ts_ms` Int64 COMMENT '_streamkap_ts_ms',
`_streamkap_partition` Int32 COMMENT '_streamkap_partition',
`id` Int32 COMMENT 'id',
`_streamkap_deleted` UInt8 MATERIALIZED if(__deleted = true, 1, 0)
)
ENGINE = ReplacingMergeTree(_streamkap_ts_ms, _streamkap_deleted)
PARTITION BY _streamkap_partition
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192
示例数据
SELECT *
FROM chdb.streamkap_nested_struct_with_array
LIMIT 1 format Vertical
obj: (123,'abc',[(789,'mnp')],['efg'])
__deleted: false
_streamkap_ts_ms: 1702519029407
_streamkap_partition: 0
id: 1
包含子结构体的嵌套数组字段
此处以 JSON 格式显示了输入记录,其中键只有一个字段 id
{
"id": 1,
"arr": [
{
"nb": 123,
"str": "abc"
}
]
}
SHOW CREATE TABLE streamkap_nested_array_of_struct
CREATE TABLE streamkap.streamkap_nested_array_of_struct
(
`arr` Array(Tuple(nb Int32, str String)) COMMENT 'arr',
`__deleted` Bool COMMENT '__deleted',
`_streamkap_ts_ms` Int64 COMMENT '_streamkap_ts_ms',
`_streamkap_partition` Int32 COMMENT '_streamkap_partition',
`id` Int32 COMMENT 'id',
`_streamkap_deleted` UInt8 MATERIALIZED if(__deleted = true, 1, 0)
)
ENGINE = ReplacingMergeTree( _streamkap_ts_ms, _streamkap_deleted)
PARTITION BY _streamkap_partition
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192
示例数据
SELECT *
FROM streamkap_nested_array_of_struct
LIMIT 1 format Vertical
arr: [(123,'abc')]
__deleted: false
_streamkap_ts_ms: 1702529856885
_streamkap_partition: 0
id: 1
快照
快照是指将现有数据从数据库加载到 ClickHouse 的过程。
我们有两种方法可以加载此历史数据。
阻塞快照
阻塞快照的目的是捕获数据库表的整个当前状态,并将使用大型 select 语句来执行此操作。这些也可以并发运行,并且速度非常快。效率方面,阻塞快照可能对系统资源产生更高的影响,特别是对于大型表,并且每个查询可能需要更长的时间。
增量快照
增量快照旨在提高效率,通常对系统资源的影响较低,并且特别适合于非常大的表或希望同时进行快照和流式传输的情况。
数据一致性和交付保证
交付保证主要指的是失败场景,在这些场景中,未确认的 CDC 事件可能会被重播,从而导致重复行插入 ClickHouse。
Streamkap 为 ClickHouse 提供 至少一次 交付保证。
使用插入摄取模式,可能会将一些重复行插入 ClickHouse。但是,通过在您的物化视图中添加重复数据删除代码,将不会有任何影响。
如前所述,对于更新插入摄取模式,我们使用源记录键执行重复数据删除。强制执行 精确一次 交付保证会增加性能损失,而没有任何额外的好处,因为相同的过程会处理重复的 CDC 事件,将一个记录的所有 CDC 事件合并到最终记录状态中。
转换
Streamkap 支持管道中的转换,以便可以将预处理的数据发送到 ClickHouse。
这对于半结构化数据、预处理和清理任务尤其有用。这可能比在摄取后处理数据效率更高。
在 ClickHouse 中自然地对清理后的结构化数据进行实时分析,查询性能得益于将数据转换移至插入时间。
下面,我们介绍 Streamkap 执行的一些常见转换。
修复半结构化数据中的不一致性
考虑修复不一致的半结构化日期字段
"someDateField": {"$date": "2023-08-04T09:12:20.29Z"}
"someDateField": "2023-08-07T08:14:57.817325+00:00"
"someDateField": {"$date": {"$numberLong": 1702853448000}}
使用 Streamkap 转换,所有记录都可以转换为通用格式,以便摄取到 Clickhouse DateTime64 列中
"someDateField": "yyyy-MM-dd HH:mm:ss.SSS"
拆分大型半结构化 JSON 文档
对于文档数据库,子实体可以建模为嵌套在父实体文档中的子数组
{
"key": "abc1234",
"array": [
{
"id": "11111",
"someField": "aa-11"
},
{
"id": "22222",
"someField": "bb-22"
}
]
}
在 ClickHouse 中,将这些子实体表示为单独的行可能更有意义。使用 Streamkap 转换,可以将子实体记录拆分为单独的记录
{
"id": "11111",
"parentKey": "abc1234",
"someField": "aa-11"
}
{
"id": "22222",
"parentKey": "abc1234",
"someField": "bb-22"
}
模式演变
模式演变或漂移处理是更改目标表以反映上游更改的过程。
Streamkap 连接器自动处理以下场景中的模式漂移。
- 附加列: 将检测到附加字段,并且将在表中创建一个新列以接收新数据。
- 删除列: 此列现在将被忽略,并且不会采取进一步的操作。
- 更改列类型:将在表中创建一个附加列,使用后缀表示新类型。例如
ColumnName_type
可以在任何阶段将其他表添加到管道中。我们在下面展示了一些模式演变的示例。
添加列
考虑模式演变之前的以下输入记录
{
"id": "123456hYCcEM62894000000000",
"str_col": "some-str-values-000000000",
"IntColumn": 123000,
"Int8": 0,
"InT16": 10,
"bool_col": true,
"double_col": 1.7976931348623157E308,
"json_col": "{\"a\": 0}",
"binary_col": "AQIDBAU=",
"byte_buf": "AQIDBAU=",
"bigint_col": "E4f/////0tCeAA==",
"__deleted": false,
"created_at": 1702894985613,
"ts_tz": "2023-10-24T15:19:51Z",
"_streamkap_ts_ms": 1702894985613
}
新列 new_double_col
被添加到上游模式中。这会导致 ClickHouse 模式演变
{
"id": "123456hYCcEM62894xxx",
"str_col": "some-str-values-000000000",
"IntColumn": 123000,
"Int8": 0,
"InT16": 10,
"bool_col": true,
"double_col": 1.7976931348623157E308,
"json_col": "{\"a\": 0}",
"binary_col": "AQIDBAU=",
"byte_buf": "AQIDBAU=",
"bigint_col": "E4f/////0tCeAA==",
"__deleted": false,
"created_at": 1702894985613,
"ts_tz": "2023-10-24T15:19:51Z",
"_streamkap_ts_ms": 1702894985613,
"new_double_col": 1.7976931348623157E308
}
ClickHouse 数据
SELECT
id,
new_double_col
FROM streamkap_test_nominal_add_new_column
ORDER BY _streamkap_ts_ms ASC
┌─id─────────────────────────┬─new_double_col─┐
│ 123456hYCcEM62894000000000 │ 0 │
└────────────────────────────┴────────────────┘
┌─id───────────────────┬─────────new_double_col─┐
│ 123456hYCcEM62894xxx │ 1.7976931348623157e308 │
└──────────────────────┴────────────────────────┘
将 Int 演变为 String
模式演变之前的输入记录
{
"id": "123456hYCcEM62894000000000",
. . .
"IntColumn": 123000,
. . .
"_streamkap_ts_ms": 1702894492041
}
在上游模式演变后摄取的新记录
{
"id": "123456hYCcEM62894xxx",
. . .
"IntColumn": "new-str-value",
. . .
}
ClickHouse 数据,在添加新列 IntColumn_str
后
SELECT
id,
IntColumn,
IntColumn_str
FROM streamkap_test_nominal_evolve_int2string
ORDER BY _streamkap_ts_ms ASC
┌─id─────────────────────────┬─IntColumn─┬─IntColumn_str─┐
│ 123456hYCcEM62894000000000 │ 123000 │ │
└────────────────────────────┴───────────┴───────────────┘
┌─id───────────────────┬─IntColumn─┬─IntColumn_str─┐
│ 123456hYCcEM62894xxx │ 0 │ new-str-value │
└──────────────────────┴───────────┴───────────────┘
性能
以下 15 分钟的负载测试旨在显示各种批量大小与延迟相关的性能特征。此外,我们将评估 Streamkap ClickHouse 目标连接器的可扩展性。
ClickHouse Cloud 实例详细信息:3 个节点,每个节点 32GiB,具有 8 个 vCPU
输入记录格式包含基本类型、一个中等字符串(约 100 个字符)和一个大字符串(约 1000 个字符)
select * from streamkap_test_nominal_perf limit 1 format Vertical;
id: 123456hYCcEM62894000000001
str_col: some-str-values-000000001
IntColumn: 123001
Int8: 1
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 1}
__deleted: false
created_at: 1970-01-01 00:00:19.751
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1706539233685
binary_col: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col: 92233720368547000000001
medium_str: str-medium-000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001
large_str: str-large-000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001
_streamkap_partition: 0
当前测试的摄取模式设置为“更新插入”。当使用“追加”时,吞吐量会稍微好一些,因为不需要一些内存中的重复数据删除逻辑。
基线单分区
使用单个 Streamkap 任务和 Clickhouse 分区以及多个批量大小进行基准测试。
吞吐量
每个批量大小的延迟
通常,回填需要原始吞吐量,而延迟不是问题。在这种情况下,超过 10 万行的大批量大小将更合适。
通常,流式传输更改的吞吐量要求较低,并且可能需要更小的延迟。在这种情况下,较小的批量大小更合适。
这些是具有固定批量大小的人工测试,旨在例证吞吐量和延迟之间的权衡。在实践中,批量大小随内部队列大小而变化。如果队列中有许多记录在等待,则批量大小将增大,因此吞吐量将增大。
可扩展性
使用相同的批量大小进行测试:每个批量大小 100,000 条记录,并逐渐增加任务数:1、2、4 和 8。我们可以看到吞吐量与任务数大致呈线性关系扩展。
总结
这只是我们与 ClickHouse 合作的开始,在未来几周内,我们将继续构建尽可能最佳的集成,以处理变更数据捕获事件及其他事件。
以下是我们希望获得社区投票支持的一些领域
- 使用 allow_experimental_object_type=1
- 自动创建物化视图,基于模板
- 跨多个表流式传输 ACID 事务
- 单条记录转换
- 多条记录转换(拆分、连接、聚合)
- 精确一次
希望此连接器能让您像我们一样更轻松地享受 ClickHouse 的好处。
Streamkap 和 ClickHouse 都提供免费试用版;您可以在 Streamkap.com 和 ClickHouse.com 注册。