DoubleCloud 即将停止运营。迁移到 ClickHouse 并享受限时免费迁移服务。立即联系我们 ->->

博客 / 工程

PostgreSQL 和 ClickHouse 的变更数据捕获 (CDC) - 第 2 部分

author avatar
Dale McDiarmid
2023 年 6 月 15 日

对于那些希望根据本文中的概念找到完整示例配置的人,请点击这里。感谢我们社区的Leart Beqiraj 的贡献。

简介

继续我们关于为 Postgresql 到 ClickHouse 构建变更捕获控制 (CDC) 管道的系列文章,本文重点介绍构建功能性管道所需的步骤和配置。为此,我们使用一个加载到 Postgres 和 ClickHouse 的示例数据集。将 Postgres 视为真相来源,我们应用插入、更新和删除的混合工作负载。使用由 Debezium、ClickHouse Kafka Connect 和物化视图构成的 CDC 管道,我们可以将这些更改近乎实时地反映到 ClickHouse 的表中。

在我们的示例中,我们使用 ClickHouse Cloud 集群中的开发实例和 AWS Aurora Postgres 实例。但是,这些示例应该在大小相同的自托管集群上可重现。或者,立即开始使用 ClickHouse Cloud 集群,并获得 300 美元的信用额度。让我们来处理基础设施,并开始查询!

示例数据集

在我们的示例数据集中,我们使用流行的英国房产价格数据集。它的大小适中(2800 万行),并且具有易于理解的架构。每行代表过去 20 年英国的房屋销售,包含表示价格、日期和位置的字段。字段的完整描述可以在此处找到。我们将此数据集加载到 Postgres 和 ClickHouse 中,然后对前者进行随机插入、更新和删除。这些更改应由 Debezium 捕获,并用于近乎实时地更新 ClickHouse。

Postgres 架构和数据加载

下面显示了 Postgres 架构。请注意使用序列 id 字段作为主键。虽然主键不是强制性的,但需要额外的 Postgres 配置才能使 Debezium 正常工作。

CREATE TABLE uk_price_paid                                                                                                                                                               (
   id serial,
   price INTEGER,
   date Date,
   postcode1 varchar(8),
   postcode2 varchar(3),
   type varchar(13),
   is_new SMALLINT,
   duration varchar(9),
   addr1 varchar(100),
   addr2 varchar(100),
   street varchar(60),
   locality varchar(35),
   town varchar(35),
   district varchar(40),
   county varchar(35),
   primary key(id)
);

我们以 Postgres 兼容 SQL 格式发布此数据集,以便进行插入,可以从此处下载。加载数据需要一些简单的命令,假设psql 客户端已使用环境变量配置。

wget
https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/postgres/uk_prices.sql.tar.gz
tar -xzvf uk_prices.sql.tar.gz
psql < uk_prices.sql

INSERT 0 10000
INSERT 0 10000
INSERT 0 10000
…

postgres=> SELECT count(*) FROM uk_price_paid;
  count
----------
 27734966
(1 row)

注意:我们使用的是具有八个内核的 Postgres 版本 14.7 的 AWS Aurora 实例。加载这些数据大约需要 10 分钟。

ClickHouse 架构

下面我们展示了我们的文档中使用的架构的修改版本,使用 ReplacingMergeTree。

CREATE TABLE default.uk_price_paid
(
	`id` UInt64,
	`price` UInt32,
	`date` Date,
	`postcode1` LowCardinality(String),
	`postcode2` LowCardinality(String),
	`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
	`is_new` UInt8,
	`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
	`addr1` String,
	`addr2` String,
	`street` LowCardinality(String),
	`locality` LowCardinality(String),
	`town` LowCardinality(String),
	`district` LowCardinality(String),
	`county` LowCardinality(String),
	`version` UInt64,
	`deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)

虽然我们在上面将早期架构迁移到 ClickHouse 类型以进行优化,但 ClickHouse 也可以自动解释原始 Postgres 架构(serial 类型除外)。例如,以下 DDL 可用于使用 Postgres 类型创建表 - 这些类型将自动转换为 ClickHouse 类型,如所示。请注意,我们删除了主键,并将 serial 类型的 id 列手动转换为 Uint64。

CREATE TABLE default.uk_price_paid
(
    `id` UInt64,
    `price` INTEGER,
    `date` Date,
    `postcode1` varchar(8),
    `postcode2` varchar(3),
    `type` varchar(13),
    `is_new` SMALLINT,
    `duration` varchar(9),
    `addr1` varchar(100),
    `addr2` varchar(100),
    `street` varchar(60),
    `locality` varchar(35),
    `town` varchar(35),
    `district` varchar(40),
    `county` varchar(35),
    `version` UInt64,
    `deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)

SHOW CREATE TABLE default.uk_price_paid

CREATE TABLE default.uk_price_paid
(
    `id` UInt64,
    `price` Int32,
    `date` Date,
    `postcode1` String,
    `postcode2` String,
    `type` String,
    `is_new` Int16,
    `duration` String,
    `addr1` String,
    `addr2` String,
    `street` String,
    `locality` String,
    `town` String,
    `district` String,
    `county` String,
    `version` UInt64,
    `deleted` UInt8
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}', version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)
SETTINGS index_granularity = 8192

考虑到我们在之前博客中探讨的关于 ReplacingMergeTree 引擎的概念,这里有一些重要的注意事项。

  • 我们在表定义中使用 version 和 deleted 列。这两列都是可选的。例如,如果您不需要支持删除操作,只需在架构和引擎定义中省略该列。
  • 我们已经为 ORDER BY 子句选择了列,以优化查询访问模式。请注意,id 列是如何在最后指定的,因为我们不希望在分析查询中使用这些列,但仍然需要它提供的唯一性属性 - 尤其是因为我们的地址不会超出街道级别。
  • 我们使用 PRIMARY KEY 子句指定主键,省略 id 列以节省内存,而不会影响我们的常规查询。

配置 CDC 管道

架构概述

我们在之前的博客中介绍的端到端架构,如下所示。

Final CDC schema.png

此架构假设用户拥有一个具有Kafka Connect 框架的 Kafka 实例。在我们的示例中,我们假设用户使用 Confluent Cloud 托管 Kafka,该云会为事件自动创建适当的主题。但是,也支持自托管 Kafka 安装。建议的架构将适用于任何通过 Debezium 生成事件的摄取管道。有关安装 Debezium 的说明以及主题配置的注意事项,请点击这里

如我们在之前文章中所述,用户应确保更改事件按每个唯一 Postgres 行的顺序传递(如果要删除删除事件)。可以通过使用单个分区(Debezium 源的默认值)或使用基于哈希的分区来保证这一点,对于由于吞吐量要求而需要多个分区的情况,可以通过对主键进行哈希来保证这一点。虽然后者的情况应该很少见,但这需要确保特定 Postgres 行的所有更改事件都通过对其主键进行哈希发送到同一个分区。

Kafka 中的日志压缩可用于确保仅保留一行上的最后一个事件,从而最大限度地减少 Kafka 存储大小。这需要在删除操作发生时在 Debezium 中省略 tombstone 事件。为了简化我们的管道,我们禁用了这些事件。用户应该在Kafka Sink中删除这些事件,如果他们需要这个高级功能。

配置 Postgresql

PostgreSQL 连接器可用于独立的 PostgreSQL 服务器或 PostgreSQL 服务器集群,但仅支持在主服务器上使用 - Debezium 连接器无法连接到副本实例。

请注意 Debezium 文档中的以下内容

“如果主服务器故障或被降级,连接器将停止。主服务器恢复后,您可以重新启动连接器。如果另一个 PostgreSQL 服务器已被提升为主服务器,请在重新启动连接器之前调整连接器配置。”

确保 Postgres 实例配置正确

  • 自管理配置详细信息 在此

  • 对于基于云的环境(例如 Amazon RDS),请参见 此处

正如我们在上一篇文章中提到的,我们假设使用输出插件 pgoutput 将数据从 WAL 内部表示转换为 Debezium 可以使用的格式。因此,我们下面的示例使用逻辑复制流模式 pgoutput。这内置于 PostgreSQL 10+ 中。早期版本的用户可以探索使用由 Debezium 社区维护的 decoderbufs 插件,或者 wal2json。我们尚未测试这些配置。

我们建议用户阅读以下关于用户安全性和配置的部分

  • 设置基本权限 - 我们在下面的示例中使用 postgres 超级用户。不建议在生产环境中使用。

  • 创建出版物的权限 - Debezium 从为表创建的出版物中流式传输 PostgreSQL 源表的更改事件。出版物包含从一个或多个表生成的过滤后的更改事件集。每个出版物中的数据根据出版物规范进行过滤。我们假设 Debezium 配置了足够的权限来创建这些出版物。默认情况下,postgres 超级用户具有此操作的权限。但是,对于生产用例,我们建议用户自己创建这些出版物或 最小化 用于创建它们的连接器分配给 Debezium 用户的权限。

  • 权限 允许与 Debezium 连接器主机进行复制

配置副本标识

Debezium 发送的消息内容取决于您为源目标配置 REPLICA IDENTITY 的方式。REPLICA IDENTITY 是一个 PostgreSQL 特定的表级设置,它确定逻辑解码插件对 UPDATE 和 DELETE 事件可用信息的量。更准确地说,此设置控制在发生 UPDATE 或 DELETE 事件时,对于涉及的表列的先前值(如果有)将提供哪些信息。

虽然支持四种不同的值,但我们建议根据您是否需要支持删除操作来选择以下值:

  • DEFAULT - 默认行为是,如果表具有主键,则更新和删除事件将包含表的主键列的先前值。对于 UPDATE 事件,只有具有更改值的列的先前值存在。如果表没有主键,连接器不会为该表发出 UPDATE 或 DELETE 事件。仅当以下情况时才使用此值:
    • 您的 ClickHouse ORDER BY 子句只包含 Postgres 主键列。这不太可能,因为通常用户会将列添加到 ORDER BY 以优化聚合查询,这些查询不太可能成为 Postgres 中的主键。
    • 您不需要支持删除操作。注意:下面使用的配置不需要更新的先前列值。它们是删除操作所必需的,因为新状态为 null。
  • FULL - 更新和删除操作发出的事件将包含表中所有列的先前值。如果您需要支持删除操作,则需要此设置。

使用 ALTER 命令设置此设置。

ALTER TABLE uk_price_paid REPLICA IDENTITY FULL;

本文的其余部分假设用户需要支持删除操作。如果不需要删除支持,步骤会有所不同,我们将提供替代配置的参考。

准备 ClickHouse

初始数据加载

在处理更改事件流之前,我们需要预加载 ClickHouse 表以确保它与 Postgres 一致。这可以通过几种方式完成,包括但不限于:

  • 使用 postgres 表函数 使用 INSERT INTO SELECT 从我们的 Postgres 实例中直接加载数据集。这提供了一种快速简便的加载数据集的方法,但需要我们在 Postgres 实例上暂停更改。这可能在生产场景中不可用。
  • 使用提供的 Postgres 导出文件进行下载。这可以转换为 ClickHouse 支持的格式 并加载。对于大多数大型数据集来说,这是不现实的。
  • 配置 Debezium 在首次启动时执行一致快照。快照完成后,它将继续从快照创建的确切点开始流式传输更改。这使得连接器能够以所有数据的一致视图开始,而不会遗漏在拍摄快照时进行的更改。关于如何实现这一点的完整过程在 这里 详细介绍。此过程的结果是类似于插入行时发送的读取事件流。虽然这些可以由我们下面的物化视图管道处理,但该过程在吞吐量方面往往不理想。

为了速度和简单起见,我们使用上面的选项一。注意,我们的 INSERT INTO SELECT 语句将 version 和 deleted 列分别设置为 1 和 0 的值。

INSERT INTO uk_price_paid SELECT
	id,
	price,
	date,
	postcode1,
	postcode2,
	type,
	is_new,
	duration,
	addr1,
	addr2,
	street,
	locality,
	town,
	district,
	county,
	1 AS version,
	0 AS deleted
FROM postgresql('<host>', '<database>', '<table>', '<user>', '<password>')

0 rows in set. Elapsed: 80.885 sec. Processed 27.73 million rows, 5.63 GB (342.89 thousand rows/s., 69.60 MB/s.)

在没有任何调整的情况下,我们能够在 80 秒内加载所有 2800 万行。

物化视图

Debezium 使用嵌套 JSON 格式发送消息。如果配置正确(见下文),并且 REPLICA IDENTITY 设置为 FULL,则更改事件将包含行的列的 before 和 after 值作为嵌套 JSON。这些消息的完整示例可以在 这里 找到,包括如果 REPLICA IDENTITY 设置为 DEFAULT 时不需要删除支持的情况。

例如,我们下面显示一条更新消息(REPLICA IDENTITY=Full)。

{
  "before": {
	"id": 50658675,
	"price": 227500,
	"date": 11905,
	"postcode1": "SP2",
	"postcode2": "7EN",
	"type": "detached",
	"is_new": 0,
	"duration": "freehold",
	"addr1": "31",
	"addr2": "",
	"street": "CHRISTIE MILLER ROAD",
	"locality": "SALISBURY",
	"town": "SALISBURY",
	"district": "SALISBURY",
	"county": "WILTSHIRE"
  },
  "after": {
	"id": 50658675,
	"price": 227500,
	"date": 11905,
	"postcode1": "SP2",
	"postcode2": "7EN",
	"type": "terraced",
	"is_new": 0,
	"duration": "freehold",
	"addr1": "31",
	"addr2": "",
	"street": "CHRISTIE MILLER ROAD",
	"locality": "SALISBURY",
	"town": "SALISBURY",
	"district": "SALISBURY",
	"county": "WILTSHIRE"
  },
  "source": {
	"version": "1.9.6.Final",
	"connector": "postgresql",
	"name": "postgres_server",
	"ts_ms": 1685378780355,
	"snapshot": "false",
	"db": "postgres",
	"sequence": "[\"247833040488\",\"247833042536\"]",
	"schema": "public",
	"table": "uk_price_paid",
	"txId": 106940,
	"lsn": 247833042536,
	"xmin": null
  },
  "op": "u",
  "ts_ms": 1685378780514,
  "transaction": null
}

这里的 op 字段表示操作,值 udc 分别表示更新、删除和插入操作。source.lsn 字段提供我们的版本值。对于删除事件,after 字段为 null。相反,对于插入事件,before 字段为 null。

此消息格式与我们 ClickHouse 中的目标表 uk_price_paid 不兼容。我们可以在插入时使用物化视图来转换这些消息。我们在下面显示了这一点

CREATE MATERIALIZED VIEW uk_price_paid_mv TO uk_price_paid
(
   `id` Nullable(UInt64),
   `price` Nullable(UInt32),
   `date` Nullable(Date),
   `postcode1` Nullable(String),
   `postcode2` Nullable(String),
   `type` Nullable(Enum8('other'=0, 'terraced'=1, 'semi-detached'=2, 'detached'=3, 'flat'=4)),
   `is_new` Nullable(UInt8),
   `duration` Nullable(Enum8('unknown'=0, 'freehold'=1, 'leasehold'=2)),
   `addr1` Nullable(String),
   `addr2` Nullable(String),
   `street` Nullable(String),
   `locality` Nullable(String),
   `town` Nullable(String),
   `district` Nullable(String),
   `county` Nullable(String),
   `version` UInt64,
   `deleted` UInt8
) AS
SELECT
   if(op = 'd', before.id, after.id) AS id,
   if(op = 'd', before.price, after.price) AS price,
   if(op = 'd', toDate(before.date), toDate(after.date)) AS date,
   if(op = 'd', before.postcode1, after.postcode1) AS postcode1,
   if(op = 'd', before.postcode2, after.postcode2) AS postcode2,
   if(op = 'd', before.type, after.type) AS type,
   if(op = 'd', before.is_new, after.is_new) AS is_new,
   if(op = 'd', before.duration, after.duration) AS duration,
   if(op = 'd', before.addr1, after.addr1) AS addr1,
   if(op = 'd', before.addr2, after.addr2) AS addr2,
   if(op = 'd', before.street, after.street) AS street,
   if(op = 'd', before.locality, after.locality) AS locality,
   if(op = 'd', before.town, after.town) AS town,
   if(op = 'd', before.district, after.district) AS district,
   if(op = 'd', before.county, after.county) AS county,
   if(op = 'd', source.lsn, source.lsn) AS version,
   if(op = 'd', 1, 0) AS deleted
FROM default.uk_price_paid_changes
WHERE (op = 'c') OR (op = 'r') OR (op = 'u') OR (op = 'd')

注意,我们的物化视图根据操作为每个列选择适当的值。version 基于 source.lsn 列,如果 op 列具有 d 值,则将 deleted 列设置为 1,否则设置为 0。op = r 值允许我们也支持快照事件(如果需要)。如果不需要删除支持,这个物化视图可以 简化

熟悉 ClickHouse 的读者会注意到,这个视图将行插入到我们的 uk_price_paid 中,并从 uk_price_paid_changes 表中选择行。这个后一个表将从我们的 Kafka 接收行插入。这个表的模式必须与前面显示的 Debezium 消息一致。我们在下面显示了这个表的模式

CREATE TABLE uk_price_paid_changes
(
	`before.id` Nullable(UInt64),
	`before.price` Nullable(UInt32),
	`before.date` Nullable(UInt32),
	`before.postcode1` Nullable(String),
	`before.postcode2` Nullable(String),
	`before.type` Nullable(Enum8('other'=0,'terraced'=1,'semi-detached'=2,'detached'=3,'flat'=4)),
	`before.is_new` Nullable(UInt8),
	`before.duration` Nullable(Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2)),
	`before.addr1` Nullable(String),
	`before.addr2` Nullable(String),
	`before.street` Nullable(String),
	`before.locality` Nullable(String),
	`before.town` Nullable(String),
	`before.district` Nullable(String),
	`before.county` Nullable(String),
	`after.id` Nullable(UInt64),
	`after.price` Nullable(UInt32),
	`after.date` Nullable(UInt32),
	`after.postcode1` Nullable(String),
	`after.postcode2` Nullable(String),
	`after.type` Nullable(Enum8('other'=0,'terraced'=1,'semi-detached'=2,'detached'=3,'flat'=4)),
	`after.is_new` Nullable(UInt8),
	`after.duration` Nullable(Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2)),
	`after.addr1` Nullable(String),
	`after.addr2` Nullable(String),
	`after.street` Nullable(String),
	`after.locality` Nullable(String),
	`after.town` Nullable(String),
	`after.district` Nullable(String),
	`after.county` Nullable(String),
	`op` LowCardinality(String),
	`ts_ms` UInt64,
	`source.sequence` String,
	`source.lsn` UInt64
)
ENGINE = MergeTree
ORDER BY tuple()

为了调试,我们为这个表使用了 MergeTree 引擎。在生产环境中,这可以是 Null 引擎 - 更改将不会被持久化,但转换后的行仍将被发送到目标表 uk_price_paid。如果不需要删除支持,并且 REPLICA IDENTITY 设置为 DEFAULT,则可以使用 更简单的表

敏锐的读者会注意到我们的模式是扁平化的,而不是 Debezium 发送的嵌套消息。我们将在 Debezium 连接器中执行此操作。从模式的角度来看,这更简单,并且允许我们配置 Nullable 值。涉及 Tuple 的替代方法更加复杂。

配置 Debezium

在 Kafka 连接框架中部署连接器需要以下设置。注意,我们假设消息以 JSON 无模式 格式发送

  • value.converter - org.apache.kafka.connect.json.JsonConverter
  • key.converter - org.apache.kafka.connect.storage.StringConverter
  • key.converter.schemas.enable - false
  • value.converter.schemas.enable - false
  • decimal.format - 控制此转换器将以何种格式序列化十进制数。此值不区分大小写,可以是 BASE64(默认值)或 NUMERIC。此值应设置为 BASE64。有关十进制数处理的更多详细信息,请参见 此处

此管道的必需配置设置(如数据库连接详细信息)的完整列表可以在 这里 找到。下面我们重点介绍了对消息格式影响最大的那些设置。**重要**:我们配置连接器以在每个表级别跟踪更改

  • plugin.name - 安装在 PostgreSQL 服务器上的 PostgreSQL 逻辑解码插件的名称。我们建议使用 pgoutput
  • slot.name - 逻辑解码槽位名称。它必须对数据库和模式唯一。如果只复制一个,请使用 debezium
  • publication.name - 使用 pgoutput 时,为流式传输更改创建的 PostgreSQL 出版物的名称。如果您已配置 Postgres 用户具有 足够的权限,则将在启动时创建它。或者,也可以预先创建它。可以使用 dbz_publication 默认值。
  • table.include.list - 一个可选的、逗号分隔的正则表达式列表,用于匹配要捕获其更改的表的完全限定表标识符。确保格式为 <schema_name>.<table_name>。在我们的示例中,我们使用 public.uk_price_paid。**此参数中只能指定一个表。**
  • tombstones.on.delete - 控制是否在删除事件之后紧跟一个 tombstone 事件。设置为 false。如果对使用 日志压缩 感兴趣,可以将其设置为 true - 这需要您在 ClickHouse Sink 中删除这些 tombstone。
  • publication.autocreate.mode - 设置为 filtered。这会导致仅为属性 table.include.list 中的表创建出版物。更多详细信息 这里
  • snapshot.mode - 我们使用 never 作为快照模式 - 因为我们使用 postgres 函数加载了初始数据。如果无法暂停 Postgres 实例的更改,用户可以使用 initial 模式。
  • decimal.handling.mode - 指定连接器如何处理 DECIMALNUMERIC 列的值。默认值为 precise,它将以二进制形式编码这些值,即 java.math.BigDecimal。与上面的 decimal.format 设置结合使用,这将导致这些值在 JSON 中以数字形式输出。用户可能希望根据所需精度进行调整。

以下设置将影响 Postgres 中的更改与其到达 ClickHouse 的时间之间的延迟。在 ClickHouse 的必需 SLA 和高效批处理的背景下考虑这些因素 - 请参见 其他注意事项

  • max.batch.size - 每个事件批次的最大大小。
  • max.queue.size - 在将事件发送到 Kafka 之前,队列大小。允许背压。它应该大于批次大小。
  • poll.interval.ms - 指定连接器在开始处理一批事件之前等待新更改事件出现的毫秒数的正整数。默认为 500 毫秒。

Confluent 提供了 其他文档,用于那些使用 Confluent Kafka 或 Cloud 部署的人员。可以使用以下所示的方式在 Confluent Cloud 中配置 Debezium 连接器。此连接器将在接收到消息时自动创建一个 Kafka 主题。

debezium_configuration_cdc.gif

请注意,我们在上面将 after.state.only 属性设置为 false。此设置似乎特定于 Confluent Cloud,并且必须设置为 false,以确保提供行之前的价值以及 LSN 编号。

我们还利用 Kafka connect 的 SMT 功能 将消息扁平化,并将 Kafka 主题设置为 uk_price_paid_changes。这可以通过配置在自管理中实现。更多详细信息 [1][2]

在上面的示例中,我们假设我们的目标主题只有一个分区 - 这是由 Debezium 自动创建的。如前所述,多个分区需要使用 基于哈希的路由,以确保同一 Postgres 行的事件被发送到同一分区 - 从而确保下游的按顺序交付。这超出了本博文的范围,需要进一步测试。

上面配置的 JSON 配置文件可以在 这里找到,并可与官方文档中记录的 步骤一起使用。自管理安装说明可以在 这里找到。

配置 Kafka Connect 接收器

我们可以使用 ClickHouse Kafka Connect 接收器从 Kafka 读取更改事件消息并将其发送到 ClickHouse。这假设用户正在运行 Kafka Connect 框架。可以将 许多 Kafka Connect 接收器与 ClickHouse 一起使用,包括 Confluent HTTP Connector。但是,对于我们的用例,我们选择使用官方的 Kafka Connect Sink for ClickHouse。虽然 ReplacingMergeTree 的属性只需要至少一次语义,但这提供了 精确一次语义,并且现在可以使用“自定义连接器”产品在 Confluent Cloud 中部署。

重要的是,Kafka Connect 接收器保证消息按每个分区的顺序传递。这是由以下原因保证的

  • Kafka Connect 框架只为每个分区分配 一个任务 - 尽管一个任务可能从多个分区消费。
  • 在插入时,ClickHouse Kafka Connect 接收器在插入之前按主题和分区对行进行分组。在消费另一个批次之前,会确认一批的插入。

这允许主题的分区数量进行扩展,同时仍然满足我们对特定 Postgres 行的任何更改按顺序交付的要求,假设我们可以保证针对行的任何事件都通过 哈希从 Debezium 连接器发送到同一分区。

我们在下面显示了在 Confluent Cloud 中配置 ClickHouse Kafka Connect 接收器。注意,我们首先将连接器包上传以使其可用。可以从 这里下载。我们假设用户已配置 Debezium 连接器将数据发送到主题 uk_price_paid_changes

kafka_connect_config.gif

上面配置的 JSON 表示形式可以在 这里找到。

除了基于 Kafka Connect 的方法之外,还存在其他方法,例如,用户可以使用 VectorKafka 表引擎,或 Confluent Cloud 提供的 HTTP 连接器

测试

为了确认我们的管道正常工作,我们提供了一个脚本,该脚本对 Postgres 表进行随机更改 - 添加、更新和删除行。具体来说,关于更新,该脚本将为随机行更改 typepriceis_new 列。完整的代码和依赖项可以在 这里找到。

export PGDATABASE=<database>
export PGUSER=postgres
export PGPASSWORD=<password>
export PGHOST=<host>
export PGPORT=5432
pip3 install -r requirements.txt
python randomize.py --iterations 1 --weights "0.4,0.4,0.2" --delay 0.5

请注意 weights 参数和值 0.4,0.4,0.2 表示创建、更新和删除的比率。delay 参数设置每次操作之间的时间延迟(默认 0.5 秒)。iterations 设置对表进行更改的总次数。在上面的示例中,我们修改了 1000 行。

脚本完成后,我们可以对 Postgres 和 ClickHouse 运行以下查询以确认一致性。显示的响应可能与您的值不同,因为更改是随机的。但是,两个数据库中的值应该相同。为了简单起见,我们使用 FINAL

相同行数

-- Postgres
postgres=> SELECT count(*) FROM uk_price_paid;
  count
----------
 27735027
(1 row)


-- ClickHouse
SELECT count()
FROM uk_price_paid
FINAL

┌──count()─┐
│ 27735027 │
└──────────┘

相同价格统计信息

-- Postgres
postgres=> SELECT sum(price) FROM uk_price_paid;
     sum
---------------
5945061701495
(1 row)

-- ClickHouse
SELECT sum(price)
FROM uk_price_paid
FINAL

┌────sum(price)─┐
│ 5945061701495 │
└───────────────┘

相同房屋价格分布

postgres=> SELECT type, count(*) c FROM uk_price_paid GROUP BY type;
 	type  	|	c
---------------+---------
 detached  	| 6399743
 flat      	| 4981171
 other     	|  419212
 semi-detached | 7597039
 terraced  	| 8337862
(5 rows)


-- ClickHouse
SELECT
	type,
	count() AS c
FROM uk_price_paid
FINAL
GROUP BY type

┌─type──────────┬───────c─┐
│ other     	│  419212 │
│ terraced  	│ 8337862 │
│ semi-detached │ 7597039 │
│ detached  	│ 6399743 │
│ flat      	│ 4981171 │
└───────────────┴─────────┘

其他注意事项

在使用 Debezium 为 ClickHouse 和 Postgres 运行 CDC 管道时,还有一些其他注意事项

  • Debezium 连接器将尽可能地将行更改批处理,最大限度为 max.batch.size。这些批次在每个轮询间隔 poll.interval.ms(默认 500 毫秒)内形成。用户可以增加这些值以获得更大、更有效的批次,但代价是更高的端到端延迟。请记住,ClickHouse 倾向于使用 至少 1000 个批次,以避免诸如 太多部分之类的常见问题。对于低吞吐量环境(<100 行/秒),这种批处理并不像 ClickHouse 那样重要,因为它可能会跟上合并。但是,用户应避免以高插入速率使用小批次。

    批处理也可以在接收器端配置。目前,ClickHouse Connect 接收器不支持显式配置,但可以通过 Kafka connect 框架进行配置 - 请参见设置 consumer.override.max.poll.records。或者,用户可以配置 ClickHouse 异步插入,并允许 ClickHouse 进行批处理。在这种模式下,可以将插入作为小批次发送到 ClickHouse,ClickHouse 将在刷新之前对行进行批处理。请注意,在刷新过程中,行将不可搜索。因此,这种方法**不能**帮助解决端到端延迟。

  • 用户应该意识到 WAL 磁盘使用情况,以及在监视更新频繁的数据库中更改很少的表的 경우 heartbeat.interval.ms 的重要性。

  • 上述方法**目前不支持 Postgres 主键更改**。为了实现这一点,用户需要检测来自 Debezium 的 op=delete 消息,这些消息没有 beforeafter 字段。然后应该使用 id 在 ClickHouse 中删除这些行 - 最好使用 轻量级删除。这需要自定义代码,而不是使用 Kafka 接收器将数据发送到 ClickHouse。

  • 如果表的 Primary key 发生更改,用户可能需要创建一个新的 ClickHouse 表,其中新的列作为 ORDER BY 子句的一部分。请注意,这也需要 对 Debezium 连接器执行的过程

  • Debezium 连接器依赖的逻辑解码不支持 DDL 更改。这意味着连接器无法将 DDL 更改事件报告回消费者。

  • 逻辑解码复制槽仅在主服务器上受支持。当存在 PostgreSQL 服务器集群时,连接器只能在活动主服务器上运行。它不能在热或温备用副本上运行。如果主服务器出现故障或被降级,则连接器将停止。主服务器恢复后,可以重新启动连接器。如果另一个 PostgreSQL 服务器已提升为主服务器,请在重新启动连接器之前调整连接器配置。

  • 虽然 Kafka 接收器可以安全地扩展以使用更多工作器(假设同一 Postgres 行的事件已哈希到同一分区),但 Debezium 连接器只允许 一个任务。上面提出的解决方案使用每个表一个连接器,允许解决方案在表级别进行扩展。

  • 记录的方法假设每个表一个连接器实例。我们目前不支持监视多个表的连接器 - 尽管这可以通过主题路由来实现,即消息被路由到特定于表的主题。此配置尚未经过测试。

结论

在这篇博文中,我们探讨了如何构建 CDC 管道,以便以近乎实时的速度将更改从 Postgres 复制到 ClickHouse。我们讨论了 ReplacingMergeTree 如何是此设计的核心,以及用户如何优化表设计并使用 FINAL 运算符进行查询时去重。除了提供构建管道的说明(包括如何配置 Debezium)外,我们还讨论了想要构建生产解决方案的用户的其他注意事项。虽然这篇博文中的详细信息特定于 Postgres,但由于其独立于 DBMS 的消息格式,它们有可能应用于 Debezium 支持的所有源数据库。我们将此留作读者的练习,以便探索其他数据库,并让我们知道你的进展如何!

分享这篇文章

订阅我们的时事通讯

了解功能发布、产品路线图、支持和云服务方面的最新信息!
加载表单...
关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image
©2024ClickHouse, Inc. 总部位于加利福尼亚州湾区和荷兰阿姆斯特丹。