目录
对于那些正在寻找基于本文概念的完整示例配置的人,请参阅此处。感谢我们社区的 Leart Beqiraj 的贡献。
简介
继续我们关于为 Postgresql 构建变更捕获控制 (CDC) 管道到 ClickHouse 的系列文章,这篇文章重点介绍构建功能性管道所需的步骤和配置。为此,我们使用加载到 Postgres 和 ClickHouse 中的示例数据集。考虑到 Postgres 是真理来源,我们应用了插入、更新和删除的混合工作负载。使用由 Debezium、ClickHouse Kafka Connect 和物化视图构建的 CDC 管道,我们可以在 ClickHouse 的表中近乎实时地反映这些更改。
在我们的示例中,我们使用 ClickHouse Cloud 集群中的开发实例和 Postgres 的 AWS Aurora 实例。但是,这些示例应该可以在同等大小的自托管集群上重现。或者,立即启动您的 ClickHouse Cloud 集群,并获得 300 美元的信用额度。让我们来担心基础设施,开始查询吧!
示例数据集
对于我们的示例数据集,我们使用流行的英国房产价格数据集。它规模适中(2800 万行),模式易于理解。每一行代表过去 20 年在英国的一次房屋销售,字段代表价格、日期和位置。字段的完整描述可以在此处找到。我们将把此数据集加载到 Postgres 和 ClickHouse 中,然后再对前者进行随机插入、更新和删除。这些更改应由 Debezium 捕获,并用于近乎实时地更新 ClickHouse。
Postgres 模式和数据加载
Postgres 模式如下所示。请注意使用 serial id
字段作为主键。虽然主键不是强制性的,但 Debezium 工作需要额外的 Postgres 配置。
CREATE TABLE uk_price_paid (
id serial,
price INTEGER,
date Date,
postcode1 varchar(8),
postcode2 varchar(3),
type varchar(13),
is_new SMALLINT,
duration varchar(9),
addr1 varchar(100),
addr2 varchar(100),
street varchar(60),
locality varchar(35),
town varchar(35),
district varchar(40),
county varchar(35),
primary key(id)
);
我们将此数据集作为与 Postgres 兼容的 SQL 分发,可从此处下载以进行插入。加载数据需要一些简单的命令,假设 psql 客户端已使用环境变量进行配置
wget
https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/postgres/uk_prices.sql.tar.gz
tar -xzvf uk_prices.sql.tar.gz
psql < uk_prices.sql
INSERT 0 10000
INSERT 0 10000
INSERT 0 10000
…
postgres=> SELECT count(*) FROM uk_price_paid;
count
----------
27734966
(1 row)
注意:我们正在使用具有八个内核的 Postgres 版本 14.7 的 AWS Aurora 实例。此数据大约需要 10 分钟才能加载。
ClickHouse 模式
下面我们展示了我们的文档中使用的模式的修改版本,使用了 ReplacingMergeTree。
CREATE TABLE default.uk_price_paid
(
`id` UInt64,
`price` UInt32,
`date` Date,
`postcode1` LowCardinality(String),
`postcode2` LowCardinality(String),
`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
`is_new` UInt8,
`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
`addr1` String,
`addr2` String,
`street` LowCardinality(String),
`locality` LowCardinality(String),
`town` LowCardinality(String),
`district` LowCardinality(String),
`county` LowCardinality(String),
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)
虽然上面,我们为了优化目的将早期模式迁移到 ClickHouse 类型,但原始 Postgres 模式也可以由 ClickHouse 自动解释(除了 serial
类型)。例如,下面的 DDL 可用于创建使用 Postgres 类型的表 - 这些类型将自动转换为 ClickHouse 类型,如下所示。请注意,我们删除了主键并将 id
列的类型 serial
手动转换为 Uint64。
CREATE TABLE default.uk_price_paid
(
`id` UInt64,
`price` INTEGER,
`date` Date,
`postcode1` varchar(8),
`postcode2` varchar(3),
`type` varchar(13),
`is_new` SMALLINT,
`duration` varchar(9),
`addr1` varchar(100),
`addr2` varchar(100),
`street` varchar(60),
`locality` varchar(35),
`town` varchar(35),
`district` varchar(40),
`county` varchar(35),
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)
SHOW CREATE TABLE default.uk_price_paid
CREATE TABLE default.uk_price_paid
(
`id` UInt64,
`price` Int32,
`date` Date,
`postcode1` String,
`postcode2` String,
`type` String,
`is_new` Int16,
`duration` String,
`addr1` String,
`addr2` String,
`street` String,
`locality` String,
`town` String,
`district` String,
`county` String,
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}', version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)
SETTINGS index_granularity = 8192
考虑到我们之前的博客中探讨的关于 ReplacingMergeTree 引擎的概念,这里有一些重要的注意事项
- 我们在表定义中使用了 version 和 deleted 列。这两者都是可选的。例如,如果您不需要支持删除,只需在模式和引擎定义中省略该列即可。
- 我们为
ORDER BY
子句选择了列,以优化查询访问模式。请注意id
列是如何最后指定的,因为我们不希望在分析查询中使用这些列,但仍然需要它提供的唯一性属性 - 特别是当我们的地址不超过街道级别时。 - 我们使用
PRIMARY KEY
子句指定主索引,省略id
列以节省内存,而对我们的常用查询没有影响。
配置 CDC 管道
架构概述
我们在之前的博客中介绍的端到端架构如下所示。
此架构假设用户拥有 Kafka 实例和 Kafka Connect 框架。对于我们的示例,我们假设用户正在使用 Confluent Cloud 托管 Kafka,它会自动为事件创建适当的主题。但是,也支持自托管 Kafka 安装。建议的模式将适用于任何写入 Debezium 生成的事件的摄取管道。有关安装 Debezium 的说明以及主题配置的注意事项,请参见此处。
正如我们之前的文章中指出的那样,用户应确保更改事件按顺序传递给每个唯一的 Postgres 行(如果要删除删除事件)。这可以通过使用单个分区(Debezium 源默认设置)或在由于吞吐量要求而需要多个分区的情况下使用基于哈希的分区来保证。虽然后一种情况应该很少见,但这涉及到确保特定 Postgres 行的所有更改事件都通过哈希其主键发送到同一分区。
Kafka 中的日志压缩可用于确保仅保留行的最后一个事件,从而最大限度地减少 Kafka 存储大小。这需要省略 tombstone 事件在发生删除时在 Debezium 中。为了简化我们的管道,我们禁用了这些。如果用户需要此高级功能,则应在其Kafka Sink 中删除这些 tombstone 事件。
配置 Postgresql
PostgreSQL 连接器可以与独立的 PostgreSQL 服务器或 PostgreSQL 服务器集群一起使用,但仅在主服务器上受支持 - Debezium 连接器无法连接到副本实例。
请注意 Debezium 文档中的以下内容
“如果主服务器发生故障或被降级,连接器将停止。在主服务器恢复后,您可以重新启动连接器。如果不同的 PostgreSQL 服务器已升级为主服务器,请在重新启动连接器之前调整连接器配置。”
确保 Postgres 实例已正确配置
正如我们之前的文章中提到的,我们假设使用输出插件 pgoutput
将数据从 WAL 内部表示形式转换为 Debezium 可以使用的格式。因此,我们下面的示例使用逻辑复制流模式 pgoutput
。这是内置于 PostgreSQL 10+ 中的。早期版本的用户可以探索使用 decoderbufs 插件(由 Debezium 社区维护)或 wal2json。我们尚未测试这些配置。
我们建议用户阅读以下关于安全性和用户配置的部分
-
设置基本权限 - 在下面的示例中,我们使用
postgres
超级用户。不建议在生产部署中使用此方法。 -
创建发布权限 - Debezium 从为表创建的发布中流式传输 PostgreSQL 源表的更改事件。发布包含从一个或多个表生成的已过滤更改事件集。每个发布中的数据都根据发布规范进行过滤。我们假设 Debezium 配置了足够的权限来创建这些发布。默认情况下,
postgres
超级用户有权执行此操作。但是,对于生产用例,我们建议用户自己创建这些发布,或者最小化分配给用于创建它们的连接器的 Debezium 用户的权限。 -
权限以允许与 Debezium 连接器主机进行复制
配置副本身份
Debezium 发送的消息内容取决于您如何为源目标配置 REPLICA IDENTITY
。REPLICA IDENTITY
是 PostgreSQL 特有的表级设置,它确定逻辑解码插件可用于 UPDATE 和 DELETE 事件的信息量。更具体地说,此设置控制在发生 UPDATE 或 DELETE 事件时,表列的先前值(如果有)可用的信息。
虽然支持四个不同的值,但我们根据您是否需要删除支持来推荐以下值
DEFAULT
- 默认行为是,如果表具有主键,则更新和删除事件包含表的主键列的先前值。对于 UPDATE 事件,仅存在具有更改值的主键列。如果表没有主键,则连接器不会为该表发出 UPDATE 或 DELETE 事件。仅在以下情况下使用此值- 您的 ClickHouse
ORDER BY
子句仅包含 Postgres 主键列。这不太可能,因为通常用户会将列添加到ORDER BY
以优化聚合查询,而这些聚合查询不太可能是 Postgres 中的主键。 - 您不需要删除支持。注意:下面使用的配置不需要更新的先前列值。删除需要它们,因为新状态为空。
- 您的 ClickHouse
FULL
- 更新和删除操作的已发出事件包含表中所有列的先前值。如果您需要支持删除操作,则需要此设置。
使用 ALTER
命令设置此设置。
ALTER TABLE uk_price_paid REPLICA IDENTITY FULL;
这篇文章的其余部分假设用户需要支持删除。如果不需要删除支持的情况下的步骤有所不同,我们将提供替代配置的参考。
准备 ClickHouse
初始数据加载
在处理更改事件流之前,我们需要预加载我们的 ClickHouse 表,以确保它与 Postgres 一致。这可以通过几种方式完成,包括但不限于
- 使用postgres 表函数使用
INSERT INTO SELECT
直接从我们的 Postgres 实例加载数据集。这提供了一种快速简便的方式来加载我们的数据集,但需要我们暂停 Postgres 实例上的更改。这在生产场景中可能不可用。 - 使用为下载提供的 Postgres 导出。这可以转换为 ClickHouse 支持的格式并加载。这对于大多数大型数据集是不现实的。
- 配置 Debezium 以在首次启动时执行一致的快照。快照完成后,它将继续从创建快照的确切时间点开始流式传输更改。这允许连接器以所有数据的一致视图开始,而不会遗漏在快照拍摄期间所做的任何更改。有关如何实现此目的的完整过程,请参见此处。此过程的结果是类似于在插入行时发送的读取事件流。虽然这些事件可以由我们下面的物化视图管道处理,但就吞吐量而言,该过程往往不是最佳的。
为了速度和简单性,我们使用上面的选项一。请注意,我们的 INSERT INTO SELECT
语句将 version 和 deleted 列设置为值 1 和 0。
INSERT INTO uk_price_paid SELECT
id,
price,
date,
postcode1,
postcode2,
type,
is_new,
duration,
addr1,
addr2,
street,
locality,
town,
district,
county,
1 AS version,
0 AS deleted
FROM postgresql('<host>', '<database>', '<table>', '<user>', '<password>')
0 rows in set. Elapsed: 80.885 sec. Processed 27.73 million rows, 5.63 GB (342.89 thousand rows/s., 69.60 MB/s.)
在没有调优的情况下,我们能够在 80 秒内加载所有 2800 万行。
物化视图
Debezium 使用嵌套 JSON 格式发送消息。如果配置得当(见下文),并将 REPLICA IDENTITY
设置为 FULL
,则更改事件将包括行的列的前后值作为嵌套 JSON。这些消息的完整示例可以在此处找到,包括当不需要删除支持时 REPLICA IDENTITY
设置为 DEFAULT
的情况。
作为一个示例,我们在下面展示了一个更新消息(REPLICA IDENTITY=Full
)。
{
"before": {
"id": 50658675,
"price": 227500,
"date": 11905,
"postcode1": "SP2",
"postcode2": "7EN",
"type": "detached",
"is_new": 0,
"duration": "freehold",
"addr1": "31",
"addr2": "",
"street": "CHRISTIE MILLER ROAD",
"locality": "SALISBURY",
"town": "SALISBURY",
"district": "SALISBURY",
"county": "WILTSHIRE"
},
"after": {
"id": 50658675,
"price": 227500,
"date": 11905,
"postcode1": "SP2",
"postcode2": "7EN",
"type": "terraced",
"is_new": 0,
"duration": "freehold",
"addr1": "31",
"addr2": "",
"street": "CHRISTIE MILLER ROAD",
"locality": "SALISBURY",
"town": "SALISBURY",
"district": "SALISBURY",
"county": "WILTSHIRE"
},
"source": {
"version": "1.9.6.Final",
"connector": "postgresql",
"name": "postgres_server",
"ts_ms": 1685378780355,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"247833040488\",\"247833042536\"]",
"schema": "public",
"table": "uk_price_paid",
"txId": 106940,
"lsn": 247833042536,
"xmin": null
},
"op": "u",
"ts_ms": 1685378780514,
"transaction": null
}
此处的 op
字段指示操作,值 u
、d
和 c
分别指示更新、删除和插入操作。source.lsn
字段提供我们的版本值。对于删除事件,after
字段为空。相反,对于插入事件,before
字段为空。
此消息格式与我们在 ClickHouse 中的目标表 uk_price_paid
不兼容。我们可以使用物化视图在插入时转换这些消息。我们在下面展示了这一点
CREATE MATERIALIZED VIEW uk_price_paid_mv TO uk_price_paid
(
`id` Nullable(UInt64),
`price` Nullable(UInt32),
`date` Nullable(Date),
`postcode1` Nullable(String),
`postcode2` Nullable(String),
`type` Nullable(Enum8('other'=0, 'terraced'=1, 'semi-detached'=2, 'detached'=3, 'flat'=4)),
`is_new` Nullable(UInt8),
`duration` Nullable(Enum8('unknown'=0, 'freehold'=1, 'leasehold'=2)),
`addr1` Nullable(String),
`addr2` Nullable(String),
`street` Nullable(String),
`locality` Nullable(String),
`town` Nullable(String),
`district` Nullable(String),
`county` Nullable(String),
`version` UInt64,
`deleted` UInt8
) AS
SELECT
if(op = 'd', before.id, after.id) AS id,
if(op = 'd', before.price, after.price) AS price,
if(op = 'd', toDate(before.date), toDate(after.date)) AS date,
if(op = 'd', before.postcode1, after.postcode1) AS postcode1,
if(op = 'd', before.postcode2, after.postcode2) AS postcode2,
if(op = 'd', before.type, after.type) AS type,
if(op = 'd', before.is_new, after.is_new) AS is_new,
if(op = 'd', before.duration, after.duration) AS duration,
if(op = 'd', before.addr1, after.addr1) AS addr1,
if(op = 'd', before.addr2, after.addr2) AS addr2,
if(op = 'd', before.street, after.street) AS street,
if(op = 'd', before.locality, after.locality) AS locality,
if(op = 'd', before.town, after.town) AS town,
if(op = 'd', before.district, after.district) AS district,
if(op = 'd', before.county, after.county) AS county,
if(op = 'd', source.lsn, source.lsn) AS version,
if(op = 'd', 1, 0) AS deleted
FROM default.uk_price_paid_changes
WHERE (op = 'c') OR (op = 'r') OR (op = 'u') OR (op = 'd')
请注意,我们的物化视图在此处根据操作为每一列选择适当的值。version
基于 source.lsn
列,如果 op
列的值为 d
,则我们将 deleted
列设置为 1,否则设置为 0。op = r
值允许我们在需要时也支持快照事件。如果不需要删除支持,则可以简化此物化视图。
熟悉 ClickHouse 的读者会注意到,此视图将行插入到我们的 uk_price_paid
中,并从 uk_price_paid_changes
表中选择行。后一个表将接收来自我们的 Kafka sink 的行插入。此表的模式必须与前面显示的 Debezium 消息对齐。我们在下面展示了此表的模式
CREATE TABLE uk_price_paid_changes
(
`before.id` Nullable(UInt64),
`before.price` Nullable(UInt32),
`before.date` Nullable(UInt32),
`before.postcode1` Nullable(String),
`before.postcode2` Nullable(String),
`before.type` Nullable(Enum8('other'=0,'terraced'=1,'semi-detached'=2,'detached'=3,'flat'=4)),
`before.is_new` Nullable(UInt8),
`before.duration` Nullable(Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2)),
`before.addr1` Nullable(String),
`before.addr2` Nullable(String),
`before.street` Nullable(String),
`before.locality` Nullable(String),
`before.town` Nullable(String),
`before.district` Nullable(String),
`before.county` Nullable(String),
`after.id` Nullable(UInt64),
`after.price` Nullable(UInt32),
`after.date` Nullable(UInt32),
`after.postcode1` Nullable(String),
`after.postcode2` Nullable(String),
`after.type` Nullable(Enum8('other'=0,'terraced'=1,'semi-detached'=2,'detached'=3,'flat'=4)),
`after.is_new` Nullable(UInt8),
`after.duration` Nullable(Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2)),
`after.addr1` Nullable(String),
`after.addr2` Nullable(String),
`after.street` Nullable(String),
`after.locality` Nullable(String),
`after.town` Nullable(String),
`after.district` Nullable(String),
`after.county` Nullable(String),
`op` LowCardinality(String),
`ts_ms` UInt64,
`source.sequence` String,
`source.lsn` UInt64
)
ENGINE = MergeTree
ORDER BY tuple()
为了调试目的,我们为此表使用了 MergeTree
引擎。在生产场景中,这可以是 Null 引擎 - 更改将不会持久化,但转换后的行仍将发送到目标表 uk_price_paid
。如果不需要删除支持,并且 REPLICA IDENTITY
设置为 DEFAULT
,则可以使用更简单的表。
细心的读者会注意到我们的模式是从 Debezium 发送的嵌套消息中展平的。我们将在我们的 Debezium 连接器中执行此操作。从模式的角度来看,这更简单,并且允许我们配置 Nullable
值。涉及 Tuple
的替代方案更复杂。
配置 Debezium
在 Kafka Connect 框架中部署连接器需要以下设置。请注意,我们假设消息以没有模式的 JSON 形式发送
value.converter
-org.apache.kafka.connect.json.JsonConverter
key.converter
-org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable
-false
value.converter.schemas.enable
-false
decimal.format
- 控制此转换器将以哪种格式序列化小数。此值不区分大小写,可以是BASE64
(默认)或NUMERIC
。应将其设置为BASE64
。有关 Decimal 处理的更多详细信息,请参见此处。
此管道所需配置设置的完整列表,例如数据库连接详细信息,可以在此处找到。下面我们重点介绍那些对消息格式影响最大的设置。重要提示:我们将连接器配置为在每个表级别跟踪更改
- plugin.name - PostgreSQL 服务器上安装的 PostgreSQL 逻辑解码插件的名称。我们推荐
pgoutput
。 - slot.name - 逻辑解码槽名称。它必须对数据库和模式是唯一的。如果仅复制一个,请使用
debezium
。 - publication.name - 使用
pgoutput
时为流式传输更改而创建的 PostgreSQL 发布名称。如果您已将 Postgres 用户配置为具有足够的权限,则将在启动时创建此发布。或者,可以预先创建它。可以使用dbz_publication
默认值。 - table.include.list - 一个可选的、逗号分隔的正则表达式列表,用于匹配您要捕获其更改的表的完全限定表标识符。确保格式为
<schema_name>.<table_name>
。对于我们的示例,我们使用public.uk_price_paid
。此参数中只能指定一个表。 - tombstones.on.delete - 控制删除事件之后是否跟随 tombstone 事件。设置为
false
。如果您有兴趣使用日志压缩,则可以将其设置为true
- 这需要您在 ClickHouse Sink 中删除这些 tombstone。 - publication.autocreate.mode - 设置为
filtered
。这将导致仅为属性table.include.list
中的表创建发布。更多详细信息,请参见此处。 - snapshot.mode - 我们对快照模式使用
never
- 因为我们使用postgres
函数加载了初始数据。如果用户无法暂停对其 Postgres 实例的更改,则可以使用initial
模式。 - decimal.handling.mode - 指定连接器应如何处理
DECIMAL
和NUMERIC
列的值。默认值precise
会将这些值编码为二进制形式,即java.math.BigDecimal
。与上面的decimal.format
设置结合使用,这将导致这些值在 JSON 中以数字形式输出。用户可能希望根据所需的精度进行调整。
以下设置将影响 Postgres 中的更改与其到达 ClickHouse 的时间之间的延迟。在所需 SLA 和到 ClickHouse 的高效批处理的上下文中考虑这些设置 - 请参见其他注意事项。
- max.batch.size - 每个事件批次的最大大小。
- max.queue.size - 在将事件发送到 Kafka 之前的队列大小。允许反压。它应该大于批次大小。
- poll.interval.ms - 正整数值,指定连接器在开始处理一批事件之前,应等待新的变更事件出现多少毫秒。默认为 500 毫秒。
Confluent 为那些使用 Confluent Kafka 或 Cloud 进行部署的用户提供了额外的文档。Debezium 连接器可以在 Confluent Cloud 中配置,如下所示。此连接器将在收到消息时自动创建 Kafka 主题。
请注意,我们在上面将 after.state.only
属性设置为 false
。此设置似乎特定于 Confluent Cloud,并且必须设置为 false
,以确保提供行的先前值以及 LSN 号。
我们还使用 Kafka connect 的 SMT 功能来展平消息,并将 Kafka 主题设置为 uk_price_paid_changes
。这可以通过配置在自管理中实现。更多详情请参阅 [1][2]。
在上面的示例中,我们假设目标主题使用单个分区 - 这是由 Debezium 自动创建的。正如之前讨论的,多个分区需要使用基于哈希的路由,以确保同一 Postgres 行的事件被传递到同一分区 - 从而确保下游的顺序交付。这超出了本博客的范围,需要进一步测试。
上述相关的 JSON 配置可以在此处找到,并可以与官方文档 步骤 一起使用。自管理安装说明可以在此处找到。
配置 Kafka Connect Sink
我们可以使用 ClickHouse Kafka Connect Sink 从 Kafka 读取变更事件消息,并将它们发送到 ClickHouse。这假设用户正在运行 Kafka Connect 框架。许多 Kafka Connect Sink 可以与 ClickHouse 一起使用,包括 Confluent HTTP Connector。然而,对于我们的用例,我们选择使用官方的 ClickHouse Kafka Connect Sink。虽然 ReplacingMergeTree 的属性仅需要至少一次语义,但这提供了 精确一次语义,并且现在可以使用 “自定义连接器” 产品在 Confluent Cloud 中部署。
重要的是,Kafka Connect sink 保证消息按分区顺序交付。这是由以下原因保证的:
- Kafka Connect 框架仅为任何给定分区分配 一个任务 - 尽管一个任务可能从多个分区消费。
- 在插入时,ClickHouse Kafka Connect sink 在插入之前按主题和分区对行进行分组。一个批次的插入在另一个批次被消费之前被确认。
这允许扩展主题的分区数量,同时仍然满足我们对于与特定 Postgres 行相关的任何更改的顺序交付的要求,前提是我们能够保证来自 Debezium 连接器的任何行的事件都通过哈希发送到同一分区。
我们在下面展示了在 Confluent Cloud 中配置 ClickHouse Kafka Connect Sink。请注意我们如何首先上传连接器包使其可用。可以从此处下载。我们假设用户已配置 Debezium 连接器以将数据发送到主题 uk_price_paid_changes
。
上述配置的 JSON 表示可以在此处找到。
存在基于 Kafka Connect 方法的替代方案,例如,用户可以使用 Vector、Kafka 表引擎或 Confluent Cloud 提供的 HTTP 连接器。
测试
为了确认我们的管道工作正常,我们提供了一个脚本,该脚本对 Postgres 表进行随机更改 - 添加、更新和删除行。具体而言,关于更新,此脚本更改随机行的 type
、price
和 is_new
列。完整的代码和依赖项可以在此处找到。
export PGDATABASE=<database>
export PGUSER=postgres
export PGPASSWORD=<password>
export PGHOST=<host>
export PGPORT=5432
pip3 install -r requirements.txt
python randomize.py --iterations 1 --weights "0.4,0.4,0.2" --delay 0.5
请注意,weights
参数和值 0.4,0.4,0.2
表示创建、更新和删除的比率。delay
参数设置每次操作之间的时间延迟(默认为 0.5 秒)。iterations
设置要对表进行的更改总数。在上面的示例中,我们修改了 1000 行。
脚本完成后,我们可以对 Postgres 和 ClickHouse 运行以下查询以确认一致性。显示的响应可能与您的值不同,因为更改是随机的。但是,来自两个数据库的值应该相同。为了简单起见,我们使用 FINAL
。
相同的行数
-- Postgres
postgres=> SELECT count(*) FROM uk_price_paid;
count
----------
27735027
(1 row)
-- ClickHouse
SELECT count()
FROM uk_price_paid
FINAL
┌──count()─┐
│ 27735027 │
└──────────┘
相同的价格统计信息
-- Postgres
postgres=> SELECT sum(price) FROM uk_price_paid;
sum
---------------
5945061701495
(1 row)
-- ClickHouse
SELECT sum(price)
FROM uk_price_paid
FINAL
┌────sum(price)─┐
│ 5945061701495 │
└───────────────┘
相同的房价分布
postgres=> SELECT type, count(*) c FROM uk_price_paid GROUP BY type;
type | c
---------------+---------
detached | 6399743
flat | 4981171
other | 419212
semi-detached | 7597039
terraced | 8337862
(5 rows)
-- ClickHouse
SELECT
type,
count() AS c
FROM uk_price_paid
FINAL
GROUP BY type
┌─type──────────┬───────c─┐
│ other │ 419212 │
│ terraced │ 8337862 │
│ semi-detached │ 7597039 │
│ detached │ 6399743 │
│ flat │ 4981171 │
└───────────────┴─────────┘
其他注意事项
当使用 Debezium for ClickHouse 和 Postgres 运行 CDC 管道时,还有其他一些注意事项:
-
Debezium 连接器将在可能的情况下批处理行更改,最大批处理大小为
max.batch.size
。这些批次在每个轮询间隔poll.interval.ms
(默认为 500 毫秒)形成。用户可以增加这些值以获得更大、更高效的批次,但会牺牲更高的端到端延迟。请记住,ClickHouse 偏好至少 1000 个批次,以避免常见问题,例如 parts 过多。对于低吞吐量环境(<100 行/秒),这种批处理并不像 ClickHouse 可能会跟上合并那样关键。但是,用户应避免以高插入速率进行小批次处理。批处理也可以在 Sink 端配置。目前 ClickHouse Connect Sink 中未显式支持此功能,但可以通过 Kafka connect 框架进行配置 - 请参阅设置
consumer.override.max.poll.records
。或者,用户可以配置 ClickHouse 异步插入,并允许 ClickHouse 进行批处理。在此模式下,插入可以作为小批次发送到 ClickHouse,ClickHouse 将在刷新之前对行进行批处理。请注意,在刷新时,行将不可搜索。因此,这种方法**无助于**端到端延迟。 -
在监视具有许多更新的数据库中更改很少的表的情况下,用户应注意 WAL 磁盘使用量 以及
heartbeat.interval.ms
的重要性。 -
上述方法目前不支持 Postgres 主键更改。为了实现这一点,用户需要检测来自 Debezium 的
op=delete
消息,这些消息没有before
或after
字段。然后应使用id
在 ClickHouse 中删除这些行 - 最好使用 轻量级删除。这需要自定义代码,而不是使用 Kafka sink 将数据发送到 ClickHouse。 -
如果表的主键发生更改,用户可能需要创建一个新的 ClickHouse 表,并将新列作为
ORDER BY
子句的一部分。请注意,这也需要为 Debezium 连接器执行一个过程。 -
Debezium 连接器所依赖的逻辑解码不支持 DDL 更改。这意味着连接器无法将 DDL 更改事件报告回消费者。
-
逻辑解码复制槽仅在主服务器上受支持。当存在 PostgreSQL 服务器集群时,连接器只能在活动主服务器上运行。它不能在热备或温备副本上运行。如果主服务器发生故障或降级,则连接器停止。主服务器恢复后,您可以重新启动连接器。如果已将不同的 PostgreSQL 服务器提升为主服务器,请在重新启动连接器之前调整连接器配置。
-
虽然 Kafka Sink 可以安全地扩展以使用更多 worker(假设同一 Postgres 行的事件被哈希到同一分区),但 Debezium 连接器仅允许单个任务。上面提出的解决方案为每个表使用一个连接器,从而允许在表级别扩展解决方案。
-
文档化的方法假设每个表一个连接器实例。我们目前不支持一个连接器监控多个表 - 尽管这可以通过主题路由来实现,即消息被路由到特定于表的主题。此配置尚未经过测试。
结论
在本博客文章中,我们探讨了如何构建 CDC 管道,以近实时地将更改从 Postgres 复制到 ClickHouse。我们讨论了 ReplacingMergeTree 如何成为此设计的基础,以及用户如何优化表设计并使用 FINAL 运算符进行查询时去重。除了提供构建管道的说明(包括如何配置 Debezium)外,我们还讨论了希望构建生产解决方案的用户的其他注意事项。虽然本博客文章中的详细信息特定于 Postgres,但由于 Debezium 的 DBMS 独立消息格式,它们可能适用于 Debezium 支持的所有源数据库。我们将其作为练习留给读者,以探索其他数据库,并告知我们您的进展如何!