CDC 从 DynamoDB 到 ClickHouse
本页介绍如何使用 ClickPipes 设置从 DynamoDB 到 ClickHouse 的 CDC。 此集成包含 2 个组件
- 通过 S3 ClickPipes 进行初始快照
- 通过 Kinesis ClickPipes 进行实时更新
数据将被摄取到 ReplacingMergeTree 中。 此表引擎通常用于 CDC 场景,以允许应用更新操作。 更多关于此模式的信息可以在以下博客文章中找到
- 使用 PostgreSQL 和 ClickHouse 进行变更数据捕获 (CDC) - 第 1 部分
- 使用 PostgreSQL 和 ClickHouse 进行变更数据捕获 (CDC) - 第 2 部分
1. 设置 Kinesis 流
首先,您需要在 DynamoDB 表上启用 Kinesis 流,以实时捕获更改。 我们希望在创建快照之前执行此操作,以避免丢失任何数据。 请参阅 AWS 指南,网址位于 此处。
2. 创建快照
接下来,我们将创建 DynamoDB 表的快照。 这可以通过 AWS 导出到 S3 来实现。 请参阅 AWS 指南,网址位于 此处。 您需要执行“完全导出”并使用 DynamoDB JSON 格式。
3. 将快照加载到 ClickHouse
创建必要的表
来自 DynamoDB 的快照数据如下所示
请注意,数据采用嵌套格式。 在将其加载到 ClickHouse 之前,我们需要展平此数据。 这可以使用 ClickHouse 中的 JSONExtract 函数在物化视图中完成。
我们希望创建三个表
- 一个表用于存储来自 DynamoDB 的原始数据
- 一个表用于存储最终展平的数据(目标表)
- 一个物化视图用于展平数据
对于上面的示例 DynamoDB 数据,ClickHouse 表如下所示
目标表有一些要求
- 此表必须是
ReplacingMergeTree表 - 该表必须具有
version列- 在后续步骤中,我们将 Kinesis 流中的
ApproximateCreationDateTime字段映射到version列。
- 在后续步骤中,我们将 Kinesis 流中的
- 表应使用分区键作为排序键(由
ORDER BY指定)- 具有相同排序键的行将基于
version列进行去重。
- 具有相同排序键的行将基于
创建快照 ClickPipe
现在您可以创建一个 ClickPipe,将快照数据从 S3 加载到 ClickHouse。 请按照 S3 ClickPipe 指南 此处,但使用以下设置
- 摄取路径:您需要找到 S3 中导出的 json 文件的路径。 路径如下所示
- 格式:JSONEachRow
- 表:您的快照表(例如,上面的示例中的
default.snapshot)
创建后,数据将开始填充到快照表和目标表中。 您无需等待快照加载完成即可继续下一步。
4. 创建 Kinesis ClickPipe
现在我们可以设置 Kinesis ClickPipe,以捕获来自 Kinesis 流的实时更改。 请按照 Kinesis ClickPipe 指南 此处,但使用以下设置
- 流:步骤 1 中使用的 Kinesis 流
- 表:您的目标表(例如,上面的示例中的
default.destination) - 展平对象:true
- 列映射:
ApproximateCreationDateTime:version- 将其他字段映射到适当的目标列,如下所示
5. 清理(可选)
快照 ClickPipe 完成后,您可以删除快照表和物化视图。