跳至主要内容
跳至主要内容

CDC 从 DynamoDB 到 ClickHouse

本页介绍如何使用 ClickPipes 设置从 DynamoDB 到 ClickHouse 的 CDC。 此集成包含 2 个组件

  1. 通过 S3 ClickPipes 进行初始快照
  2. 通过 Kinesis ClickPipes 进行实时更新

数据将被摄取到 ReplacingMergeTree 中。 此表引擎通常用于 CDC 场景,以允许应用更新操作。 更多关于此模式的信息可以在以下博客文章中找到

1. 设置 Kinesis 流

首先,您需要在 DynamoDB 表上启用 Kinesis 流,以实时捕获更改。 我们希望在创建快照之前执行此操作,以避免丢失任何数据。 请参阅 AWS 指南,网址位于 此处

2. 创建快照

接下来,我们将创建 DynamoDB 表的快照。 这可以通过 AWS 导出到 S3 来实现。 请参阅 AWS 指南,网址位于 此处您需要执行“完全导出”并使用 DynamoDB JSON 格式。

3. 将快照加载到 ClickHouse

创建必要的表

来自 DynamoDB 的快照数据如下所示

{
  "age": {
    "N": "26"
  },
  "first_name": {
    "S": "sally"
  },
  "id": {
    "S": "0A556908-F72B-4BE6-9048-9E60715358D4"
  }
}

请注意,数据采用嵌套格式。 在将其加载到 ClickHouse 之前,我们需要展平此数据。 这可以使用 ClickHouse 中的 JSONExtract 函数在物化视图中完成。

我们希望创建三个表

  1. 一个表用于存储来自 DynamoDB 的原始数据
  2. 一个表用于存储最终展平的数据(目标表)
  3. 一个物化视图用于展平数据

对于上面的示例 DynamoDB 数据,ClickHouse 表如下所示

/* Snapshot table */
CREATE TABLE IF NOT EXISTS "default"."snapshot"
(
    `item` String
)
ORDER BY tuple();

/* Table for final flattened data */
CREATE MATERIALIZED VIEW IF NOT EXISTS "default"."snapshot_mv" TO "default"."destination" AS
SELECT
    JSONExtractString(item, 'id', 'S') AS id,
    JSONExtractInt(item, 'age', 'N') AS age,
    JSONExtractString(item, 'first_name', 'S') AS first_name
FROM "default"."snapshot";

/* Table for final flattened data */
CREATE TABLE IF NOT EXISTS "default"."destination" (
    "id" String,
    "first_name" String,
    "age" Int8,
    "version" Int64
)
ENGINE ReplacingMergeTree("version")
ORDER BY id;

目标表有一些要求

  • 此表必须是 ReplacingMergeTree
  • 该表必须具有 version
    • 在后续步骤中,我们将 Kinesis 流中的 ApproximateCreationDateTime 字段映射到 version 列。
  • 表应使用分区键作为排序键(由 ORDER BY 指定)
    • 具有相同排序键的行将基于 version 列进行去重。

创建快照 ClickPipe

现在您可以创建一个 ClickPipe,将快照数据从 S3 加载到 ClickHouse。 请按照 S3 ClickPipe 指南 此处,但使用以下设置

  • 摄取路径:您需要找到 S3 中导出的 json 文件的路径。 路径如下所示
https://{bucket}.s3.amazonaws.com/{prefix}/AWSDynamoDB/{export-id}/data/*
  • 格式:JSONEachRow
  • :您的快照表(例如,上面的示例中的 default.snapshot

创建后,数据将开始填充到快照表和目标表中。 您无需等待快照加载完成即可继续下一步。

4. 创建 Kinesis ClickPipe

现在我们可以设置 Kinesis ClickPipe,以捕获来自 Kinesis 流的实时更改。 请按照 Kinesis ClickPipe 指南 此处,但使用以下设置

  • :步骤 1 中使用的 Kinesis 流
  • :您的目标表(例如,上面的示例中的 default.destination
  • 展平对象:true
  • 列映射:
    • ApproximateCreationDateTimeversion
    • 将其他字段映射到适当的目标列,如下所示

5. 清理(可选)

快照 ClickPipe 完成后,您可以删除快照表和物化视图。

DROP TABLE IF EXISTS "default"."snapshot";
DROP TABLE IF EXISTS "default"."snapshot_clickpipes_error";
DROP VIEW IF EXISTS "default"."snapshot_mv";
    © . This site is unofficial and not affiliated with ClickHouse, Inc.