目录
对于那些正在寻找基于本文概念的完整示例配置的人,请参阅此处。感谢我们社区的 Leart Beqiraj 的贡献。
简介
继续我们关于为 Postgresql 构建变更捕获控制 (CDC) 管道到 ClickHouse 的系列文章,这篇文章重点介绍构建功能性管道所需的步骤和配置。为此,我们使用加载到 Postgres 和 ClickHouse 中的示例数据集。考虑到 Postgres 是真理来源,我们应用了插入、更新和删除的混合工作负载。使用由 Debezium、ClickHouse Kafka Connect 和物化视图构建的 CDC 管道,我们可以在近乎实时的将这些更改反映在 ClickHouse 的表中。
在我们的示例中,我们使用了 ClickHouse Cloud 集群中的开发实例和 AWS Aurora Postgres 实例。但是,这些示例应该可以在同等大小的自管理集群上重现。或者,立即启动您的 ClickHouse Cloud 集群,并获得 300 美元的信用额度。让我们担心基础设施,开始查询!
示例数据集
对于我们的示例数据集,我们使用了流行的英国房产价格数据集。它的大小适中(2800 万行),并且模式易于理解。每一行代表过去 20 年在英国的房屋销售,字段表示价格、日期和位置。有关字段的完整描述,请访问此处。我们将此数据集加载到 Postgres 和 ClickHouse 中,然后再对前者进行随机插入、更新和删除。这些更改应由 Debezium 捕获,并用于近乎实时地更新 ClickHouse。
Postgres 模式和数据加载
Postgres 模式如下所示。请注意使用 serial id
字段作为主键。虽然主键不是强制性的,但 Debezium 工作需要额外的 Postgres 配置。
CREATE TABLE uk_price_paid (
id serial,
price INTEGER,
date Date,
postcode1 varchar(8),
postcode2 varchar(3),
type varchar(13),
is_new SMALLINT,
duration varchar(9),
addr1 varchar(100),
addr2 varchar(100),
street varchar(60),
locality varchar(35),
town varchar(35),
district varchar(40),
county varchar(35),
primary key(id)
);
我们将此数据集作为与 Postgres 兼容的 SQL 分发,可随时插入,可从此处下载。加载数据需要几个简单的命令,假设 psql 客户端已使用环境变量配置
wget
https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/postgres/uk_prices.sql.tar.gz
tar -xzvf uk_prices.sql.tar.gz
psql < uk_prices.sql
INSERT 0 10000
INSERT 0 10000
INSERT 0 10000
…
postgres=> SELECT count(*) FROM uk_price_paid;
count
----------
27734966
(1 row)
注意:我们正在使用 AWS Aurora Postgres 14.7 版本实例,具有八个核心。此数据大约需要 10 分钟才能加载。
ClickHouse 模式
下面我们展示了 我们的文档中使用的模式的修改版本,使用了 ReplacingMergeTree。
CREATE TABLE default.uk_price_paid
(
`id` UInt64,
`price` UInt32,
`date` Date,
`postcode1` LowCardinality(String),
`postcode2` LowCardinality(String),
`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
`is_new` UInt8,
`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
`addr1` String,
`addr2` String,
`street` LowCardinality(String),
`locality` LowCardinality(String),
`town` LowCardinality(String),
`district` LowCardinality(String),
`county` LowCardinality(String),
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)
虽然上面我们将较早的模式迁移到 ClickHouse 类型以进行优化,但原始 Postgres 模式也可以由 ClickHouse 自动解释(除了 serial
类型)。例如,下面的 DDL 可用于创建使用 Postgres 类型的表 - 这些类型将自动转换为 ClickHouse 类型,如下所示。请注意,我们删除了主键并将 id
列的类型 serial
手动转换为 Uint64。
CREATE TABLE default.uk_price_paid
(
`id` UInt64,
`price` INTEGER,
`date` Date,
`postcode1` varchar(8),
`postcode2` varchar(3),
`type` varchar(13),
`is_new` SMALLINT,
`duration` varchar(9),
`addr1` varchar(100),
`addr2` varchar(100),
`street` varchar(60),
`locality` varchar(35),
`town` varchar(35),
`district` varchar(40),
`county` varchar(35),
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)
SHOW CREATE TABLE default.uk_price_paid
CREATE TABLE default.uk_price_paid
(
`id` UInt64,
`price` Int32,
`date` Date,
`postcode1` String,
`postcode2` String,
`type` String,
`is_new` Int16,
`duration` String,
`addr1` String,
`addr2` String,
`street` String,
`locality` String,
`town` String,
`district` String,
`county` String,
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}', version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)
SETTINGS index_granularity = 8192
考虑到我们在之前的博客中探讨的关于 ReplacingMergeTree 引擎的概念,这里有一些重要的注意事项
- 我们在表定义中使用了 version 和 deleted 列。这两个都是可选的。例如,如果您不需要支持删除,只需在模式和引擎定义中省略该列即可。
- 我们为
ORDER BY
子句选择了列,以优化查询访问模式。请注意id
列是如何最后指定的,因为我们不希望这些列用于分析查询,但仍然需要它提供的唯一性属性 - 特别是当我们的地址不超过街道级别时。 - 我们使用
PRIMARY KEY
子句指定主索引,省略id
列以节省内存,而不会影响我们常用的查询。
配置 CDC 管道
架构概述
我们在 之前的博客中介绍的端到端架构如下所示。
此架构假定用户拥有 Kafka 实例和 Kafka Connect 框架。在我们的示例中,我们假设用户正在使用 Confluent Cloud 来托管 Kafka,后者会自动为事件创建适当的主题。但是,也支持自管理的 Kafka 安装。提议的模式将适用于任何写入由 Debezium 生成的事件的摄取管道。有关安装 Debezium 的说明以及主题配置的注意事项,请访问此处。
正如我们在之前的文章中提到的,用户应确保按顺序交付每个唯一 Postgres 行的更改事件(如果要删除删除事件)。可以通过使用单个分区(Debezium 源默认)或在由于吞吐量要求而需要多个分区的情况下使用基于哈希的分区来保证这一点。虽然后一种情况应该很少见,但这涉及到通过哈希其主键来确保特定 Postgres 行的所有更改事件都发送到同一分区。
Kafka 中的日志压缩可用于确保仅保留行的最后一个事件,从而最大限度地减少 Kafka 存储大小。这需要 tombstone 事件在发生删除时在 Debezium 中省略。为了简化我们的管道,我们禁用了这些。如果用户需要此高级功能,则应在他们的 Kafka Sink 中删除这些。
配置 Postgresql
PostgreSQL 连接器可以与独立的 PostgreSQL 服务器或 PostgreSQL 服务器集群一起使用,但仅在主服务器上受支持 - Debezium 连接器无法连接到副本实例。
请注意 Debezium 文档中的以下内容
“如果主服务器发生故障或被降级,连接器将停止。在主服务器恢复后,您可以重新启动连接器。如果另一个 PostgreSQL 服务器已提升为主服务器,请在重新启动连接器之前调整连接器配置。”
确保 Postgres 实例已正确配置
正如我们在之前的文章中提到的,我们假设使用输出插件 pgoutput
将数据从 WAL 内部表示形式转换为 Debezium 可以使用的格式。因此,我们下面的示例使用逻辑复制流模式 pgoutput
。这是内置在 PostgreSQL 10+ 中的。早期版本的用户可以探索使用 decoderbufs 插件(由 Debezium 社区维护)或 wal2json。我们尚未测试这些配置。
我们建议用户阅读以下关于用户安全和配置的部分
-
设置基本权限 - 在下面的示例中,我们使用
postgres
超级用户。不建议在生产部署中使用。 -
创建发布所需的权限 - Debezium 从为表创建的发布中流式传输 PostgreSQL 源表的更改事件。发布包含从一个或多个表生成的已过滤更改事件集。每个发布中的数据都根据发布规范进行过滤。我们假设 Debezium 配置了足够的权限来创建这些发布。默认情况下,
postgres
超级用户有权执行此操作。但是,对于生产用例,我们建议用户自己创建这些发布,或者最小化分配给用于创建它们的连接器的 Debezium 用户的权限。 -
权限以允许与 Debezium 连接器主机进行复制
配置副本标识
Debezium 发送的消息内容取决于您如何为源目标配置 REPLICA IDENTITY
。REPLICA IDENTITY
是 PostgreSQL 特有的表级设置,它确定逻辑解码插件可用于 UPDATE 和 DELETE 事件的信息量。更具体地说,此设置控制在发生 UPDATE 或 DELETE 事件时,表列的先前值(如果有)的信息的可用性。
虽然支持四个不同的值,但我们根据您是否需要删除支持来推荐以下值
DEFAULT
- 默认行为是,如果表具有主键,则更新和删除事件包含表主键列的先前值。对于 UPDATE 事件,仅存在具有更改值的主键列。如果表没有主键,则连接器不会为该表发出 UPDATE 或 DELETE 事件。仅在以下情况下使用此值- 您的 ClickHouse
ORDER BY
子句仅包含 Postgres 主键列。这不太可能,因为通常用户会将列添加到ORDER BY
以优化聚合查询,而聚合查询不太可能是 Postgres 中的主键。 - 您不需要删除支持。注意:下面使用的配置不需要更新的先前列值。删除需要它们,因为新状态为空。
- 您的 ClickHouse
FULL
- 更新和删除操作发出的事件包含表中所有列的先前值。如果您需要支持删除操作,则需要此项。
使用 ALTER
命令设置此设置。
ALTER TABLE uk_price_paid REPLICA IDENTITY FULL;
本文的其余部分假设用户需要支持删除。如果不需要删除支持的情况下的步骤有所不同,我们将提供替代配置的参考。
准备 ClickHouse
初始数据加载
在处理更改事件流之前,我们需要预加载 ClickHouse 表以确保其与 Postgres 一致。这可以通过几种方式完成,包括但不限于
- 使用 postgres 表函数使用
INSERT INTO SELECT
直接从我们的 Postgres 实例加载数据集。这提供了一种快速简便的方式来加载我们的数据集,但要求我们暂停 Postgres 实例上的更改。这在生产场景中可能不可用。 - 使用为下载提供的 Postgres 导出。这可以转换为 ClickHouse 支持的格式并加载。对于大多数大型数据集来说,这是不现实的。
- 配置 Debezium 以在首次启动时执行一致的快照。快照完成后,它会继续从创建快照的确切时间点开始流式传输更改。这允许连接器以所有数据的一致视图开始,并且不会遗漏在拍摄快照时所做的任何更改。有关如何实现此目的的完整过程,请访问此处。此过程的结果是类似于在插入行时发送的读取事件流。虽然这些可以通过我们下面的物化视图管道处理,但就吞吐量而言,该过程往往不是最优的。
为了速度和简单性,我们使用上面的选项一。请注意,我们的 INSERT INTO SELECT
语句将 version 和 deleted 列分别设置为值 1 和 0。
INSERT INTO uk_price_paid SELECT
id,
price,
date,
postcode1,
postcode2,
type,
is_new,
duration,
addr1,
addr2,
street,
locality,
town,
district,
county,
1 AS version,
0 AS deleted
FROM postgresql('<host>', '<database>', '<table>', '<user>', '<password>')
0 rows in set. Elapsed: 80.885 sec. Processed 27.73 million rows, 5.63 GB (342.89 thousand rows/s., 69.60 MB/s.)
在没有任何调整的情况下,我们能够在 80 秒内加载所有 2800 万行。
物化视图
Debezium 使用嵌套的 JSON 格式发送消息。如果配置正确(见下文),并将 REPLICA IDENTITY
设置为 FULL
,则更改事件将包括行的列的前后值,作为嵌套的 JSON。这些消息的完整示例可以在此处找到,包括当不需要删除支持时,REPLICA IDENTITY
设置为 DEFAULT
的情况。
例如,我们显示以下更新消息 (REPLICA IDENTITY=Full
)。
{
"before": {
"id": 50658675,
"price": 227500,
"date": 11905,
"postcode1": "SP2",
"postcode2": "7EN",
"type": "detached",
"is_new": 0,
"duration": "freehold",
"addr1": "31",
"addr2": "",
"street": "CHRISTIE MILLER ROAD",
"locality": "SALISBURY",
"town": "SALISBURY",
"district": "SALISBURY",
"county": "WILTSHIRE"
},
"after": {
"id": 50658675,
"price": 227500,
"date": 11905,
"postcode1": "SP2",
"postcode2": "7EN",
"type": "terraced",
"is_new": 0,
"duration": "freehold",
"addr1": "31",
"addr2": "",
"street": "CHRISTIE MILLER ROAD",
"locality": "SALISBURY",
"town": "SALISBURY",
"district": "SALISBURY",
"county": "WILTSHIRE"
},
"source": {
"version": "1.9.6.Final",
"connector": "postgresql",
"name": "postgres_server",
"ts_ms": 1685378780355,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"247833040488\",\"247833042536\"]",
"schema": "public",
"table": "uk_price_paid",
"txId": 106940,
"lsn": 247833042536,
"xmin": null
},
"op": "u",
"ts_ms": 1685378780514,
"transaction": null
}
此处的 op
字段指示操作,值 u
、d
和 c
分别表示更新、删除和插入操作。source.lsn
字段提供我们的版本值。对于删除事件,after
字段为空。相反,对于插入事件,before
字段为空。
此消息格式与我们在 ClickHouse 中的目标表 uk_price_paid
不兼容。我们可以使用物化视图在插入时转换这些消息。我们在下面展示这一点
CREATE MATERIALIZED VIEW uk_price_paid_mv TO uk_price_paid
(
`id` Nullable(UInt64),
`price` Nullable(UInt32),
`date` Nullable(Date),
`postcode1` Nullable(String),
`postcode2` Nullable(String),
`type` Nullable(Enum8('other'=0, 'terraced'=1, 'semi-detached'=2, 'detached'=3, 'flat'=4)),
`is_new` Nullable(UInt8),
`duration` Nullable(Enum8('unknown'=0, 'freehold'=1, 'leasehold'=2)),
`addr1` Nullable(String),
`addr2` Nullable(String),
`street` Nullable(String),
`locality` Nullable(String),
`town` Nullable(String),
`district` Nullable(String),
`county` Nullable(String),
`version` UInt64,
`deleted` UInt8
) AS
SELECT
if(op = 'd', before.id, after.id) AS id,
if(op = 'd', before.price, after.price) AS price,
if(op = 'd', toDate(before.date), toDate(after.date)) AS date,
if(op = 'd', before.postcode1, after.postcode1) AS postcode1,
if(op = 'd', before.postcode2, after.postcode2) AS postcode2,
if(op = 'd', before.type, after.type) AS type,
if(op = 'd', before.is_new, after.is_new) AS is_new,
if(op = 'd', before.duration, after.duration) AS duration,
if(op = 'd', before.addr1, after.addr1) AS addr1,
if(op = 'd', before.addr2, after.addr2) AS addr2,
if(op = 'd', before.street, after.street) AS street,
if(op = 'd', before.locality, after.locality) AS locality,
if(op = 'd', before.town, after.town) AS town,
if(op = 'd', before.district, after.district) AS district,
if(op = 'd', before.county, after.county) AS county,
if(op = 'd', source.lsn, source.lsn) AS version,
if(op = 'd', 1, 0) AS deleted
FROM default.uk_price_paid_changes
WHERE (op = 'c') OR (op = 'r') OR (op = 'u') OR (op = 'd')
请注意,我们的物化视图在此处根据操作为每一列选择适当的值。version
基于 source.lsn
列,如果 op
列具有 d
值,则我们将 deleted
列设置为 1,否则设置为 0。op = r
值允许我们在需要时也支持快照事件。如果不需要删除支持,则可以简化此物化视图。
熟悉 ClickHouse 的读者会注意到,此视图将行插入到我们的 uk_price_paid
中,并从 uk_price_paid_changes
表中选择行。后一个表将接收来自我们的 Kafka sink 的行插入。此表的模式必须与前面显示的 Debezium 消息对齐。我们在下面显示此表的模式
CREATE TABLE uk_price_paid_changes
(
`before.id` Nullable(UInt64),
`before.price` Nullable(UInt32),
`before.date` Nullable(UInt32),
`before.postcode1` Nullable(String),
`before.postcode2` Nullable(String),
`before.type` Nullable(Enum8('other'=0,'terraced'=1,'semi-detached'=2,'detached'=3,'flat'=4)),
`before.is_new` Nullable(UInt8),
`before.duration` Nullable(Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2)),
`before.addr1` Nullable(String),
`before.addr2` Nullable(String),
`before.street` Nullable(String),
`before.locality` Nullable(String),
`before.town` Nullable(String),
`before.district` Nullable(String),
`before.county` Nullable(String),
`after.id` Nullable(UInt64),
`after.price` Nullable(UInt32),
`after.date` Nullable(UInt32),
`after.postcode1` Nullable(String),
`after.postcode2` Nullable(String),
`after.type` Nullable(Enum8('other'=0,'terraced'=1,'semi-detached'=2,'detached'=3,'flat'=4)),
`after.is_new` Nullable(UInt8),
`after.duration` Nullable(Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2)),
`after.addr1` Nullable(String),
`after.addr2` Nullable(String),
`after.street` Nullable(String),
`after.locality` Nullable(String),
`after.town` Nullable(String),
`after.district` Nullable(String),
`after.county` Nullable(String),
`op` LowCardinality(String),
`ts_ms` UInt64,
`source.sequence` String,
`source.lsn` UInt64
)
ENGINE = MergeTree
ORDER BY tuple()
出于调试目的,我们为此表使用了 MergeTree
引擎。在生产场景中,这可能是 Null 引擎 - 更改将不会被持久化,但转换后的行仍将发送到目标表 uk_price_paid
。如果不需要删除支持,并且 REPLICA IDENTITY
设置为 DEFAULT
,则可以使用更简单的表。
精明的读者会注意到我们的模式已从 Debezium 发送的嵌套消息中扁平化。我们将在 Debezium 连接器中执行此操作。从模式的角度来看,这更简单,并允许我们配置 Nullable
值。涉及 Tuple
的替代方案更复杂。
配置 Debezium
在 Kafka connect 框架中部署连接器需要以下设置。请注意,我们假设消息以 没有模式的 JSON 形式发送
value.converter
-org.apache.kafka.connect.json.JsonConverter
key.converter
-org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable
-false
value.converter.schemas.enable
-false
decimal.format
- 控制此转换器将以哪种格式序列化十进制数。此值不区分大小写,可以是BASE64
(默认)或NUMERIC
。应将其设置为BASE64
。有关十进制处理的更多详细信息,请访问此处。
此管道所需的完整配置设置列表,例如数据库连接详细信息,可以在此处找到。下面我们重点介绍那些对消息格式影响最大的设置。 重要提示:我们将连接器配置为跟踪每个表级别的更改
- plugin.name - PostgreSQL 服务器上安装的 PostgreSQL 逻辑解码插件的名称。我们建议使用
pgoutput
。 - slot.name - 逻辑解码槽名称。它对于数据库和模式必须是唯一的。如果仅复制一个,请使用
debezium
。 - publication.name - 使用
pgoutput
时为流式传输更改而创建的 PostgreSQL 发布的名称。如果您已将 Postgres 用户配置为具有足够的权限,则将在启动时创建此发布。或者,可以预先创建它。可以使用dbz_publication
默认值。 - table.include.list - 可选的逗号分隔的正则表达式列表,用于匹配您要捕获其更改的表的完全限定表标识符。确保格式为
<schema_name>.<table_name>
。对于我们的示例,我们使用public.uk_price_paid
。 此参数中只能指定一个表。 - tombstones.on.delete - 控制删除事件之后是否跟随 tombstone 事件。设置为
false
。如果您有兴趣使用 日志压缩,则可以将其设置为true
- 这需要您在 ClickHouse Sink 中删除这些 tombstone。 - publication.autocreate.mode - 设置为
filtered
。这将导致仅为属性table.include.list
中的表创建发布。更多详细信息请访问此处。 - snapshot.mode - 我们为快照模式使用
never
- 因为我们使用postgres
函数加载了初始数据。如果用户无法暂停对 Postgres 实例的更改,则可以使用initial
模式。 - decimal.handling.mode - 指定连接器应如何处理
DECIMAL
和NUMERIC
列的值。默认值precise
将以二进制形式(即java.math.BigDecimal
)编码这些值。与上面的decimal.format
设置结合使用,这将导致这些值在 JSON 中以数字形式输出。用户可能希望根据所需的精度进行调整。
以下设置将影响 Postgres 中的更改与其到达 ClickHouse 的时间之间的延迟。在所需 SLA 和 ClickHouse 的高效批处理的上下文中考虑这些 - 请参阅其他注意事项。
- max.batch.size - 每个事件批次的最大大小。
- max.queue.size - 在将事件发送到 Kafka 之前的队列大小。允许反压。它应该大于批次大小。
- poll.interval.ms - 正整数值,指定连接器在开始处理一批事件之前应等待新更改事件出现多少毫秒。默认为 500 毫秒。
Confluent 为那些使用 Confluent Kafka 或 Cloud 部署的用户提供了其他文档。可以在 Confluent Cloud 中配置 Debezium 连接器,如下所示。此连接器将在收到消息时自动创建 Kafka 主题。
请注意,上面我们将 after.state.only
属性设置为 false
。此设置似乎特定于 Confluent Cloud,必须设置为 false
,以确保提供行的先前值以及 LSN 编号。
我们还使用 Kafka connect 的 SMT 功能来扁平化消息并将 Kafka 主题设置为 uk_price_paid_changes
。这可以通过自管理配置来实现。更多详细信息请访问[1][2]。
在上面的示例中,我们假设目标主题只有一个分区 - 这是由 Debezium 自动创建的。如前所述,多个分区需要使用基于哈希的路由,以确保将同一 Postgres 行的事件传递到同一分区 - 从而确保下游的按顺序传递。这超出了本博客的范围,需要进一步测试。
上面相关的 JSON 配置可以在此处找到,并且可以与官方文档记录的步骤一起使用。自管理安装说明可以在此处找到。
配置 Kafka Connect Sink
我们可以使用 ClickHouse Kafka Connect sink 从 Kafka 读取更改事件消息并将它们发送到 ClickHouse。这假设用户正在运行 Kafka Connect 框架。许多 Kafka Connect Sink 可以与 ClickHouse 一起使用,包括 Confluent HTTP Connector。但是,对于我们的用例,我们选择使用官方的ClickHouse 的 Kafka Connect Sink。虽然 ReplacingMergeTree 的属性只需要至少一次语义,但这提供了精确一次语义,现在可以使用 “自定义连接器” 产品在 Confluent Cloud 中部署。
重要的是,Kafka Connect sink 保证消息按分区顺序交付。这是由以下因素保证的
- Kafka Connect 框架仅将一项任务分配给任何给定的分区 - 尽管一项任务可能会从多个分区中使用。
- 在插入时,ClickHouse Kafka Connect sink 在插入之前按主题和分区对行进行分组。在消耗另一个批次之前,会确认批次的插入。
这允许扩展主题的分区数,同时仍然满足我们对与特定 Postgres 行相关的任何更改的按顺序交付的要求,假设我们可以保证来自 Debezium 连接器的任何行的事件都通过哈希发送到同一分区。
下面展示了如何在 Confluent Cloud 中配置 ClickHouse Kafka Connect Sink。请注意,我们首先上传连接器包使其可用。可以从此处下载。我们假设用户已配置 Debezium 连接器以将数据发送到主题 uk_price_paid_changes
。
上述配置的 JSON 表示形式可以在此处找到。
除了基于 Kafka Connect 的方法之外,还存在其他替代方案,例如,用户可以使用 Vector、Kafka 表引擎,或 Confluent Cloud 提供的 HTTP 连接器。
测试
为了确认我们的管道正常工作,我们提供了一个脚本,该脚本对 Postgres 表进行随机更改 - 添加、更新和删除行。具体而言,关于更新,此脚本更改随机行的 type
、price
和 is_new
列。完整的代码和依赖项可以在此处找到。
export PGDATABASE=<database>
export PGUSER=postgres
export PGPASSWORD=<password>
export PGHOST=<host>
export PGPORT=5432
pip3 install -r requirements.txt
python randomize.py --iterations 1 --weights "0.4,0.4,0.2" --delay 0.5
请注意,weights
参数和值 0.4,0.4,0.2
表示创建、更新和删除的比率。delay
参数设置每次操作之间的时间延迟(默认为 0.5 秒)。iterations
设置要对表进行的更改总数。在上面的示例中,我们修改了 1000 行。
脚本完成后,我们可以针对 Postgres 和 ClickHouse 运行以下查询,以确认一致性。显示的响应可能与您的值不同,因为更改是随机的。但是,来自两个数据库的值应相同。为了简单起见,我们使用 FINAL
。
相同的行数
-- Postgres
postgres=> SELECT count(*) FROM uk_price_paid;
count
----------
27735027
(1 row)
-- ClickHouse
SELECT count()
FROM uk_price_paid
FINAL
┌──count()─┐
│ 27735027 │
└──────────┘
相同的价格统计信息
-- Postgres
postgres=> SELECT sum(price) FROM uk_price_paid;
sum
---------------
5945061701495
(1 row)
-- ClickHouse
SELECT sum(price)
FROM uk_price_paid
FINAL
┌────sum(price)─┐
│ 5945061701495 │
└───────────────┘
相同的房价分布
postgres=> SELECT type, count(*) c FROM uk_price_paid GROUP BY type;
type | c
---------------+---------
detached | 6399743
flat | 4981171
other | 419212
semi-detached | 7597039
terraced | 8337862
(5 rows)
-- ClickHouse
SELECT
type,
count() AS c
FROM uk_price_paid
FINAL
GROUP BY type
┌─type──────────┬───────c─┐
│ other │ 419212 │
│ terraced │ 8337862 │
│ semi-detached │ 7597039 │
│ detached │ 6399743 │
│ flat │ 4981171 │
└───────────────┴─────────┘
其他注意事项
使用 Debezium for ClickHouse 和 Postgres 运行 CDC 管道时,还有其他一些注意事项
-
Debezium 连接器将在可能的情况下批量处理行更改,最大批量大小为
max.batch.size
。这些批次在每个轮询间隔poll.interval.ms
(默认为 500 毫秒)形成。用户可以增加这些值以获得更大、更高效的批次,但会以更高的端到端延迟为代价。请记住,ClickHouse 更喜欢 至少 1000 个批次,以避免常见问题,例如零件过多。对于低吞吐量环境(每秒 <100 行),这种批处理并不像 ClickHouse 跟上合并那样关键。但是,用户应避免以高插入速率进行小批量处理。批处理也可以在 Sink 端配置。ClickHouse Connect Sink 目前不支持显式配置批处理,但可以通过 Kafka connect 框架进行配置 - 请参阅设置
consumer.override.max.poll.records
。或者,用户可以配置 ClickHouse 异步插入并允许 ClickHouse 进行批处理。在这种模式下,可以将小批量插入发送到 ClickHouse,ClickHouse 将在刷新之前批量处理行。请注意,在刷新时,行将不可搜索。因此,这种方法**无助于**缩短端到端延迟。 -
用户应注意 WAL 磁盘使用量 以及
heartbeat.interval.ms
在监视更改很少的表但在更新频繁的数据库中的重要性。 -
上述方法**目前不支持 Postgres 主键更改**。要实现这一点,用户需要检测来自 Debezium 的
op=delete
消息,这些消息没有before
或after
字段。然后应使用id
在 ClickHouse 中删除这些行 - 最好使用 轻量级删除。这需要自定义代码,而不是使用 Kafka sink 将数据发送到 ClickHouse。 -
如果表的主键发生更改,用户可能需要创建一个新的 ClickHouse 表,并将新列作为
ORDER BY
子句的一部分。请注意,这也需要对 Debezium 连接器执行 进程。 -
Debezium 连接器依赖的逻辑解码不支持 DDL 更改。这意味着连接器无法将 DDL 更改事件报告回消费者。
-
逻辑解码复制槽仅在主服务器上受支持。当存在 PostgreSQL 服务器集群时,连接器只能在活动的主服务器上运行。它不能在热备或温备副本上运行。如果主服务器发生故障或降级,连接器将停止。主服务器恢复后,您可以重新启动连接器。如果不同的 PostgreSQL 服务器已升级为主服务器,请在重新启动连接器之前调整连接器配置。
-
虽然 Kafka Sink 可以安全地扩展以使用更多工作线程(假设同一 Postgres 行的事件哈希到同一分区),但 Debezium 连接器仅允许 单个任务。上面提出的解决方案为每个表使用一个连接器,从而允许在表级别扩展解决方案。
-
记录的方法假定每个表有一个连接器实例。我们目前不支持监控多个表的连接器 - 尽管这可以通过主题路由来实现,即消息被路由到特定于表的主题。此配置尚未经过测试。
结论
在这篇博文中,我们探讨了如何构建 CDC 管道,以近乎实时地将更改从 Postgres 复制到 ClickHouse。我们讨论了 ReplacingMergeTree 对于此设计的基础作用,以及用户如何优化表设计并使用 FINAL 运算符进行查询时重复数据删除。除了提供构建管道的说明(包括如何配置 Debezium)外,我们还讨论了希望构建生产解决方案的用户的其他注意事项。虽然这篇博文中的详细信息特定于 Postgres,但由于 Debezium 与 DBMS 无关的消息格式,它们也可能适用于 Debezium 支持的所有源数据库。我们将其留给读者作为练习,以探索其他数据库,并告知我们您的进展!