跳到主要内容
跳到主要内容
编辑此页面

从 DynamoDB 到 ClickHouse 的 CDC

实验性功能。了解更多。

本页介绍如何使用 ClickPipes 设置从 DynamoDB 到 ClickHouse 的 CDC。此集成包含 2 个组件

  1. 通过 S3 ClickPipes 进行初始快照
  2. 通过 Kinesis ClickPipes 进行实时更新

数据将摄取到 ReplacingMergeTree 中。此表引擎常用于 CDC 场景,以允许应用更新操作。有关此模式的更多信息,请参阅以下博客文章

1. 设置 Kinesis Stream

首先,您需要为您的 DynamoDB 表启用 Kinesis 流,以实时捕获更改。我们希望在创建快照之前执行此操作,以避免丢失任何数据。在此处找到 AWS 指南 here

DynamoDB Kinesis Stream

2. 创建快照

接下来,我们将创建 DynamoDB 表的快照。这可以通过 AWS 导出到 S3 来实现。在此处找到 AWS 指南 here您需要以 DynamoDB JSON 格式执行“完整导出”。

DynamoDB S3 Export

3. 将快照加载到 ClickHouse

创建必要的表

来自 DynamoDB 的快照数据将如下所示

{
"age": {
"N": "26"
},
"first_name": {
"S": "sally"
},
"id": {
"S": "0A556908-F72B-4BE6-9048-9E60715358D4"
}
}

请注意,数据采用嵌套格式。我们需要在将此数据加载到 ClickHouse 之前对其进行扁平化处理。这可以使用 ClickHouse 中的 JSONExtract 函数在物化视图中完成。

我们将要创建三个表

  1. 一个用于存储来自 DynamoDB 的原始数据的表
  2. 一个用于存储最终扁平化数据的表(目标表)
  3. 一个用于扁平化数据的物化视图

对于上面的示例 DynamoDB 数据,ClickHouse 表将如下所示

/* Snapshot table */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
`item` String
)
ORDER BY tuple();

/* Table for final flattened data */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
JSONExtractString(item, 'id', 'S') AS id,
JSONExtractInt(item, 'age', 'N') AS age,
JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";

/* Table for final flattened data */
CREATE TABLE IF NOT EXISTS "default"."destination" (
"id" String,
"first_name" String,
"age" Int8,
"version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;

目标表有一些要求

  • 此表必须是 ReplacingMergeTree
  • 该表必须具有 version
    • 在后续步骤中,我们将把来自 Kinesis 流的 ApproximateCreationDateTime 字段映射到 version 列。
  • 该表应使用分区键作为排序键(由 ORDER BY 指定)
    • 具有相同排序键的行将根据 version 列进行去重。

创建快照 ClickPipe

现在您可以创建一个 ClickPipe,以将快照数据从 S3 加载到 ClickHouse。请按照 S3 ClickPipe 指南 here,但使用以下设置

  • 摄取路径:您需要找到 S3 中导出的 json 文件的路径。路径将如下所示
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
  • 格式:JSONEachRow
  • :您的快照表(例如,上面示例中的 default.snapshot

创建后,数据将开始填充到快照表和目标表中。您无需等待快照加载完成即可继续下一步。

4. 创建 Kinesis ClickPipe

现在我们可以设置 Kinesis ClickPipe 以捕获来自 Kinesis 流的实时更改。请按照 Kinesis ClickPipe 指南 here,但使用以下设置

  • :步骤 1 中使用的 Kinesis 流
  • :您的目标表(例如,上面示例中的 default.destination
  • 扁平化对象:true
  • 列映射:
    • ApproximateCreationDateTime: version
    • 将其他字段映射到相应的目标列,如下所示

DynamoDB Map Columns

5. 清理(可选)

快照 ClickPipe 完成后,您可以删除快照表和物化视图。

DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";