本博客的第 2 部分可以在PostgreSQL 和 ClickHouse 的变更数据捕获 (CDC) - 第 2 部分中找到。
目录
简介
在之前的帖子中,我们讨论了 OLTP 数据库(如 Postgres)和 OLAP 数据库(如 ClickHouse)之间的区别,以及用户可能希望将分析工作负载迁移到后者的原因。
本文提供了关于使用 Postgres 和 ClickHouse 实现变更数据捕获的入门指南。对于那些不熟悉的人来说,变更数据捕获 (CDC) 是指在两个数据库之间保持表同步的过程。本博客系列提出的解决方案仅使用 ClickHouse 的原生功能。除了 Debezium 和 Kafka 之外,不需要其他组件。
这篇初始帖子介绍了 Postgres 到 ClickHouse CDC 管道的概念和构建块。本系列的下一篇文章将组装这些概念并生成一个可工作的管道。虽然后一篇博客文章可能可以独立于本文阅读,但我们建议用户阅读本文以了解概念和约束,以避免可能出现的问题。
在我们的示例中,我们使用 ClickHouse Cloud 集群中的开发实例和 AWS Aurora Postgres 实例。然而,这些示例应该可以在同等大小的自托管集群上重现。或者,立即启动您的 ClickHouse Cloud 集群,并获得 300 美元的信用额度。让我们担心基础设施,开始查询吧!
方法
在数据是静态或不可变且仅受追加影响的情况下,CDC 可以很简单。通常,ClickHouse 原生函数(如 postgresql 函数)足以在实例之间移动数据,使用时间戳或单调递增的 ID 来定期识别哪些行需要从 OLTP 数据库读取并插入到 ClickHouse 中。
但是,对于包含更新和删除的更复杂工作负载的表,用户需要识别和跟踪其源 Postgres 数据库中的所有这些可能更改,以便近乎实时地将其应用于 ClickHouse。
这个问题的解决方案通常特定于源和目标,利用前者的功能来识别更改,并确保这些更改根据其最佳使用模式尽可能高效地反映在目标中。我们在下面描述了一些方法。
拉取式
使用拉取式变更数据捕获,目标数据库从源数据库拉取更改,通常使用记录到表列的更改。这减少了源数据库上的负载,并需要从目标端实现轮询方法。
在数据不可变且仅追加的情况下,可以考虑使用 cron 计划的 postgresql 函数在 Postgres 和 ClickHouse 之间移动数据,作为拉取式实现。
用户可以选择添加消息传递系统,以确保更改的交付是可靠的,即使目标系统不可用。由于更改是定期批量应用,因此这种方法主要存在源和目标中更改的可见性之间存在延迟的问题。此外,这种方法需要网络访问源数据库,源数据库无法控制何时以及如何拉取数据。这在生产关键系统中通常是不希望看到的,因为管理员对提供访问权限犹豫不决。
通过实时事件流推送
在推送式变更数据捕获管道中,源系统捕获表列的更改,并将这些更改发送到目标系统,在目标系统中,这些更改将应用于当前数据。在任何时间点,两个系统都应该几乎相同,更改几乎实时应用。这种方法假设更改的可靠交付是可能的 - 通过精确一次交付语义或通过至少一次交付以及目标系统适当处理重复项的能力。虽然不是绝对必要的,但这种方法通常会引入诸如 Kafka 之类的消息传递系统,以确保可靠的消息交付。
下面,我们提出一种推送式 CDC 方法,用于在近乎实时地在 Postgres 和 ClickHouse 之间移动更改。为了实现这一点,我们利用 Postgres 的原生功能,这些功能可以识别存储级别的更改,并将这些更改转换为可消费的更改流。
跟踪 PostgreSQL 中的更改
任何 CDC 解决方案都要求源数据库提供一种健壮且可靠的方法来跟踪特定表中行和列级别的更改。PostgreSQL 通过两个基本功能公开这些更改
- 预写式日志 (WAL) 是对数据库所做的所有更改的顺序日志。每当事务修改数据库时,更改首先写入 WAL,然后再应用于实际数据文件。此过程称为预写式日志记录。这为数据库事务提供持久性和崩溃恢复。它是 PostgreSQL 事务处理机制的关键组件。通过使用 WAL,PostgreSQL 确保更改在被视为已提交的事务之前持久存储在磁盘上。WAL 对于 PostgreSQL 中的各种复制技术也至关重要,这些技术使用 WAL 捕获主数据库上所做的更改,并将它们应用于副本,从而确保多个数据库实例之间的一致性。但是,在其本机格式中,此日志未针对外部进程进行优化,因此难以消费。
- 逻辑解码在 Postgres 中将 WAL 的内容解码为连贯且易于理解的格式,例如元组流或 SQL 语句。这需要使用复制槽,复制槽表示有序更改流,通常用于在客户端和服务器之间重放事件以进行正常的 Postgres 复制。至关重要的是,这些复制槽确保更改以它们应用于源数据库的顺序交付。此排序基于内部 LSN(日志序列号),LSN 本身是指向 WAL 日志中位置的指针。此过程对于崩溃是稳健的,使用检查点来推进位置。关键的是,任何消费者都必须能够潜在地处理由于检查点之间重启而交付的重复消息(Postgres 将返回到较早的检查点 LSN 位置)。
逻辑解码执行的解码过程以及复制槽中消息的后续格式可以通过插件控制。虽然存在其他选项,但自版本 10 以来,Postgres 包含了一个标准的逻辑解码插件 pgoutput
,它不需要安装额外的库,也用于内部复制。有了这个可用的更改消息流,我们现在需要一个能够读取它们并将它们发送到 ClickHouse 的系统。为此,我们将使用开源工具 Debezium。
Debezium
Debezium 是一组用于捕获数据库中更改的服务。Debezium 将数据库表中的所有行级更改记录为有序事件流,并将这些更改发送到下游系统以供消费。Debezium 支持连接器库,旨在生成与源 DBMS 无关的格式的消息,从而允许事件的消费者使用类似的逻辑,而无需考虑来源。虽然我们尚未测试其他数据库连接器(如 MySQL 和 MSSQL),但这应该意味着以下 ClickHouse 配置应该是可重用的。
此架构要求每个连接器利用每个源数据库中的适当更改捕获功能。在 Postgres 的情况下,连接器利用逻辑解码功能和通过复制槽公开的消息,如上所述。这与自定义 Java 代码相结合,该代码使用JDBC 驱动程序和流式复制协议来读取更改流并生成事件。在标准架构中,这些事件被发送到 Kafka,以便下游接收器消费。
总之,Debezium 将为 Postgres 中发生的每个插入、更新和删除操作生成一个行更改事件。然后,必须将这些事件应用于我们 ClickHouse 实例中的等效表,以确保我们的数据一致。请注意,此处表的映射是 1:1。在其最简单的形式中,每个表部署一个 Debezium 连接器,尽管多表配置是可能的。
Kafka 和队列的作用
消息传递系统通常在源系统和目标系统之间实现,以便可以缓冲和保留更改事件,直到它们被提交到目标。此队列减轻了 Postgres 预写式日志的压力,并将系统解耦。这避免了在 ClickHouse 不可用时(例如,由于网络连接问题)Postgres WAL 日志增长而无法回收(不会无限期持有,一旦不再需要事件就可以修剪)的潜在问题。
使用 Kafka Connect 框架开发,Debezium 本身支持 Apache Kafka,使其成为我们 CDC 解决方案的显而易见的消息传递系统选择 - 特别是当与 Kafka 与 ClickHouse 集成的丰富选项结合使用时。部署和配置 Kafka 超出了本博客的范围,但 Debezium 文档包括关于主题配置和命名的建议。
请注意,Debezium 还支持服务器模式,其中事件可以发送到任何消息传递系统,例如Kinesis 或 Google Pub Sub。虽然我们尚未测试这些架构,但如果 Debezium 和 Postgres 配置保持相同,并且生成的消息使用相同的格式,则用户应该能够使用相同的 ClickHouse 配置和 CDC 方法。
ReplacingMergeTree
上面,我们描述了 Debezium 如何通过 pgoutput
插件和复制槽利用 WAL 日志,从而能够从 Postgres 生成插入、更新和删除事件流。
有了这个可用的消息流,我们描述了如何将这些更改应用于 ClickHouse。由于 ClickHouse 尚未针对删除和更新工作负载进行优化,因此我们使用 ReplacingMergeTree 表引擎来有效地处理此更改流。作为其中的一部分,我们还讨论了使用此表引擎的一些重要注意事项,以及最近的进展以及此方法在何种条件下可以最佳地工作。
优化删除和更新
虽然诸如 Postgres 之类的 OLTP 数据库针对事务性更新和删除工作负载进行了优化,但 OLAP 数据库为此类操作提供了降低的保证,并针对批量插入的不可变数据进行了优化,以实现显着更快的分析查询。虽然 ClickHouse 通过突变提供更新操作,以及轻量级删除行的方法,但其面向列的结构意味着应谨慎安排这些操作。这些操作是异步处理的,使用单线程处理,并且(在更新的情况下)需要重写磁盘上的数据。因此,它们不应用于大量的小更改。
为了处理更新和删除行流,同时避免上述使用模式,我们可以使用 ClickHouse 表引擎ReplacingMergeTree。
此表引擎允许将更新操作应用于行,而无需使用低效的 ALTER 或 DELETE 语句,它允许用户插入同一行的多个副本,并将其中一个副本指定为最新版本。反过来,后台进程异步删除同一行的旧版本,通过使用不可变插入有效地模拟更新操作。
这依赖于表引擎识别重复行的能力。这是通过使用 ORDER BY
子句来确定唯一性来实现的,即,如果两行对于 ORDER BY
中指定的列具有相同的值,则它们被认为是重复项。版本列(在定义表时指定)允许在将两行识别为重复项时保留行的最新版本,即保留版本值最高的行。
此外,可以指定 deleted 列。它可以包含 0 或 1,其中值 1 表示应删除该行(及其重复项),否则使用零。
我们在下面的示例中说明此过程。此处,行由 A
列(表的 ORDER BY
)唯一标识。我们假设这些行已作为两个批次插入,从而在磁盘上形成两个数据部分。稍后,在异步后台进程中,这些部分合并在一起。在此过程中,会发生以下情况
- 由列
A
的值 1 标识的行既有版本 2 的更新行,也有版本 3 的删除行(以及deleted
列值为 1)。因此,将删除此键的所有行。 - 由列
A
的值 2 标识的行既有版本 2 的更新行。因此,将保留后一行,其price
列的值为 6。 - 由列
A
的值 3 标识的行既有版本 2 的更新行。因此,将保留后一行,其price
列的值为 3。
作为此合并过程的结果,我们有两行表示最终状态。
如所述,此重复删除过程在合并时发生,在后台异步进行,并且仅最终一致。或者,可以使用特殊的 FINAL 语法在查询时调用它,以确保结果准确 - 请参阅在 ClickHouse 中查询。
除了用于重复删除外,此表引擎的属性还可用于处理更新和删除工作负载。假设我们在 ClickHouse 中有以下简单表和行。请注意 ORDER BY
子句如何定义行唯一性,并设置为列 key
。我们还在引擎创建语句中定义了 version
和 deleted
列
CREATE TABLE test
(
`key` String,
`price` UInt64,
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
ORDER BY key
{"version":1,"deleted":0,"key":"A","price":100}
假设我们需要更新此行的 price 列(因为它在 Postgres 中已更改)。为了实现这一点,我们插入以下行。请注意,键值是相同的,但版本值已递增。
{"version":2,"deleted":0,"key":"A","price":200}
如果我们稍后希望删除此行,我们将再次插入一个具有相同键值、更高版本以及 deleted 列值为 1 的重复项。
{"version":3,"deleted":1,"key":"A","price":200}
这种更新和删除方法允许我们显式地避免低效的 ALTER 和 DELETE 命令,而是通过插入不可变的行并允许 ClickHouse 异步协调更改来模拟这些操作。
删除的局限性
移除已删除的行
仅当表级设置 clean_deleted_rows
设置为 Always
时,才会在合并时删除已删除的行。默认情况下,此值设置为 Never
,这意味着永远不会删除行。截至 ClickHouse 版本 23.5,当值设置为 Always
时,此功能存在已知问题,导致可能删除错误的行。
因此,目前,我们建议使用值 Never
,如图所示 - 这将导致已删除的行累积,这在低容量下可能是可以接受的。要强制删除已删除的行,用户可以定期计划对表执行 OPTIMIZE FINAL CLEANUP
操作,即
OPTIMIZE TABLE uk_price_paid FINAL CLEANUP
这是一个 I/O 密集型操作,应在空闲期间谨慎安排。
因此,在上述问题得到解决之前,我们建议我们的 CDC 管道仅适用于删除数量较少到中等的表(少于 10%)。
仅按顺序交付
如果支持删除并依赖 OPTIMIZE FINAL CLEANUP
或 clean_deleted_rows=Always
(在上述问题解决后)来删除行,则必须按顺序交付每个 Postgres 行的更改。更具体地说,对于配置的 ORDER BY
列的一组不同值的行,必须按照它们在 Postgres 中出现的顺序插入。
如果未满足此约束,则如果更新发生在后台合并或计划的 OPTIMIZE FINAL CLEANUP
操作已执行删除操作之后,则可能会错误地保留行。考虑以下事件序列
如上所示,接收到唯一键值 A 的行的插入,然后是删除事件。OPTIMIZE FINAL CLEANUP
或后台合并发生,导致删除所有行。反过来,由于乱序插入,接收到版本低于先前删除的更新事件。此行被错误地保留。此问题可以简单地重现。请注意,如果 clean_deleted_rows=Never
(如当前建议的那样),则不会发生此问题,因为已删除的行会被保留。
为了解决这个问题,Debezium 默认情况下为其 Kafka 主题使用单个分区,从而确保按顺序交付(以及仅使用一个 Kafka 任务)。虽然这对于大多数工作负载来说已经足够了,但更高的吞吐量可能需要多个分区。
如果需要多个分区,用户可能希望探索主题分区路由,对 ORDER BY
列进行哈希处理,以确保同一行的所有更改事件都转到同一分区,从而保证按顺序交付。虽然这不能保证跨所有事件的按顺序交付,但它可以确保更改对于特定的 Postgres 行和一组 ORDER BY
值按顺序交付 - 足以避免上述竞争条件。注意:我们尚未测试此多主题配置。
或者,用户可以避免使用 OPTIMIZE FINAL CLEANUP
,而只是允许已删除的行累积,或确保谨慎执行它,例如,暂停 Postgres 上的更改,允许 Kafka 队列清除,在开始对 Postgres 进行更改之前执行删除清理。用户也可能能够对不再受更改影响的选择性分区发出 OPTIMIZE FINAL CLEANUP
。
行要求
因此,为了使我们的 Postgres 到 ClickHouse 变更数据捕获管道能够处理更新和删除,发送到 ClickHouse 的任何行都需要满足以下条件
- ClickHouse
ORDER BY
子句中列的值必须唯一标识 Postgres 中的一行。 - 当发送任何更改事件行时(无论是更新还是删除),它必须包含
ORDER BY
子句中列的相同值,这些值唯一标识该行。这些值不能更改,应被视为不可变的。ORDER BY
子句通常将包含 Postgres 中的主键列。用户还将希望包含与查询访问模式对齐的列,以优化查询性能。更新不能更改这些列 - 请参阅选择主键。 - 当发送更新的行时,它必须包含表中的所有列,并包含行的最终状态作为值,以及
deleted
列的值 0 和高于先前行的version
值。 - 当发送删除行时,它必须包含
ORDER BY 子句
的所有列,以及 deleted 列的值 1,以及高于先前列的version
列的值。 - 插入必须包含所有列值、
deleted
列的值 0 和version
编号,该编号将保证低于后续更新。
Postgres 行的更改事件可以独立发送到 ClickHouse。如果用户允许已删除的行累积,则特定行的更改事件也可以乱序发送。如果要删除删除事件,则对于任何特定的 Postgres 行,更改事件必须按顺序发生如上所述(但不是跨 Postgres 行)。在设计多线程/进程方法以消费来自 Kafka 的消息时,必须考虑这些约束。
以上所有操作都需要一个版本号,该版本号满足确保它单调递增并反映特定行的 Postgres 中事件顺序的必要属性。
在我们的下一篇文章中,我们将讨论如何转换 Debezium 的更改事件消息以满足 ReplacingMergeTree 的上述要求。
选择主键
上面,我们强调了一个重要的附加约束,该约束也必须在 ReplacingMergeTree
的情况下满足:ORDER BY
列的值在更改中唯一标识 Postgres 行。因此,Postgres 主键应包含在 Clickhouse ORDER BY
子句中。
ClickHouse 用户应该熟悉在表的 ORDER BY
子句中选择列以优化查询性能的做法。通常,应根据您的常用查询并按照基数递增的顺序选择这些列。重要的是,ReplacingMergeTree 施加了一个额外的约束 - 这些列必须是不可变的,即,此子句中只能添加在底层 Postgres 数据中不发生更改的列。虽然其他列可以更改,但这些列需要保持一致以用于唯一行标识。
对于分析型工作负载,Postgres 主键通常用途不大,因为用户很少执行点行查找(这并非 OLAP 数据库的优化方向,与 OLTP 数据库不同)。鉴于我们建议列按照基数递增的顺序排列,以及ORDER BY 中较早列出的列通常匹配速度更快,Postgres 主键应附加到 ORDER BY
的末尾(除非它具有分析价值)。如果 Postgres 中的多个列构成主键,则应将它们附加到 ORDER BY
,并考虑基数和查询值的可能性。用户可能还希望通过 MATERIALIZED 列使用值串联生成唯一主键。
考虑以下 英国房价数据集的架构。此处,id
列表示 Postgres 中的唯一主键(为示例目的添加到数据集)。列 postcode1
、postcode2
、addr1
、addr2
表示分析查询中常用的列。id
列因此附加到 ORDER BY
子句的末尾,并以这些列作为前导。
CREATE TABLE default.uk_price_paid
(
`id` UInt64,
`price` UInt32,
`date` Date,
`postcode1` LowCardinality(String),
`postcode2` LowCardinality(String),
`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
`is_new` UInt8,
`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
`addr1` String,
`addr2` String,
`street` LowCardinality(String),
`locality` LowCardinality(String),
`town` LowCardinality(String),
`district` LowCardinality(String),
`county` LowCardinality(String),
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplacingMergeTree(version, deleted)
PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id)
ORDER BY
子句配置磁盘上数据的顺序。默认情况下,如果未指定,PRIMARY KEY
子句也设置为相同的值。 此子句配置关联的稀疏主索引。ORDER BY
必须包含 PRIMARY KEY
作为前缀,因为后者假定数据已排序。
为了提高性能,此主索引保存在内存中,并使用使用标记的间接级别以最小化大小。在 ReplacingMergeTree 中使用 ORDER BY
键进行重复数据删除可能会导致其变得很长,从而增加内存使用量。如果这成为问题,用户可以直接指定 PRIMARY KEY
,限制加载到内存中的列,同时保留 ORDER BY
以最大化压缩并强制唯一性。
使用这种方法,可以从 PRIMARY KEY
中省略 Postgres 主键列 - 在不影响查询性能的情况下节省内存。我们也将此应用于上面的示例。
在 ClickHouse 中查询
在合并时,ReplacingMergeTree 使用 ORDER BY
值作为唯一标识符来识别重复行,并且仅保留最高版本,或者如果最新版本指示删除,则删除所有重复项(等待解决早前提到的问题_。然而,这仅提供最终的一致性 - 它不保证行将被去重,您不应依赖它。因此,由于查询中考虑了 update
和 delete
行,查询可能会产生不正确的答案。
为了获得正确的答案,用户需要使用查询时去重和删除移除来补充后台合并。这可以使用 FINAL 运算符来实现。考虑以下使用英国房价数据集的示例。
postgres=> select count(*) FROM uk_price_paid;
count
----------
27735104
(1 row)
postgres=> SELECT avg(price) FROM uk_price_paid;
avg
---------------------
214354.531780374791
(1 row)
– no FINAL, incorrect result
SELECT count()
FROM uk_price_paid
┌──count()─┐
│ 27735966 │
└──────────┘
– FINAL, correct result
SELECT count()
FROM uk_price_paid
FINAL
┌──count()─┐
│ 27735104 │
└──────────┘
– no FINAL, incorrect result
SELECT avg(price)
FROM uk_price_paid
┌─────────avg(price)─┐
│ 214353.94542901445 │
└────────────────────┘
– FINAL, correct result with some precision
SELECT avg(price)
FROM uk_price_paid
FINAL
┌────────avg(price)─┐
│ 214354.5317803748 │
└───────────────────┘
FINAL 性能
FINAL 运算符会对查询产生性能开销,尽管 22.6 版本中进行了改进,确保了去重步骤是多线程的。当查询未在主键列上进行过滤时,这将最为明显,导致读取更多数据并增加去重开销。如果用户使用 WHERE 条件在键列上进行过滤,则加载并传递用于去重的数据将减少。
如果 WHERE 条件未使用键列,则在使用 FINAL 时,ClickHouse 当前不使用 PREWHERE 优化。此优化旨在减少为非过滤列读取的行数。当查询运行时,首先使用表的主键识别读取所需的 granule。这会识别一组 granule,每个 granule 包含若干行(默认为 8192 行)。但是,并非这些 granule 内的所有行都将与主键上的过滤器子句匹配 - 因为 granule 可以包含一系列值,或者因为 WHERE 条件未使用主键。为了识别正确的行,并在可以读取 SELECT 列之前,因此需要额外的过滤。这是在使用 PREWHERE 的读取的第二阶段中执行的,PREWHERE 允许在 WHERE 子句中进一步过滤主键列和非主键列。ClickHouse 通常根据内部启发式方法将列移动到 PREWHERE 阶段。但是,当使用 FINAL 时,此优化当前未应用。有关 PREWHERE
的最新改进的更多详细信息,请访问此处。
为了模拟此优化,用户可以重写查询以使用子查询。例如,考虑以下查询,该查询查找伦敦的房产均价
–establish correct answer in postgres
postgres=> SELECT avg(price)
FROM uk_price_paid WHERE town = 'LONDON';
avg
---------------------
474799.921480528985
(1 row)
SELECT avg(price)
FROM uk_price_paid
WHERE town = 'LONDON'
┌─────────avg(price)─┐
│ 474797.35553246835 │
└────────────────────┘
1 row in set. Elapsed: 0.033 sec. Processed 27.74 million rows, 39.93 MB (835.45 million rows/s., 1.20 GB/s.)
EXPLAIN SYNTAX
SELECT avg(price)
FROM uk_price_paid
WHERE town = 'LONDON'
┌─explain──────────────────┐
│ SELECT avg(price) │
│ FROM uk_price_paid │
│ PREWHERE town = 'LONDON' │
└──────────────────────────┘
3 rows in set. Elapsed: 0.002 sec.
虽然在没有 FINAL 的情况下此结果不正确,但我们可以看到应用了 PREWHERE
优化以帮助实现 0.075 秒的执行时间。在下面使用 FINAL 返回了正确的答案,但性能降低了 20 多倍。
SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE town = 'LONDON'
┌───────avg(price)─┐
│ 474799.921480529 │
└──────────────────┘
1 row in set. Elapsed: 0.725 sec. Processed 29.65 million rows, 1.41 GB (40.88 million rows/s., 1.94 GB/s.)
EXPLAIN SYNTAX
SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE town = 'LONDON'
┌─explain───────────────┐
│ SELECT avg(price) │
│ FROM uk_price_paid │
│ FINAL │
│ WHERE town = 'LONDON' │
└───────────────────────┘
4 rows in set. Elapsed: 0.004 sec.
我们可以通过在子查询中返回主键并在外部查询上仅使用 FINAL 来部分模拟 PREWHERE。这没有达到早期(不准确)查询的相同性能,但为增加的复杂性提供了一些改进
SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE ((postcode1, postcode2) IN (
SELECT
postcode1,
postcode2
FROM uk_price_paid
WHERE town = 'LONDON'
GROUP BY
postcode1,
postcode2
)) AND (town = 'LONDON')
┌───────avg(price)─┐
│ 474799.921480529 │
└──────────────────┘
1 row in set. Elapsed: 0.287 sec. Processed 31.55 million rows, 230.30 MB (109.88 million rows/s., 802.08 MB/s.)
当内部查询返回主键值的小子集时,此解决方法最有效。
利用分区
ClickHouse 中数据的合并发生在分区级别。当使用 ReplacingMergeTree 时,
我们建议用户根据 最佳实践对表进行分区,前提是用户可以确保**行的此分区键不会更改**。这将确保与同一行相关的更新将被发送到相同的 ClickHouse 分区。
假设情况如此,用户可以使用设置 do_not_merge_across_partitions_select_final=1
来提高 FINAL 查询性能。此设置使分区在使用 = FINAL 时被独立合并和处理。考虑下表,我们在表中按年份对数据进行分区,并计算多个分区的平均价格。
CREATE TABLE default.uk_price_paid_year
(
`id` UInt64,
`price` UInt32,
`date` Date,
`postcode1` LowCardinality(String),
`postcode2` LowCardinality(String),
`type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4),
`is_new` UInt8,
`duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2),
`addr1` String,
`addr2` String,
`street` LowCardinality(String),
`locality` LowCardinality(String),
`town` LowCardinality(String),
`district` LowCardinality(String),
`county` LowCardinality(String),
`version` UInt64,
`deleted` UInt8
)
ENGINE = ReplacingMergeTree( version, deleted) PRIMARY KEY (postcode1, postcode2, addr1, addr2)
ORDER BY (postcode1, postcode2, addr1, addr2, id) PARTITION BY toYear(date)
INSERT INTO default.uk_price_paid_year SELECT * FROM default.uk_price_paid
——query on original table
SELECT avg(price)
FROM uk_price_paid
FINAL
WHERE (toYear(date) >= 1990) AND (toYear(date) <= 2000)
┌────────avg(price)─┐
│ 85861.88784270117 │
└───────────────────┘
1 row in set. Elapsed: 0.702 sec. Processed 29.65 million rows, 1.44 GB (42.23 million rows/s., 2.05 GB/s.)
-– query on partitioned table
SELECT avg(price)
FROM uk_price_paid_year
FINAL
WHERE (toYear(date) >= 1990) AND (toYear(date) <= 2000)
┌────────avg(price)─┐
│ 85861.88784270117 │
└───────────────────┘
1 row in set. Elapsed: 0.492 sec. Processed 9.27 million rows, 443.09 MB (18.83 million rows/s., 900.24 MB/s.)
-— performance with do_not_merge_across_partitions_select_final = 1
SET do_not_merge_across_partitions_select_final = 1
SELECT avg(price)
FROM uk_price_paid_year
FINAL
WHERE (toYear(date) >= 1990) AND (toYear(date) <= 2000)
┌────────avg(price)─┐
│ 85861.88784270117 │
└───────────────────┘
1 row in set. Elapsed: 0.230 sec. Processed 7.62 million rows, 364.26 MB (33.12 million rows/s., 1.58 GB/s.)
如上所示,由于我们的查询目标是 10 个分区并减少了读取所需的数据,因此分区提高了性能。通过将查询时去重限制为独立分区,性能得到进一步提高。
结论
在这篇博文中,我们探讨了基于拉取的 CDC 管道的构建块,用于使用 Debezium 在 Postgres 和 ClickHouse 之间移动数据。这包括如何跟踪 Postgres 中的更改、Debezium 和 ReplacingMergeTree 的介绍。这些概念可以结合起来生成以下管道。
在本系列的下一篇文章中,我们将为测试数据集构建一个工作管道,重点介绍重要的配置选项,以确保各个组件正确交互。敬请期待!