此博客文章是为您的 ClickHouse 大型数据加载提速
系列的一部分
简介
通常,尤其是在从其他系统迁移到 ClickHouse Cloud 时,必须首先加载大量数据。从头开始加载数十亿或数万亿行数据可能具有挑战性,因为此类数据加载需要一些时间。时间越长,网络故障等瞬时问题可能中断并停止数据加载的可能性就越高。我们将展示如何应对这些挑战并避免数据加载中断。
在我们关于为您的 ClickHouse 大型数据加载提速的三部分博客系列的第三部分也是最后一部分中,我们将为您提供我们用于弹性高效大型数据加载的最佳实践。
为此,我们将简要介绍一种新的托管解决方案,用于将外部系统中的大量数据加载到 ClickHouse 中,并具有内置的弹性。然后,我们将深入了解来自我们出色的支持团队的脚本的内部机制,该团队帮助我们的一些ClickHouse Cloud客户成功迁移了包含数万亿行的数据集。如果您的外部数据源尚不支持我们的内置托管解决方案,您可以暂时使用此脚本,以便长期增量且可靠地加载大型数据集。
弹性数据加载
从外部系统从头开始将数十亿甚至数万亿行数据加载到 ClickHouse 表中需要一些时间。例如,假设(任意)基本传输吞吐量为每秒 1000 万行,则每小时可以加载 360 亿行,每天可以加载 8640 亿行。加载数万亿行将需要数天时间。
这段时间足以让事情暂时出错 - 例如,可能会出现瞬时网络连接问题,导致数据加载中断并失败。如果没有使用有状态编排数据传输的托管解决方案,并且内置了弹性和自动重试,用户通常会求助于截断 ClickHouse 目标表并从头开始重新启动整个数据加载。这进一步增加了加载数据所需的总时间,最重要的是,也可能再次失败,让您感到不满意。或者,您可以尝试确定哪些数据已成功摄取到 ClickHouse 目标表中,然后尝试仅加载缺失的子集。如果没有数据中的唯一序列,这可能会很棘手,并且需要手动干预和额外时间。
ClickPipes
ClickPipes是ClickHouse Cloud中完全托管的集成解决方案,为从外部系统进行持续、快速、弹性且可扩展的数据摄取提供内置支持
ClickPipes 仅在今年 9 月正式发布,目前支持多种风格的 Apache Kafka:OSS Apache Kafka、Confluent Cloud 和 AWS MSK。其他事件管道和对象存储(S3、GCS 等)将很快得到支持,并且随着时间的推移,将直接与其他数据库系统集成。
请注意,ClickPipes 将在发生故障时自动重试数据传输,并且目前提供至少一次语义。我们还为需要精确一次语义的 Kafka 用户提供了一个 Kafka Connect 连接器。
ClickLoad
如果您的外部数据源尚不支持 ClickPipes,或者您未使用 ClickHouse Cloud,我们建议采用此图表中概述的方法: ① 首先将您的数据导出到对象存储桶中(例如 Amazon AWS S3、Google GCP Cloud Storage 或 Microsoft Azure Blob Storage),理想情况下采用适度大小的 Parquet 文件,大小介于 50 到 150MB 之间,但其他格式也适用。提出此建议有以下几个原因
-
当前大多数数据库系统和外部数据源都支持将非常大的数据量高效可靠地导出到(相对便宜的)对象存储中。
-
Parquet 已成为几乎无处不在的文件交换格式,它提供了高效的存储和检索。ClickHouse 具有惊人的快速内置 Parquet 支持。
-
ClickHouse 服务器可以使用高水平的并行性处理和插入存储在对象存储中的文件中的数据,从而利用所有可用的 CPU 内核。
② 然后,您可以使用我们的 ClickLoad 脚本,如下文更详细的描述,该脚本通过利用我们的对象存储集成表函数之一(例如 s3、GCS 或 AzureBlobStorage)来协调可靠且有弹性的数据传输到您的 ClickHouse 目标表中。
我们在此处提供了使用 ClickLoad 的分步示例。
核心方法
ClickLoad 的基本思想源于我们的支持团队指导我们的一些 ClickHouse Cloud 客户成功迁移了包含数万亿行的数据。
以经典的 分而治之 方式,ClickLoad 将要加载的整体文件数据拆分为可重复且可重试的任务。这些任务用于将对象存储中的数据增量加载到 ClickHouse 表中,并在发生故障时自动重试。
为了易于扩展,我们采用了另一个博客中介绍的队列-工作器方法。无状态 ClickLoad 工作器通过从任务表可靠地声明文件加载任务来协调数据加载,该任务表由 KeeperMap 表引擎支持。每个任务可能包含要处理的多个文件,KeeperMap 表保证每个任务(以及因此的文件)只能分配给一个工作器。这使我们能够通过启动额外的工作器,增加整体摄取吞吐量,从而并行化文件加载过程。
作为先决条件,任务表填充了 ClickLoad 工作器的文件加载任务
① 用户可以利用相应的对象存储命令行界面工具(例如 Amazon aws-cli、Microsoft azure-cli 或 Google gsutil)来创建一个本地文件,其中包含要加载文件的对象存储 URL。
② 我们提供说明以及一个单独的脚本 queue_files.py,该脚本将来自步骤 ① 的文件中的条目拆分为文件 URL 的块,并将这些块作为文件加载任务加载到任务表中。
下图显示了 ClickLoad 工作器如何使用任务表中的这些文件加载任务来协调数据加载
为了在出现问题时易于重试文件加载,每个 ClickLoad 工作器首先将所有文件数据加载到(每个工作器不同的)暂存表
中:在 ① 声明其下一个任务后,工作器 迭代任务的文件 URL 列表,并 ②(按顺序)指示 ClickHouse 服务器通过使用 INSERT INTO SELECT FROM 查询 将任务中的每个文件加载到 暂存表
中(其中 ClickHouse 服务器本身拉取对象存储中的文件数据)。这以高水平的并行性插入文件数据,并在暂存表中创建数据部件。
假设当前任务的某个插入查询被中断并在暂存表已包含部分数据的状态下失败。在这种情况下,工作器首先 ③ 指示 ClickHouse 服务器截断暂存表(删除所有部件),然后 ② 从头开始重试其当前文件加载任务。成功完成任务后,工作器 ③ 使用特定的查询和命令,使 ClickHouse 服务器将暂存表中的所有部件(来自所有分区)移动到目标表。然后,工作器 ① 从任务表中声明并处理下一个任务。
在 ClickHouse Cloud 中,所有数据都与 共享的 ClickHouse 服务器分开存储在共享对象存储中。因此,移动部件是一种轻量级操作,仅更改部件的元数据,但不会物理移动部件。
请注意,每个工作器在启动时创建其自己的暂存表,然后执行无限循环检查未处理的任务,如果未找到新任务,则使用休眠间隔。我们使用为 SIGINT
(Ctrl+C
) 和 SIGTERM
(Unix 进程 kill signal
) 信号注册的信号处理程序,以便在工作器关闭时清理(删除)工作器的暂存表。
另请注意,工作器处理原子文件块而不是单个文件,以减少在使用复制表时 Keeper 上的争用。后者会创建(更多)大量的 MOVE PARTITION 调用,这些调用由复制集群中的 Keeper 协调。此外,我们随机化文件块大小,以防止多个并行工作器运行时出现 Keeper 争用。
暂存表确保加载的数据将精确存储一次
ClickLoad 工作器使用的 INSERT INTO SELECT FROM
查询可以将文件数据直接插入到目标表中。但是,当插入查询不可避免地失败时,为了避免在我们重试任务时导致数据重复,我们需要删除先前失败的任务中的所有数据。当数据插入到暂存表中时,这种删除比插入到暂存表中要困难得多,暂存表可以简单地截断。
例如,仅删除目标表中的部件是不可能的,因为最初插入的部件会在后台自动合并(可能与先前成功插入的部件合并)。
通常也不可能依赖自动插入去重,因为 (1) 插入线程极不可能重新创建完全相同的插入块,并且 (2) 在运行的工作器数量较多的情况下,ClickHouse 中的默认按表去重窗口可能不足。
最后,使用 OPTIMIZE DEDUPLICATE 语句显式地对所有行进行去重将 (1) 是一项非常繁重且缓慢的操作,目标表越大,操作越慢,并且 (2) 可能会意外地对源数据文件中有意相同的行进行去重。
在重试插入之前,从目标表中可靠地删除失败插入的数据的唯一方法是使用 ALTER TABLE DELETE、轻量级删除或 轻量级更新 语句。所有这些最终都通过繁重的 mutation 操作来实现,随着表大小的增加,mutation 操作变得更加昂贵。
相反,通过暂存表的绕行允许我们的工作器保证来自加载文件的每一行都精确地存储在目标表中一次,方法是根据任务是失败还是成功,有效地删除或移动部件。
工作器是轻量级的
ClickLoad 的工作器脚本仅协调数据加载,但实际上本身并不加载任何数据。相反,ClickHouse 服务器及其硬件资源用于从对象存储拉取数据并将其写入 ClickHouse 表。
请注意,运行 ClickLoad 需要一台单独的机器,该机器具有对源对象存储桶和目标 ClickHouse 实例的网络访问权限。由于工作器的轻量级工作方式,一台适中大小的机器可以运行 100 多个并行工作器实例。
插入吞吐量可以轻松扩展
多个工作器和多个 ClickHouse 服务器(在 ClickHouse Cloud 中,前面有一个负载均衡器)可用于扩展摄取吞吐量
所有工作器的 INSERT INTO SELECT FROM
查询都均匀地分布到可用的 ClickHouse 服务器数量,然后由这些服务器并行执行。请注意,每个工作器都有其自己的暂存表。
在 ClickHouse 服务器执行插入查询时有足够资源的情况下,将工作器数量增加一倍可以将摄取吞吐量增加一倍。
同样,将 ClickHouse 服务器的数量增加一倍可以将摄取吞吐量增加一倍。在我们的测试中,当加载 6000 亿行以上的数据集(使用 100 个并行工作器)时,将 ClickHouse Cloud 服务中的 ClickHouse 服务器数量从 3 个增加到 6 个,摄取吞吐量正好翻了一番(从每秒 400 万行增加到每秒 800 万行)。
可以实现连续数据摄取
如上所述,每个工作器都执行一个无限循环,以检查任务表中是否有未处理的任务,如果未找到新任务,则使用休眠间隔。如果在对象存储桶中检测到新文件,通过将新的文件加载任务添加到任务表中,这可以轻松实现连续数据摄取过程。正在运行的工作器将自动声明这些新的计划任务。我们在此处描述了一个具体的示例,但将此实现留给读者
支持任何分区键
ClickLoad 工作器的文件加载机制独立于目标表的任何分区方案。工作器本身不会创建任何分区。我们也不要求每个加载的文件都属于特定分区。相反,目标表可以具有任何(或没有)自定义分区键,我们在暂存表中复制该分区键(暂存表是目标表的 DDL 级克隆)。
在每个成功的文件块传输之后,我们只是移动暂存表中当前处理的文件的数据摄取期间自然创建的所有分区(所属的部件)。这意味着,总体而言,为目标表创建的分区数与我们(不使用 ClickLoad 脚本)直接将所有数据插入到目标表时创建的分区数完全相同。
您可以在此处找到更详细的说明。
完全支持投影和物化视图
将数万亿行数据可靠地加载到目标表中是良好的第一步。但是,投影和物化视图可以通过允许表具有自动增量聚合和具有附加主索引的多行顺序来为您的查询提速。
在初始加载数万亿行之后在目标表上创建投影或物化视图将需要昂贵的投影物化或 ClickHouse 端的表到表数据重载,以便触发物化视图。这两者都将再次花费很长时间,包括出现问题的风险。因此,最有效的选择是在初始数据加载之前创建投影和物化视图。ClickLoad 完全(且透明地)支持这一点。
投影支持
我们的 ClickLoad 工作器脚本创建的暂存表是目标表的完整 DDL 级克隆,包括所有定义的投影。由于投影的数据部件存储为投影宿主表的部件目录中的子目录,因此它们在每个文件加载任务后会自动从暂存表移动到目标表。
物化视图支持
下图显示了 ClickLoad 物化视图支持的基本逻辑
在上图中,目标表有两个连接的物化视图(MV-1
和 MV-2
),它们将在直接插入到目标表时触发,并将数据(以转换的形式)存储在其自己的目标表中。
我们的 ClickLoad 工作器脚本通过自动创建暂存表来复制此行为,该暂存表不仅用于主目标表,而且还用于所有物化视图 (mv) 目标表。。除了额外的暂存表外,我们还自动创建物化视图触发器的克隆,但将它们配置为对暂存表上的插入做出反应,然后以其相应的目标暂存表为目标。
当 ① 数据插入到目标表的暂存表中(并且 ② 以部件的形式存储)时,此插入 ③ 会自动触发 mv 副本,④ 从而导致相应地插入到暂存 mv 的目标表中。如果整个插入成功,我们 ⑤ 将所有部件(分区)从暂存表移动到其对应项中。如果出现问题,例如,某个物化视图在当前数据中存在问题,我们只需从所有暂存表中删除所有部件,并重试插入。如果超出最大重试次数,我们将跳过(并记录)当前文件,然后继续处理下一个文件。通过这种机制,我们确保插入是原子的,并且主目标表和所有连接的物化视图之间的数据始终保持一致。
请注意,在步骤 ⑤ 中将部件移动到目标表中不会触发任何连接的原始物化视图。
有关失败的物化视图的详细错误信息,请参见 query_views_log 系统表。
此外,如上文所述,物化视图的目标表可以具有任何(或没有)自定义分区键。我们的编排逻辑与此无关。
我们的 ClickLoad 工作器脚本目前仅支持使用 TO target_table
子句创建的物化视图,并且不支持链式(级联
)物化视图。
ClickLoad 最适用于适度的文件大小
工作器的处理单元是一个完整的文件。如果在加载文件时出现问题,我们将重新加载整个文件。因此,我们建议使用大小适中的文件,每个文件包含数百万行而不是数万亿行,压缩后的大小约为 100 到 150 MB。这确保了高效的重试机制。
欢迎提交 PR
正如上面提到的,我们的 ClickLoad 脚本的初衷是在支持互动期间帮助一些 ClickHouse Cloud 客户迁移他们的大量数据。因此,该脚本目前依赖于云特定的功能,例如 MOVE PARTITION
,对于 SharedMergeTree 引擎来说,这是一个轻量级操作。此引擎还允许 轻松 扩展 ClickHouse 服务器的数量,以 提高 摄取吞吐量。我们还没有机会在其他设置上测试该脚本,但我们欢迎 贡献。原则上,它应该可以通过最少的调整在其他设置上工作。MOVE PARTITION 操作 必须在分片集群中的所有分片上运行,例如,通过使用 ON CLUSTER 子句。另请注意,当使用 零拷贝复制 时,MOVE PARTITION 当前无法并发运行。我们希望该脚本可以作为一个有用的起点,我们欢迎在更多用例和场景中使用它,并协作改进!
未来,我们希望在为 ClickPipes 构建从对象存储摄取文件的支持时,会考虑此脚本的机制。请继续关注更新!
总结
加载包含数万亿行的大型数据集可能是一个挑战。为了克服这一挑战,ClickHouse Cloud 提供了 ClickPipes - 一个内置的托管集成解决方案,具有弹性加载大型数据卷的支持,能够抵抗中断并自动重试。如果您的外部数据源尚不受支持,我们探索了 ClickLoad 的机制 - 一个用于在较长时间内以增量和可靠方式加载大型数据集的脚本。
这结束了我们关于增强大型数据加载的三部分博客系列。