本博客的第 2 部分可以在以下链接找到:使用PostgreSQL和ClickHouse实现变更数据捕获(CDC) - 第2部分。
目录
引言
在之前的文章中,我们讨论了OLTP数据库(例如Postgres)和OLAP数据库(例如ClickHouse)之间的区别,以及用户为何可能希望将分析工作负载迁移到后者。
本文提供了一个使用Postgres和ClickHouse实现变更数据捕获的入门指南。对于不熟悉的人来说,变更数据捕获(CDC)是使两个数据库之间的表保持同步的过程。本博客系列提出的解决方案仅使用ClickHouse的原生功能。除了Debezium和Kafka之外,不需要任何其他组件。
这篇初始文章介绍了用于Postgres到ClickHouse CDC管道的一些概念和构建块。本系列的下一篇文章将整合这些概念并构建一个可运行的管道。虽然后面的博客文章可以独立于本文阅读,但我们建议用户阅读本文以了解相关概念和约束,避免出现潜在问题。
在我们的示例中,我们使用了ClickHouse Cloud集群中的一个开发实例和一个AWS Aurora Postgres实例。但是,这些示例应该可以在等效大小的自管理集群上复制。或者,立即启动您的ClickHouse Cloud集群,并获得 300 美元的信用额度。让我们为您处理基础设施,开始查询吧!
方法
在数据是静态或不可变且仅受追加操作影响的情况下,CDC 可以非常简单。通常,ClickHouse的原生函数(例如postgresql 函数)足以在实例之间移动数据,使用带时间戳或单调递增的ID定期识别需要从OLTP数据库读取并插入到ClickHouse中的行。
但是,对于包含更新和删除等更复杂工作负载的表,用户需要识别并跟踪源Postgres数据库中所有这些可能的变更,以便在接近实时的状态下将其应用于ClickHouse。
此问题的解决方案通常特定于源和目标,利用前者的功能来识别变更,并确保根据目标的最佳使用模式尽可能高效地反映在目标中。我们将在下面描述几种方法。
拉取型
在基于拉取的变更数据捕获中,目标数据库从源数据库拉取变更,通常使用记录到表列的变更。这减少了对源数据库的负载,并且需要在目标端实现轮询方法。
在数据不可变且仅追加的情况下,可以将一个定时执行的postgresql函数视为拉取实现,该函数在Postgres和ClickHouse之间移动数据。
用户可以选择添加消息系统以确保即使目标系统不可用也能可靠地交付变更。由于变更会定期分批应用,因此这种方法主要存在一个延迟,即源数据库中的变更可见性和目标数据库中的变更可见性之间存在延迟。此外,此方法需要访问源数据库的网络,并且源数据库无法控制何时以及如何拉取数据。这在生产关键系统中通常是不希望的,因为管理员不愿提供访问权限。
通过实时事件流推送
在基于推送的变更数据捕获管道中,源系统捕获表列的变更并将这些变更发送到目标系统,目标系统将这些变更应用到当前数据。在任何时间点,这两个系统都应该几乎完全相同,并且变更以接近实时的速度应用。这种方法假设可以可靠地交付变更 - 无论是通过恰好一次的交付语义,还是通过至少一次的交付以及目标系统能够适当地处理重复项。虽然不一定是必需的,但此方法通常会引入Kafka等消息系统以确保可靠的消息交付。
下面,我们提出了一种基于推送的CDC方法,用于在接近实时的状态下在Postgres和ClickHouse之间移动变更。为了实现这一点,我们利用了Postgres的原生功能,这些功能识别存储级别发生的变更并将这些变更转换为可消费的变更流。
跟踪PostgreSQL中的变更
任何CDC解决方案都需要源数据库提供一种强大且可靠的方法来跟踪特定表中行和列级别的变更。PostgreSQL通过两个基本功能公开这些变更
- 预写日志(WAL)是对数据库中所有变更进行的顺序记录。每当事务修改数据库时,变更首先写入WAL,然后再应用到实际的数据文件。此过程称为预写日志记录。这为数据库事务提供了持久性和崩溃恢复功能。它是PostgreSQL事务处理机制的关键组成部分。通过使用WAL,PostgreSQL确保在将事务视为已提交之前,变更会持久存储在磁盘上。WAL对于PostgreSQL中的各种复制技术也至关重要,这些技术使用WAL捕获在主数据库上进行的变更并将这些变更应用于副本,确保多个数据库实例之间的一致性。但是,在其原生格式中,此日志未针对外部进程进行优化,因此难以使用。
- 逻辑解码在Postgres中将WAL的内容解码为一种连贯且易于理解的格式,例如元组流或SQL语句流。这需要使用复制槽,复制槽表示有序变更流,通常用于在客户端和服务器之间重放事件以进行正常的Postgres复制。至关重要的是,这些复制槽确保变更按其应用于源数据库的顺序交付。此排序基于内部LSN(日志序列号),LSN本身是指向WAL日志中某个位置的指针。此过程对崩溃具有鲁棒性,使用检查点来推进位置。至关重要的是,任何使用者都必须能够潜在地处理由于检查点之间的重启而导致的重复消息(Postgres将返回到较早的检查点LSN位置)。
逻辑解码执行的解码过程以及复制槽中后续消息的格式可以通过插件进行控制。虽然存在其他选项,但从 10 版本开始,Postgres 就包含了一个标准的逻辑解码插件pgoutput
,它不需要安装任何额外的库,也用于内部复制。有了这个变更消息流,我们现在需要一个能够读取它们并将其发送到 ClickHouse 的系统。为此,我们将使用开源工具 Debezium。
Debezium
Debezium 是一套用于捕获数据库中变更的服务。Debezium 将数据库表中所有行级别的变更记录为有序的事件流,并将这些事件发送到下游系统供其使用。Debezium 支持一组连接器库,旨在生成独立于源 DBMS 的格式的消息,从而允许事件的使用者无论其来源如何都使用类似的逻辑。虽然我们尚未测试其他数据库连接器,例如 MySQL 和 MSSQL,但这意味着以下 ClickHouse 配置应该是可重用的。
此架构要求每个连接器利用每个源数据库中相应的变更捕获功能。在 Postgres 的情况下,连接器利用上面描述的逻辑解码功能和通过复制槽公开的消息。这与自定义 Java 代码相结合,该代码使用JDBC 驱动程序和流式复制协议读取变更流并生成事件。在标准架构中,这些事件将发送到 Kafka 供下游接收器使用。
总之,Debezium 将为 Postgres 中发生的每个插入、更新和删除操作生成一个行变更事件。然后必须将这些事件应用到 ClickHouse 实例中的等效表中,以确保我们的数据一致。请注意,这里的表映射是一对一的。在其最简单的形式中,每个表部署一个 Debezium 连接器,尽管也可能有多表配置。
Kafka和队列的作用
源系统和目标系统之间通常会实现一个消息系统,以便可以缓冲变更事件并将其保留,直到它们提交到目标位置。此队列减轻了 Postgres 的预写日志的压力,并解耦了系统。这避免了由于 ClickHouse 不可用(例如由于网络连接问题)而导致 Postgres WAL 日志增长的问题(它不会无限期地保存,并且一旦不再需要事件就可以被修剪)。
使用 Kafka Connect 框架开发,Debezium 本身支持 Apache Kafka,使其成为我们 CDC 解决方案的首选消息系统 - 特别是当与将 Kafka 与 ClickHouse 集成的丰富选项相结合时。部署和配置 Kafka 超出了本博文的范围,但 Debezium 文档包含关于主题配置和命名的建议。
请注意,Debezium 还支持服务器模式,其中事件可以发送到任何消息系统,例如Kinesis 或 Google Pub Sub。虽然我们尚未测试这些架构,但如果 Debezium 和 Postgres 配置保持相同,并且生成的消息使用相同的格式,用户应该能够使用相同的 ClickHouse 配置和方法进行 CDC。
ReplacingMergeTree
上面,我们描述了 Debezium 如何能够通过利用 WAL 日志(通过pgoutput
插件和复制槽)从 Postgres 生成插入、更新和删除事件流。
有了这个消息流,我们将描述如何将这些更改应用到 ClickHouse。由于 ClickHouse 尚未针对删除和更新工作负载进行优化,因此我们使用 ReplacingMergeTree 表引擎来高效地处理此更改流。作为此过程的一部分,我们还讨论了使用此表引擎的一些重要注意事项,以及最近的发展以及在这种方法下哪些条件将获得最佳效果。
优化删除和更新
虽然像 Postgres 这样的 OLTP 数据库针对事务更新和删除工作负载进行了优化,但 OLAP 数据库对此类操作提供的保证较少,并且针对批量插入的不可变数据进行了优化,从而有利于显著更快的分析查询。虽然 ClickHouse 提供了通过 mutation 的更新操作,以及一种轻量级删除行的方法,但其面向列的结构意味着应谨慎安排这些操作。这些操作是异步处理的,由单个线程处理,并且需要(在更新的情况下)将数据重写到磁盘上。因此,不应将它们用于大量的小型更改。
为了处理更新和删除行的流,同时避免上述使用模式,我们可以使用 ClickHouse 表引擎ReplacingMergeTree。
此表引擎允许将更新操作应用于行,而无需使用低效的 ALTER 或 DELETE 语句,因为它允许用户插入同一行的多个副本,并将其中一个指定为最新版本。反过来,后台进程会异步删除同一行的旧版本,通过使用不可变插入来有效地模拟更新操作。
这依赖于表引擎识别重复行。这是通过使用ORDER BY
子句来确定唯一性实现的,即,如果两行在ORDER BY
中指定的列具有相同的值,则认为它们是重复的。在定义表时指定的版本列允许在识别出两行是重复时保留行的最新版本,即保留版本值最高的行。
此外,可以指定一个已删除列。这可以包含 0 或 1,其中值为 1 表示应删除该行(及其重复项),否则使用零。
我们在下面的示例中说明了此过程。这里,行由列A
唯一标识(表的ORDER BY
)。我们假设这些行已作为两批插入,导致在磁盘上形成了两个数据部分。稍后,在异步后台进程期间,这些部分将合并在一起。在此过程中,将发生以下情况
- 由列
A
的值 1 标识的行同时具有更新行(版本 2)和删除行(版本 3 和deleted
列值为 1)。因此,将删除此键的所有行。 - 由列
A
的值 2 标识的行同时具有更新行(版本 2)。因此,将保留此后一行,其price
列的值为 6。 - 由列
A
的值 3 标识的行同时具有更新行(版本 2)。因此,将保留此后一行,其price
列的值为 3。
在此合并过程的结果中,我们有两行表示最终状态。
如上所述,此重复删除过程发生在合并时,在后台异步进行,并且仅最终一致。或者,它可以在查询时使用特殊的 FINAL 语法调用,以确保结果准确 - 请参阅在 ClickHouse 中查询。
除了对重复删除有用之外,此表引擎的属性还可用于处理更新和删除工作负载。假设我们在 ClickHouse 中有以下简单表和行。请注意,ORDER BY
子句如何定义行唯一性,并将其设置为列key
。我们还定义了作为引擎创建语句一部分的version
和deleted
列
CREATE TABLE test
(
`key` String,
`price` UInt64,
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
ORDER BY key
{"version":1,"deleted":0,"key":"A","price":100}
假设我们需要更新此行的 price 列(因为它在 Postgres 中发生了更改)。为此,我们插入以下行。请注意,键值相同,但版本值已递增。
{"version":2,"deleted":0,"key":"A","price":200}
如果我们稍后希望删除此行,我们将再次插入具有相同键值、更高版本和已删除列值为 1 的重复项。
{"version":3,"deleted":1,"key":"A","price":200}
这种更新和删除方法允许我们明确避免低效的 ALTER 和 DELETE 命令,而是通过插入不可变行并允许 ClickHouse 异步协调更改来模拟这些操作。
删除的限制
移除已删除的行
如果表级设置clean_deleted_rows
设置为Always
,则仅在合并时删除已删除的行。默认情况下,此值设置为Never
,这意味着永远不会删除行。从 ClickHouse 23.5 版本开始,当此值设置为Always
时,此功能存在已知问题,可能导致删除错误的行。
因此,目前,我们建议使用显示的Never
值 - 这会导致已删除的行累积,这在低容量下可能是可以接受的。要强制删除已删除的行,用户可以定期安排对表的OPTIMIZE FINAL CLEANUP
操作,即
OPTIMIZE TABLE uk_price_paid FINAL CLEANUP
这是一个 I/O 密集型操作,应在空闲时段谨慎安排。
因此,在解决上述问题之前,**我们建议我们的 CDC 管道仅用于删除数量较少或中等(少于 10%)的表**。
仅按顺序交付
如果支持删除并依赖于OPTIMIZE FINAL CLEANUP
或clean_deleted_rows=Always
(在解决上述问题后)来删除行,则必须按顺序交付**每个 Postgres 行**的更改。更具体地说,必须按其在 Postgres 中发生的顺序插入配置的ORDER BY
列的唯一值的行的更改。
如果不满足此约束,则如果在后台合并或计划的OPTIMIZE FINAL CLEANUP
对删除进行操作后发生更新,则可能会错误地保留行。请考虑以下事件序列
如上所示,将接收行唯一键值 A 的插入,然后是删除事件。将发生OPTIMIZE FINAL CLEANUP
或后台合并,导致两行都被删除。然后将接收更新事件,其版本低于先前的删除,这是由于插入顺序错误造成的。此行被错误地保留。此问题可以轻松复制。请注意,如果clean_deleted_rows=Never
(如目前建议的那样),则不会发生此问题,因为已删除的行将被保留。
为了解决这个问题,Debezium 默认情况下为其 Kafka 主题使用单个分区,从而确保按顺序交付(以及仅使用一个 Kafka 任务)。虽然这足以满足大多数工作负载,但更高的吞吐量可能需要多个分区。
如果需要多个分区,用户可能希望探索主题分区路由,对ORDER BY
列进行哈希以确保同一行的所有更改事件都进入同一分区,从而保证按顺序交付。虽然这不能保证所有事件的按顺序交付,但它可以确保针对特定 Postgres 行和一组ORDER BY
值的更改按顺序交付 - 足以避免上述竞争条件。注意:我们尚未测试此多主题配置。
或者,用户可以避免使用OPTIMIZE FINAL CLEANUP
,只需允许已删除的行累积或确保谨慎执行它,例如,暂停 Postgres 上的更改,允许 Kafka 队列清空,在开始对 Postgres 进行更改之前执行删除清理。用户还可以对不再受更改影响的选择性分区发出OPTIMIZE FINAL CLEANUP
。
行要求
因此,为了使我们的 Postgres 到 ClickHouse 变更数据捕获管道能够处理更新和删除,发送到 ClickHouse 的任何行都需要满足以下条件
- ClickHouse 中
ORDER BY
子句中的列值必须唯一标识 Postgres 中的一行。 - 当发送任何更改事件行(无论是更新还是删除)时,它必须包含
ORDER BY
子句中列的相同值,这些值唯一标识该行。**这些值不能更改,应被视为不可变的。**ORDER BY
子句通常包含 Postgres 中的主键列。用户还希望包含与查询访问模式对齐的列以优化查询性能。更新不能更改这些列 - 请参阅选择主键。 - 当发送更新的行时,它必须包含表中所有列,并将行的最终状态作为值,以及
deleted
列的值为 0 和比先前行更高的version
值。 - 当发送删除行时,它必须包含
ORDER BY 子句
的所有列以及deleted
列的值为 1,以及比先前列更高的version
列值。 - 插入必须包含所有列值,
deleted
列的值为 0,以及一个version
数字,该数字保证低于后续更新。
Postgres 行的更改事件可以独立发送到 ClickHouse。如果用户允许删除的行累积,则特定行的更改事件也可以**无序**发送。如果要删除删除事件,则任何特定 Postgres 行的更改事件必须**按顺序**发生如上所述(但不是跨 Postgres 行)。在设计多线程/进程方法来使用 Kafka 中的消息时,必须考虑这些约束。
以上所有内容都需要一个版本号,该版本号满足以下必要属性:确保其单调递增并反映特定行在 Postgres 中事件的顺序。
在我们的下一篇文章中,我们将讨论如何转换 Debezium 的更改事件消息以满足 ReplacingMergeTree 的上述要求。
选择主键
上面,我们强调了一个重要的附加约束,在 ReplacingMergeTree
的情况下也必须满足:ORDER BY
的列值唯一标识跨更改的 Postgres 行。因此,Postgres 主键应包含在 Clickhouse ORDER BY
子句中。
ClickHouse 的用户将熟悉在表的 ORDER BY
子句中选择列以优化查询性能。通常,应根据您的频繁查询并按基数递增的顺序排列来选择这些列。重要的是,ReplacingMergeTree 强加了一个额外的约束 - 这些列必须是不可变的,即,仅向此子句添加在底层 Postgres 数据中**不会更改**的列。虽然其他列可以更改,但这些列需要保持一致以进行唯一行识别。
对于分析工作负载,Postgres 主键通常用处不大,因为用户很少执行点行查找(这与 OLAP 数据库不同,OLAP 数据库没有针对此进行优化,而 OLTP 数据库则有)。鉴于我们建议按基数递增的顺序对列进行排序,以及在ORDER BY
中较早列出的列上的匹配通常更快的事实,Postgres 主键应附加到 ORDER BY
的末尾(除非它具有分析价值)。如果多个列在 Postgres 中形成主键,则应将它们附加到 ORDER BY
,同时尊重基数和查询值的可能性。用户还可以希望通过 MATERIALIZED 列使用值的连接生成唯一的主键。
考虑以下 英国房产价格数据集 的架构。这里 id
列表示 Postgres 中唯一的主键(出于示例目的添加到数据集中)。列 postcode1
、postcode2
、addr1
、addr2
表示分析查询中常用的列。因此,id
列附加到这些列之前的 ORDER BY
子句的末尾。
CREATE TABLE default.uk_price_paid
(
`id` UInt64,
`price` UInt32,
`date` Date,
`postcode1` LowCardinality(String),
`postcode2` LowCardinality(String),
`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
`is_new` UInt8,
`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
`addr1` String,
`addr2` String,
`street` LowCardinality(String),
`locality` LowCardinality(String),
`town` LowCardinality(String),
`district` LowCardinality(String),
`county` LowCardinality(String),
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)
ORDER BY
子句配置磁盘上数据的顺序。**默认情况下,如果未指定,则 PRIMARY KEY
子句也设置为相同的值。**此子句配置关联的稀疏主键索引。ORDER BY
必须包含 PRIMARY KEY
作为前缀,因为后者假设数据已排序。
为了提高性能,此主键存储在内存中,并使用使用标记进行间接寻址以最大程度地减少大小。在 ReplacingMergeTree 中使用 ORDER BY
键进行去重可能会导致其变长,从而增加内存使用量。如果这成为问题,用户可以直接指定 PRIMARY KEY
,限制加载到内存中的列,同时保留 ORDER BY
以最大程度地压缩并强制执行唯一性。
使用此方法,可以从 PRIMARY KEY
中省略 Postgres 主键列 - 节省内存而不会影响查询性能。我们也将其应用到上面的示例中。
在ClickHouse中查询
在合并时,ReplacingMergeTree 使用 ORDER BY
值作为唯一标识符来识别重复行,并仅保留最高版本或删除所有重复项(如果最新版本指示删除)(待解决之前提到的问题)。但是,这仅提供最终的一致性 - 它不能保证行将被去重,并且您不应依赖它。因此,由于查询中考虑了 update
和 delete
行,查询可能会产生不正确的结果。
为了获得正确的结果,用户需要将后台合并与查询时去重和删除删除相结合。这可以通过使用 FINAL 运算符来实现。考虑以下使用英国房产价格数据集的示例。
postgres=> select count(*) FROM uk_price_paid;
count
----------
27735104
(1 row)
postgres=> SELECT avg(price) FROM uk_price_paid;
avg
---------------------
214354.531780374791
(1 row)
– no FINAL, incorrect result
SELECT count()
FROM uk_price_paid
┌──count()─┐
│ 27735966 │
└──────────┘
– FINAL, correct result
SELECT count()
FROM uk_price_paid
FINAL
┌──count()─┐
│ 27735104 │
└──────────┘
– no FINAL, incorrect result
SELECT avg(price)
FROM uk_price_paid
┌─────────avg(price)─┐
│ 214353.94542901445 │
└────────────────────┘
– FINAL, correct result with some precision
SELECT avg(price)
FROM uk_price_paid
FINAL
┌────────avg(price)─┐
│ 214354.5317803748 │
└───────────────────┘
最终性能
FINAL 运算符将在查询上产生性能开销,尽管 22.6 中的改进确保了去重步骤是多线程的。当查询未过滤主键列时,这将最明显,导致读取更多数据并增加去重开销。如果用户使用 WHERE 条件过滤关键列,则加载和传递以进行去重的数据将减少。
如果 WHERE 条件未使用关键列,则 ClickHouse 目前在使用 FINAL 时不会利用 PREWHERE 优化。此优化旨在减少未过滤列的读取行数。当查询运行时,首先使用表的 主键识别读取所需的数据块。这识别出一组数据块,每个数据块包含一定数量的行(默认为 8192)。但是,这些数据块中的并非所有行都与主键上的筛选条件匹配 - 因为数据块可以包含一系列值或因为 WHERE 条件未使用主键。为了识别正确的行并在读取 SELECT 列之前,因此需要额外的过滤。这在使用 PREWHERE 的第二阶段读取中执行,它允许进一步过滤 WHERE 子句中的主键和非主键列。ClickHouse 通常根据内部启发式方法将列移动到 PREWHERE 阶段。但是,此优化目前不适用于使用 FINAL 的情况。有关 PREWHERE
最新改进的更多详细信息,请参见此处。
为了模拟此优化,用户可以重写查询以使用子查询。例如,考虑以下查找伦敦房产平均价格的查询
–establish correct answer in postgres
postgres=> SELECT avg(price)
FROM uk_price_paid WHERE town = 'LONDON';
avg
---------------------
474799.921480528985
(1 row)
SELECT avg(price)
FROM uk_price_paid
WHERE town = 'LONDON'
┌─────────avg(price)─┐
│ 474797.35553246835 │
└────────────────────┘
1 row in set. Elapsed: 0.033 sec. Processed 27.74 million rows, 39.93 MB (835.45 million rows/s., 1.20 GB/s.)
EXPLAIN SYNTAX
SELECT avg(price)
FROM uk_price_paid
WHERE town = 'LONDON'
┌─explain──────────────────┐
│ SELECT avg(price) │
│ FROM uk_price_paid │
│ PREWHERE town = 'LONDON' │
└──────────────────────────┘
3 rows in set. Elapsed: 0.002 sec.
虽然在没有 FINAL 的情况下此结果不正确,但我们可以看到 PREWHERE
优化已应用于帮助实现 0.075 秒的执行时间。在下面使用 FINAL 返回正确答案,但性能降低了 20 倍以上。
SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE town = 'LONDON'
┌───────avg(price)─┐
│ 474799.921480529 │
└──────────────────┘
1 row in set. Elapsed: 0.725 sec. Processed 29.65 million rows, 1.41 GB (40.88 million rows/s., 1.94 GB/s.)
EXPLAIN SYNTAX
SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE town = 'LONDON'
┌─explain───────────────┐
│ SELECT avg(price) │
│ FROM uk_price_paid │
│ FINAL │
│ WHERE town = 'LONDON' │
└───────────────────────┘
4 rows in set. Elapsed: 0.004 sec.
我们可以通过在子查询中返回主键并在外部查询中仅使用 FINAL 来部分模拟 PREWHERE,如所示。这无法达到早期(不准确)查询的相同性能,但为增加的复杂性提供了一些改进
SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE ((postcode1, postcode2) IN (
SELECT
postcode1,
postcode2
FROM uk_price_paid
WHERE town = 'LONDON'
GROUP BY
postcode1,
postcode2
)) AND (town = 'LONDON')
┌───────avg(price)─┐
│ 474799.921480529 │
└──────────────────┘
1 row in set. Elapsed: 0.287 sec. Processed 31.55 million rows, 230.30 MB (109.88 million rows/s., 802.08 MB/s.)
当内部查询返回主键值的小子集时,此解决方法最有效。
利用分区
ClickHouse 中的数据合并发生在分区级别。使用 ReplacingMergeTree 时,
我们建议用户根据最佳实践对表进行分区,前提是用户可以确保此**分区键对行不会更改**。这将确保与同一行相关的更新将发送到同一 ClickHouse 分区。
假设这是这种情况,用户可以使用设置 do_not_merge_across_partitions_select_final=1
来提高 FINAL 查询性能。此设置导致在使用 = FINAL 时独立合并和处理分区。考虑以下表,我们按年份对数据进行分区并计算多个分区中的平均价格。
CREATE TABLE default.uk_price_paid_year
(
`id` UInt64,
`price` UInt32,
`date` Date,
`postcode1` LowCardinality(String),
`postcode2` LowCardinality(String),
`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
`is_new` UInt8,
`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
`addr1` String,
`addr2` String,
`street` LowCardinality(String),
`locality` LowCardinality(String),
`town` LowCardinality(String),
`district` LowCardinality(String),
`county` LowCardinality(String),
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplacingMergeTree( version, deleted) PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id) PARTITION BY toYear(date)
INSERT INTO default.uk_price_paid_year SELECT * FROM default.uk_price_paid
——query on original table
SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE (toYear(date) >= 1990) AND (toYear(date) <= 2000)
┌────────avg(price)─┐
│ 85861.88784270117 │
└───────────────────┘
1 row in set. Elapsed: 0.702 sec. Processed 29.65 million rows, 1.44 GB (42.23 million rows/s., 2.05 GB/s.)
-– query on partitioned table
SELECT avg(price)
FROM uk_price_paid_year
FINAL
WHERE (toYear(date) >= 1990) AND (toYear(date) <= 2000)
┌────────avg(price)─┐
│ 85861.88784270117 │
└───────────────────┘
1 row in set. Elapsed: 0.492 sec. Processed 9.27 million rows, 443.09 MB (18.83 million rows/s., 900.24 MB/s.)
-— performance with do_not_merge_across_partitions_select_final = 1
SET do_not_merge_across_partitions_select_final = 1
SELECT avg(price)
FROM uk_price_paid_year
FINAL
WHERE (toYear(date) >= 1990) AND (toYear(date) <= 2000)
┌────────avg(price)─┐
│ 85861.88784270117 │
└───────────────────┘
1 row in set. Elapsed: 0.230 sec. Processed 7.62 million rows, 364.26 MB (33.12 million rows/s., 1.58 GB/s.)
如上所示,由于我们的查询针对 10 个分区并减少了读取所需的数据,因此性能得到了提高。通过将查询时去重限制在独立的分区中,性能得到了进一步提高。
结论
在这篇博文中,我们探讨了基于拉取的 CDC 管道用于在 Postgres 和 ClickHouse 之间使用 Debezium 移动数据的构建块。这包括介绍如何在 Postgres、Debezium 和 ReplacingMergeTree 中跟踪更改。这些概念可以结合起来产生以下管道。
在本系列的下一篇文章中,我们将为测试数据集构建一个工作管道,重点介绍重要的配置选项以确保各个组件正确交互。敬请期待!