DoubleCloud 即将结束。迁移到 ClickHouse,享受限时免费迁移服务。立即联系我们 ->->

博客 / 工程

PostgreSQL 和 ClickHouse 的变更数据捕获 (CDC) - 第 2 部分

author avatar
Dale McDiarmid
2023 年 6 月 15 日

对于那些希望根据本文中的概念找到完整示例配置的人,请参见此处。感谢我们社区的Leart Beqiraj为我们提供此贡献。

简介

继续我们关于为 Postgresql 到 ClickHouse 构建变更捕获控制 (CDC) 管道的系列文章,本文重点介绍构建功能管道所需的步骤和配置。为此,我们使用加载到 Postgres 和 ClickHouse 的示例数据集。将 Postgres 视为事实来源,我们应用插入、更新和删除的混合工作负载。使用由 Debezium、ClickHouse Kafka Connect 和物化视图构成的 CDC 管道,我们可以在 ClickHouse 中的表上近乎实时地反映这些更改。

对于我们的示例,我们在 ClickHouse Cloud 集群中使用开发实例,并在 Postgres 中使用 AWS Aurora 实例。但是,这些示例应该可以在等效大小的自管理集群上复制。或者,立即开始您的 ClickHouse Cloud 集群,并获得 300 美元的积分。让我们来处理基础设施,开始查询吧!

示例数据集

对于我们的示例数据集,我们使用流行的英国房产价格数据集。该数据集规模适中(2800 万行),其架构易于理解。每行代表过去 20 年英国的一次房屋出售,其字段表示价格、日期和位置。可以在此处找到字段的完整描述。我们将此数据集加载到 Postgres 和 ClickHouse,然后对前者进行随机插入、更新和删除。这些更改应由 Debezium 捕获并用于近乎实时地更新 ClickHouse。

Postgres 架构和数据加载

Postgres 架构如下所示。请注意使用 serial id 字段作为主键。虽然主键不是必需的,但需要额外的 Postgres 配置才能使 Debezium 工作。

CREATE TABLE uk_price_paid                                                                                                                                                               (
   id serial,
   price INTEGER,
   date Date,
   postcode1 varchar(8),
   postcode2 varchar(3),
   type varchar(13),
   is_new SMALLINT,
   duration varchar(9),
   addr1 varchar(100),
   addr2 varchar(100),
   street varchar(60),
   locality varchar(35),
   town varchar(35),
   district varchar(40),
   county varchar(35),
   primary key(id)
);

我们将此数据集作为 Postgres 兼容的 SQL 分发,可供插入,从此处下载。加载数据需要一些简单的命令,假设psql 客户端已使用环境变量配置。

wget
https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/postgres/uk_prices.sql.tar.gz
tar -xzvf uk_prices.sql.tar.gz
psql < uk_prices.sql

INSERT 0 10000
INSERT 0 10000
INSERT 0 10000
…

postgres=> SELECT count(*) FROM uk_price_paid;
  count
----------
 27734966
(1 row)

注意:我们使用的是具有 8 个内核的 Postgres 版本 14.7 的 AWS Aurora 实例。加载这些数据大约需要 10 分钟。

ClickHouse 架构

下面我们介绍了我们的文档中使用的架构的修改版本,使用 ReplacingMergeTree。

CREATE TABLE default.uk_price_paid
(
	`id` UInt64,
	`price` UInt32,
	`date` Date,
	`postcode1` LowCardinality(String),
	`postcode2` LowCardinality(String),
	`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
	`is_new` UInt8,
	`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
	`addr1` String,
	`addr2` String,
	`street` LowCardinality(String),
	`locality` LowCardinality(String),
	`town` LowCardinality(String),
	`district` LowCardinality(String),
	`county` LowCardinality(String),
	`version` UInt64,
	`deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)

虽然我们在上面将早期架构迁移到 ClickHouse 类型以进行优化,但 ClickHouse 也可以自动解释原始 Postgres 架构(serial 类型除外)。例如,以下 DDL 可用于创建使用 Postgres 类型的表——这些类型将自动转换为 ClickHouse 类型,如所示。请注意,我们手动删除主键并将 id 列的类型从 serial 转换为 Uint64。

CREATE TABLE default.uk_price_paid
(
    `id` UInt64,
    `price` INTEGER,
    `date` Date,
    `postcode1` varchar(8),
    `postcode2` varchar(3),
    `type` varchar(13),
    `is_new` SMALLINT,
    `duration` varchar(9),
    `addr1` varchar(100),
    `addr2` varchar(100),
    `street` varchar(60),
    `locality` varchar(35),
    `town` varchar(35),
    `district` varchar(40),
    `county` varchar(35),
    `version` UInt64,
    `deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)

SHOW CREATE TABLE default.uk_price_paid

CREATE TABLE default.uk_price_paid
(
    `id` UInt64,
    `price` Int32,
    `date` Date,
    `postcode1` String,
    `postcode2` String,
    `type` String,
    `is_new` Int16,
    `duration` String,
    `addr1` String,
    `addr2` String,
    `street` String,
    `locality` String,
    `town` String,
    `district` String,
    `county` String,
    `version` UInt64,
    `deleted` UInt8
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}', version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)
SETTINGS index_granularity = 8192

考虑到我们在之前的博客中探讨的有关 ReplacingMergeTree 引擎的概念,这里有一些重要的注意事项

  • 我们在表定义中使用 version 和 deleted 列。这两个都是可选的。例如,如果您不需要支持删除,只需在架构和引擎定义中省略该列即可。
  • 我们已经为 ORDER BY 子句选择了列,以便为查询访问模式进行优化。请注意 id 列是如何在最后指定的,因为我们预计不会在分析查询中使用这些列,但仍然需要它提供的唯一性属性——尤其是因为我们的地址不会超过街道级别。
  • 我们使用 PRIMARY KEY 子句指定主键,省略 id 列以节省内存,而不会影响我们通常的查询。

配置 CDC 管道

架构概述

我们在之前的博客中介绍的端到端架构如下所示。

Final CDC schema.png

此架构假设用户拥有一个具有Kafka Connect 框架的 Kafka 实例。对于我们的示例,我们假设用户正在使用 Confluent Cloud 来托管 Kafka,该服务会自动为事件创建一个合适的主题。但是,也支持自管理的 Kafka 安装。建议的架构将适用于任何使用 Debezium 生成的事件的摄取管道。有关安装 Debezium 的说明以及有关主题配置的注意事项,请参见此处

正如我们在上一篇文章中提到的,用户应确保更改事件按每个唯一 Postgres 行的顺序交付(如果要删除删除事件)。可以通过使用单个分区(Debezium 源默认值)或通过使用基于散列的分区来保证这一点,在后一种情况下,由于吞吐量要求需要多个分区。虽然后一种情况应该很少见,但这涉及确保特定 Postgres 行的所有更改事件通过对其主键进行散列发送到同一个分区。

Kafka 中的日志压缩可用于确保仅保留每行的最后一个事件,从而最大限度地减少 Kafka 存储大小。这确实需要在 Debezium 中省略 tombstone 事件,以便在发生删除时。为了简化我们的管道,我们禁用了这些事件。如果用户需要此高级功能,他们应该在其Kafka Sink 中删除这些事件。

配置 Postgresql

PostgreSQL 连接器可用于独立的 PostgreSQL 服务器或 PostgreSQL 服务器集群,但仅支持主服务器——Debezium 连接器无法连接到副本实例。

请注意以下内容来自 Debezium 文档

“如果主服务器出现故障或降级,连接器将停止。主服务器恢复后,您可以重新启动连接器。如果另一个 PostgreSQL 服务器已提升为主服务器,请在重新启动连接器之前调整连接器配置。”

确保 Postgres 实例已正确配置

  • 自管理配置详细信息在此

  • 对于云环境(例如 Amazon RDS),请参阅此处

正如我们在上一篇文章中提到的,我们假设使用输出插件pgoutput将数据从 WAL 内部表示转换为 Debezium 可以使用的格式。因此,我们下面的示例使用逻辑复制流模式pgoutput。这是 PostgreSQL 10+ 的内置功能。早期版本的用户可以探索使用decoderbufs插件(由 Debezium 社区维护)或wal2json。我们尚未测试这些配置。

我们建议用户阅读以下部分,了解用户安全性和配置

  • 设置基本权限 - 我们在下面的示例中使用postgres超级用户。不建议在生产环境中这样做。

  • 创建出版物的权限 - Debezium 从为表创建的出版物中流式传输 PostgreSQL 源表的更改事件。出版物包含从一个或多个表生成的过滤后的更改事件集。每个出版物中的数据根据出版物规范进行过滤。我们假设 Debezium 已配置为具有创建这些出版物的足够权限。默认情况下,postgres超级用户具有此操作的权限。但是,对于生产用例,我们建议用户自己创建这些出版物或最小化分配给用于创建出版物的连接器的 Debezium 用户的权限。

  • 权限以允许与 Debezium 连接器主机进行复制

配置复制标识

Debezium 发送的消息内容取决于您为源目标配置REPLICA IDENTITY的方式。REPLICA IDENTITY是 PostgreSQL 特定的表级设置,它决定逻辑解码插件在 UPDATE 和 DELETE 事件中可用的信息量。更具体地说,此设置控制在发生 UPDATE 或 DELETE 事件时,用于更新或删除操作的表列的先前值(如果有)可用的信息。

虽然支持四种不同的值,但我们建议根据您是否需要对删除操作的支持,进行以下操作:

  • DEFAULT - 默认情况下,更新和删除事件包含表的主键列的先前值(如果该表具有主键)。对于 UPDATE 事件,只有主键列的值发生变化时才会出现。如果表没有主键,连接器不会为该表发出 UPDATE 或 DELETE 事件。仅当以下情况适用时,才使用此值:
    • 您的 ClickHouse ORDER BY 子句仅包含 Postgres 主键列。这种情况不太可能发生,因为通常用户会在ORDER BY中添加列以优化聚合查询,这些查询不太可能成为 Postgres 中的主键。
    • 您不需要支持删除操作。注意:下面使用的配置不需要更新的先前列值。它们是删除操作所必需的,因为新状态为 null。
  • FULL - 更新和删除操作的已发出事件包含表中所有列的先前值。如果您需要支持删除操作,则需要这样做。

使用ALTER命令设置此设置。

ALTER TABLE uk_price_paid REPLICA IDENTITY FULL;

本文档的其余部分假设用户需要支持删除操作。如果不需要删除支持的情况下的步骤不同,我们将提供替代配置的参考。

准备 ClickHouse

初始数据加载

在处理更改事件流之前,我们需要预加载 ClickHouse 表以确保它与 Postgres 保持一致。这可以通过多种方式完成,包括但不限于:

  • 使用postgres 表函数直接从 Postgres 实例加载数据集,使用INSERT INTO SELECT。这提供了一种快速简便的加载数据集的方法,但需要我们暂停 Postgres 实例上的更改。这在生产环境中可能不可用。
  • 使用提供的 Postgres 导出文件进行下载。这可以转换为ClickHouse 支持的格式并进行加载。对于大多数大型数据集来说,这并不现实。
  • 配置 Debezium 在首次启动时执行一致快照。快照完成后,它将从快照创建的确切点开始继续流式传输更改。这使连接器能够从所有数据的完整视图开始,而不会遗漏快照创建过程中发生的更改。有关如何实现此过程的完整步骤,请参阅此处。此过程的结果是类似于插入行时发送的读事件流。虽然我们的物化视图管道可以处理这些事件,但该过程在吞吐量方面往往不是最优的。

为了速度和简单性,我们使用上述选项一。请注意,我们的INSERT INTO SELECT语句将版本和删除列分别设置为值 1 和 0。

INSERT INTO uk_price_paid SELECT
	id,
	price,
	date,
	postcode1,
	postcode2,
	type,
	is_new,
	duration,
	addr1,
	addr2,
	street,
	locality,
	town,
	district,
	county,
	1 AS version,
	0 AS deleted
FROM postgresql('<host>', '<database>', '<table>', '<user>', '<password>')

0 rows in set. Elapsed: 80.885 sec. Processed 27.73 million rows, 5.63 GB (342.89 thousand rows/s., 69.60 MB/s.)

没有进行任何调整,我们能够在 80 秒内加载所有 2800 万行。

物化视图

Debezium 使用嵌套 JSON 格式发送消息。如果配置正确(见下文),并且REPLICA IDENTITY设置为FULL,更改事件将包括行的列的之前值和之后值作为嵌套 JSON。这些消息的完整示例可以在此处找到,包括当REPLICA IDENTITY设置为DEFAULT且不需要删除支持时的情况。

例如,我们展示了下面的更新消息(REPLICA IDENTITY=Full)。

{
  "before": {
	"id": 50658675,
	"price": 227500,
	"date": 11905,
	"postcode1": "SP2",
	"postcode2": "7EN",
	"type": "detached",
	"is_new": 0,
	"duration": "freehold",
	"addr1": "31",
	"addr2": "",
	"street": "CHRISTIE MILLER ROAD",
	"locality": "SALISBURY",
	"town": "SALISBURY",
	"district": "SALISBURY",
	"county": "WILTSHIRE"
  },
  "after": {
	"id": 50658675,
	"price": 227500,
	"date": 11905,
	"postcode1": "SP2",
	"postcode2": "7EN",
	"type": "terraced",
	"is_new": 0,
	"duration": "freehold",
	"addr1": "31",
	"addr2": "",
	"street": "CHRISTIE MILLER ROAD",
	"locality": "SALISBURY",
	"town": "SALISBURY",
	"district": "SALISBURY",
	"county": "WILTSHIRE"
  },
  "source": {
	"version": "1.9.6.Final",
	"connector": "postgresql",
	"name": "postgres_server",
	"ts_ms": 1685378780355,
	"snapshot": "false",
	"db": "postgres",
	"sequence": "[\"247833040488\",\"247833042536\"]",
	"schema": "public",
	"table": "uk_price_paid",
	"txId": 106940,
	"lsn": 247833042536,
	"xmin": null
  },
  "op": "u",
  "ts_ms": 1685378780514,
  "transaction": null
}

此处的op字段表示操作,值udc分别表示更新、删除和插入操作。source.lsn字段提供我们的版本值。对于删除事件,after字段为 null。相反,对于插入事件,before字段为 null。

这种消息格式与 ClickHouse 中的目标表uk_price_paid不兼容。我们可以使用物化视图在插入时转换这些消息。我们将在下面展示此过程

CREATE MATERIALIZED VIEW uk_price_paid_mv TO uk_price_paid
(
   `id` Nullable(UInt64),
   `price` Nullable(UInt32),
   `date` Nullable(Date),
   `postcode1` Nullable(String),
   `postcode2` Nullable(String),
   `type` Nullable(Enum8('other'=0, 'terraced'=1, 'semi-detached'=2, 'detached'=3, 'flat'=4)),
   `is_new` Nullable(UInt8),
   `duration` Nullable(Enum8('unknown'=0, 'freehold'=1, 'leasehold'=2)),
   `addr1` Nullable(String),
   `addr2` Nullable(String),
   `street` Nullable(String),
   `locality` Nullable(String),
   `town` Nullable(String),
   `district` Nullable(String),
   `county` Nullable(String),
   `version` UInt64,
   `deleted` UInt8
) AS
SELECT
   if(op = 'd', before.id, after.id) AS id,
   if(op = 'd', before.price, after.price) AS price,
   if(op = 'd', toDate(before.date), toDate(after.date)) AS date,
   if(op = 'd', before.postcode1, after.postcode1) AS postcode1,
   if(op = 'd', before.postcode2, after.postcode2) AS postcode2,
   if(op = 'd', before.type, after.type) AS type,
   if(op = 'd', before.is_new, after.is_new) AS is_new,
   if(op = 'd', before.duration, after.duration) AS duration,
   if(op = 'd', before.addr1, after.addr1) AS addr1,
   if(op = 'd', before.addr2, after.addr2) AS addr2,
   if(op = 'd', before.street, after.street) AS street,
   if(op = 'd', before.locality, after.locality) AS locality,
   if(op = 'd', before.town, after.town) AS town,
   if(op = 'd', before.district, after.district) AS district,
   if(op = 'd', before.county, after.county) AS county,
   if(op = 'd', source.lsn, source.lsn) AS version,
   if(op = 'd', 1, 0) AS deleted
FROM default.uk_price_paid_changes
WHERE (op = 'c') OR (op = 'r') OR (op = 'u') OR (op = 'd')

注意,我们的物化视图在这里根据操作为每列选择适当的值。version基于source.lsn列,如果op列的值为d,则将deleted列设置为 1,否则设置为 0。op = r值允许我们也支持快照事件(如果需要)。如果不需要删除支持,可以简化此物化视图。

熟悉 ClickHouse 的读者会注意到,此视图将行插入到我们的uk_price_paid中,并从uk_price_paid_changes表中选择行。后者将从我们的 Kafka 接收器接收行插入。此表的架构必须与之前显示的 Debezium 消息一致。我们将在下面展示此表的架构

CREATE TABLE uk_price_paid_changes
(
	`before.id` Nullable(UInt64),
	`before.price` Nullable(UInt32),
	`before.date` Nullable(UInt32),
	`before.postcode1` Nullable(String),
	`before.postcode2` Nullable(String),
	`before.type` Nullable(Enum8('other'=0,'terraced'=1,'semi-detached'=2,'detached'=3,'flat'=4)),
	`before.is_new` Nullable(UInt8),
	`before.duration` Nullable(Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2)),
	`before.addr1` Nullable(String),
	`before.addr2` Nullable(String),
	`before.street` Nullable(String),
	`before.locality` Nullable(String),
	`before.town` Nullable(String),
	`before.district` Nullable(String),
	`before.county` Nullable(String),
	`after.id` Nullable(UInt64),
	`after.price` Nullable(UInt32),
	`after.date` Nullable(UInt32),
	`after.postcode1` Nullable(String),
	`after.postcode2` Nullable(String),
	`after.type` Nullable(Enum8('other'=0,'terraced'=1,'semi-detached'=2,'detached'=3,'flat'=4)),
	`after.is_new` Nullable(UInt8),
	`after.duration` Nullable(Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2)),
	`after.addr1` Nullable(String),
	`after.addr2` Nullable(String),
	`after.street` Nullable(String),
	`after.locality` Nullable(String),
	`after.town` Nullable(String),
	`after.district` Nullable(String),
	`after.county` Nullable(String),
	`op` LowCardinality(String),
	`ts_ms` UInt64,
	`source.sequence` String,
	`source.lsn` UInt64
)
ENGINE = MergeTree
ORDER BY tuple()

为了调试目的,我们在此表中使用MergeTree引擎。在生产环境中,这可能是一个Null引擎 - 更改将不会被持久化,但转换后的行仍将被发送到目标表uk_price_paid。如果不需要删除支持,并且REPLICA IDENTITY设置为DEFAULT,可以使用更简单的表

细心的读者会注意到我们的架构是扁平化的,来自 Debezium 发送的嵌套消息。我们将在 Debezium 连接器中执行此操作。从架构的角度来看,这更简单,并且允许我们配置Nullable值。涉及Tuple的替代方案更加复杂。

配置 Debezium

在 Kafka 连接框架中部署连接器需要以下设置。注意我们假设消息以没有架构的 JSON形式发送

  • value.converter - org.apache.kafka.connect.json.JsonConverter
  • key.converter - org.apache.kafka.connect.storage.StringConverter
  • key.converter.schemas.enable - false
  • value.converter.schemas.enable - false
  • decimal.format - 控制此转换器将以何种格式序列化十进制数。此值不区分大小写,可以是BASE64(默认)或NUMERIC。这应该设置为BASE64。有关十进制处理的更多详细信息,请参阅此处

此管道的必需配置设置(如数据库连接详细信息)的完整列表可以在此处找到。下面我们将重点介绍最影响消息格式的设置。重要:我们配置连接器以在每个表级别跟踪更改

  • plugin.name - 安装在 PostgreSQL 服务器上的 PostgreSQL 逻辑解码插件的名称。我们建议使用pgoutput
  • slot.name - 逻辑解码槽位名称。它必须在数据库和模式中是唯一的。如果只复制一个,请使用debezium
  • publication.name - 使用pgoutput时为流式传输更改而创建的 PostgreSQL 出版物名称。如果您已将 Postgres 用户配置为具有足够的权限,则将在启动时创建该名称。或者,也可以预先创建。可以使用默认值dbz_publication
  • table.include.list - 一个可选的逗号分隔的正则表达式列表,匹配您要捕获其更改的表的完全限定表标识符。确保格式为<schema_name>.<table_name>。在我们的示例中,我们使用public.uk_price_paid此参数中只能指定一个表。
  • tombstones.on.delete - 控制删除事件之后是否会跟随一个墓碑事件。设置为false。如果您有兴趣使用日志压缩,可以将其设置为true - 这需要您在 ClickHouse 接收器中删除这些墓碑。
  • publication.autocreate.mode - 设置为filtered。这将导致仅为table.include.list属性中的表创建出版物。更多详细信息在此
  • snapshot.mode - 我们为快照模式使用never - 因为我们使用postgres函数加载了初始数据。如果无法暂停 Postgres 实例上的更改,用户可以使用initial模式。
  • decimal.handling.mode - 指定连接器如何处理DECIMALNUMERIC 列的值。默认值为 precise,它将以二进制形式对这些值进行编码,即 java.math.BigDecimal。与上述 decimal.format 设置结合使用,这将导致这些值在 JSON 中以数字形式输出。用户可以根据所需精度进行调整。

以下设置将影响 Postgres 中的变化与其到达 ClickHouse 的时间之间的延迟。在所需的 SLA 和 ClickHouse 的有效批处理的背景下考虑这些 - 请参阅 其他注意事项

  • max.batch.size - 每批事件的最大大小。
  • max.queue.size - 将事件发送到 Kafka 之前的队列大小。允许反压。它应该大于批次大小。
  • poll.interval.ms - 指定连接器在开始处理一批事件之前应等待新更改事件出现的时间(毫秒)。默认为 500 毫秒。

Confluent 提供了 其他文档,适用于使用 Confluent Kafka 或 Cloud 部署的用户。可以在 Confluent Cloud 中配置 Debezium 连接器,如下所示。此连接器将在收到消息时自动创建 Kafka 主题。

debezium_configuration_cdc.gif

请注意,在上面我们已将 after.state.only 属性设置为 false。此设置似乎特定于 Confluent Cloud,并且必须设置为 false 才能确保提供行的先前值以及 LSN 编号。

我们还使用 Kafka Connect 的 SMT 功能来 扁平化消息 并将 Kafka 主题设置为 uk_price_paid_changes。这可以通过自管理通过配置实现。更多详细信息 [1][2]

在上面的示例中,我们假设目标主题只有一个分区 - 这是由 Debezium 自动创建的。如前所述,多个分区需要使用 基于哈希的路由 来确保相同 Postgres 行的事件被传递到同一个分区 - 从而确保下游的有序传递。这超出了本文的范围,需要进一步测试。

上面相关的 JSON 配置可以在 这里 找到,并且可以与官方文档中 步骤 一起使用。自管理安装说明可以在 这里 找到。

配置 Kafka Connect Sink

我们可以使用 ClickHouse Kafka Connect 接收器从 Kafka 读取更改事件消息并将其发送到 ClickHouse。这假设用户正在运行 Kafka Connect 框架。 许多 Kafka Connect 接收器 可以与 ClickHouse 一起使用,包括 Confluent HTTP 连接器。然而,对于我们的用例,我们选择使用官方的 ClickHouse 的 Kafka Connect 接收器。虽然 ReplacingMergeTree 的属性只需要至少一次语义,但这提供了 完全一次语义,现在可以使用 “自定义连接器” 功能在 Confluent Cloud 中部署。

重要的是,Kafka Connect 接收器保证每个分区内的消息按顺序传递。这是由以下方面保证的

  • Kafka Connect 框架只将 一个任务分配给任何给定分区 - 尽管一个任务可能从多个分区消费。
  • 在插入时,ClickHouse Kafka Connect 接收器在插入之前按主题和分区对行进行分组。在消费另一批之前,对一批的插入进行确认。

这允许主题的分区数量进行扩展,同时仍然满足我们对特定 Postgres 行的任何更改进行有序传递的要求,假设我们可以保证该行的任何事件都通过 哈希方式从 Debezium 连接器发送到同一个分区

我们在下面展示了在 Confluent Cloud 中配置 ClickHouse Kafka Connect 接收器。请注意,我们首先将连接器包上传到云端,使其可用。可以从 这里 下载。我们假设用户已将 Debezium 连接器配置为将数据发送到主题 uk_price_paid_changes

kafka_connect_config.gif

上面配置的 JSON 表示可以在 这里 找到。

除了基于 Kafka Connect 的方法之外,还存在其他方法,例如,用户可以使用 VectorKafka 表引擎 或 Confluent Cloud 提供的 HTTP 连接器

测试

为了确认我们的管道正常工作,我们提供了一个脚本,该脚本对 Postgres 表进行随机更改 - 添加、更新和删除行。具体来说,关于更新,此脚本会更改随机行的 typepriceis_new 列。完整的代码和依赖项可以在 这里 找到。

export PGDATABASE=<database>
export PGUSER=postgres
export PGPASSWORD=<password>
export PGHOST=<host>
export PGPORT=5432
pip3 install -r requirements.txt
python randomize.py --iterations 1 --weights "0.4,0.4,0.2" --delay 0.5

请注意 weights 参数和值 0.4,0.4,0.2 表示创建、更新和删除的比率。delay 参数设置每次操作之间的时间延迟(默认 0.5 秒)。iterations 设置对表进行的更改总数。在上面的示例中,我们修改了 1000 行。

脚本完成后,我们可以对 Postgres 和 ClickHouse 运行以下查询以确认一致性。显示的响应可能与您的值不同,因为更改是随机的。但是,来自两个数据库的值应该相同。为了简单起见,我们使用 FINAL

相同的行数

-- Postgres
postgres=> SELECT count(*) FROM uk_price_paid;
  count
----------
 27735027
(1 row)


-- ClickHouse
SELECT count()
FROM uk_price_paid
FINAL

┌──count()─┐
│ 27735027 │
└──────────┘

相同的价格统计信息

-- Postgres
postgres=> SELECT sum(price) FROM uk_price_paid;
     sum
---------------
5945061701495
(1 row)

-- ClickHouse
SELECT sum(price)
FROM uk_price_paid
FINAL

┌────sum(price)─┐
│ 5945061701495 │
└───────────────┘

相同的房价分布

postgres=> SELECT type, count(*) c FROM uk_price_paid GROUP BY type;
 	type  	|	c
---------------+---------
 detached  	| 6399743
 flat      	| 4981171
 other     	|  419212
 semi-detached | 7597039
 terraced  	| 8337862
(5 rows)


-- ClickHouse
SELECT
	type,
	count() AS c
FROM uk_price_paid
FINAL
GROUP BY type

┌─type──────────┬───────c─┐
│ other     	│  419212 │
│ terraced  	│ 8337862 │
│ semi-detached │ 7597039 │
│ detached  	│ 6399743 │
│ flat      	│ 4981171 │
└───────────────┴─────────┘

其他注意事项

在使用 Debezium 为 ClickHouse 和 Postgres 运行 CDC 管道时,还有一些其他注意事项

  • Debezium 连接器将尽可能地对行更改进行批处理,最大限度为 max.batch.size。这些批次每隔 poll.interval.ms(默认为 500 毫秒)形成一次。用户可以增加这些值以获得更大、更有效的批次,但代价是更高的端到端延迟。请记住,ClickHouse 倾向于将批次大小设置为 至少 1000,以避免常见的错误,例如 太多分区。对于低吞吐量环境(< 100 行/秒),这种批处理并不像 ClickHouse 能够跟上合并那样重要。但是,用户应该避免以高插入速率进行小批处理。

    批处理也可以在接收器端配置。目前,这在 ClickHouse Connect 接收器中没有明确支持,但可以通过 Kafka Connect 框架配置 - 请参阅设置 consumer.override.max.poll.records。或者,用户可以配置 ClickHouse 异步插入 并允许 ClickHouse 进行批处理。在这种模式下,插入可以以小批次发送到 ClickHouse,ClickHouse 会在刷新之前对行进行批处理。请注意,在刷新期间,行将不可搜索。因此,这种方法 **不会** 帮助提高端到端延迟。

  • 用户应该注意 WAL 磁盘使用情况 以及 heartbeat.interval.ms 在监视更新次数较多的数据库中更改次数很少的表时的重要性。

  • 上述方法 **目前不支持 Postgres 主键更改**。为了实现这一点,用户需要检测来自 Debezium 的 op=delete 消息,这些消息没有 beforeafter 字段。然后应该使用 id 来删除 ClickHouse 中的这些行 - 最好使用 轻量级删除。这需要自定义代码,而不是使用 Kafka 接收器将数据发送到 ClickHouse。

  • 如果表的 Primary key 发生变化,用户可能需要创建一个新的 ClickHouse 表,并将新列作为 ORDER BY 子句的一部分。请注意,这也需要 对 Debezium 连接器执行的进程

  • 逻辑解码,Debezium 连接器依赖于它,不支持 DDL 更改。这意味着连接器无法将 DDL 更改事件报告回消费者。

  • 逻辑解码复制槽仅在主服务器上受支持。当有一个 PostgreSQL 服务器集群时,连接器只能在活动的主服务器上运行。它不能在热备份或温备份副本上运行。如果主服务器发生故障或降级,连接器将停止。主服务器恢复后,您可以重新启动连接器。如果其他 PostgreSQL 服务器已被提升为主服务器,请在重新启动连接器之前调整连接器配置。

  • 虽然 Kafka 接收器可以安全地扩展以使用更多工作器(假设相同 Postgres 行的事件被哈希到同一个分区),但 Debezium 连接器只允许 单个任务。上面提出的解决方案使用每个表一个连接器,允许解决方案在表级别进行扩展。

  • 记录的方法假设每个表一个连接器实例。我们目前不支持一个连接器监视多个表 - 虽然这可以通过主题路由实现,即消息被路由到特定于表的主题。此配置尚未经过测试。

结论

在这篇博文中,我们探讨了如何构建 CDC 管道以将更改从 Postgres 实时复制到 ClickHouse。我们讨论了 ReplacingMergeTree 如何成为此设计的核心,以及用户如何优化表设计和使用 FINAL 操作符在查询时进行重复数据删除。除了提供构建管道的说明,包括如何配置 Debezium 之外,我们还讨论了希望构建生产解决方案的用户需要考虑的其他事项。虽然本文中的详细信息特定于 Postgres,但由于其 DBMS 独立的消息格式,它们可能适用于 Debezium 支持的所有源数据库。我们把它留给读者作为练习,以探索其他数据库,并让我们知道你的进展!

分享此文章

订阅我们的时事通讯

及时了解功能发布、产品路线图、支持和云服务!
正在加载表格...
关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image
©2024ClickHouse, Inc. 总部位于加利福尼亚州湾区和荷兰阿姆斯特丹。