从 DynamoDB 到 ClickHouse 的 CDC
本页介绍如何使用 ClickPipes 从 DynamoDB 到 ClickHouse 设置 CDC。此集成包含 2 个组件
- 通过 S3 ClickPipes 进行初始快照
- 通过 Kinesis ClickPipes 进行实时更新
数据将被导入到 ReplacingMergeTree
中。此表引擎通常用于 CDC 场景,以允许应用更新操作。有关此模式的更多信息,请参阅以下博文文章
- 使用 PostgreSQL 和 ClickHouse 进行变更数据捕获 (CDC) - 第 1 部分
- 使用 PostgreSQL 和 ClickHouse 进行变更数据捕获 (CDC) - 第 2 部分
1. 设置 Kinesis 流
首先,您需要在 DynamoDB 表上启用 Kinesis 流,以实时捕获更改。我们在创建快照之前执行此操作,以避免丢失任何数据。请查找位于 此处 的 AWS 指南。
2. 创建快照
接下来,我们将创建 DynamoDB 表的快照。这可以通过 AWS 导出到 S3 来实现。请查找位于 此处 的 AWS 指南。**您需要在 DynamoDB JSON 格式中进行“完全导出”。**
3. 将快照加载到 ClickHouse
创建必要的表
来自 DynamoDB 的快照数据将类似于此
{
"age": {
"N": "26"
},
"first_name": {
"S": "sally"
},
"id": {
"S": "0A556908-F72B-4BE6-9048-9E60715358D4"
}
}
注意数据采用嵌套格式。在将其加载到 ClickHouse 之前,我们需要扁平化此数据。这可以通过在物化视图中使用 ClickHouse 中的 JSONExtract
函数来完成。
我们需要创建三个表
- 一个用于存储来自 DynamoDB 的原始数据的表
- 一个用于存储最终扁平化数据的表(目标表)
- 一个用于扁平化数据的物化视图
对于上面的示例 DynamoDB 数据,ClickHouse 表将如下所示
/* Snapshot table */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
`item` String
)
ORDER BY tuple();
/* Table for final flattened data */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
JSONExtractString(item, 'id', 'S') AS id,
JSONExtractInt(item, 'age', 'N') AS age,
JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";
/* Table for final flattened data */
CREATE TABLE IF NOT EXISTS "default"."destination" (
"id" String,
"first_name" String,
"age" Int8,
"version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;
目标表有一些要求
- 此表必须是
ReplacingMergeTree
表 - 该表必须包含一个
version
列- 在后面的步骤中,我们将 Kinesis 流中的
ApproximateCreationDateTime
字段映射到version
列。
- 在后面的步骤中,我们将 Kinesis 流中的
- 该表应使用分区键作为排序键(由
ORDER BY
指定)- 具有相同排序键的行将根据
version
列进行去重。
- 具有相同排序键的行将根据
创建快照 ClickPipe
现在,您可以创建 ClickPipe 将快照数据从 S3 加载到 ClickHouse。请遵循 S3 ClickPipe 指南 此处,但使用以下设置
- 导入路径:您需要找到 S3 中导出的 json 文件的路径。该路径将类似于以下内容
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
- 格式:JSONEachRow
- 表:您的快照表(例如,上面的示例中的
default.snapshot
)
创建完成后,数据将开始填充到快照表和目标表中。您无需等待快照加载完成,即可继续执行下一步。
4. 创建 Kinesis ClickPipe
现在,我们可以设置 Kinesis ClickPipe 来捕获来自 Kinesis 流的实时更改。请遵循 Kinesis ClickPipe 指南 此处,但使用以下设置
- 流:步骤 1 中使用的 Kinesis 流
- 表:您的目标表(例如,上面的示例中的
default.destination
) - 扁平化对象:true
- 列映射:
ApproximateCreationDateTime
:version
- 将其他字段映射到如下所示的相应目标列
5. 清理(可选)
快照 ClickPipe 完成后,您可以删除快照表和物化视图。
DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";