博客 / 工程

ClickHouse 内幕:Join 原理 - 完全排序合并 Join,部分合并 Join - MergingSortedTransform

author avatar
Tom Schreiber
2023 年 5 月 24 日 - 41 分钟阅读

立即开始使用 ClickHouse Cloud,并获得 300 美元信用额度。要了解有关我们基于用量的折扣的更多信息,请联系我们或访问我们的定价页面

header.png

此博文是系列文章的一部分

在我们的上一篇文章中,我们开始了对为 ClickHouse 开发的 6 种不同 Join 算法的探索。提醒一下:这些算法决定了 Join 查询的计划和执行方式。ClickHouse 可以配置为自适应地选择并在运行时动态更改要使用的 Join 算法。具体取决于资源可用性和使用情况。但是,ClickHouse 也允许用户指定所需的 Join 算法。此图表根据它们的相对内存消耗和执行时间概述了这些算法

algorithms.png

在我们的上一篇文章中,我们详细描述了比较了上面图表中基于内存中哈希表的三种 ClickHouse Join 算法

提醒一下:Hash Join并行 Hash Join 速度很快,但受内存限制。来自右侧表的 Join 数据需要适合内存。Grace Hash Join 是一种非内存限制版本,它将数据临时溢出到磁盘,而无需对数据进行任何排序,因此克服了其他将数据溢出到磁盘并需要事先对数据进行排序的 Join 算法的一些性能挑战。这使我们进入了这篇文章的主题。

我们将在本文中继续探索 ClickHouse Join 算法,并描述上面图表中基于外部排序的两种算法

这两种算法都不受内存限制,并且使用一种 Join 策略,该策略要求在识别 Join 匹配项之前,首先按 Join 键的顺序对 Join 数据进行排序。

对于完全排序合并 Join,然后通过交错线性扫描和合并来自两个表的已排序行块流来 Join 两个表中的行: full_sorting_merge_abstract.png

对于部分合并 Join,通过将左侧表的每个已排序行块与右侧表的已排序行块合并来 Join 两个表中的行: partial_merge_abstract.png

完全排序合并 Join 可以利用一个或两个表的物理行顺序,从而可以跳过排序。在这种情况下,Join 性能可以与上面图表中的哈希 Join 算法相媲美,同时通常需要明显更少的内存。否则,完全排序合并 Join 需要在识别 Join 匹配项之前完全排序表的行。排序可以在内存中(如果数据适合)或外部磁盘上进行。

部分合并 Join 经过优化,可在 Join 大型表时最大限度地减少内存使用量。右表始终首先通过外部排序完全排序。为了最大限度地减少在识别 Join 匹配项时在内存中处理的数据量,在磁盘上创建了特殊的索引结构。左表始终在内存中按块排序。但是,如果左表的物理行顺序与 Join 键排序顺序匹配,则内存中的 Join 匹配项识别会更有效率。

我们将在下一篇文章中完成对 ClickHouse Join 算法的探索,在其中我们将描述上面图表中 ClickHouse 最快的 Join 算法

测试设置

我们正在使用与上一篇文章中介绍的相同的两个表和 ClickHouse Cloud 服务实例。

对于所有示例查询运行,我们都使用 max_threads 的默认设置。执行查询的节点具有 30 个 CPU 核心,因此默认的 max_threads 设置为 30。对于所有 查询管道 可视化,为了保持它们的简洁性和可读性,我们使用设置 max_threads = 2 人为地限制 ClickHouse 查询管道中使用的并行度级别。

现在让我们继续探索 ClickHouse Join 算法。

完全排序合并 Join

描述

完全排序合并 Join 算法是集成到 ClickHouse 查询管道中的经典 排序-合并 Join

ClickHouse 版本的排序-合并 Join 提供了几个性能优化。

  • 可以在任何排序和合并操作之前,通过彼此的 Join 键过滤 Join 表,以最大限度地减少已处理的数据量。
  • 如果一个或两个表的物理行顺序与 Join 键排序顺序匹配,则将跳过相应表的排序阶段。

我们稍后将详细讨论这些优化。

下图显示了未应用任何优化的完全排序合并 Join 算法的一般版本

full_sorting_merge_1.png

① 来自右表的全部数据通过 2 个流阶段(因为 max_threads = 2)并行地按块流式传输到内存中。两个并行排序阶段按 Join 键列值对每个流式块中的行进行排序。这些已排序的块通过两个并行溢出阶段溢出到 临时存储

② 与 ① 并发,来自左表的全部数据通过 2 个线程( max_threads = 2)并行地按块流式传输,并且与 ① 类似,每个块都经过排序并溢出到磁盘。

③ 每个表使用一个流,从磁盘读取已排序的块并进行合并排序,并通过合并(交错扫描)两个已排序的流来识别 Join 匹配项。

支持的 Join 类型

INNER、LEFT、RIGHT 和 FULL Join 类型,适用于 ALL 和 ANY 严格性,均受支持

示例

为了首先演示未应用任何优化的完全排序合并 Join 算法的一般版本,我们使用一个 Join 查询,该查询查找所有名字在电影中用作角色名的演员。使用 max_rows_in_set_to_optimize_join=0 设置,我们禁用在 Join 之前通过彼此的 Join 键过滤 Join 表的优化

SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0;

0 rows in set. Elapsed: 11.559 sec. Processed 101.00 million rows, 3.67 GB (8.74 million rows/s., 317.15 MB/s.)

通常,我们可以查询 query_log 系统表,以便检查上次查询运行的运行时统计信息。请注意,我们使用 一些 来自 ProfileEvents 列的键,以便检查 Join 处理期间外部排序溢出到磁盘的数据量

SELECT
    query,
    formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
    formatReadableSize(memory_usage) AS memory_usage,
    formatReadableQuantity(read_rows) AS read_rows,
    formatReadableSize(read_bytes) AS read_data,
    formatReadableSize(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) AS data_spilled_to_disk_uncompressed,
    formatReadableSize(ProfileEvents['ExternalProcessingCompressedBytesTotal']) AS data_spilled_to_disk_compressed
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles'])
ORDER BY initial_query_start_time DESC
LIMIT 1
FORMAT Vertical;


Row 1:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   JOIN roles AS r ON a.first_name = r.role
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge',
                                   max_rows_in_set_to_optimize_join = 0;
query_duration:                    11 seconds
memory_usage:                      4.71 GiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed:   0.00 B

我们可以看到 ClickHouse 没有将任何数据溢出到磁盘,并且完全在内存中处理了 Join,峰值使用量为 4.71 GiB。

执行上述查询的 ClickHouse 节点具有 120 GiB 的可用主内存

SELECT formatReadableSize(getSetting('max_memory_usage'));


┌─formatReadableSize(getSetting('max_memory_usage'))─┐
│ 120.00 GiB                                         │
└────────────────────────────────────────────────────┘

ClickHouse 配置为在要排序的数据量超过可用主内存的一半时使用外部排序

SELECT formatReadableSize(getSetting('max_bytes_before_external_sort'));


┌─formatReadableSize(getSetting('max_bytes_before_external_sort'))─┐
│ 60.00 GiB                                                        │
└──────────────────────────────────────────────────────────────────┘

我们通过使用查询的 SETTINGS 子句将 max_bytes_before_external_sort 设置为较低的阈值,来触发上述 Join 示例查询的外部排序

SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0, max_bytes_before_external_sort = '100M';


0 rows in set. Elapsed: 12.267 sec. Processed 132.92 million rows, 4.82 GB (10.84 million rows/s., 393.25 MB/s.)

我们检查最后两个 Join 示例的运行时统计信息

SELECT
    query,
    formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
    formatReadableSize(memory_usage) AS memory_usage,
    formatReadableQuantity(read_rows) AS read_rows,
    formatReadableSize(read_bytes) AS read_data,
    formatReadableSize(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) AS data_spilled_to_disk_uncompressed,
    formatReadableSize(ProfileEvents['ExternalProcessingCompressedBytesTotal']) AS data_spilled_to_disk_compressed
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles'])
ORDER BY initial_query_start_time DESC
LIMIT 2
FORMAT Vertical;


Row 1:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   JOIN roles AS r ON a.first_name = r.role
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge',
                                   max_rows_in_set_to_optimize_join = 0,
                                   max_bytes_before_external_sort = '100M'
query_duration:                    12 seconds
memory_usage:                      3.49 GiB
read_rows:                         132.92 million
read_data:                         4.49 GiB
data_spilled_to_disk_uncompressed: 1.79 GiB
data_spilled_to_disk_compressed:   866.36 MiB

Row 2:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   JOIN roles AS r ON a.first_name = r.role
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge',
                                   max_rows_in_set_to_optimize_join = 0
query_duration:                    11 seconds
memory_usage:                      4.71 GiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed:   0.00 B

我们可以看到,对于使用较低的 max_bytes_before_external_sort 设置的查询运行,使用的内存更少,并且数据溢出到磁盘,表明使用了外部排序。

请注意,此查询的 read_rows 指标目前对于具有外部处理的管道而言并不精确。

查询管道和跟踪日志

与本博客系列的上一部分中所做的一样,我们可以通过使用 ClickHouse 命令行客户端(快速安装说明在此处)来内省示例 Join 查询(max_threads 设置为 2)的 ClickHouse 查询管道。我们使用 EXPLAIN 语句来打印以 DOT 图形描述语言描述的查询管道图,并使用 Graphviz dot 以 pdf 格式渲染图形

clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password <PASSWORD> --database=imdb_large --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
SETTINGS max_threads = 2, join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0
;" | dot -Tpdf > pipeline.pdf

我们使用与上面的抽象图相同的带圈数字注释了管道,稍微简化了主要阶段的名称,并添加了两个 Join 表,以便对齐两个图: full_sorting_merge_2.png

我们看到查询管道与我们上面的抽象版本匹配。

请注意,如果块数据排序的峰值内存体积保持在外部排序的配置阈值以下,则溢出阶段将被忽略,并且已排序的块将立即进行合并排序和 Join。

另请注意,块数据排序的峰值内存体积仅与两个 Join 表中的数据总量略有相关,并且更多地取决于查询管道中配置并行度级别。通常,数据在 ClickHouse 中流式处理:数据以并行和按块的方式流式传输到(内存中)查询引擎。流式数据块由特定的查询管道阶段按顺序和并行处理,以便一旦某些表示(部分)查询结果的块可用,它们就会从内存中流出并返回给查询的发送者。

为了观察外部排序和数据溢出到磁盘,我们分别内省两个示例 Join 查询运行的实际执行情况,方法是要求 ClickHouse 在执行期间将跟踪级别日志发送到 ClickHouse 命令行客户端。

首先,我们获取具有较低的外部排序阈值的查询运行的跟踪日志

clickhouse client --host ea3kq2u4fm.eu-west-1.aws.clickhouse.cloud --secure --password <PASSWORD> --database=imdb_large --send_logs_level='trace' --query "
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0, max_bytes_before_external_sort = '100M';"

    ...
... imdb_large.actors ... : Reading approx. 1000000 rows with 6 streams
    ...
... imdb_large.roles ... : Reading approx. 100000000 rows with 30 streams
    ...
... MergeSortingTransform: ... writing part of data into temporary file …
    ... 
... MergingSortedTransform: Merge sorted … blocks, … rows in … sec., … rows/sec., … MiB/sec
    ... 
... MergeJoinAlgorithm: Finished processing in … seconds, left: 16 blocks, 1000000 rows; right: 1529 blocks, 100000000 rows, max blocks loaded to memory: 3
    ...

在我们分析上面的跟踪日志条目之前,快速提醒一下,我们对所有示例查询运行使用 max_threads 的默认设置。该设置控制并行度查询管道中的级别。执行查询的节点具有 30 个 CPU 核心,因此默认的 max_threads 设置为 30。对于所有查询管道可视化,为了保持它们的简洁性和可读性,我们使用设置 max_threads = 2 人为地限制 ClickHouse 查询管道中使用的并行度级别。

我们可以看到如何使用 6 个和 30 个并行流将来自两个表的数据按块流式传输到查询引擎中。因为 max_threads 设置为 30。请注意,对于包含 100 万行的 actors 表,仅使用了 6 个并行流,而不是 30 个。原因是设置 merge_tree_min_rows_for_concurrent_read_for_remote_filesystem(对于云,对于 OSS,设置是 merge_tree_min_rows_for_concurrent_read)。此设置配置单个查询执行线程应至少读取/处理的最小行数。默认值为 163,840 行。并且 100 万行 / 163,840 行 = 6 个线程。对于包含 1 亿行的 roles 表,结果将为 610 个线程,这高于我们配置的最大线程数 30。

此外,我们看到了 MergeSortingTransform 管道阶段(其名称在上面的图中简化为“spill”)的条目,表明(已排序块的)数据已溢出到磁盘上的临时存储。MergingSortedTransform 阶段(上图中的“merge sort”)的条目总结了从临时存储读取已排序块后的合并排序。

最终的 MergeJoinAlgorithm 条目总结了 Join 处理:来自左表的 100 万行以块为单位(通过 6 个并行流)以 16 个块(每个块约 62500 行 - 接近 默认块大小)的形式流式传输。来自右表的 1 亿行以块为单位(通过 30 个并行流)以 1529 个块(每个块约 65400 行)的形式流式传输。在流式处理期间,在 merge join 阶段,内存中同时最多有 3 个块,其中包含具有相同 Join 键的行。这些行的 笛卡尔积是示例查询中 INNER Join 的 ALL 严格性所必需的。这在内存中完成。

接下来,我们获取没有降低外部排序阈值的查询运行的跟踪日志

clickhouse client --host ea3kq2u4fm.eu-west-1.aws.clickhouse.cloud --secure --password <PASSWORD> --database=imdb_large --send_logs_level='trace' --query "
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0;"

    ...
... imdb_large.actors ... : Reading approx. 1000000 rows with 6 streams
    ...
... imdb_large.roles ... : Reading approx. 100000000 rows with 30 streams
    ...
... MergingSortedTransform: Merge sorted … blocks, … rows in … sec., … rows/sec., … MiB/sec
    ... 
... MergeJoinAlgorithm: Finished processing in … seconds, left: 16 blocks, 1000000 rows; right: 1529 blocks, 100000000 rows, max blocks loaded to memory: 3
    ...

日志条目看起来与之前运行的降低外部排序阈值的日志条目相似。只是缺少 MergeSortingTransform(“spill”)阶段,因为块数据排序的峰值内存体积保持在外部排序的默认阈值以下。因此,溢出阶段被忽略,并且已排序的块立即进行了合并排序和 Join。

扩展

上一篇文章中,我们解释了 max_threads 设置控制查询管道中的并行度级别。为了便于阅读,我们使用设置 max_threads=2 人为地限制查询管道的可视化并行度级别。

现在,我们内省 max_threads 设置为 4 的完全排序合并 Join 查询的查询管道

clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password <PASSWORD> --database=imdb_large --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
SETTINGS max_threads = 4, join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0
;" | dot -Tpdf > pipeline.pdf

full_sorting_merge_3.png

现在,每个表使用四个并行流、排序和溢出阶段。这加快了数据块的(外部)排序。每个表的合并排序阶段和最终的合并 Join 阶段需要保持单线程才能正常工作。但是,ClickHouse 为排序-合并 Join 提供了一些额外的性能优化。我们接下来将讨论这些内容。

优化

在 Join 之前,通过彼此的 Join 键值过滤表

可以在任何排序和合并操作之前,通过彼此的 Join 键过滤 Join 表,以便最大限度地减少需要排序和合并的数据量。为此,如果可能(见下文),ClickHouse 构建一个内存中集合,其中包含右表的(唯一)Join 键列值,并使用此集合来过滤掉左表中所有可能没有 Join 匹配项的行。反之亦然。如果一个表比另一个表小得多,并且表的唯一 Join 键列值适合内存,则此方法特别有效。

Hash Join 在这种情况下也会表现良好。但是,完全排序合并 Join 对左表和右表都以相同的方式工作,并且在两个表都大于可用内存的情况下,它将自动回退到外部排序。此优化试图将 Hash Join 性能引入到完全排序合并 Join,以用于此特定用例。max_rows_in_set_to_optimize_join 设置控制此优化。将其设置为 0 将禁用它。默认值为 100,000。此值指定两个表集合在一起的最大允许大小(以条目数计)。这意味着,如果两个集合在一起都低于阈值,则优化将应用于两个表。如果两个集合在一起都高于阈值,则仍然可能其中一个集合低于阈值,并且优化将仅应用于一个表。正如我们将在下面的跟踪日志中看到的那样,ClickHouse 将按顺序尝试为两个表构建集合,并在超出限制时还原并跳过构建集合。

我们的示例 Join 查询分别通过 first_namerole 列 Join 两个表

SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge';

我们检查(较小的)左表中唯一 Join 键列值的数量

SELECT countDistinct(first_name)
FROM actors;


┌─uniqExact(first_name)─┐
│                109993 │
└───────────────────────┘

我们检查(较大的)右表中唯一 Join 键列值的数量

SELECT countDistinct(role)
FROM roles;


┌─uniqExact(role)─┐
│          999999 │
└─────────────────┘

使用 max_rows_in_set_to_optimize_join 设置的默认值 100,000,优化将不会应用于任何表。

为了演示,我们使用 max_rows_in_set_to_optimize_join 的默认值执行示例查询

SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge';


0 rows in set. Elapsed: 11.602 sec. Processed 101.00 million rows, 3.67 GB (8.71 million rows/s., 315.97 MB/s.)

现在,我们使用 max_rows_in_set_to_optimize_join 设置为 200,000 执行示例查询。请注意,此限制对于为两个表构建集合仍然太低。但是,它允许为较小的左表构建集合,这是此优化的主要思想,即如果一个表比另一个表小得多,并且表的唯一 Join 键列值适合内存,则此方法特别有效

SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 200_000;


0 rows in set. Elapsed: 2.156 sec. Processed 101.00 million rows, 3.67 GB (46.84 million rows/s., 1.70 GB/s.)

我们已经可以看到更快的执行时间。让我们检查两个查询运行的运行时统计信息,以便查看更多详细信息

SELECT
    query,
    formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
    formatReadableSize(memory_usage) AS memory_usage,
    formatReadableQuantity(read_rows) AS read_rows,
    formatReadableSize(read_bytes) AS read_data,
    formatReadableSize(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) AS data_spilled_to_disk_uncompressed,
    formatReadableSize(ProfileEvents['ExternalProcessingCompressedBytesTotal']) AS data_spilled_to_disk_compressed
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles'])
ORDER BY initial_query_start_time DESC
LIMIT 2
FORMAT Vertical;


Row 1:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   JOIN roles AS r ON a.first_name = r.role
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge',
                                   max_rows_in_set_to_optimize_join = 200_000;
query_duration:                    2 seconds
memory_usage:                      793.30 MiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed:   0.00 B


Row 2:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   JOIN roles AS r ON a.first_name = r.role
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge';
query_duration:                    11 seconds
memory_usage:                      4.71 GiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed:   0.00 B

我们可以看到预过滤器优化的效果:执行时间快 5 倍,峰值内存消耗减少约 6 倍。

现在,我们使用启用的优化来内省查询管道

clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password <PASSWORD> --database=imdb_large --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
SETTINGS max_threads = 2, join_algorithm = 'full_sorting_merge';" | dot -Tpdf > pipeline.pdf

full_sorting_merge_4.png

与未应用任何优化的完全排序合并 Join 算法的一般版本的管道相比,我们可以看到额外的阶段(在上图中以蓝色和绿色着色)。这些阶段负责在 Join 之前通过彼此的 Join 键值过滤两个表

如果可能,则使用两个并行的蓝色 CreatingSetsOnTheFlyTransform 阶段来构建一个内存中集合,其中包含右表的(唯一)Join 键列值。

然后,此集合由两个(因为 max_threads 设置为 2)并行蓝色 FilterBySetOnTheFlyTransform 阶段使用,用于过滤掉左表中所有可能没有 Join 匹配项的行。

如果可能,则使用两个并行的绿色 CreatingSetsOnTheFlyTransform 阶段来构建一个内存中集合,其中包含左表的(唯一)Join 键列值。

然后,此集合由两个并行绿色 FilterBySetOnTheFlyTransform 阶段使用,用于过滤掉右表中所有可能没有 Join 匹配项的行。

在从 Join 键列完全构建这些集合之前,并行地流式传输包含所有必需列的行块,绕过过滤器优化,以便按其 Join 键对每个块中的行进行排序,并(可能)将它们溢出到磁盘。过滤器仅在集合准备就绪时才开始工作。这就是为什么还有两个 ReadHeadBalancedProcessor 阶段。这些阶段确保从一开始就从两个表流式传输数据(在集合准备就绪之前),其大小与它们的总大小成比例,以防止在小表可用于过滤大表之前,主要处理大表的数据的情况。

为了内省这些附加阶段的执行情况,我们检查查询执行的跟踪日志

clickhouse client --host ea3kq2u4fm.eu-west-1.aws.clickhouse.cloud --secure --password <PASSWORD> --database=imdb_large --send_logs_level='trace' --query "
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 200_000;"


    ...
... imdb_large.actors ... : Reading approx. 1000000 rows with 6 streams
    ...
... imdb_large.roles ... : Reading approx. 100000000 rows with 30 streams
    ...
... CreatingSetsOnTheFlyTransform: Create set and filter Right joined stream: set limit exceeded, give up building set, after reading 577468 rows and using 96.00 MiB
    ...
... CreatingSetsOnTheFlyTransform: Create set and filter Left joined stream: finish building set for [first_name] with 109993 rows, set size is 6.00 MiB
    ...
... FilterBySetOnTheFlyTransform: Finished create set and filter right joined stream by [role]: consumed 3334144 rows in total, 573440 rows bypassed, result 642125 rows, 80.74% filtered
... FilterBySetOnTheFlyTransform: Finished create set and filter right joined stream by [role]: consumed 3334144 rows in total, 573440 rows bypassed, result 642125 rows, 80.74% filtered
    ... 
... MergingSortedTransform: Merge sorted … blocks, … rows in … sec., … rows/sec., … MiB/sec
    ... 
... MergeJoinAlgorithm: Finished processing in 3.140038835 seconds, left: 16 blocks, 1000000 rows; right: 207 blocks, 13480274 rows, max blocks loaded to memory: 3
    ...

我们看到如何使用 6 个和 30 个并行流将来自两个表的数据流式传输到查询引擎中。

接下来,我们看到了 CreatingSetsOnTheFlyTransform 阶段的条目,表明包含右表的(唯一)Join 键列值的内存中集合无法构建,因为条目数将超过 max_rows_in_set_to_optimize_join 设置的配置阈值 200000。

另一个 CreatingSetsOnTheFlyTransform 阶段的条目显示,包含左表的(唯一)Join 键列值的集合可以成功构建。此集合用于过滤来自右表的行,如 30 个 FilterBySetOnTheFlyTransform 阶段的条目所示(我们仅显示前两个并省略其余部分)。30,因为 ClickHouse 使用 30 个并行流阶段从右表流式传输行,并使用 30 个并行 FilterBySetOnTheFlyTransform 阶段来过滤 30 个流。

利用物理行顺序

如果一个或两个连接表的物理行顺序与连接键排序顺序匹配,则将跳过相应表的全排序合并连接算法的排序阶段。

我们可以通过检查连接查询的查询管道来验证这一点,该连接查询使用与两个表的排序键匹配的连接键。首先,我们检查两个连接表的排序键

SELECT
    name AS table,
    sorting_key
FROM system.tables
WHERE database = 'imdb_large';


┌─table───────┬─sorting_key───────────────────────┐
│ actors      │ id, first_name, last_name, gender │
│ roles       │ actor_id, movie_id                │
└─────────────┴───────────────────────────────────┘

我们使用一个连接查询,通过连接 actors 表的 id 列和 roles 表的 actor_id 列来查找每个演员的所有角色。这些连接键是表排序键的前缀,允许 ClickHouse 通过按照行在磁盘上的存储顺序读取两个表中的行,从而跳过全排序合并算法的排序阶段。

我们检查此查询的查询管道

clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password <PASSWORD> --database=imdb_large --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.actor_id
SETTINGS max_threads = 2, join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0, max_bytes_before_external_sort = '100M';" | dot -Tpdf > pipeline.pdf

full_sorting_merge_5.png

我们看到查询管道 ① ② 以每个表两个并行流阶段开始(因为 max_threads 设置为 2),这些阶段按顺序将行块式地从两个表流式传输到查询引擎中。

请注意,排序和溢出阶段是如何缺失的。已经排序的块按表进行合并排序,并且 ③ 通过合并(交错扫描)两个排序流来识别连接匹配项。

我们运行缺少排序和溢出阶段的查询。请注意,此按顺序读取优化目前仅在禁用 max_rows_in_set_to_optimize_join 设置的情况下应用。有一个待处理的 PR,如果 ClickHouse 可以按顺序读取数据,则会自动禁用该设置。ClickHouse 不支持按顺序优化和预过滤同时进行。使用上面提到的 PR,将优先考虑按顺序优化

SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0;

0 rows in set. Elapsed: 7.280 sec. Processed 101.00 million rows, 3.67 GB (13.87 million rows/s., 503.56 MB/s.)

为了比较,我们运行相同的查询,但不禁用 max_rows_in_set_to_optimize_join 设置来强制排序

SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge';

0 rows in set. Elapsed: 7.542 sec. Processed 101.00 million rows, 3.67 GB (13.39 million rows/s., 486.09 MB/s.)

为了进一步比较,我们运行相同的查询,但不禁用 max_rows_in_set_to_optimize_join 设置并降低 max_bytes_before_external_sort 值来强制外部排序

SELECT *
FROM actors AS a
INNER JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_bytes_before_external_sort = '100M';

0 rows in set. Elapsed: 8.332 sec. Processed 139.35 million rows, 5.06 GB (16.72 million rows/s., 606.93 MB/s.)

我们检查最后三个查询运行的运行时统计信息

SELECT
    query,
    formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
    formatReadableSize(memory_usage) AS memory_usage,
    formatReadableQuantity(read_rows) AS read_rows,
    formatReadableSize(read_bytes) AS read_data,
    formatReadableSize(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) AS data_spilled_to_disk_uncompressed,
    formatReadableSize(ProfileEvents['ExternalProcessingCompressedBytesTotal']) AS data_spilled_to_disk_compressed
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles'])
ORDER BY initial_query_start_time DESC
LIMIT 3
FORMAT Vertical;


Row 1:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   INNER JOIN roles AS r ON a.id = r.actor_id
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge',                                                                                                          
                                   max_bytes_before_external_sort = '100M';
query_duration:                    8 seconds
memory_usage:                      3.56 GiB
read_rows:                         139.35 million
read_data:                         4.71 GiB
data_spilled_to_disk_uncompressed: 1.62 GiB
data_spilled_to_disk_compressed:   1.09 GiB


Row 2:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   JOIN roles AS r ON a.id = r.actor_id
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge';
query_duration:                    7 seconds
memory_usage:                      5.07 GiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed:   0.00 B


Row 3:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   JOIN roles AS r ON a.id = r.actor_id
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge',                                    
                                   max_rows_in_set_to_optimize_join = 0;
query_duration:                    7 seconds
memory_usage:                      497.88 MiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed:   0.00 B

来自第 3 行的查询运行,其中跳过了排序和溢出阶段,具有最快的执行时间和非常低的内存使用率。因为来自两个表的数据都以块式和按顺序流式传输通过查询引擎,所以内存中同时只有少量数据块,并且只需要合并并流式输出到查询的发送者。

我们可以看到,对于来自第 2 行的强制排序的查询运行,排序发生在内存中,因为没有数据溢出到磁盘。此查询运行使用的内存量是来自第 3 行的无排序运行的 10 倍。

对于来自第 1 行的强制外部排序的查询运行,执行时间最慢,但与来自第 2 行的强制内存排序的查询运行相比,内存使用率更低。

当只有一个表的物理行顺序与连接键排序顺序匹配时,也会应用按顺序流式传输优化。我们可以通过检查连接查询的查询管道来演示这一点,其中左表通过与表的磁盘物理行顺序匹配的列连接,但右表并非如此

clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password <PASSWORD> --database=imdb_large --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.movie_id
SETTINGS max_threads = 2, join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0;" | dot -Tpdf > pipeline.pdf

full_sorting_merge_6.png

来自左表的行由两个流按顺序并行流式传输到查询引擎中。这些已排序流的排序和溢出阶段缺失。相反,右表的阶段指示了排序和(可能)溢出。

部分合并 Join

描述

部分合并连接是集成到 ClickHouse 查询管道中的 排序-合并连接 的变体。经典的排序-合并连接首先按连接键对两个连接表进行完全排序,然后合并排序后的结果。ClickHouse 部分合并连接经过优化,可在连接大表时最大限度地减少内存使用,并且仅通过外部排序首先对右表进行完全排序。为了最大限度地减少内存中处理的数据量,它在磁盘上创建了 min-max 索引。左表始终按块和内存中排序。但是,如果左表的物理行顺序与连接键排序顺序匹配,则内存中连接匹配项的识别效率更高。

此图草绘了 ClickHouse 如何实现部分合并连接的详细信息

partial_merge_1.png

查询管道看起来与 ClickHouse 哈希连接算法的管道非常相似。这不是巧合。部分合并连接正在重用哈希连接管道,因为与哈希连接一样,它也具有构建和扫描阶段。请记住,哈希连接首先从右表构建哈希表,然后扫描左表。类似地,部分合并连接首先构建右表的排序版本,然后扫描左表

① 首先,来自右表的所有数据都通过 2 个流(因为 max_threads = 2块式并行流式传输到内存中。通过填充阶段,每个流式传输块中的行按连接键列值排序,并与每个排序块的 min-max 索引一起溢出到 临时存储。min-max 索引为每个排序块存储该块包含的最小和最大连接键值。这些 min-max 索引在步骤 ② 中用于最大限度地减少识别连接匹配项时内存中处理的数据量。

② 然后,来自左表的所有数据都通过 2 个流 (max_threads = 2) 块式并行流式传输。每个块都按连接键即时排序,然后 ③ 与来自右表的磁盘上的排序块进行匹配。min-max 索引用于仅从磁盘加载可能包含连接匹配项的右表块。

这种连接处理策略非常节省内存。与连接表的大小和物理行顺序无关。在上面的步骤 ① 中,在写入临时存储之前,只有来自右表的少量块同时流式传输到内存中。在步骤 ② 中,只有来自左表的少量块同时流式传输到内存中。在步骤 ① 中创建的 min-max 索引有助于最大限度地减少从临时存储加载到内存中以识别连接匹配项的右表块的数量。

请注意,如果左表的物理行顺序与连接键排序顺序匹配,则这种基于 min-max 索引跳过不匹配的右表块的方法最有效。

但是,当左表的数据块具有连接键值的一般分布时,使用部分合并连接算法的成本最高。因为如果左表的每个块都包含连接键值的一般分布的较大子集,那么来自右表的排序块的 min-max 索引就无济于事,并且实际上在两个表的块之间创建了笛卡尔积:对于左表的每个块,都会从磁盘加载来自右表的大量排序块。

支持的 Join 类型

支持具有 ALL 严格性的 INNER、LEFT、RIGHT、FULL 连接类型以及具有 ANY 和 SEMI 严格性的 INNER、LEFT 连接类型

示例

我们使用部分合并算法运行我们的示例连接查询(使用作为连接表排序键前缀的连接键,以受益于上述基于 min-max 索引的性能优化)

SELECT *
FROM actors AS a
INNER JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'partial_merge';

0 rows in set. Elapsed: 33.796 sec. Processed 101.00 million rows, 3.67 GB (2.99 million rows/s., 108.47 MB/s.)

现在我们运行相同的查询,但左表的磁盘物理顺序不同。我们创建了按非连接键列排序的 actors 表的副本。这意味着行是随机连接键顺序。如上所述,这是部分合并连接执行时间的最坏情况

SELECT *
FROM actors_unsorted AS a
INNER JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'partial_merge';


0 rows in set. Elapsed: 44.872 sec. Processed 101.00 million rows, 3.67 GB (2.25 million rows/s., 81.70 MB/s.)

与之前的运行相比,执行时间慢了 36%。

为了进一步比较,我们使用全排序合并算法运行相同的查询。为了与部分合并算法进行公平比较,我们强制外部排序。通过禁用全排序合并算法的“按顺序流式传输优化”(不将 max_rows_in_set_to_optimize_join 设置为 0)。并通过降低 max_bytes_before_external_sort

SELECT *
FROM actors AS a
INNER JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_bytes_before_external_sort = '100M';

0 rows in set. Elapsed: 7.381 sec. Processed 139.35 million rows, 5.06 GB (18.88 million rows/s., 685.11 MB/s.)

我们检查最后三个查询运行的运行时统计信息

SELECT
    query,
    formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
    formatReadableSize(memory_usage) AS memory_usage,
    formatReadableQuantity(read_rows) AS read_rows,
    formatReadableSize(read_bytes) AS read_data,
    formatReadableSize(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) AS data_spilled_to_disk_uncompressed,
    formatReadableSize(ProfileEvents['ExternalProcessingCompressedBytesTotal']) AS data_spilled_to_disk_compressed
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND (hasAll(tables, ['imdb_large.actors', 'imdb_large.roles']) OR hasAll(tables, ['imdb_large.actors_unsorted', 'imdb_large.roles']))
ORDER BY initial_query_start_time DESC
LIMIT 3
FORMAT Vertical;


Row 1:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   INNER JOIN roles AS r ON a.id = r.actor_id
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge',                                                                       
                                   max_bytes_before_external_sort = '100M';
query_duration:                    7 seconds
memory_usage:                      3.54 GiB
read_rows:                         139.35 million
read_data:                         4.71 GiB
data_spilled_to_disk_uncompressed: 1.62 GiB
data_spilled_to_disk_compressed:   1.09 GiB

Row 2:
──────
query:                             SELECT *
                                   FROM actors_unsorted AS a
                                   INNER JOIN roles AS r ON a.id = r.actor_id
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'partial_merge';
query_duration:                    44 seconds
memory_usage:                      2.20 GiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 5.27 GiB
data_spilled_to_disk_compressed:   3.52 GiB

Row 3:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   INNER JOIN roles AS r ON a.id = r.actor_id
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'partial_merge';
query_duration:                    33 seconds
memory_usage:                      2.21 GiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 5.27 GiB
data_spilled_to_disk_compressed:   3.52 GiB

我们可以看到在第 2 行和第 3 行中,部分合并连接的两次运行具有相同数量的已用内存和溢出到磁盘的数据。但是,如上文详细解释的那样,对于第 3 行中左表的物理行顺序与连接键顺序匹配的运行,执行速度更快。

即使在(人为强制)对连接表进行完全外部排序的情况下,第 1 行中全排序合并连接的执行速度也几乎是第 3 行中部分合并连接的执行速度的 5 倍。部分合并连接确实像其设计的那样使用了更少的内存

查询管道和跟踪日志

我们检查 max_threads 设置为 2 的部分合并连接示例的查询管道

clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password <PASSWORD> --database=imdb_large --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
SETTINGS max_threads = 2, join_algorithm = 'partial_merge';" | dot -Tpdf > pipeline.pdf

圆圈数字、主要阶段的略微简化名称和添加的两个连接表用于与上面的抽象图对齐

partial_merge_2.png

实际查询管道反映了我们上面的抽象版本。如前所述,部分排序合并连接正在重用 哈希连接管道,因为与哈希连接一样,它也具有构建和扫描阶段:部分合并连接首先构建右表的排序版本,然后扫描左表。

由于上述管道重用,来自右表的块的排序以及与来自左表的块的排序合并在管道中没有直接可见。

为了检查这些阶段的执行情况,我们检查查询执行的跟踪日志

clickhouse client --host ea3kq2u4fm.eu-west-1.aws.clickhouse.cloud --secure --password <PASSWORD> --database=imdb_large --send_logs_level='trace' --query "
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'partial_merge';"


    ...
... imdb_large.actors ... : Reading approx. 1000000 rows with 6 streams
    ...
... imdb_large.roles ... : Reading approx. 100000000 rows with 30 streams 
    ...
... MergingSortedTransform: Merge sorted 1528 blocks, 100000000 rows …
    ...

我们可以看到如何使用 6 个和 30 个并行流将来自两个表的数据块式流式传输到查询引擎中。

MergingSortedTransform 条目总结了连接处理:来自右表的 1528 个数据块(包含 1 亿行)已排序,稍后与来自左表的块进行合并连接。请注意,1528 个数据块(1 亿行)相当于每个块约 65445 行,这对应于默认块大小

总结

这篇博文详细描述和比较了两种基于外部排序的 ClickHouse 连接算法。

全排序合并连接不受内存限制,并且基于内存或外部排序,并且可以利用连接表的物理行顺序并跳过排序阶段。在这种情况下,连接性能可以与 哈希连接算法相媲美,同时通常需要明显更少的内存。

部分合并连接经过优化,可在连接大表时最大限度地减少内存使用,并且始终首先通过外部排序对右表进行完全排序。左表也始终按块在内存中排序。如果左表的物理行顺序与连接键排序顺序匹配,则连接匹配过程的运行效率更高。

此图表总结并比较了本文部分连接查询运行的内存使用率和执行时间。我们始终在具有 30 个 CPU 核心(因此 max_threads 设置为 30)的节点上运行相同的连接查询,连接相同的数据,并将较大的表放在右侧

comparison.png

① 在此运行中,全排序合并连接跳过了排序和溢出阶段,因为两个连接表的物理行顺序都与连接键排序顺序匹配。从而实现了最快的执行时间和显著最低的内存使用率。 ② 通过对两个连接表进行内存排序,全排序合并连接具有最高的内存消耗,而 ③ 使用外部排序代替内存排序,内存消耗降低了,但牺牲了执行速度的降低。 ④ 部分合并连接始终通过外部排序对右表的数据进行排序。我们看到,从所有使用外部排序的连接查询运行来看,此算法的内存使用率最低。这正是此算法优化的目标,但代价是执行速度相对较低。左表数据也始终按块和内存中排序。但是 ⑤ 我们可以看到,如果左表的物理行顺序与连接键顺序不匹配,则执行速度最差。

在我们的下一篇文章中,我们将介绍 ClickHouse 最快的连接算法:直接连接。

敬请期待!

立即开始使用 ClickHouse Cloud,并获得 300 美元的信用额度。在 30 天试用期结束时,继续使用按需付费计划,或联系我们以了解有关我们基于用量的折扣的更多信息。访问我们的定价页面了解详情。

分享这篇文章

订阅我们的新闻资讯

随时了解功能发布、产品路线图、支持和云产品!
正在加载表单...
关注我们
X imageSlack imageGitHub image
Telegram imageMeetup imageRss image
©2025ClickHouse, Inc. 总部位于加利福尼亚州湾区和荷兰阿姆斯特丹。