简介
今天,我们欢迎我们的技术合作伙伴 Streamkap 的客座文章,Streamkap 是 ClickHouse 的开箱即用变更数据捕获 (CDC) 解决方案。本博文深入探讨了构建此类产品的细节和挑战。对于那些只想要 ClickHouse 的开箱即用 CDC 解决方案的用户,我们很高兴推荐 Streamkap 作为托管服务。
我们很高兴地宣布我们为 ClickHouse 推出新的连接器,用于将从 PostgreSQL、MySQL、SQL Server、Oracle 和 MongoDB 等数据库流式传输的 CDC 数据流式传输到 ClickHouse。
在发现其他解决方案无法达到我们所需的查询性能后,Streamkap 最近转向使用 ClickHouse 来实时处理所有日志和指标。在采用 ClickHouse 后,我们希望开始提供 ClickHouse CDC 集成,但发现现有的连接器存在问题,因此我们开始构建一个解决这些问题的全新连接器。
在本文中,我们假设您熟悉 ClickHouse 数据库和变更数据捕获 (CDC) 的概念,但如果不熟悉,您可以通过阅读 流式传输中的变更数据捕获 来了解更多信息。
我们将深入探讨为 ClickHouse 构建 CDC 解决方案的挑战以及我们如何解决这些挑战,讨论我们如何处理架构演变、数据一致性和快照。最后,我们将展示如何在保持高性能流式传输管道的同时实现所有这些目标。
技术
ClickHouse 是一个开源的列式数据库。列式结构意味着数据按列存储和检索,而不是按行存储。ClickHouse 由于能够摄取大量数据以及在写入时而不是在读取时物化数据,因此已成为构建实时应用程序的事实标准选择。这将导致查询速度显着提高,从而使 ClickHouse 适用于为实时应用程序提供服务。
Streamkap 是一个无服务器流式传输平台,它支持将实时变更数据捕获 (CDC) 摄取到 ClickHouse。在幕后,诸如 Kafka、Debezium 和 Flink 等技术与生产级连接器/管道相结合。
以下是 Streamkap 如何将数据从数据库流式传输到 ClickHouse 的概述。
挑战
当我们首次尝试将 CDC 数据流式传输到 ClickHouse 时,我们开始寻找可以使用的现有连接器。在审查了官方 ClickHouse Kafka Connect 连接器以及市场上的其他连接器后,我们很快意识到我们需要对它们进行大量修改才能支持不同的用例。意识到这些连接器需要进行广泛的修改,我们开始构建自己的解决方案。以下是一些关键要求,我们需要确保在将我们的解决方案投入生产之前解决这些要求。
数据类型
现有解决方案对数据类型的支持不佳
- 嵌套结构
- 嵌套数组,包含嵌套结构的数组
- 带微秒精度的 timestamps
- 带微秒精度的 Time
- 没有时间信息的日期(自纪元以来的天数)
- 作为普通字符串字段传输的 JSON
元数据
在处理 CDC 数据时,添加其他元数据列(例如时间戳和 CDC 记录类型)很有帮助。这允许更简单和更强大的摄取后转换以及诊断任何延迟问题。
插入/更新
在 Streamkap,我们看到希望使用插入或更新的客户人数相等。插入是追加模式,因此维护所有更改的历史记录,而更新只会导致最终数据可见(插入 + 更新)。虽然大多数公司习惯于在批处理 ETL 中使用此功能,但在与流式传输 ETL 结合使用时,它是一个新概念。了解有关 批处理与实时处理 的更多信息
架构演变
当源表发生更改时,我们需要更新目标表以处理此架构漂移,而不会导致管道中断。
半结构化数据
像 MongoDB/Elasticsearch 这样的源允许在复杂的嵌套记录结构中存在不一致,这些不一致需要在摄取管道中进行协调,然后再插入 ClickHouse。例如
- 日期/时间以数字表示(自纪元以来的秒数/毫秒数)的一些记录,以及以字符串形式表示(ISO 格式)的其他记录
- 在某些记录中是字符串,而在其他记录中是更复杂的嵌套结构的嵌套字段
- 深度嵌套的复杂半结构化数据通常需要在插入 ClickHouse 之前进行预处理,并映射到适当的类型,例如元组、嵌套。
我们的方法
现在让我们深入了解我们的连接器以及我们如何解决每个挑战。
数据类型
我们发现默认方法通常是将数据作为 JSON 插入 ClickHouse,然后在加载后转换数据。
我们已内置支持以下数据类型
Kafka Connect 数据类型 | ClickHouse 数据类型 |
---|---|
INT8 | Int8 |
INT16 | Int16 |
INT32 | Int32 |
INT64 | Int64 |
FLOAT32 | Float32 |
FLOAT64 | Float64 |
BOOLEAN | Bool |
BYTES | BLOB (String) |
STRING | String |
org.apache.kafka.connect.data.Decimal | DECIMAL(38, 0) |
org.apache.kafka.connect.data.Timestampio.debezium.time.ZonedTimestamp | DateTime64 |
org.apache.kafka.connect.data.Date | Date |
io.debezium.data.Json | String |
STRUCT | Tuple |
ARRAY | Array |
JSON 字段当前被摄取为字符串,allow_experimental_object_type=1
的使用目前正在测试中。
元数据
连接器在每次插入 ClickHouse 表时添加额外的键列,以便在加载后更好地进行分析和建模,以及支持更新。
以下元数据列将添加到每个 ClickHouse 表中
_streamkap_ts_ms
:CDC 事件时间戳_streamkap_deleted
:如果当前 CDC 事件是删除事件_streamkap_partition
:通过对源记录键字段应用一致性哈希获得的表示 Streamkap 内部分区号的小整数_streamkap_source_ts_ms
:更改事件在源数据库中发生的时间戳_streamkap_op
:CDC 事件操作类型(c 插入、u 更新、d 删除、r 快照、t 截断)
插入/更新
Streamkap 连接器支持两种将数据导入 ClickHouse 的模式:插入(追加)和更新。
更新模式是我们的连接器的默认模式,当 ClickHouse 表需要包含源数据的最新版本时使用。
插入(追加)模式
插入模式会导致每个更改都被跟踪并作为新行插入到 ClickHouse 中,而删除事件将使用元值 _streamkap_deleted
在 ClickHouse 中被标记为已删除。
这对大容量数据很有用,可以保持低延迟并维护更改历史记录。
例如,Streamkap 在收集我们的指标时使用插入模式,因为只插入不可变数据。
然后,我们在指标表之上使用 物化视图 在数据导入时创建多个用于时间序列分析的聚合。在此表上设置合适的 TTL,以便 ClickHouse 为我们处理删除操作,同时提供足够的历史数据以调查任何问题,或者如果出于某种原因我们必须重建物化视图。
要使用插入(追加)模式,可以使用 ClickHouse 引擎 MergeTree。
更新模式
更新是插入和更新的组合。如果在行的主键上匹配,则将覆盖该值。相反,如果不存在匹配项,则将插入该事件。
更新模式是使用 ClickHouse 的 ReplacingMergeTree 引擎实现的。
ReplacingMergeTree 引擎在定期后台合并期间根据排序键对数据进行去重,允许清理旧记录。此过程的异步性质意味着可能存在一个小窗口,在此窗口内您将保留视图中的旧记录。因此,查询必须使用 FINAL 修饰符以确保返回数据的最新版本,然后将在查询时执行任何剩余相同记录的去重。
使用基本类型的更新示例
这里以 JSON 格式显示了更新的输入记录。该键只有一个字段 id
,它是将在其上对行进行去重的主键。
{
"id": "123456hYCcEM62894000000000",
"str_col": "some-str-values-000000000",
"IntColumn": 123000,
"Int8": 0,
"InT16": 10,
"bool_col": true,
"double_col": 1.7976931348623157E308,
"json_col": "{\"a\": 0}",
"__deleted": false,
"created_at": 1707379532748,
"date_col": 19761,
"ts_tz": "2023-10-24T15:19:51Z",
"_streamkap_ts_ms": 1707379532748,
"binary_col": "AQIDBAU=",
"byte_buf": "AQIDBAU=",
"bigint_col": "E4f/////0tCeAA=="
}
结果表
SHOW CREATE TABLE streamkap_test_nominal_upsert
FORMAT Vertical
Query id: 1abf2898-69b3-4785-a849-65c3879493bb
Row 1:
──────
statement: CREATE TABLE streamkap.streamkap_test_nominal_upsert
(
`id` String COMMENT 'id',
`str_col` String COMMENT 'str_col',
`IntColumn` Int32 COMMENT 'IntColumn',
`Int8` Int8 COMMENT 'Int8',
`InT16` Int16 COMMENT 'InT16',
`bool_col` Bool COMMENT 'bool_col',
`double_col` Float64 COMMENT 'double_col',
`json_col` String COMMENT 'json_col',
`__deleted` Bool COMMENT '__deleted',
`created_at` DateTime64(3) COMMENT 'created_at',
`date_col` Date COMMENT 'date_col',
`ts_tz` DateTime64(3) COMMENT 'ts_tz',
`_streamkap_ts_ms` Int64 COMMENT '_streamkap_ts_ms',
`binary_col` String COMMENT 'binary_col',
`byte_buf` String COMMENT 'byte_buf',
`bigint_col` Decimal(38, 0) COMMENT 'bigint_col',
`_streamkap_partition` Int32 COMMENT '_streamkap_partition',
`_streamkap_deleted` UInt8 MATERIALIZED if(__deleted = true, 1, 0)
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}', _streamkap_ts_ms, _streamkap_deleted)
PARTITION BY _streamkap_partition
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192
示例数据
SELECT *
FROM streamkap_test_nominal_upsert
FORMAT Vertical
Row 1:
──────
id: 123456hYCcEM62894000000000
str_col: some-str-values-000000000
IntColumn: 123000
Int8: 0
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 0}
__deleted: false
created_at: 2024-02-08 08:03:37.368
date_col: 2024-02-08
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1707379417368
binary_col:
byte_buf:
bigint_col: 92233720368547000000000
_streamkap_partition: 0
Row 2:
──────
id: 123456hYCcEM62894000000000
str_col: some-str-values-000000000
IntColumn: 123000
Int8: 0
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 0}
__deleted: false
created_at: 2024-02-08 08:03:41.608
date_col: 2024-02-08
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1707379421608
binary_col: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col: 92233720368547000000000
_streamkap_partition: 0
使用 FINAL
去重的 数据
SELECT *
FROM streamkap_test_nominal_upsert
FINAL
FORMAT Vertical
Row 1:
──────
id: 123456hYCcEM62894000000000
str_col: some-str-values-000000000
IntColumn: 123000
Int8: 0
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 0}
__deleted: false
created_at: 2024-02-08 08:03:41.608
date_col: 2024-02-08
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1707379421608
binary_col: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col: 92233720368547000000000
_streamkap_partition: 0
处理半结构化数据
嵌套数组 & 结构体
下面,我们提供了一些关于复杂结构如何自动映射到 ClickHouse 类型的示例。
为了支持包含结构体的数组,我们需要更改 Streamkap 在 ClickHouse 中的角色,将 flatten_nested 设置为 0。
ALTER ROLE STREAMKAP_ROLE SETTINGS flatten_nested = 0;
包含子数组的嵌套结构体字段
这里以 JSON 格式显示了输入记录,其中该键只有一个字段 id
。
{
"id": 1,
"obj": {
"nb": 123,
"str": "abc",
"sub_arr": [
{
"sub_nb": 789,
"sub_str": "mnp"
}
]
}
}
结果表。请注意 obj
列是如何映射到 Tuple(nb Int32, str String, sub_arr Array(Tuple(n Int32, s String)), sub_arr_str Array(String))
以处理复杂结构的。
SHOW CREATE TABLE chdb.streamkap_nested_struct_with_array
CREATE TABLE chdb.streamkap_nested_struct_with_array
(
`obj` Tuple(nb Int32, str String, sub_arr Array(Tuple(n Int32, s String)), sub_arr_str Array(String)) COMMENT 'obj',
`__deleted` Bool COMMENT '__deleted',
`_streamkap_ts_ms` Int64 COMMENT '_streamkap_ts_ms',
`_streamkap_partition` Int32 COMMENT '_streamkap_partition',
`id` Int32 COMMENT 'id',
`_streamkap_deleted` UInt8 MATERIALIZED if(__deleted = true, 1, 0)
)
ENGINE = ReplacingMergeTree(_streamkap_ts_ms, _streamkap_deleted)
PARTITION BY _streamkap_partition
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192
示例数据
SELECT *
FROM chdb.streamkap_nested_struct_with_array
LIMIT 1 format Vertical
obj: (123,'abc',[(789,'mnp')],['efg'])
__deleted: false
_streamkap_ts_ms: 1702519029407
_streamkap_partition: 0
id: 1
包含子结构体的嵌套数组字段
这里以 JSON 格式显示了输入记录,其中该键只有一个字段 id
。
{
"id": 1,
"arr": [
{
"nb": 123,
"str": "abc"
}
]
}
SHOW CREATE TABLE streamkap_nested_array_of_struct
CREATE TABLE streamkap.streamkap_nested_array_of_struct
(
`arr` Array(Tuple(nb Int32, str String)) COMMENT 'arr',
`__deleted` Bool COMMENT '__deleted',
`_streamkap_ts_ms` Int64 COMMENT '_streamkap_ts_ms',
`_streamkap_partition` Int32 COMMENT '_streamkap_partition',
`id` Int32 COMMENT 'id',
`_streamkap_deleted` UInt8 MATERIALIZED if(__deleted = true, 1, 0)
)
ENGINE = ReplacingMergeTree( _streamkap_ts_ms, _streamkap_deleted)
PARTITION BY _streamkap_partition
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192
示例数据
SELECT *
FROM streamkap_nested_array_of_struct
LIMIT 1 format Vertical
arr: [(123,'abc')]
__deleted: false
_streamkap_ts_ms: 1702529856885
_streamkap_partition: 0
id: 1
快照
快照是指将数据库中的现有数据加载到 ClickHouse 的过程。
我们有两种方法可以加载这些历史数据。
阻塞快照
阻塞快照的目的是捕获数据库表的整个当前状态,并将使用大型 select 语句来执行此操作。这些语句也可以并发运行,速度非常快。从效率的角度来看,阻塞快照可能会对系统资源产生更大的影响,尤其是在大型表的情况下,并且每个查询可能需要更长时间。
增量快照
增量快照的目标是效率,对系统资源的影响通常较低,特别适合非常大的表或希望同时进行快照和流式传输的情况。
数据一致性和传递保证
传递保证主要指的是故障场景,在这种场景中,未确认的 CDC 事件可能会被重放,导致重复的行插入到 ClickHouse 中。
Streamkap 为 ClickHouse 提供至少一次传递保证。
使用插入导入模式,可能会将一些重复的行插入到 ClickHouse 中。但是,通过在您的物化视图中添加去重代码,将不会有任何影响。
如前所述,对于更新导入模式,我们使用源记录键进行去重。强制执行一次且仅一次传递保证会带来性能损失,而没有任何额外的好处,因为相同的过程会处理重复的 CDC 事件,将一个记录的所有 CDC 事件合并到最终的记录状态中。
转换
Streamkap 支持管道中的转换,以便可以将数据预处理后发送到 ClickHouse。
这对半结构化数据、预处理和清理任务特别有用。这可能比在数据导入后处理数据效率更高。
在 ClickHouse 中自然会对清理后的结构化数据进行实时分析,通过将数据转换移至插入时间,查询性能将得到提升。
下面,我们展示了 Streamkap 执行的一些常见转换。
修复半结构化数据中的不一致
考虑修复不一致的半结构化日期字段。
"someDateField": {"$date": "2023-08-04T09:12:20.29Z"}
"someDateField": "2023-08-07T08:14:57.817325+00:00"
"someDateField": {"$date": {"$numberLong": 1702853448000}}
使用 Streamkap 转换,所有记录都可以转换为用于导入到 Clickhouse 的通用格式 DateTime64 列。
"someDateField": "yyyy-MM-dd HH:mm:ss.SSS"
拆分大型半结构化 JSON 文档
在文档数据库中,子实体可以建模为嵌套在父实体文档中的子数组。
{
"key": "abc1234",
"array": [
{
"id": "11111",
"someField": "aa-11"
},
{
"id": "22222",
"someField": "bb-22"
}
]
}
在 ClickHouse 中,将这些子实体表示为单独的行可能很有意义。使用 Streamkap 转换,可以将子实体记录拆分为单个记录。
{
"id": "11111",
"parentKey": "abc1234",
"someField": "aa-11"
}
{
"id": "22222",
"parentKey": "abc1234",
"someField": "bb-22"
}
架构演变
模式演化或漂移处理是指对目标表进行更改以反映上游更改的过程。
Streamkap 连接器在以下情况下会自动处理模式漂移。
- 添加列:将检测到一个新的字段,并且将在表中创建一个新列以接收新数据。
- 删除列:此列现在将被忽略,不会采取任何进一步的操作。
- 更改列类型:将在表中创建一个新的列,并使用后缀表示新类型。例如:
ColumnName_type
可以在管道的任何阶段添加额外的表。下面我们将展示一些模式演化的示例。
添加列
考虑以下模式演化之前的输入记录。
{
"id": "123456hYCcEM62894000000000",
"str_col": "some-str-values-000000000",
"IntColumn": 123000,
"Int8": 0,
"InT16": 10,
"bool_col": true,
"double_col": 1.7976931348623157E308,
"json_col": "{\"a\": 0}",
"binary_col": "AQIDBAU=",
"byte_buf": "AQIDBAU=",
"bigint_col": "E4f/////0tCeAA==",
"__deleted": false,
"created_at": 1702894985613,
"ts_tz": "2023-10-24T15:19:51Z",
"_streamkap_ts_ms": 1702894985613
}
在流式数据源的模式中添加了一个新列 new_double_col
。这会导致 ClickHouse 模式发生演化。
{
"id": "123456hYCcEM62894xxx",
"str_col": "some-str-values-000000000",
"IntColumn": 123000,
"Int8": 0,
"InT16": 10,
"bool_col": true,
"double_col": 1.7976931348623157E308,
"json_col": "{\"a\": 0}",
"binary_col": "AQIDBAU=",
"byte_buf": "AQIDBAU=",
"bigint_col": "E4f/////0tCeAA==",
"__deleted": false,
"created_at": 1702894985613,
"ts_tz": "2023-10-24T15:19:51Z",
"_streamkap_ts_ms": 1702894985613,
"new_double_col": 1.7976931348623157E308
}
ClickHouse 数据
SELECT
id,
new_double_col
FROM streamkap_test_nominal_add_new_column
ORDER BY _streamkap_ts_ms ASC
┌─id─────────────────────────┬─new_double_col─┐
│ 123456hYCcEM62894000000000 │ 0 │
└────────────────────────────┴────────────────┘
┌─id───────────────────┬─────────new_double_col─┐
│ 123456hYCcEM62894xxx │ 1.7976931348623157e308 │
└──────────────────────┴────────────────────────┘
将 Int 演化为 String
模式演化之前的输入记录。
{
"id": "123456hYCcEM62894000000000",
. . .
"IntColumn": 123000,
. . .
"_streamkap_ts_ms": 1702894492041
}
流式数据源模式演化后导入的新记录。
{
"id": "123456hYCcEM62894xxx",
. . .
"IntColumn": "new-str-value",
. . .
}
ClickHouse 数据,添加了新列 IntColumn_str
之后。
SELECT
id,
IntColumn,
IntColumn_str
FROM streamkap_test_nominal_evolve_int2string
ORDER BY _streamkap_ts_ms ASC
┌─id─────────────────────────┬─IntColumn─┬─IntColumn_str─┐
│ 123456hYCcEM62894000000000 │ 123000 │ │
└────────────────────────────┴───────────┴───────────────┘
┌─id───────────────────┬─IntColumn─┬─IntColumn_str─┐
│ 123456hYCcEM62894xxx │ 0 │ new-str-value │
└──────────────────────┴───────────┴───────────────┘
性能
以下 15 分钟的负载测试旨在展示不同批量大小的性能特征与延迟的关系。此外,我们还将评估 Streamkap ClickHouse 目标连接器的可扩展性。
ClickHouse 云实例详细信息:3 个节点,每个节点 32GiB,8 个 vCPU
输入记录格式包含基本类型、一个中等字符串(约 100 个字符)和一个大型字符串(约 1000 个字符)。
select * from streamkap_test_nominal_perf limit 1 format Vertical;
id: 123456hYCcEM62894000000001
str_col: some-str-values-000000001
IntColumn: 123001
Int8: 1
InT16: 10
bool_col: true
double_col: 1.7976931348623157e308
json_col: {"a": 1}
__deleted: false
created_at: 1970-01-01 00:00:19.751
ts_tz: 2023-10-24 15:19:51.000
_streamkap_ts_ms: 1706539233685
binary_col: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf: java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col: 92233720368547000000001
medium_str: str-medium-000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001
large_str: str-large-000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001
_streamkap_partition: 0
当前测试的导入模式设置为 "upsert"。使用 "append" 时,吞吐量会略微好一些,因为不需要一些内存中的去重逻辑。
基线单分区
使用单个 Streamkap 任务和 Clickhouse 分区,并使用多个批量大小进行基准测试。
吞吐量
每个批量大小的延迟
通常,原始吞吐量是回填所必需的,延迟并不重要。在这种情况下,超过 100k 行的大型批量大小更合适。
通常,流式传输更改所需的吞吐量较低,可能需要更小的延迟。在这种情况下,较小的批量大小更合适。
这些是使用固定批量大小的人工测试,用于说明吞吐量和延迟之间的权衡。实际上,批量大小会随着内部队列大小的变化而变化。如果有许多记录在队列中等待,则批量大小会增加,因此吞吐量也会增加。
可扩展性
使用相同的批量大小进行测试:每个批量大小 100,000 条记录,并逐渐增加任务数量:1、2、4 和 8。我们可以看到,吞吐量随着任务数量的增加而大致线性扩展。
总结
这仅仅是我们与 ClickHouse 合作的开始,在接下来的几周内,我们将继续构建尽可能最好的集成,以处理更改数据捕获事件等等。
以下是一些我们希望获得社区反馈的领域,请投票选择您是否支持。
- 使用 allow_experimental_object_type=1
- 自动创建的物化视图,基于模板
- 跨多个表的流式 ACID 事务
- 单记录转换
- 多记录转换(拆分、联接、聚合)
- 一次且仅一次
希望这款连接器能让您像我们一样轻松享受 ClickHouse 的优势。
Streamkap 和 ClickHouse 都提供免费试用;您可以在 Streamkap.com 和 ClickHouse.com 上注册。