本博文的第2部分可在 使用PostgreSQL和ClickHouse实现变更数据捕获(CDC) - 第2部分 找到。
目录
简介
在之前的文章中,我们讨论了OLTP数据库(如Postgres)和OLAP数据库(如ClickHouse)之间的区别,以及用户为何可能希望将分析工作负载迁移到后者。
本文提供了一个使用Postgres和ClickHouse实现变更数据捕获的入门指南。对于不熟悉的人来说,变更数据捕获(CDC)是指在两个数据库之间保持表同步的过程。本博客系列提出的解决方案仅使用ClickHouse的原生功能。除了Debezium和Kafka之外,不需要其他任何组件。
本文首先介绍了Postgres到ClickHouse CDC管道中的概念和构建块。本系列的下一篇文章将整合这些概念并构建一个可工作的管道。虽然后面的博文可以独立于本文阅读,但我们建议用户阅读本文以了解相关概念和约束,以避免潜在的问题。
在我们的示例中,我们使用了ClickHouse Cloud集群中的开发实例和AWS Aurora上的Postgres实例。但是,这些示例应该可以在等效规模的自托管集群上重现。或者,立即开始使用您的ClickHouse Cloud集群,并获得300美元的信用额度。让我们为您处理基础设施,并开始查询吧!
方法
如果数据是静态的或不可变的,并且只允许追加操作,那么CDC可能会很简单。通常,ClickHouse的原生函数(例如postgresql 函数)足以在实例之间移动数据,使用带时间戳或单调递增ID来定期识别需要从OLTP数据库读取并插入到ClickHouse中的行。
但是,对于包含更新和删除操作的更复杂工作负载的表,用户需要在源Postgres数据库中识别和跟踪所有这些可能的变更,以便在接近实时的状态下将它们应用到ClickHouse中。
解决此问题的方案通常特定于源和目标,利用前者的功能来识别变更,并确保根据其最佳使用模式在目标中尽可能高效地反映这些变更。我们在下面描述了几种方法。
拉取式
在基于拉取的变更数据捕获中,目标数据库从源数据库拉取变更,通常使用记录到表列的变更。这减少了源数据库的负载,并需要在目标数据库中实现轮询方法。
在数据不可变且仅允许追加的情况下,可以考虑使用计划的postgresql函数在Postgres和ClickHouse之间移动数据作为拉取实现。
用户可以选择添加消息系统,以确保即使目标系统不可用也能可靠地传递变更。由于变更会定期批量处理和应用,因此这种方法的主要缺点是源数据库和目标数据库之间变更可见性的延迟。此外,此方法需要访问源数据库的网络,而源数据库无法控制何时以及如何拉取数据。这在生产关键系统中通常是不可取的,因为管理员不愿提供访问权限。
通过实时事件流推送
在基于推送的变更数据捕获管道中,源系统捕获表列的变更并将其发送到目标系统,在那里将应用于当前数据。在任何时间点,这两个系统都应该几乎相同,并且变更几乎实时地应用。此方法假设可以可靠地传递变更 - 无论是通过精确一次传递语义,还是通过至少一次传递以及目标系统能够适当地处理重复项。虽然不一定是必需的,但此方法通常会引入Kafka等消息系统以确保消息可靠传递。
下面,我们提出了一种基于推送的CDC方法,用于在接近实时的状态下在Postgres和ClickHouse之间移动变更。为此,我们利用了Postgres的原生功能,这些功能可以在存储级别识别发生的变更,并将这些变更转换为可消费的变更流。
跟踪PostgreSQL中的变更
任何CDC解决方案都需要源数据库提供一种强大而可靠的方法来跟踪特定表的行和列级别的变更。PostgreSQL通过两个基本功能公开这些变更
- 预写日志(WAL)是数据库中所有变更的顺序日志。每当事务修改数据库时,变更首先写入WAL,然后再应用到实际的数据文件。此过程称为预写日志记录。这为数据库事务提供了持久性和崩溃恢复。它是PostgreSQL事务处理机制的关键组成部分。通过使用WAL,PostgreSQL确保在将事务视为已提交之前,变更持久存储在磁盘上。WAL对于PostgreSQL中的各种复制技术也至关重要,这些技术使用WAL捕获主数据库上的变更并将其应用于副本,从而确保多个数据库实例之间的一致性。但是,在其原生格式中,此日志未针对外部进程进行优化,因此难以使用。
- 逻辑解码在Postgres中将WAL的内容解码为一种连贯且易于理解的格式,例如元组流或SQL语句流。这需要使用复制槽,复制槽表示有序变更流,通常用于在客户端和服务器之间重放事件以进行正常的Postgres复制。至关重要的是,这些复制槽确保变更按其应用于源数据库的顺序传递。此排序基于内部LSN(日志序列号),LSN本身是指向WAL日志中位置的指针。此过程对崩溃具有鲁棒性,使用检查点来推进位置。至关重要的是,任何使用者都必须能够潜在地处理由于检查点之间的重新启动而传递的重复消息(Postgres将返回到先前的检查点LSN位置)。
逻辑解码执行的解码过程以及复制槽中消息的后续格式可以通过插件进行控制。虽然存在其他选项,但从 10 版开始,Postgres 就包含了一个标准的逻辑解码插件pgoutput
,该插件不需要安装任何额外的库,并且也用于内部复制。有了这个变更消息流,我们现在需要一个能够读取它们并将它们发送到 ClickHouse 的系统。为此,我们将使用开源工具 Debezium。
Debezium
Debezium 是一套用于捕获数据库中更改的服务。Debezium 将数据库表中的所有行级更改记录为有序的事件流,并将这些事件发送到下游系统以供使用。Debezium 支持一个连接器库,旨在以独立于源 DBMS 的格式生成消息,从而允许事件使用者无论来源如何都能使用类似的逻辑。虽然我们尚未测试其他数据库连接器(例如 MySQL 和 MSSQL),但这意味着以下 ClickHouse 配置应该是可重用的。
此架构要求每个连接器利用每个源数据库中相应的更改捕获功能。在 Postgres 的情况下,连接器利用上面描述的逻辑解码功能和通过复制槽公开的消息。这与自定义 Java 代码相结合,该代码使用JDBC 驱动程序和流式复制协议读取更改流并生成事件。在标准架构中,这些事件被发送到 Kafka,供下游接收器使用。
总之,Debezium 将为 Postgres 中发生的每个插入、更新和删除操作生成一个行更改事件。然后,必须将这些事件应用于 ClickHouse 实例中的等效表,以确保我们的数据一致。请注意,此处的表映射为 1:1。在最简单的形式中,每个表部署一个 Debezium 连接器,尽管也可能有多表配置。
Kafka和队列的作用
源系统和目标系统之间通常会实现一个消息系统,以便可以缓冲更改事件并保留它们,直到它们被提交到目标位置。此队列减轻了 Postgres 写前日志的压力并解耦了系统。这避免了由于 ClickHouse 不可访问(例如由于网络连接问题)而导致 Postgres WAL 日志增长而产生的潜在问题,因为 WAL 日志无法回收(它不会无限期保留,并且一旦不再需要事件就可以被修剪)。
使用 Kafka Connect 框架开发,Debezium 本身支持 Apache Kafka,使其成为我们 CDC 解决方案的首选消息系统,尤其是在与 ClickHouse 集成的丰富选项相结合时。部署和配置 Kafka 超出了本博文的范围,但 Debezium 文档中包含有关主题配置和命名的建议。
请注意,Debezium 还支持服务器模式,其中事件可以发送到任何消息系统,例如Kinesis 或 Google Pub Sub。虽然我们尚未测试这些架构,但只要 Debezium 和 Postgres 配置保持相同,并且生成的消息使用相同的格式,用户应该能够使用相同的 ClickHouse 配置和方法来进行 CDC。
ReplacingMergeTree
上面,我们描述了 Debezium 如何能够通过利用 WAL 日志(通过pgoutput
插件和复制槽)从 Postgres 生成插入、更新和删除事件流。
有了这个消息流,我们将描述如何将这些更改应用于 ClickHouse。由于 ClickHouse 尚未针对删除和更新工作负载进行优化,因此我们使用 ReplacingMergeTree 表引擎来有效地处理此更改流。作为此过程的一部分,我们还将讨论使用此表引擎的一些重要注意事项,以及最近的开发以及此方法在何种情况下能够最佳地工作。
优化删除和更新
虽然像 Postgres 这样的 OLTP 数据库针对事务更新和删除工作负载进行了优化,但 OLAP 数据库对此类操作提供的保证减少,并针对批量插入的不可变数据进行了优化,从而有利于显著更快的分析查询。虽然 ClickHouse 提供了通过 mutation 进行更新操作,以及轻量级删除行的方法,但其面向列的结构意味着应谨慎安排这些操作。这些操作是异步处理的,由单个线程处理,并且需要(在更新的情况下)将数据重写到磁盘上。因此,不应将它们用于大量的小型更改。
为了处理更新和删除行的流,同时避免上述使用模式,我们可以使用 ClickHouse 表引擎ReplacingMergeTree。
此表引擎允许将更新操作应用于行,而无需使用低效的 ALTER 或 DELETE 语句,方法是允许用户插入同一行的多个副本并将其中一个指定为最新版本。反过来,后台进程异步删除同一行的旧版本,通过使用不可变插入来有效地模拟更新操作。
这依赖于表引擎识别重复行。这是通过使用ORDER BY
子句来确定唯一性来实现的,即,如果两行在ORDER BY
中指定的列中具有相同的值,则它们被视为重复项。在定义表时指定的版本列允许在识别出两行重复时保留行的最新版本,即保留版本值最高的行。
此外,可以指定一个 deleted 列。这可以包含 0 或 1,其中值为 1 表示应删除该行(及其重复项),否则使用零。
我们在下面的示例中说明了此过程。这里,行由A
列(表的ORDER BY
)唯一标识。我们假设这些行已作为两个批次插入,导致在磁盘上形成了两个数据分区。稍后,在异步后台过程中,这些分区将合并在一起。在此过程中,将发生以下情况
- 由
A
列的值 1 标识的行既有版本为 2 的更新行,又有版本为 3 的删除行(以及deleted
列值为 1)。因此,此键的所有行都将被删除。 - 由
A
列的值 2 标识的行既有版本为 2 的更新行。因此,此后一行(price
列的值为 6)将被保留。 - 由
A
列的值 3 标识的行既有版本为 2 的更新行。因此,此后一行(price
列的值为 3)将被保留。
由于此合并过程,我们有两行表示最终状态。
如上所述,此重复项删除过程在合并时异步在后台发生,并且仅最终一致。或者,它可以在查询时使用特殊的 FINAL 语法调用,以确保结果准确 - 请参阅在 ClickHouse 中查询。
除了用于重复项删除之外,此表引擎的属性还可用于处理更新和删除工作负载。假设我们在 ClickHouse 中有以下简单的表和行。请注意ORDER BY
子句如何定义行唯一性并将其设置为key
列。我们还定义了作为引擎创建语句一部分的version
和deleted
列
CREATE TABLE test
(
`key` String,
`price` UInt64,
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
ORDER BY key
{"version":1,"deleted":0,"key":"A","price":100}
假设我们需要更新此行的 price 列(因为它在 Postgres 中已更改)。为此,我们插入以下行。请注意键值是相同的,但版本值已递增。
{"version":2,"deleted":0,"key":"A","price":200}
如果我们稍后希望删除此行,我们将再次插入一个具有相同键值、更高版本以及 deleted 列值为 1 的副本。
{"version":3,"deleted":1,"key":"A","price":200}
这种更新和删除方法允许我们明确避免低效的 ALTER 和 DELETE 命令,而是通过插入不可变的行并允许 ClickHouse 异步协调更改来模拟这些操作。
删除操作的限制
删除已删除的行
如果表级设置clean_deleted_rows
设置为Always
,则仅在合并时删除已删除的行。默认情况下,此值设置为Never
,这意味着永远不会删除行。从 ClickHouse 23.5 版开始,当此值设置为Always
时,此功能存在一个已知问题,会导致错误的行被删除。
因此,目前,我们建议使用显示的Never
值 - 这将导致已删除的行累积,这在低容量时可能是可以接受的。为了强制删除已删除的行,用户可以定期安排对表执行OPTIMIZE FINAL CLEANUP
操作,即
OPTIMIZE TABLE uk_price_paid FINAL CLEANUP
这是一个 I/O 密集型操作,应在空闲期间谨慎安排。
因此,在解决上述问题之前,**我们建议我们的 CDC 管道仅用于删除次数少到中等(少于 10%)的表**。
仅按顺序传递
如果支持删除并依赖于OPTIMIZE FINAL CLEANUP
或clean_deleted_rows=Always
(在解决上述问题后)来删除行,则必须按顺序传递**每个 Postgres 行**的更改。更具体地说,必须按配置的ORDER BY
列的唯一值集的顺序插入行,该顺序与它们在 Postgres 中发生的顺序一致。
如果不满足此约束,则如果在后台合并或计划的OPTIMIZE FINAL CLEANUP
对删除行进行操作后发生更新,则行可能会被错误地保留。请考虑以下事件序列
如上所示,接收了行唯一键值 A 的插入事件,然后是删除事件。发生OPTIMIZE FINAL CLEANUP
或后台合并,导致两行都被删除。由于插入顺序错误,更新事件依次接收到的版本低于先前的删除事件。此行被错误地保留。此问题可以简单地复制。请注意,如果clean_deleted_rows=Never
(如当前建议的那样),则不会发生此问题,因为已删除的行将被保留。
为了解决这个问题,Debezium 默认情况下为其 Kafka 主题使用单个分区,从而确保按顺序传递(以及仅使用一个 Kafka 任务)。虽然这足以满足大多数工作负载,但更高的吞吐量可能需要多个分区。
如果需要多个分区,用户可能希望探索主题分区路由,对ORDER BY
列进行哈希以确保同一行的所有更改事件都转到同一分区,从而保证按顺序传递。虽然这不能保证所有事件的按顺序传递,但它确保了特定 Postgres 行和ORDER BY
值的更改按顺序传递 - 足以避免上述竞争条件。注意:我们尚未测试此多主题配置。
或者,用户可以避免使用OPTIMIZE FINAL CLEANUP
,只需允许已删除的行累积或确保谨慎执行它,例如,暂停 Postgres 上的更改,允许 Kafka 队列清空,在开始对 Postgres 进行更改之前执行删除清理。用户也可以对不再受更改影响的选择性分区发出OPTIMIZE FINAL CLEANUP
。
行要求
因此,为了使我们的 Postgres 到 ClickHouse 的变更数据捕获管道能够处理更新和删除操作,发送到 ClickHouse 的任何行都需要满足以下条件
- ClickHouse
ORDER BY
子句中列的值必须唯一地标识 Postgres 中的一行。 - 发送任何更改事件行时,无论是更新还是删除,它都必须包含
ORDER BY
子句中列的相同值,这些值唯一地标识该行。**这些值不能更改,应视为不可变的。**ORDER BY
子句通常包含 Postgres 中的主键列。用户还希望包含与查询访问模式对齐的列以优化查询性能。更新不能更改这些列 - 请参阅选择主键。 - 发送更新的行时,它必须包含表中所有列,并将行的最终状态作为值,以及
deleted
列的值为 0 和比前一行更高的version
值。 - 发送删除行时,它必须包含
ORDER BY
子句的所有列以及deleted
列的值为 1,以及比以前列更高的version
列值。 - 插入必须包含所有列值,
deleted
列的值为 0,以及一个version
数字,该数字将保证低于后续更新。
Postgres 行的更改事件可以独立发送到 ClickHouse。如果用户允许已删除的行累积,则特定行的更改事件也可以**乱序**发送。如果要删除删除事件,则任何特定 Postgres 行的更改事件都必须**按顺序**发生如上所述(但不是跨 Postgres 行)。在设计多线程/进程方法来使用 Kafka 中的消息时,必须考虑这些约束。
以上所有操作都需要一个版本号,该版本号满足所需的属性,以确保其单调递增并反映特定行在 Postgres 中事件的顺序。
在我们的下一篇文章中,我们将讨论如何转换 Debezium 的更改事件消息以满足 ReplacingMergeTree 的上述要求。
选择主键
上面,我们强调了一个重要的附加约束,在使用ReplacingMergeTree
的情况下也必须满足:ORDER BY
的列的值在更改中唯一地标识 Postgres 行。因此,Postgres 主键应包含在 Clickhouse ORDER BY
子句中。
ClickHouse 用户将熟悉在表的ORDER BY
子句中选择列以优化查询性能。通常,应根据您的频繁查询并按基数递增的顺序列出选择这些列。重要的是,ReplacingMergeTree 强加了一个额外的约束 - 这些列必须是不可变的,即,仅向此子句中添加在底层 Postgres 数据中**不会更改**的列。虽然其他列可以更改,但这些列需要保持一致以进行唯一行标识。
对于分析工作负载,Postgres 主键通常没有什么用处,因为用户很少执行点行查找(OLAP 数据库没有针对此进行优化,不像 OLTP 数据库)。鉴于我们建议按基数递增的顺序对列进行排序,以及在ORDER BY
中较早列出的列上的匹配通常会更快的事实,Postgres 主键应附加到ORDER BY
的末尾(除非它具有分析价值)。如果 Postgres 中的多个列形成主键,则应将它们附加到ORDER BY
,并遵守基数和查询值的可能性。用户还可以希望通过 MATERIALIZED 列使用值的串联生成唯一主键。
考虑以下英国房产价格数据集的架构。此处,id
列表示 Postgres 中的唯一主键(出于示例目的添加到数据集中)。postcode1
、postcode2
、addr1
、addr2
列表示分析查询中常用的列。因此,id
列附加到这些列之前的ORDER BY
子句的末尾。
CREATE TABLE default.uk_price_paid
(
`id` UInt64,
`price` UInt32,
`date` Date,
`postcode1` LowCardinality(String),
`postcode2` LowCardinality(String),
`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
`is_new` UInt8,
`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
`addr1` String,
`addr2` String,
`street` LowCardinality(String),
`locality` LowCardinality(String),
`town` LowCardinality(String),
`district` LowCardinality(String),
`county` LowCardinality(String),
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)
ORDER BY
子句配置磁盘上数据的顺序。**默认情况下,如果未指定,PRIMARY KEY
子句也设置为相同的值。**此子句配置关联的稀疏主键索引。ORDER BY
必须包含PRIMARY KEY
作为前缀,因为后者假设数据已排序。
为了提高性能,此主键存储在内存中,并使用使用标记的间接级别以最大程度地减少大小。在 ReplacingMergeTree 中使用ORDER BY
密钥进行去重可能会导致此密钥变长,从而增加内存使用量。如果这成为问题,用户可以直接指定PRIMARY KEY
,限制加载到内存中的列,同时保留ORDER BY
以最大程度地压缩并强制执行唯一性。
使用此方法,可以从PRIMARY KEY
中省略 Postgres 主键列 - 节省内存而不影响查询性能。我们也将其应用于上面的示例。
在ClickHouse中查询
在合并时,ReplacingMergeTree 使用ORDER BY
值作为唯一标识符来识别重复行,并且仅保留最高版本或删除所有重复项(如果最新版本指示删除)(待解决较早的已记录的问题)。但是,这仅提供最终的正确性 - 它不保证行将被去重,并且您不应依赖它。因此,由于查询中考虑了update
和delete
行,查询可能会产生不正确的答案。
为了获得正确的答案,用户需要用查询时去重和删除删除来补充后台合并。这可以通过使用 FINAL 运算符来实现。考虑以下使用英国房产价格数据集的示例。
postgres=> select count(*) FROM uk_price_paid;
count
----------
27735104
(1 row)
postgres=> SELECT avg(price) FROM uk_price_paid;
avg
---------------------
214354.531780374791
(1 row)
– no FINAL, incorrect result
SELECT count()
FROM uk_price_paid
┌──count()─┐
│ 27735966 │
└──────────┘
– FINAL, correct result
SELECT count()
FROM uk_price_paid
FINAL
┌──count()─┐
│ 27735104 │
└──────────┘
– no FINAL, incorrect result
SELECT avg(price)
FROM uk_price_paid
┌─────────avg(price)─┐
│ 214353.94542901445 │
└────────────────────┘
– FINAL, correct result with some precision
SELECT avg(price)
FROM uk_price_paid
FINAL
┌────────avg(price)─┐
│ 214354.5317803748 │
└───────────────────┘
最终性能
FINAL 运算符将在查询上产生性能开销,尽管 22.6 中的改进确保了去重步骤是多线程的。当查询未根据主键列进行过滤时,这将最明显,导致读取更多数据并增加去重开销。如果用户使用 WHERE 条件根据关键列进行过滤,则加载并传递以进行去重的数据将减少。
如果 WHERE 条件不使用关键列,则 ClickHouse 当前在使用 FINAL 时不会利用 PREWHERE 优化。此优化旨在减少读取未过滤列的行。当查询运行时,首先使用表的 PRIMARY KEY 识别读取所需的数据块。这会识别出一组数据块,每个数据块包含一定数量的行(默认为 8192)。但是,这些数据块中的并非所有行都与主键上的筛选条件匹配 - 因为数据块可以包含一系列值,或者因为 WHERE 条件未使用主键。因此,为了识别正确的行并在读取 SELECT 列之前,需要额外的筛选。这在读取的第二阶段使用 PREWHERE 执行,这允许进一步筛选 WHERE 子句中的主键和非主键列。ClickHouse 通常会根据内部启发式算法将列移动到 PREWHERE 阶段。但是,此优化当前未应用于使用 FINAL 时。有关PREWHERE
最新改进的更多详细信息在此处。
为了模拟此优化,用户可以重写查询以使用子查询。例如,考虑以下查找伦敦房产平均价格的查询
–establish correct answer in postgres
postgres=> SELECT avg(price)
FROM uk_price_paid WHERE town = 'LONDON';
avg
---------------------
474799.921480528985
(1 row)
SELECT avg(price)
FROM uk_price_paid
WHERE town = 'LONDON'
┌─────────avg(price)─┐
│ 474797.35553246835 │
└────────────────────┘
1 row in set. Elapsed: 0.033 sec. Processed 27.74 million rows, 39.93 MB (835.45 million rows/s., 1.20 GB/s.)
EXPLAIN SYNTAX
SELECT avg(price)
FROM uk_price_paid
WHERE town = 'LONDON'
┌─explain──────────────────┐
│ SELECT avg(price) │
│ FROM uk_price_paid │
│ PREWHERE town = 'LONDON' │
└──────────────────────────┘
3 rows in set. Elapsed: 0.002 sec.
虽然在没有 FINAL 的情况下此结果不正确,但我们可以看到应用了PREWHERE
优化以帮助实现 0.075 秒的执行时间。在下面使用 FINAL 返回正确答案,但性能降低了 20 倍以上。
SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE town = 'LONDON'
┌───────avg(price)─┐
│ 474799.921480529 │
└──────────────────┘
1 row in set. Elapsed: 0.725 sec. Processed 29.65 million rows, 1.41 GB (40.88 million rows/s., 1.94 GB/s.)
EXPLAIN SYNTAX
SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE town = 'LONDON'
┌─explain───────────────┐
│ SELECT avg(price) │
│ FROM uk_price_paid │
│ FINAL │
│ WHERE town = 'LONDON' │
└───────────────────────┘
4 rows in set. Elapsed: 0.004 sec.
我们可以通过在子查询中返回主键并在外部查询上仅使用 FINAL 来部分模拟 PREWHERE,如所示。这无法达到早期(不准确)查询的相同性能,但为增加的复杂性提供了一些改进
SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE ((postcode1, postcode2) IN (
SELECT
postcode1,
postcode2
FROM uk_price_paid
WHERE town = 'LONDON'
GROUP BY
postcode1,
postcode2
)) AND (town = 'LONDON')
┌───────avg(price)─┐
│ 474799.921480529 │
└──────────────────┘
1 row in set. Elapsed: 0.287 sec. Processed 31.55 million rows, 230.30 MB (109.88 million rows/s., 802.08 MB/s.)
当内部查询返回主键值的一个小子集时,此解决方法最有效。
利用分区
ClickHouse 中的数据合并发生在分区级别。使用 ReplacingMergeTree 时,
我们建议用户根据最佳实践对表进行分区,前提是用户可以确保此**分区键对于行不会更改**。这将确保与同一行相关的更新将发送到同一 ClickHouse 分区。
假设情况如此,用户可以使用设置do_not_merge_across_partitions_select_final=1
来提高 FINAL 查询性能。此设置导致在使用 = FINAL 时独立合并和处理分区。考虑以下表格,我们按年份对数据进行分区并计算多个分区的平均价格。
CREATE TABLE default.uk_price_paid_year
(
`id` UInt64,
`price` UInt32,
`date` Date,
`postcode1` LowCardinality(String),
`postcode2` LowCardinality(String),
`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
`is_new` UInt8,
`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
`addr1` String,
`addr2` String,
`street` LowCardinality(String),
`locality` LowCardinality(String),
`town` LowCardinality(String),
`district` LowCardinality(String),
`county` LowCardinality(String),
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplacingMergeTree( version, deleted) PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id) PARTITION BY toYear(date)
INSERT INTO default.uk_price_paid_year SELECT * FROM default.uk_price_paid
——query on original table
SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE (toYear(date) >= 1990) AND (toYear(date) <= 2000)
┌────────avg(price)─┐
│ 85861.88784270117 │
└───────────────────┘
1 row in set. Elapsed: 0.702 sec. Processed 29.65 million rows, 1.44 GB (42.23 million rows/s., 2.05 GB/s.)
-– query on partitioned table
SELECT avg(price)
FROM uk_price_paid_year
FINAL
WHERE (toYear(date) >= 1990) AND (toYear(date) <= 2000)
┌────────avg(price)─┐
│ 85861.88784270117 │
└───────────────────┘
1 row in set. Elapsed: 0.492 sec. Processed 9.27 million rows, 443.09 MB (18.83 million rows/s., 900.24 MB/s.)
-— performance with do_not_merge_across_partitions_select_final = 1
SET do_not_merge_across_partitions_select_final = 1
SELECT avg(price)
FROM uk_price_paid_year
FINAL
WHERE (toYear(date) >= 1990) AND (toYear(date) <= 2000)
┌────────avg(price)─┐
│ 85861.88784270117 │
└───────────────────┘
1 row in set. Elapsed: 0.230 sec. Processed 7.62 million rows, 364.26 MB (33.12 million rows/s., 1.58 GB/s.)
如上所示,由于我们的查询针对 10 个分区并减少了读取所需的数据,因此性能有所提高。通过将查询时去重限制在各个分区进一步提高了性能。
结论
在这篇博文中,我们探讨了使用 Debezium 在 Postgres 和 ClickHouse 之间移动数据的基于拉取的 CDC 管道的构建块。这包括介绍如何在 Postgres、Debezium 和 ReplacingMergeTree 中跟踪更改。这些概念可以组合起来生成以下管道。
在本系列的下一篇文章中,我们将为测试数据集构建一个工作管道,突出显示重要的配置选项以确保各个组件正确交互。敬请期待!