博客 / 用户案例

Streamkap:ClickHouse 的开箱即用 CDC 解决方案

Favicon
Streamkap
2024 年 2 月 22 日 - 19 分钟阅读

简介

今天,我们欢迎来自我们的技术合作伙伴 Streamkap 的客座文章,Streamkap 是一个用于 ClickHouse 的开箱即用变更数据捕获 (CDC) 解决方案。这篇博客深入探讨了构建此类产品的细节和挑战。对于那些只想为 ClickHouse 获得可用的开箱即用 CDC 解决方案的读者,我们很高兴推荐 Streamkap 作为托管服务。

我们很高兴宣布我们新的 ClickHouse 连接器,用于将 CDC 数据从 PostgreSQL、MySQL、SQL Server、Oracle 和 MongoDB 等数据库流式传输到 ClickHouse

Streamkap 最近切换到 ClickHouse,以实时处理我们所有的日志和指标,此前我们发现其他解决方案未能达到我们要求的查询性能。在我们自己采用 ClickHouse 后,我们希望开始提供 ClickHouse CDC 集成,但发现现有连接器存在问题,因此我们着手构建一个新的连接器来解决这些问题。

在这篇文章中,我们假设您熟悉 ClickHouse 数据库和变更数据捕获 (CDC) 的概念,如果您不熟悉,可以通过阅读 流式 ETL 中的变更数据捕获来了解更多信息。

我们将深入探讨为 ClickHouse 构建 CDC 解决方案的挑战以及我们如何应对这些挑战,讨论我们如何处理模式演变、数据一致性和快照。最后,我们将展示如何在保持高性能流式管道的同时实现这一切。

技术

ClickHouse 是一个开源的面向列的数据库。面向列的结构意味着数据按列而不是按行存储和检索。ClickHouse 已成为构建实时应用程序的事实标准选择,因为它能够摄取大量数据并在写入时而不是在读取时物化数据。这大大加快了查询速度,使 ClickHouse 适用于服务实时应用程序。

Streamkap 是一个无服务器流式平台,支持将实时变更数据捕获 (CDC) 摄取到 ClickHouse 中。在底层,KafkaDebeziumFlink 等技术与生产级连接器/管道相结合。

以下是 Streamkap 如何从数据库流式传输到 ClickHouse 的概述。

streamkap_architecture.png

挑战

当我们最初希望将 CDC 数据流式传输到 ClickHouse 时,我们开始寻找可以使用的现有连接器。在审查了官方的 ClickHouse Kafka Connect 连接器以及市场上的其他连接器后,我们很快意识到我们需要对它们进行大量修改才能支持各种用例。意识到这些连接器需要进行大量修改,我们着手构建我们自己的解决方案。以下是我们需要确保解决的一些关键要求,然后我们才能将我们的解决方案投入生产。

数据类型

现有解决方案对数据类型的支持不佳

  • 嵌套结构体
  • 嵌套数组,包含嵌套结构体的数组
  • 具有微秒精度的 Timestamp
  • 具有微秒精度的时间
  • 没有时间信息的日期(自 epoch 以来的天数)
  • JSON 作为纯字符串字段传输

元数据

在处理 CDC 数据时,添加额外的元数据列(例如时间戳和 CDC 记录类型)非常有用。这允许更简单、更强大的摄取后转换,以及诊断任何延迟问题。

插入/更新插入

在 Streamkap,我们看到希望使用插入或更新插入的客户数量均分。插入是仅追加模式,因此维护所有更改的历史记录,而更新插入仅显示最终数据(插入 + 更新)。虽然大多数公司都习惯了批量 ETL 的这种能力,但当与流式 ETL 结合使用时,这是一个新概念。在 批量处理与实时流处理中了解更多信息

模式演变

当源表更改时,我们需要更新目标表以处理此模式漂移,并且不会导致管道中断。

半结构化数据

MongoDB/Elasticsearch 等源允许复杂嵌套记录结构中存在不一致性,这些不一致性需要在摄取管道插入 ClickHouse 之前进行协调。例如

  • 在某些记录中,Date/Time 表示为数字(自 epoch 以来的秒/毫秒),而在其他记录中表示为字符串(ISO 格式)
  • 在某些记录中,嵌套字段是字符串,而在其他记录中是更复杂的嵌套结构体
  • 深度嵌套的复杂半结构化数据通常需要在插入 ClickHouse 之前进行预处理,并映射到适当的类型,例如 Tuple、Nested。

我们的方法

现在让我们深入了解我们的连接器以及我们如何应对每个挑战。

数据类型

我们发现默认方法通常只是将数据作为 JSON 插入 ClickHouse,然后在加载后转换数据。

我们内置了对以下数据类型的支持

Kafka Connect 数据类型ClickHouse 数据类型
INT8Int8
INT16Int16
INT32Int32
INT64Int64
FLOAT32Float32
FLOAT64Float64
BOOLEANBool
BYTESBLOB (String)
STRINGString
org.apache.kafka.connect.data.DecimalDECIMAL(38, 0)
org.apache.kafka.connect.data.Timestampio.debezium.time.ZonedTimestampDateTime64
org.apache.kafka.connect.data.DateDate
io.debezium.data.JsonString
STRUCTTuple
ARRAYArray

JSON 字段目前作为字符串摄取,allow_experimental_object_type=1 的使用目前正在测试中。

元数据

连接器为每次插入 ClickHouse 表添加额外的键列,以便更好地进行加载后分析和建模,并支持更新插入。

以下元数据列被添加到每个 ClickHouse 表中

  • _streamkap_ts_ms:CDC 事件时间戳
  • _streamkap_deleted:如果当前的 CDC 事件是删除事件
  • _streamkap_partition:smallint,表示通过对源记录键字段应用一致哈希获得的内部 Streamkap 分区号
  • _streamkap_source_ts_ms:更改事件在源数据库中发生的时间戳
  • _streamkap_op:CDC 事件操作类型(c 插入,u 更新,d 删除,r 快照,t 截断)

插入/更新插入

Streamkap 连接器支持两种数据摄取到 ClickHouse 的模式:插入(追加)和 更新插入

更新插入模式是我们连接器的默认模式,当 ClickHouse 表需要包含源数据的最新版本时使用。

插入(追加)模式

插入模式会导致每个更改都被跟踪并作为新行插入 ClickHouse 中,而删除事件将在 ClickHouse 中使用元值 _streamkap_deleted 标记为已删除。

这对于较大的数据量很有用,可以保持低延迟并维护更改历史记录。

例如,Streamkap 在收集我们的指标时使用插入模式,因为只插入不可变数据。

然后我们使用指标表之上的 物化视图,以便在摄取时为时间序列分析创建多个聚合。在此表上设置合适的 TTL,以便 ClickHouse 为我们处理删除操作,同时提供足够的历史数据来调查任何问题,或者如果我们必须出于某种原因重建物化视图。

要使用插入(追加)模式,请使用 ClickHouse 引擎 MergeTree

appends_streamkap.png

更新插入模式

更新插入是插入和更新的组合。如果在行的主键上找到匹配项,则该值将被覆盖。相反,如果没有匹配项,则将插入该事件。

更新插入模式是使用 ClickHouse 的 ReplacingMergeTree 引擎实现的。

ReplacingMergeTree 引擎在基于排序键的定期后台合并期间对数据进行重复数据删除,从而可以清理旧记录。此过程的异步性质意味着可能会有一个小窗口,您在该窗口中仍然保留视图中的旧记录。因此,查询必须使用 FINAL 修饰符来确保返回数据的最新版本,然后这将对查询时剩余的任何相同记录执行重复数据删除。

具有基本类型的更新插入示例

此处以 JSON 格式显示了更新插入的输入记录。键只有一个字段 id,它是将对其行进行重复数据删除的主键

{
    "id": "123456hYCcEM62894000000000",
    "str_col": "some-str-values-000000000",
    "IntColumn": 123000,
    "Int8": 0,
    "InT16": 10,
    "bool_col": true,
    "double_col": 1.7976931348623157E308,
    "json_col": "{\"a\": 0}",
    "__deleted": false,
    "created_at": 1707379532748,
    "date_col": 19761,
    "ts_tz": "2023-10-24T15:19:51Z",
    "_streamkap_ts_ms": 1707379532748,
    "binary_col": "AQIDBAU=",
    "byte_buf": "AQIDBAU=",
    "bigint_col": "E4f/////0tCeAA=="
}

生成的表

SHOW CREATE TABLE streamkap_test_nominal_upsert
FORMAT Vertical

Query id: 1abf2898-69b3-4785-a849-65c3879493bb

Row 1:
──────
statement: CREATE TABLE streamkap.streamkap_test_nominal_upsert
(
    `id` String COMMENT 'id',
    `str_col` String COMMENT 'str_col',
    `IntColumn` Int32 COMMENT 'IntColumn',
    `Int8` Int8 COMMENT 'Int8',
    `InT16` Int16 COMMENT 'InT16',
    `bool_col` Bool COMMENT 'bool_col',
    `double_col` Float64 COMMENT 'double_col',
    `json_col` String COMMENT 'json_col',
    `__deleted` Bool COMMENT '__deleted',
    `created_at` DateTime64(3) COMMENT 'created_at',
    `date_col` Date COMMENT 'date_col',
    `ts_tz` DateTime64(3) COMMENT 'ts_tz',
    `_streamkap_ts_ms` Int64 COMMENT '_streamkap_ts_ms',
    `binary_col` String COMMENT 'binary_col',
    `byte_buf` String COMMENT 'byte_buf',
    `bigint_col` Decimal(38, 0) COMMENT 'bigint_col',
    `_streamkap_partition` Int32 COMMENT '_streamkap_partition',
    `_streamkap_deleted` UInt8 MATERIALIZED if(__deleted = true, 1, 0)
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}', _streamkap_ts_ms, _streamkap_deleted)
PARTITION BY _streamkap_partition
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192

示例数据

SELECT *
FROM streamkap_test_nominal_upsert
FORMAT Vertical

Row 1:
──────
id:                   123456hYCcEM62894000000000
str_col:              some-str-values-000000000
IntColumn:            123000
Int8:                 0
InT16:                10
bool_col:             true
double_col:           1.7976931348623157e308
json_col:             {"a": 0}
__deleted:            false
created_at:           2024-02-08 08:03:37.368
date_col:             2024-02-08
ts_tz:                2023-10-24 15:19:51.000
_streamkap_ts_ms:     1707379417368
binary_col:
byte_buf:
bigint_col:           92233720368547000000000
_streamkap_partition: 0

Row 2:
──────
id:                   123456hYCcEM62894000000000
str_col:              some-str-values-000000000
IntColumn:            123000
Int8:                 0
InT16:                10
bool_col:             true
double_col:           1.7976931348623157e308
json_col:             {"a": 0}
__deleted:            false
created_at:           2024-02-08 08:03:41.608
date_col:             2024-02-08
ts_tz:                2023-10-24 15:19:51.000
_streamkap_ts_ms:     1707379421608
binary_col:           java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf:             java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col:           92233720368547000000000
_streamkap_partition: 0

使用 FINAL 的重复数据删除数据

SELECT *
FROM streamkap_test_nominal_upsert
FINAL
FORMAT Vertical

Row 1:
──────
id:                   123456hYCcEM62894000000000
str_col:              some-str-values-000000000
IntColumn:            123000
Int8:                 0
InT16:                10
bool_col:             true
double_col:           1.7976931348623157e308
json_col:             {"a": 0}
__deleted:            false
created_at:           2024-02-08 08:03:41.608
date_col:             2024-02-08
ts_tz:                2023-10-24 15:19:51.000
_streamkap_ts_ms:     1707379421608
binary_col:           java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf:             java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col:           92233720368547000000000
_streamkap_partition: 0

处理半结构化数据

嵌套数组和结构体

下面,我们提供了一些示例,说明如何将复杂结构自动映射到 ClickHouse 类型。

为了支持包含结构体的数组,我们需要更改 Streamkap 在 ClickHouse 中的角色,将 flatten_nested 设置为 0

ALTER ROLE STREAMKAP_ROLE SETTINGS flatten_nested = 0;

包含子数组的嵌套结构体字段

此处以 JSON 格式显示了输入记录,其中键只有一个字段 id

{
	"id": 1,
	"obj": {
		"nb": 123,
		"str": "abc",
		"sub_arr": [
			{
				"sub_nb": 789,
				"sub_str": "mnp"
			}
		]	
	}
}

生成的表。请注意 obj 列是如何映射到 Tuple(nb Int32, str String, sub_arr Array(Tuple(n Int32, s String)), sub_arr_str Array(String)) 以处理复杂结构的

SHOW CREATE TABLE chdb.streamkap_nested_struct_with_array

CREATE TABLE chdb.streamkap_nested_struct_with_array
(
    `obj` Tuple(nb Int32, str String, sub_arr Array(Tuple(n Int32, s String)), sub_arr_str Array(String)) COMMENT 'obj',
    `__deleted` Bool COMMENT '__deleted',
    `_streamkap_ts_ms` Int64 COMMENT '_streamkap_ts_ms',
    `_streamkap_partition` Int32 COMMENT '_streamkap_partition',
    `id` Int32 COMMENT 'id',
    `_streamkap_deleted` UInt8 MATERIALIZED if(__deleted = true, 1, 0)
)
ENGINE = ReplacingMergeTree(_streamkap_ts_ms, _streamkap_deleted)
PARTITION BY _streamkap_partition
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192 

示例数据

SELECT *
FROM chdb.streamkap_nested_struct_with_array
LIMIT 1 format Vertical

obj:                               (123,'abc',[(789,'mnp')],['efg']) 
__deleted:                         false     
_streamkap_ts_ms:                  1702519029407 
_streamkap_partition:              0 
id:                                1 

包含子结构体的嵌套数组字段

此处以 JSON 格式显示了输入记录,其中键只有一个字段 id

{
    "id": 1,
    "arr": [
        {
            "nb": 123,
            "str": "abc"
        }
    ]
}
SHOW CREATE TABLE streamkap_nested_array_of_struct

CREATE TABLE streamkap.streamkap_nested_array_of_struct
(
    `arr` Array(Tuple(nb Int32, str String)) COMMENT 'arr',
    `__deleted` Bool COMMENT '__deleted',
    `_streamkap_ts_ms` Int64 COMMENT '_streamkap_ts_ms',
    `_streamkap_partition` Int32 COMMENT '_streamkap_partition',
    `id` Int32 COMMENT 'id',
    `_streamkap_deleted` UInt8 MATERIALIZED if(__deleted = true, 1, 0)
)
ENGINE = ReplacingMergeTree( _streamkap_ts_ms, _streamkap_deleted)
PARTITION BY _streamkap_partition
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192

示例数据

SELECT *
FROM streamkap_nested_array_of_struct
LIMIT 1 format Vertical

arr:                            [(123,'abc')]
__deleted:                      false
_streamkap_ts_ms:               1702529856885
_streamkap_partition:           0
id:                             1

快照

快照是指将现有数据从数据库加载到 ClickHouse 的过程。

我们有两种方法可以加载此历史数据。

阻塞快照

阻塞快照的目的是捕获数据库表的整个当前状态,并将使用大型 select 语句来执行此操作。这些也可以并发运行,并且速度非常快。效率方面,阻塞快照可能对系统资源产生更高的影响,特别是对于大型表,并且每个查询可能需要更长的时间。

增量快照

增量快照旨在提高效率,通常对系统资源的影响较低,并且特别适合于非常大的表或希望同时进行快照和流式传输的情况。

数据一致性和交付保证

交付保证主要指的是失败场景,在这些场景中,未确认的 CDC 事件可能会被重播,从而导致重复行插入 ClickHouse。

Streamkap 为 ClickHouse 提供 至少一次 交付保证。

使用插入摄取模式,可能会将一些重复行插入 ClickHouse。但是,通过在您的物化视图中添加重复数据删除代码,将不会有任何影响。

如前所述,对于更新插入摄取模式,我们使用源记录键执行重复数据删除。强制执行 精确一次 交付保证会增加性能损失,而没有任何额外的好处,因为相同的过程会处理重复的 CDC 事件,将一个记录的所有 CDC 事件合并到最终记录状态中。

转换

Streamkap 支持管道中的转换,以便可以将预处理的数据发送到 ClickHouse。

这对于半结构化数据、预处理和清理任务尤其有用。这可能比在摄取后处理数据效率更高。

在 ClickHouse 中自然地对清理后的结构化数据进行实时分析,查询性能得益于将数据转换移至插入时间。

下面,我们介绍 Streamkap 执行的一些常见转换。

修复半结构化数据中的不一致性

考虑修复不一致的半结构化日期字段

"someDateField": {"$date": "2023-08-04T09:12:20.29Z"}
"someDateField": "2023-08-07T08:14:57.817325+00:00"
"someDateField": {"$date": {"$numberLong": 1702853448000}}

使用 Streamkap 转换,所有记录都可以转换为通用格式,以便摄取到 Clickhouse DateTime64 列中

"someDateField": "yyyy-MM-dd HH:mm:ss.SSS"

拆分大型半结构化 JSON 文档

对于文档数据库,子实体可以建模为嵌套在父实体文档中的子数组

{
    "key": "abc1234",
    "array": [
        {
            "id": "11111",
            "someField": "aa-11"
        },
        {
            "id": "22222",
            "someField": "bb-22"
        }
    ]
}

在 ClickHouse 中,将这些子实体表示为单独的行可能更有意义。使用 Streamkap 转换,可以将子实体记录拆分为单独的记录

{
    "id": "11111",
    "parentKey": "abc1234",
    "someField": "aa-11"
}

{
    "id": "22222",
    "parentKey": "abc1234",
    "someField": "bb-22"
}

模式演变

模式演变或漂移处理是更改目标表以反映上游更改的过程。

Streamkap 连接器自动处理以下场景中的模式漂移。

  • 附加列: 将检测到附加字段,并且将在表中创建一个新列以接收新数据。
  • 删除列: 此列现在将被忽略,并且不会采取进一步的操作。
  • 更改列类型:将在表中创建一个附加列,使用后缀表示新类型。例如 ColumnName_type

可以在任何阶段将其他表添加到管道中。我们在下面展示了一些模式演变的示例。

添加列

考虑模式演变之前的以下输入记录

{
    "id": "123456hYCcEM62894000000000",
    "str_col": "some-str-values-000000000",
    "IntColumn": 123000,
    "Int8": 0,
    "InT16": 10,
    "bool_col": true,
    "double_col": 1.7976931348623157E308,
    "json_col": "{\"a\": 0}",
    "binary_col": "AQIDBAU=",
    "byte_buf": "AQIDBAU=",
    "bigint_col": "E4f/////0tCeAA==",
    "__deleted": false,
    "created_at": 1702894985613,
    "ts_tz": "2023-10-24T15:19:51Z",
    "_streamkap_ts_ms": 1702894985613
}

新列 new_double_col 被添加到上游模式中。这会导致 ClickHouse 模式演变

{
    "id": "123456hYCcEM62894xxx",
    "str_col": "some-str-values-000000000",
    "IntColumn": 123000,
    "Int8": 0,
    "InT16": 10,
    "bool_col": true,
    "double_col": 1.7976931348623157E308,
    "json_col": "{\"a\": 0}",
    "binary_col": "AQIDBAU=",
    "byte_buf": "AQIDBAU=",
    "bigint_col": "E4f/////0tCeAA==",
    "__deleted": false,
    "created_at": 1702894985613,
    "ts_tz": "2023-10-24T15:19:51Z",
    "_streamkap_ts_ms": 1702894985613,
    "new_double_col": 1.7976931348623157E308
}

ClickHouse 数据

SELECT
    id,
    new_double_col
FROM streamkap_test_nominal_add_new_column
ORDER BY _streamkap_ts_ms ASC

┌─id─────────────────────────┬─new_double_col─┐
│ 123456hYCcEM62894000000000 │              0 │
└────────────────────────────┴────────────────┘
┌─id───────────────────┬─────────new_double_col─┐
│ 123456hYCcEM62894xxx │ 1.7976931348623157e308 │
└──────────────────────┴────────────────────────┘

将 Int 演变为 String

模式演变之前的输入记录

{
    "id": "123456hYCcEM62894000000000",
. . . 
    "IntColumn": 123000,
. . . 
    "_streamkap_ts_ms": 1702894492041
}

在上游模式演变后摄取的新记录

{
    "id": "123456hYCcEM62894xxx",
. . .
    "IntColumn": "new-str-value",
. . .
}

ClickHouse 数据,在添加新列 IntColumn_str

SELECT
    id,
    IntColumn,
    IntColumn_str
FROM streamkap_test_nominal_evolve_int2string
ORDER BY _streamkap_ts_ms ASC


┌─id─────────────────────────┬─IntColumn─┬─IntColumn_str─┐
│ 123456hYCcEM62894000000000 │    123000 │               │
└────────────────────────────┴───────────┴───────────────┘
┌─id───────────────────┬─IntColumn─┬─IntColumn_str─┐
│ 123456hYCcEM62894xxx │         0new-str-value │
└──────────────────────┴───────────┴───────────────┘

性能

以下 15 分钟的负载测试旨在显示各种批量大小与延迟相关的性能特征。此外,我们将评估 Streamkap ClickHouse 目标连接器的可扩展性。

ClickHouse Cloud 实例详细信息:3 个节点,每个节点 32GiB,具有 8 个 vCPU

输入记录格式包含基本类型、一个中等字符串(约 100 个字符)和一个大字符串(约 1000 个字符)

select * from streamkap_test_nominal_perf limit 1 format Vertical;

id:                   123456hYCcEM62894000000001
str_col:              some-str-values-000000001
IntColumn:            123001
Int8:                 1
InT16:                10
bool_col:             true
double_col:           1.7976931348623157e308
json_col:             {"a": 1}
__deleted:            false
created_at:           1970-01-01 00:00:19.751
ts_tz:                2023-10-24 15:19:51.000
_streamkap_ts_ms:     1706539233685
binary_col:           java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
byte_buf:             java.nio.HeapByteBuffer[pos=0 lim=5 cap=5]
bigint_col:           92233720368547000000001
medium_str:           str-medium-000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001
large_str:            str-large-000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001x000000001
_streamkap_partition: 0

当前测试的摄取模式设置为“更新插入”。当使用“追加”时,吞吐量会稍微好一些,因为不需要一些内存中的重复数据删除逻辑。

基线单分区

使用单个 Streamkap 任务和 Clickhouse 分区以及多个批量大小进行基准测试。

吞吐量

streamkap_throughput.png

每个批量大小的延迟

latency_streamkap.png

通常,回填需要原始吞吐量,而延迟不是问题。在这种情况下,超过 10 万行的大批量大小将更合适。

通常,流式传输更改的吞吐量要求较低,并且可能需要更小的延迟。在这种情况下,较小的批量大小更合适。

这些是具有固定批量大小的人工测试,旨在例证吞吐量和延迟之间的权衡。在实践中,批量大小随内部队列大小而变化。如果队列中有许多记录在等待,则批量大小将增大,因此吞吐量将增大。

可扩展性

使用相同的批量大小进行测试:每个批量大小 100,000 条记录,并逐渐增加任务数:1、2、4 和 8。我们可以看到吞吐量与任务数大致呈线性关系扩展。

streamkap_scalability.png

总结

这只是我们与 ClickHouse 合作的开始,在未来几周内,我们将继续构建尽可能最佳的集成,以处理变更数据捕获事件及其他事件。

以下是我们希望获得社区投票支持的一些领域

  • 使用 allow_experimental_object_type=1
  • 自动创建物化视图,基于模板
  • 跨多个表流式传输 ACID 事务
  • 单条记录转换
  • 多条记录转换(拆分、连接、聚合)
  • 精确一次

希望此连接器能让您像我们一样更轻松地享受 ClickHouse 的好处。

Streamkap 和 ClickHouse 都提供免费试用版;您可以在 Streamkap.comClickHouse.com 注册。

分享这篇文章

订阅我们的新闻通讯

随时了解功能发布、产品路线图、支持和云产品!
正在加载表单...
关注我们
X imageSlack imageGitHub image
Telegram imageMeetup imageRss image
©2025ClickHouse, Inc. 总部位于加利福尼亚州湾区和荷兰阿姆斯特丹。