博客 / 工程

ClickHouse Join 内幕 - 哈希 Join、并行哈希 Join、Grace 哈希 Join

author avatar
Tom Schreiber
2023 年 4 月 20 日 - 28 分钟阅读

立即开始使用 ClickHouse Cloud,并获得 300 美元额度。要了解更多关于我们基于量的折扣信息,请联系我们或访问我们的定价页面

join_algorithms.png

这篇博客文章是系列文章的一部分

在我们之前的文章中,我们回顾了 ClickHouse 中可用的 SQL JOIN 类型。提醒一下:ClickHouse 完全支持 SQL Join。

在这篇文章中,我们将开始探索 ClickHouse 中 join 执行的内部原理,以便您可以为应用程序使用的查询优化 join。在这里,您将看到 ClickHouse 如何将这些经典 join 算法集成到其查询管道中,以便尽可能快地执行 join 类型。

查询管道

ClickHouse 被设计为快速。ClickHouse 中的查询以高度并行的方式处理,利用当前服务器上所有可用的必要资源,并且在许多情况下,将硬件利用到其理论极限。服务器拥有的 CPU 核心和主内存越多,查询从并行执行中获得的性能提升就越大。

查询管道决定了每个查询执行阶段的并行度。

下图显示了 ClickHouse 查询管道如何在具有 4 个 CPU 核心的服务器上处理查询: query_pipeline.png 查询的表数据动态地分布在 4 个独立的并行流阶段之间,这些阶段流式传输数据到 ClickHouse 中。由于服务器有 4 个 CPU 核心,因此查询管道中的大多数查询处理阶段都由 4 个线程并行执行。

使用的线程数取决于 max_threads 设置,默认情况下,该设置设置为 ClickHouse 在其运行的机器上看到的 CPU 核心数。

对于所有查询(包括 join),查询管道确保表数据以高度并行和可扩展的方式处理。

内幕 Join 算法

为了确保资源的最大利用率,ClickHouse 开发了 6 种不同的 join 算法。这些算法决定了 join 查询的计划和执行方式。ClickHouse 可以配置为自适应地选择并在运行时动态更改要使用的 join 算法。取决于资源可用性和使用情况。但是,ClickHouse 也允许用户指定所需的 join 算法。此图表根据它们的相对内存消耗和执行时间概述了这些算法: algorithms.png

这篇博客文章将详细描述和比较上图中基于内存哈希表的三个 ClickHouse join 算法

在这篇文章中,我们将探讨 哈希 join 算法如何快速且最通用。并行哈希 join 算法对于大型右侧表可能更快,但需要更多内存。哈希 join 和并行哈希 join 都是内存受限的。而 Grace 哈希 join 是一个非内存受限的版本,它将数据临时溢出到磁盘。Grace 哈希 join 不需要对数据进行任何排序,因此克服了其他将数据溢出到磁盘的 join 算法(如(部分)合并 join 算法)的一些性能挑战(我们将在第二部分中介绍)。

下一篇文章中,我们将看看上图中基于外部排序的两种算法

我们将最好的留在最后,并在另一篇文章中结束我们对 ClickHouse join 算法的探索,我们将在其中描述上图中 ClickHouse 最快的 join 算法

测试数据和资源

对于所有示例查询,我们将使用我们在上一篇文章中介绍的规范化 IMDB 数据集中的两个表: schema.png

为了有足够大的数据进行测试,我们在一个新的数据库 imdb_large 中生成了这些表的大型版本。

此查询列出了示例表中的行数和未压缩数据量

SELECT table, formatReadableQuantity(sum(rows)) AS rows, formatReadableSize(sum(data_uncompressed_bytes)) AS data_uncompressed FROM system.parts WHERE (database = 'imdb_large') AND active GROUP BY table ORDER BY table ASC; ┌─table──┬─rows───────────┬─data_uncompressed─┐ │ actors │ 1.00 million │ 21.81 MiB │ │ roles │ 100.00 million │ 2.63 GiB │ └────────┴────────────────┴───────────────────┘

对于所有可视化,为了保持简洁和可读性,我们人为地限制了 ClickHouse 查询管道中使用的并行级别,设置 max_threads = 2

但是,对于所有示例查询运行,我们都使用 max_threads 的默认设置。

如上所述,默认情况下,max_threads 设置为 ClickHouse 在其运行的机器上看到的 CPU 核心数。这些示例使用 ClickHouse Cloud 服务,其中单个节点有 30 个 CPU 核心

SELECT getSetting('max_threads'); ┌─getSetting('max_threads')─┐ │ 30 │ └───────────────────────────┘

现在让我们开始探索 ClickHouse join 算法。我们从最通用的算法开始,即哈希 join 算法。

哈希 join

描述

内存哈希表可以每秒处理 2.5 亿个完全随机的请求(如果它适合 CPU 缓存,则超过 10 亿个)。这种非常快速的查找能力使内存哈希表成为 ClickHouse 中实现 join 的自然通用选择,当无法或不可行利用表排序时。

哈希 join 算法是 ClickHouse 中可用 join 实现中最通用的算法。我们在下面说明了集成到 ClickHouse 查询管道中的 哈希 join 算法

hash.png 我们可以看到

① 来自右侧表的所有数据都流式传输(由于 max_threads = 2,因此由 2 个线程并行)到内存中,然后 ClickHouse 使用此数据填充内存哈希表。

② 来自左侧表的数据流式传输(由于 max_threads = 2,因此由 2 个线程并行),并且 ③ 通过在哈希表中查找来进行 join。

请注意,由于 ClickHouse 获取右侧表并在 RAM 中为其创建哈希表,因此将较小的表放在 JOIN 的右侧更节省内存。我们将在下面演示这一点。

另请注意,哈希表是 ClickHouse 中的关键数据结构。基于每个特定查询,特别是对于 join 查询,基于 join 键列类型和 join 严格性,ClickHouse 会自动选择 30 多种变体之一。

支持的 join 类型

支持所有 join 类型严格性 设置。此外,目前,只有哈希 join 支持在 ON 子句中与 OR 组合的多个 join 键。

对于想要更深入了解的读者,源代码包含关于哈希 join 算法如何实现这些类型和设置的非常详细的描述

示例

我们通过两次查询运行演示哈希 join 算法。

右侧表较小

SELECT * FROM roles AS r JOIN actors AS a ON r.actor_id = a.id FORMAT `Null` SETTINGS join_algorithm = 'hash'; 0 rows in set. Elapsed: 0.817 sec. Processed 101.00 million rows, 3.67 GB (123.57 million rows/s., 4.49 GB/s.)

右侧表较大

SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT `Null` SETTINGS join_algorithm = 'hash'; 0 rows in set. Elapsed: 5.063 sec. Processed 101.00 million rows, 3.67 GB (19.95 million rows/s., 724.03 MB/s.)

我们可以查询 query_log 系统表,以检查最后两次查询运行的运行时统计信息

SELECT query, formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration, formatReadableSize(memory_usage) AS memory_usage, formatReadableQuantity(read_rows) AS read_rows, formatReadableSize(read_bytes) AS read_data FROM clusterAllReplicas(default, system.query_log) WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles']) ORDER BY initial_query_start_time DESC LIMIT 2 FORMAT Vertical; Row 1: ────── query: SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT `Null` SETTINGS join_algorithm = 'hash' query_duration: 5 seconds memory_usage: 8.95 GiB read_rows: 101.00 million read_data: 3.41 GiB Row 2: ────── query: SELECT * FROM roles AS r JOIN actors AS a ON r.actor_id = a.id FORMAT `Null` SETTINGS join_algorithm = 'hash' query_duration: 0 seconds memory_usage: 716.44 MiB read_rows: 101.00 million read_data: 3.41 GiB

正如预期的那样,右侧使用较小的 actors 表的 join 查询比右侧使用较大的 roles 表的 join 查询消耗的内存少得多。

请注意,指示的峰值内存使用量 8.95 GiB 和 716.44 MiB 大于两次查询运行的各自右侧表未压缩大小 2.63 GiB 和 21.81 MiB。原因是哈希表大小最初是选择的,并且根据 join 键列的类型和特定的内部哈希表缓冲区大小的倍数动态增加memory_usage 指标计算为哈希表保留的总内存,尽管它可能没有完全填满。

对于两个查询的执行,ClickHouse 读取的总行数(和数据)相同:来自 roles 表的 1 亿行 + 来自 actors 表的 100 万行。但是,右侧使用较大的 roles 表的 join 查询速度慢了五倍。这是因为默认的哈希 join 对于将右表的行插入哈希表不是线程安全的。因此,哈希表的填充阶段在单个线程中运行。我们可以通过内省实际的查询管道来仔细检查这一点。

查询管道

我们可以通过使用 ClickHouse 命令行客户端(快速安装说明在此)来内省哈希 join 查询的 ClickHouse 查询管道。我们使用 EXPLAIN 语句来打印以 DOT 图形描述语言描述的查询管道的图形,并使用 Graphviz dot 以 pdf 格式渲染图形

./clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password --database=imdb_large --query " EXPLAIN pipeline graph=1, compact=0 SELECT * FROM actors a JOIN roles r ON a.id = r.actor_id SETTINGS max_threads = 2, join_algorithm = 'hash';" | dot -Tpdf > pipeline.pdf

我们使用与上面的抽象图相同的带圈数字注释了管道,稍微简化了主要阶段的名称,并添加了两个 join 表,以便对齐两个图表

hash_pipeline.png

我们可以看到查询管道 ① 以两个并行流阶段开始(因为 max_threads 设置为 2),用于从右侧表流式传输数据,然后是用于填充哈希表的单个填充阶段。另外两个并行流阶段 ② 和两个并行 join 阶段 ③ 用于从左侧表流式传输和 join 数据。

如上所述,默认的哈希 join 算法对于将右侧表的行插入哈希表不是线程安全的。因此,上面的管道中使用了一个调整大小阶段,用于将从右侧表流式传输数据的两个线程减少到单线程填充阶段。这可能会成为查询运行时的瓶颈。如果右侧表很大 - 请参阅我们上面的两个查询运行,其中右侧 join 具有大型 roles 表的查询速度慢了五倍。

但是, ClickHouse 版本 22.7 以来,对于大型表,通过使用并行哈希算法,可以显着加快从右侧表构建哈希表的过程。

并行哈希 join

描述

并行哈希 join 算法是哈希 join 的一种变体,它拆分输入数据以并发构建多个哈希表,从而以更高的内存开销为代价来加速 join。我们在下面草绘了此算法的 查询管道

parallel_hash.png

上图显示

① 来自右表的所有数据都流式传输(由于 max_threads = 2,因此由 2 个线程并行)到内存中。数据是块状流式传输的。通过将哈希函数应用于每行的 join 键,将来自每个流式块的行拆分为 2 个 bucket(max_threads = 2)。我们在上图中用橙色和蓝色颜色草绘了这一点。并行地,每个 bucket 使用单个线程填充一个内存哈希表。请注意,用于将行拆分为 bucket 的哈希函数与哈希表内部使用的哈希函数不同。

② 来自左表的数据流式传输(由于 max_threads = 2,因此由 2 个线程并行),并且步骤 ① 中的相同“bucket 哈希函数”应用于每行的 join 键,以确定相应的哈希表,并且行 ③ 通过在相应的哈希表中查找来进行 join。

请注意,max_threads 设置决定了并发哈希表的数量。稍后我们将通过内省具体的查询管道来演示这一点。

支持的 join 类型

INNER 和 LEFT join 类型和除 ASOF 之外的所有 严格性 设置都支持

示例

我们现在将比较同一查询的哈希算法和并行哈希算法的运行时和峰值内存消耗。

右侧表较大的哈希 join

SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT `Null` SETTINGS join_algorithm = 'hash'; 0 rows in set. Elapsed: 5.385 sec. Processed 101.00 million rows, 3.67 GB (18.76 million rows/s., 680.77 MB/s.)

右侧表较大的并行哈希 join

SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT `Null` SETTINGS join_algorithm = 'parallel_hash'; 0 rows in set. Elapsed: 2.639 sec. Processed 101.00 million rows, 3.67 GB (38.28 million rows/s., 1.39 GB/s.)

我们检查最后两次查询运行的运行时统计信息

SELECT query, formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration, formatReadableSize(memory_usage) AS memory_usage, formatReadableQuantity(read_rows) AS read_rows, formatReadableSize(read_bytes) AS read_data FROM clusterAllReplicas(default, system.query_log) WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles']) ORDER BY initial_query_start_time DESC LIMIT 2 FORMAT Vertical; Row 1: ────── query: SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT `Null` SETTINGS join_algorithm = 'parallel_hash' query_duration: 2 seconds memory_usage: 18.29 GiB read_rows: 101.00 million read_data: 3.41 GiB Row 2: ────── query: SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT `Null` SETTINGS join_algorithm = 'hash' query_duration: 5 seconds memory_usage: 8.86 GiB read_rows: 101.00 million read_data: 3.41 GiB

并行哈希 join 比标准哈希 join 大约快 100%,但峰值内存消耗是后者的两倍多,尽管对于两个查询,读取的行数和数据量以及右侧表的大小都相同。

内存消耗高得多的原因是查询在具有 30 个 CPU 核心的节点上运行,因此 max_threads 设置为 30。这意味着,正如我们将在下面演示的那样,使用了 30 个并发哈希表。如前所述,每个哈希表的大小最初是选择的,并根据 join 键列的类型和特定的内部哈希表缓冲区大小的倍数动态增加。哈希表很可能没有完全填满,但 memory_usage 指标计算为哈希表保留的总内存。

查询管道

我们提到 max_threads 设置决定了并发哈希表的数量。我们可以通过内省具体的查询管道来验证这一点。

首先,我们内省 max_threads 设置为 2 的并行哈希 join 查询的查询管道

./clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password --database=imdb_large --query " EXPLAIN pipeline graph=1, compact=0 SELECT * FROM actors a JOIN roles r ON a.id = r.actor_id SETTINGS max_threads = 2, join_algorithm = 'parallel_hash';" | dot -Tpdf > pipeline.pdf

与往常一样,我们使用与上面的抽象图相同的带圈数字注释了管道,稍微简化了主要阶段的名称,并添加了两个 join 表,以便对齐两个图表

parallel_hash_pipeline_1.png

我们可以看到存在两个并发填充阶段,用于并行地使用来自右侧表的数据填充两个哈希表。此外,使用两个并发 join 阶段来 join(以哈希表查找的形式)来自左侧表的数据。

请注意,上面的查询管道中使用了调整大小阶段,用于定义所有填充阶段和所有 join 阶段之间的显式连接:所有 join 阶段都应等待所有填充阶段完成。

接下来,我们内省 max_threads 设置为 4 的并行哈希 join 查询的查询管道

./clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password --database=imdb_large --query " EXPLAIN pipeline graph=1, compact=0 SELECT * FROM actors a JOIN roles r ON a.id = r.actor_id SETTINGS max_threads = 4, join_algorithm = 'parallel_hash';" | dot -Tpdf > pipeline.pdf

parallel_hash_pipeline_2.png

现在,使用四个并发填充阶段来并行地使用来自右侧表的数据填充四个哈希表。并且使用四个并发 join 阶段来 join 来自左侧表的数据。

原始 PR 中的测量表明,加速几乎与并行度呈线性相关。

Grace 哈希 join

描述

上面描述的哈希 join 和并行哈希 join 算法都很快,但都受内存限制。如果右侧表不适合主内存,ClickHouse 将引发 OOM 异常。在这种情况下,ClickHouse 用户可以牺牲性能并使用(部分)合并算法(在下一篇文章中描述),该算法在合并表数据之前(部分地)将表数据排序到外部存储中。

幸运的是,ClickHouse 22.12 引入了另一种名为“grace hash”的 join 算法,该算法不受内存限制,但基于哈希表,因此不需要对数据进行任何排序。这克服了(部分)合并算法的一些性能挑战。

该算法利用两阶段方法来 join 数据。我们的实现与经典算法描述略有不同,以便适应我们的 查询管道。下图显示了第一阶段

grace_hash_1.png

① 来自右表的所有数据都以块状流式传输(由于 max_threads = 2,因此由 2 个线程并行)到内存中。通过将哈希函数应用于每行的 join 键,将来自每个流式块的行拆分为 3 个 bucket(由于 grace_hash_join_initial_buckets = 3)。我们在上图中用橙色、蓝色和绿色颜色草绘了这一点。内存哈希表填充了来自第一个(橙色)bucket 的行。来自 right_table 的其他两个(绿色和蓝色)bucket 的 join 通过将它们保存到临时存储来延迟。

请注意,如果内存哈希表增长超过内存限制(由 max_bytes_in_join 设置),ClickHouse 会动态增加 bucket 的数量,并重新计算每行的分配 bucket。任何不属于当前 bucket 的行都会被刷新并重新分配。

另请注意,ClickHouse 向上舍入 grace_hash_join_initial_buckets 的设置值到最接近的 2 的幂。因此,由于 3 向上舍入为 4,因此使用了 4 个初始 bucket。我们在图表中使用 3 个 bucket 是为了便于阅读,并且与 4 个 bucket 的内部工作方式没有本质区别。

② 来自左表的数据由 2 个线程并行流式传输(max_threads = 2),并且步骤 ① 中的相同“bucket 哈希函数”应用于每行的 join 键,以确定相应的 bucket。对应于第一个 bucket 的行 ③ 进行 join(因为相应的哈希表在内存中)。其他 bucket 的 join 通过将它们保存到临时存储来延迟。

步骤 ① 和 ② 中的关键是“bucket 哈希函数”将始终如一地将值分配给同一个 bucket,从而有效地对数据进行分区,并通过分解来解决问题。

在第二阶段,ClickHouse 处理磁盘上的剩余 bucket。剩余的 bucket 顺序处理。以下两个图表草绘了这一点。第一个图表显示了蓝色 bucket 如何首先被处理。第二个图表显示了最终绿色 bucket 的处理。

grace_hash_2.png

grace_hash_3.png

① ClickHouse 从右侧表数据为每个 bucket 构建哈希表。

同样,如果 ClickHouse 耗尽内存,它会动态增加 bucket 的数量。

② 一旦从右侧表 bucket 构建了哈希表,ClickHouse 就会流式传输来自相应左侧表 bucket 的数据,并 ③ 完成此对的 join。

请注意,在此阶段,由于某些行在动态增加 bucket 数量之前已保存到临时存储,因此可能存在一些属于当前 bucket 之外的其他 bucket 的行。在这种情况下,ClickHouse 会将它们保存到新的实际 bucket 中并进一步处理它们。

此过程为所有剩余的 bucket 重复进行。

支持的 join 类型

INNER 和 LEFT join 类型和除 ASOF 之外的所有 严格性 设置都支持

示例

下面我们比较了使用哈希 join 和 grace 哈希 join 算法运行的相同 join 查询的运行时和峰值内存消耗。

右侧表较大的哈希 join

SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT `Null` SETTINGS join_algorithm = 'hash'; 0 rows in set. Elapsed: 5.038 sec. Processed 101.00 million rows, 3.67 GB (20.05 million rows/s., 727.61 MB/s.)

右侧表较大的 Grace 哈希 join

SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT `Null` SETTINGS join_algorithm = 'grace_hash', grace_hash_join_initial_buckets = 3; 0 rows in set. Elapsed: 13.117 sec. Processed 101.00 million rows, 3.67 GB (7.70 million rows/s., 279.48 MB/s.)

我们获取最后两次查询运行的运行时统计信息

SELECT query, formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration, formatReadableSize(memory_usage) AS memory_usage, formatReadableQuantity(read_rows) AS read_rows, formatReadableSize(read_bytes) AS read_data FROM clusterAllReplicas(default, system.query_log) WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles']) ORDER BY initial_query_start_time DESC LIMIT 2 FORMAT Vertical; Row 1: ────── query: SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT `Null` SETTINGS join_algorithm = 'grace_hash', grace_hash_join_initial_buckets = 3 query_duration: 13 seconds memory_usage: 3.72 GiB read_rows: 101.00 million read_data: 3.41 GiB Row 2: ────── query: SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT `Null` SETTINGS join_algorithm = 'hash' query_duration: 5 seconds memory_usage: 8.96 GiB read_rows: 101.00 million read_data: 3.41 GiB

正如预期的那样,哈希 join 更快。但是,grace 哈希 join 仅消耗了一半的峰值主内存。

可以通过增加 grace_hash_join_initial_buckets 设置来进一步减少 grace 哈希 join 的主内存消耗。我们通过使用值 8 重新运行查询来演示这一点,用于 grace_hash_join_initial_buckets 设置

SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT `Null` SETTINGS join_algorithm = 'grace_hash', grace_hash_join_initial_buckets = 8; 0 rows in set. Elapsed: 16.366 sec. Processed 101.00 million rows, 3.67 GB (6.17 million rows/s., 224.00 MB/s.)

让我们检查最后两次查询运行的运行时统计信息

SELECT query, formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration, formatReadableSize(memory_usage) AS memory_usage, formatReadableQuantity(read_rows) AS read_rows, formatReadableSize(read_bytes) AS read_data FROM clusterAllReplicas(default, system.query_log) WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles']) ORDER BY initial_query_start_time DESC LIMIT 2 FORMAT Vertical; Row 1: ────── query: SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT `Null` SETTINGS join_algorithm = 'grace_hash', grace_hash_join_initial_buckets = 8 query_duration: 16 seconds memory_usage: 2.10 GiB read_rows: 101.00 million read_data: 3.41 GiB Row 2: ────── query: SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT `Null` SETTINGS join_algorithm = 'grace_hash', grace_hash_join_initial_buckets = 3 query_duration: 13 seconds memory_usage: 3.72 GiB read_rows: 101.00 million read_data: 3.41 GiB

使用 8 个初始 bucket 运行 grace 哈希 join 比使用 3 个初始 bucket 运行的 grace 哈希 join 大约减少了 70% 的主内存消耗。为了牺牲更高的执行时间,可以通过增加 bucket 数量来相当线性地减少内存消耗。

请注意,如前所述并在下面演示的那样,ClickHouse 始终将 grace_hash_join_initial_buckets 的设置值向上舍入到最接近的 2 的幂。因此,grace_hash_join_initial_buckets 设置为 3 的查询运行实际上使用了 4 个初始 bucket。

查询管道

我们内省 max_threads 设置为 2 且 grace_hash_join_initial_buckets 设置为 3 的 grace 哈希 join 查询的查询管道

./clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password --database=imdb_large --query " EXPLAIN pipeline graph=1, compact=0 SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT `Null` SETTINGS max_threads = 2, join_algorithm = 'grace_hash', grace_hash_join_initial_buckets = 3';" | dot -Tpdf > pipeline.pdf

添加的带圈数字和稍微简化的主要阶段名称以及添加的两个 join 表用于与上面的抽象图对齐

grace_hash_pipeline.png

我们看到 ① 使用两个并行流阶段(max_threads=2),来自右侧表的数据流式传输到内存中。我们还看到使用了两个并行填充阶段来填充内存哈希表。另外两个并行流阶段 ② 和两个并行 join 阶段 ③ 用于从左侧表流式传输和 join 数据。延迟阶段表示某些 join 阶段被推迟。

但是,我们在查询管道中看不到 bucket 的数量,因为 bucket 的创建是动态的,并且取决于内存压力,ClickHouse 会根据需要动态增加数量。所有 bucket 都在 Delayed…Transform 阶段处理。

为了内省创建和处理的 bucket 的数量,我们需要内省 grace 哈希 join 查询的实际执行情况,方法是要求 ClickHouse 在执行期间将跟踪级别的日志发送到 ClickHouse 命令行客户端。

我们使用 max_threads 设置为 2 和 grace_hash_join_initial_buckets 值为 3 执行 grace 哈希 join 查询(请注意 send_logs_level='trace' 设置)

./clickhouse client --host ea3kq2u4fm.eu-west-1.aws.clickhouse.cloud --secure --password --database=imdb_large --send_logs_level='trace' --query " SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT Null SETTINGS max_threads = 2, join_algorithm = 'grace_hash', grace_hash_join_initial_buckets = 3;" ... ... GraceHashJoin: Initialize 4 buckets ... GraceHashJoin: Joining file bucket 0 ... ... imdb_large.actors ...: Reading approx. 1000000 rows with 2 streams ... ... imdb_large.roles ...: Reading approx. 100000000 rows with 2 streams ... ... GraceHashJoin: Joining file bucket 1 ... GraceHashJoin: Loaded bucket 1 with 250000(/25000823) rows ... ... GraceHashJoin: Joining file bucket 2 ... GraceHashJoin: Loaded bucket 2 with 250000(/24996460) rows ... ... GraceHashJoin: Joining file bucket 3 ... GraceHashJoin: Loaded bucket 3 with 250000(/25000742) rows ... ... GraceHashJoin: Finished loading all 4 buckets ...

我们现在可以看到创建了 4 个(而不是 3 个)初始 bucket。因为,如前所述,ClickHouse 始终将 grace_hash_join_initial_buckets 的设置值向上舍入到最接近的 2 的幂。我们还看到如何为每个表使用 2 个并行流阶段来读取表的行。两个表的第一个对应的 bucket(上面跟踪日志消息中的 bucket 0)立即被 join。

其他 3 个 bucket 被写入磁盘,稍后顺序加载以进行 join。我们看到来自两个表的 100 万行和 1 亿行被均匀拆分 - 每个 bucket 分别为 25 万行和约 2500 万行。

为了进行比较,我们使用 max_threads 设置为 4 和 grace_hash_join_initial_buckets 值为 8 执行 grace 哈希 join 查询

./clickhouse client --host ea3kq2u4fm.eu-west-1.aws.clickhouse.cloud --secure --password --database=imdb_large --send_logs_level='trace' --query " SELECT * FROM actors AS a JOIN roles AS r ON a.id = r.actor_id FORMAT Null SETTINGS max_threads = 4, join_algorithm = 'grace_hash', grace_hash_join_initial_buckets = 8;" ... ... GraceHashJoin: Initialize 8 buckets ... GraceHashJoin: Joining file bucket 0 ... ... imdb_large.actors ...: Reading approx. 1000000 rows with 4 streams ... ... imdb_large.roles ...: Reading approx. 100000000 rows with 4 streams ... ... GraceHashJoin: Joining file bucket 1 ... GraceHashJoin: Loaded bucket 1 with 125000(/12502068) rows ... ... GraceHashJoin: Joining file bucket 2 ... GraceHashJoin: Loaded bucket 2 with 125000(/12498406) rows ... ... GraceHashJoin: Joining file bucket 3 ... GraceHashJoin: Loaded bucket 3 with 125000(/12502699) rows ... ... GraceHashJoin: Joining file bucket 4 ... GraceHashJoin: Loaded bucket 4 with 125000(/12498074) rows ... ... GraceHashJoin: Joining file bucket 5 ... GraceHashJoin: Loaded bucket 5 with 125000(/12498755) rows ... ... GraceHashJoin: Joining file bucket 6 ... GraceHashJoin: Loaded bucket 6 with 125000(/12498054) rows ... ... GraceHashJoin: Joining file bucket 7 ... GraceHashJoin: Loaded bucket 7 with 125000(/12498043) rows ... ... GraceHashJoin: Finished loading all 8 buckets ...

我们可以看到创建了 8 个初始 bucket,并且为每个表使用了 4 个并行流阶段来读取表的行。

总结

这篇博客文章详细描述和比较了 3 种基于内存哈希表的 ClickHouse join 算法。

哈希 join 算法速度快且是最通用的算法,支持所有 join 类型和严格性设置,但内存哈希表的创建是单线程的,如果右侧表非常大,则可能成为瓶颈。

并行哈希 join 算法对于大型右侧表可以通过并发构建多个哈希表来更快,但需要更多内存。

Grace 哈希 join 算法是一个非内存受限的版本,它将输入数据拆分为 bucket,其中一些 bucket 在内存中顺序处理之前被卸载到磁盘。

下图总结了来自本文的所有 join 查询运行(max_threads 设置为 30,右侧表较大)的内存消耗和执行时间

summary.png

在本系列的下一部分中,我们将探索 ClickHouse 中可用的其余 3 种 join 算法

  • 完全排序合并 join
  • 部分合并 join
  • 直接 Join

敬请期待!

立即开始使用 ClickHouse Cloud,并获得 300 美元信用额度。在 30 天试用期结束时,您可以继续使用按需付费计划,或 联系我们 以了解有关批量折扣的更多信息。访问我们的 定价页面 了解详情。

分享此文章

订阅我们的新闻通讯

随时了解功能发布、产品路线图、支持和云产品信息!
正在加载表单...
关注我们
X imageSlack imageGitHub image
Telegram imageMeetup imageRss image
©2025ClickHouse, Inc. 总部位于加利福尼亚州湾区和荷兰阿姆斯特丹。