DoubleCloud 即将停止运营。使用有限时间免费迁移服务迁移到 ClickHouse。立即联系我们。 ->->

博客 / 工程

大规模 ClickHouse 数据加载加速 - 使大规模数据加载更具弹性

author avatar
Tom Schreiber
2023 年 11 月 1 日

large_data_loads-p3-01.png

这篇文章是“大规模 ClickHouse 数据加载加速”系列的一部分

简介

通常,尤其是在 从其他系统迁移 到 ClickHouse Cloud 时,必须先加载大量数据。从头开始加载数十亿甚至数万亿行数据可能很困难,因为这些数据加载需要一些时间。时间越长,网络故障等瞬态问题中断并停止数据加载的可能性就越高。我们将展示如何解决这些挑战并避免数据加载中断。

在本系列关于大规模 ClickHouse 数据加载加速的三篇博文中,我们将介绍第三部分,也是最后一部分,提供有关大规模数据加载的最佳实践,以实现弹性和效率。

为此,我们将简要介绍一个新的托管解决方案,用于将外部系统中的大数据量加载到 ClickHouse 中,并提供内置的弹性功能。然后,我们深入了解来自我们优秀的 支持 团队的 脚本,该团队帮助我们的一些 ClickHouse Cloud 客户成功迁移了包含数万亿行的数据集。如果您的外部数据源尚未得到我们内置的托管解决方案的支持,您可以暂时使用此脚本以增量方式可靠地加载大型数据集,时间跨度很长。

弹性数据加载

将外部系统中的数十亿甚至数万亿行数据从头开始加载到 ClickHouse 表中需要一些时间。例如,假设(任意)基本传输吞吐量为每秒 1000 万行,每小时可以加载 360 亿行,每天可以加载 8640 亿行。加载数万亿行需要几天时间。

这足够时间让事情暂时出错 - 例如,可能存在瞬态网络连接问题,导致数据加载中断并失败。如果没有使用状态化编排来管理数据传输,并提供内置的弹性和自动重试功能,用户通常会诉诸截断 ClickHouse 目标表,并从头开始重新启动整个数据加载。这会进一步增加加载数据所需的总时间,最重要的是,也可能再次失败,让您不高兴。或者,您可以尝试确定哪些数据已成功加载到 ClickHouse 目标表中,然后尝试仅加载缺少的子集。如果没有数据中的唯一序列,这可能很困难,需要人工干预和额外时间。

ClickPipes

ClickPipesClickHouse Cloud 中的一个完全托管的集成解决方案,提供对来自外部系统的持续、快速、弹性和可扩展数据摄取的内置支持。


clickpipes.gif


ClickPipes 于今年 9 月正式发布,目前支持多种 Apache Kafka 版本:OSS Apache KafkaConfluent CloudAWS MSK。其他事件管道和对象存储(S3、GCS 等)将 很快 得到支持,随着时间的推移,还将与其他数据库系统直接集成。

请注意,ClickPipes 会 自动重试 发生故障时的数据传输,并且目前 提供 最少一次语义。我们还为从 Kafka 加载数据但需要至少一次语义的用户提供 Kafka Connect 连接器

ClickLoad

如果您的外部数据源尚未得到 ClickPipes 的支持,或者您没有使用 ClickHouse Cloud,我们建议您使用此图中概述的方法:large_data_loads-p3-02.png ① 首先将您的数据导出到 对象存储 存储桶(例如,Amazon AWS S3、Google GCP Cloud Storage 或 Microsoft Azure Blob Storage)中,理想情况下,数据以 中等 大小的 Parquet 文件形式存储,大小在 50 到 150 MB 之间,但其他 格式 也适用。之所以提出这个建议,有几个原因

  1. 大多数当前的数据库系统和外部数据源都支持将非常大的数据量高效且可靠地导出到(相对便宜的)对象存储中。

  2. Parquet 已成为一种几乎无处不在的文件交换格式,它提供高效的 存储检索 功能。ClickHouse 拥有非常快的 内置 Parquet 支持功能。

  3. ClickHouse 服务器可以使用 高水平的并行性 处理和插入存储在对象存储中的文件中的数据,从而利用所有可用的 CPU 内核。

② 然后,您可以使用我们在下面详细介绍的 ClickLoad 脚本,该脚本通过使用我们的对象存储集成表函数之一(例如,s3GCSAzureBlobStorage)来编排可靠且弹性的数据传输到 ClickHouse 目标表中。

我们提供了一个使用 ClickLoad 的逐步示例 此处

核心方法

ClickLoad 的基本理念源于我们的 支持 团队,该团队指导我们的一些 ClickHouse Cloud 客户成功迁移了包含数万亿行数据的数据集。

ClickLoad 以经典的 分而治之 方式将要加载的文件数据分成可重复且可重试的任务。这些任务用于将数据从对象存储中增量加载到 ClickHouse 表中,如果发生故障,会自动重试。

为了轻松扩展,我们采用另一个博客中介绍的队列-工作器方法。无状态 ClickLoad 工作器通过从由任务表(由KeeperMap 表引擎支持)可靠地声明文件加载任务来协调数据加载。每个任务可能包含多个要处理的文件,KeeperMap 表保证每个任务(因此每个文件)只能分配给一个工作器。这使我们能够通过启动额外的工作器来并行化文件加载过程,从而提高整体的摄取吞吐量。

作为先决条件,任务表将填充有用于 ClickLoad 工作器的文件加载任务。


large_data_loads-p3-03.png


① 用户可以使用相应的对象存储命令行界面工具(例如 Amazon aws-cli、Microsoft azure-cli 或 Google gsutil)来创建一个包含要加载文件的对象存储 URL 的本地文件。

② 我们提供了说明以及一个单独的脚本queue_files.py,它可以分割步骤①中来自文件的条目,将其分成文件 URL,并将这些块加载到任务表中作为文件加载任务。

下图显示了任务表中的这些文件加载任务如何被 ClickLoad 工作器用来协调数据加载。


large_data_loads-p3-04.png


为了轻松重试文件加载(如果出现问题),每个 ClickLoad 工作器首先将所有文件数据加载到一个(每个工作器不同暂存表中:在①声明其下一个任务后,工作器会遍历任务的文件 URL 列表,并②(按顺序)指示 ClickHouse 服务器使用INSERT INTO SELECT FROM 查询(ClickHouse 服务器本身会拉取来自对象存储的文件数据)将任务中的每个文件从对象存储加载到暂存表中。这会以高度并行的方式插入文件数据,并在暂存表中创建数据块

假设当前任务的某个插入查询在暂存表中已经包含了一些数据(以块的形式)的状态之间被中断并失败。在这种情况下,工作器首先③指示 ClickHouse 服务器清空暂存表(删除所有块),然后②重试其当前的文件加载任务。成功完成任务后,工作器③使用特定的查询命令,指示 ClickHouse 服务器移动暂存表中所有分区的所有块到目标表。然后,工作器①声明并处理任务表中的下一个任务。

ClickHouse Cloud 中,所有数据都单独存储在共享对象存储中,与 ClickHouse 服务器分开。因此,移动块是一个轻量级操作,只更改块的元数据,不会物理地移动块。

请注意,每个工作器在启动时创建自己的暂存表,然后执行一个无限循环来检查未处理的任务,如果找不到新任务,则暂停。我们使用一个信号处理程序,它注册SIGINTCtrl+C)和SIGTERM(Unix 进程kill 信号)信号,用于在工作器关闭时清理(删除)工作器的暂存表。

此外,请注意,工作器会处理原子文件块,而不是单个文件,以减少对 Keeper 的争用(如果使用了复制表)。后者会导致(更多)大量的MOVE PARTITION 调用,这些调用由 Keeper 在复制集群中进行协调。此外,我们随机化文件块大小,以防止多个并行工作器运行时对 Keeper 造成争用。

暂存表确保加载的数据将完全存储一次。

ClickLoad 工作器使用的INSERT INTO SELECT FROM 查询可以将文件数据直接插入目标表中。但如果插入查询始终失败,为了在重试任务时不会导致数据重复,我们需要从之前失败的任务中删除所有数据。这种删除比将数据插入暂存表(可以简单地清空)要困难得多。

例如,仅仅删除目标表中的块是不可能的,因为最初插入的块会在后台自动合并(可能与之前成功插入的块合并)。

依靠自动插入去重通常也不可行,因为(1)插入线程极不可能重新创建完全相同的插入块,以及(2)随着大量运行的工作器,ClickHouse 中的每个表去重窗口的默认值可能不足。

最后,使用OPTIMIZE DEDUPLICATE 语句显式地对所有行进行去重将是(1)一个非常繁重且缓慢的操作,目标表越大,操作越慢,以及(2)可能会意外地对源数据文件中有意重复的行进行去重。

在重试插入之前,可靠地从目标表中删除失败的插入中的数据唯一的办法是使用ALTER TABLE DELETE轻量级删除轻量级更新语句。所有这些最终都会通过一个繁重的修改操作来实现,随着表的大小增加,该操作的成本也会增加。

相反,通过暂存表进行迂回,我们的工作器能够通过有效地删除或移动块(取决于任务是失败还是成功)来保证从加载的文件中获得的每一行在目标表中完全存储一次。

工作器是轻量级的。

ClickLoad 的工作器脚本只是协调数据加载,本身并不实际加载任何数据。相反,ClickHouse 服务器及其硬件资源被用于拉取来自对象存储的数据并将其写入 ClickHouse 表中。

请注意,运行 ClickLoad 需要一台单独的机器,该机器能够访问源对象存储桶和目标 ClickHouse 实例。由于工作器是轻量级的,一台中等大小的机器可以运行数百个并行工作器实例。

插入吞吐量可以轻松扩展。

多个工作器和多个 ClickHouse 服务器(在 ClickHouse Cloud 中使用负载均衡器)可以用来扩展摄取吞吐量。


large_data_loads-p3-05.png


来自所有工作器的INSERT INTO SELECT FROM 查询将被均匀地分配到可用的 ClickHouse 服务器,并在这些服务器上并行执行。请注意,每个工作器都有自己的暂存表

如果执行插入查询的 ClickHouse 服务器拥有足够的资源,那么将工作器数量增加一倍可以将摄取吞吐量增加一倍。

类似地,将 ClickHouse 服务器数量增加一倍可以将摄取吞吐量增加一倍。在我们的测试中,当加载一个包含 6000 亿多行的数据集(使用 100 个并行工作器)时,将 ClickHouse Cloud 服务中的 ClickHouse 服务器数量从 3 个增加到 6 个,摄取吞吐量恰好增加了一倍(从每秒 400 万行增加到每秒 800 万行)。

可以实现持续数据摄取。

如上所述,每个工作器都会执行一个无限循环来检查任务表中的未处理任务,如果找不到新任务,则暂停。这使得我们可以轻松实现持续数据摄取过程,方法是:如果在对象存储桶中检测到新文件,则将新的文件加载任务添加到任务表中。然后,正在运行的工作器会自动声明这些新计划的任务。我们在这里描述了一个具体的示例,但具体的实现留给读者。

支持任何分区键。

ClickLoad 工作程序的文件加载机制独立于目标表的任何 分区 方案。工作程序不会自行创建任何分区。我们也不要求每个加载的文件都属于特定的分区。相反,目标表可以拥有任何(或没有)自定义分区键,我们在暂存表中复制此键(暂存表是目标表的 DDL 级 克隆)。

在每个成功的 文件块传输 后,我们只需 移动 暂存表在从当前处理的文件中摄取数据期间自然创建的所有(属于)分区。这意味着总体而言,为目标表创建的分区数量与直接将所有数据(不使用我们的 ClickLoad 脚本)插入目标表时创建的分区数量完全相同。相同

您可以在 此处 找到更详细的说明。

完全支持投影和物化视图

可靠地将数万亿行数据加载到目标表是第一步。但是,投影物化视图 可以通过允许表具有 自动增量聚合 和具有 附加主键 的多行排序来 加速您的查询

在初始加载数万亿行数据 **之后** 在目标表上创建投影或物化视图将需要昂贵的投影 物化 或 ClickHouse 端表到表的重新加载数据以 触发 物化视图。这两者都需要很长时间,包括出现错误的风险。因此,最有效的选择是在初始数据加载 **之前** 创建投影和物化视图。ClickLoad 完全(并且透明地)支持这一点。

投影支持

我们的 ClickLoad 工作程序脚本 创建 的暂存表是目标表的完整 DDL 级克隆,包括所有定义的投影。由于投影的数据部分 存储 为投影主机表 部分目录 内的子目录,因此它们在每个文件加载任务后会自动从暂存表移动到目标表。

物化视图支持

下图显示了 ClickLoad 物化视图支持的基本逻辑


large_data_loads-p3-06.png


在上图中,目标表有两个连接的物化视图(MV-1MV-2),它们将在新的直接插入到目标表中时触发,并将数据(以 转换 的形式)存储在它们自己的目标表中。

我们的 ClickLoad 工作程序脚本通过自动创建暂存表来复制此行为,该暂存表不仅针对主目标表 创建,还针对所有物化视图 (mv) 目标表 创建。除了额外的暂存表,我们还自动 创建 物化视图触发器的克隆,但将它们配置为对暂存表上的插入做出反应,然后将其对应到相应的暂存 mv 目标表。

当 ① 数据插入到目标表的暂存表中(以及 ② 以部分的形式存储)时,此插入 ③ 会自动触发 mv 副本,④ 导致相应的插入到暂存 mv 的目标表中。如果整个插入成功,我们将 ⑤ 移动 所有部分(分区)从暂存表到其对应表。如果出现错误,例如,其中一个物化视图对当前数据存在问题,我们只需 删除 所有暂存表中的所有部分,然后 重试 插入。如果 最大重试次数 超过,我们将跳过(并记录)当前文件并继续下一个文件。通过这种机制,我们确保插入是原子的,并且数据在主目标表和所有连接的物化视图之间始终保持一致。

请注意,在步骤 ⑤ 中将部分移动到目标表不会触发任何连接的原始物化视图。

有关失败的物化视图的详细错误信息,请查看 query_views_log 系统表。

此外,如 上文所述,物化视图的目标表可以拥有任何(或没有)自定义分区键。我们的编排逻辑独立于此。

我们的 ClickLoad 工作程序脚本目前仅支持使用 TO target_table 子句 创建的物化视图,不支持 链接级联)物化视图。

ClickLoad 最适合中等大小的文件

工作程序的处理单元是整个文件。如果在加载文件时出现错误,我们将重新加载整个文件。因此,我们建议使用中等大小的文件,每个文件包含数百万而不是数万亿行,压缩后的尺寸约为 100 到 150 MB。这样可以确保有效的重试机制。

欢迎提交 PR

如上所述,我们的 ClickLoad 脚本的起源是帮助一些 ClickHouse Cloud 客户在支持交互过程中迁移大量数据。因此,该脚本目前依赖于云特定功能,例如 MOVE PARTITION SharedMergeTree 引擎的轻量级操作。该引擎还允许 轻松 扩展 ClickHouse 服务器的数量,以 提高 摄取吞吐量。我们还没有机会在其他设置上测试该脚本,但我们欢迎 贡献。原则上,它应该可以在其他设置上运行,只需进行少量调整。在分片集群中,必须在所有分片上运行 MOVE PARTITION 操作,例如,通过利用 ON CLUSTER 子句。此外,请注意,当使用 零拷贝复制 时,MOVE PARTITION 目前不能并发运行。我们希望该脚本可以作为有用的起点,我们欢迎在更多用例和场景中使用它,并共同改进它!

将来,我们希望在构建 ClickPipes 中从对象存储中摄取文件的支持时,考虑该脚本的机制。敬请关注更新!

总结

加载包含数万亿行的大型数据集可能是一个挑战。为了克服这个问题,ClickHouse Cloud 提供了 ClickPipes - 一种内置的托管集成解决方案,提供对弹性加载大型数据量的支持,该方案对中断具有鲁棒性,并具有自动重试功能。如果您的外部数据源尚不支持,我们探索了 ClickLoad 的机制 - 一个用于在很长一段时间内增量可靠地加载大型数据集的脚本。

这结束了我们关于加速大型数据加载的三部分博客系列。

分享此文章

订阅我们的时事通讯

及时了解功能发布、产品路线图、支持和云服务!
加载表单...
关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image