DoubleCloud 即将停止运营。利用限时免费迁移服务,迁移到 ClickHouse。立即联系我们 ->->

博客 / 工程

ClickHouse 连接器内部原理 - 完全排序合并连接、部分合并连接 - MergingSortedTransform

author avatar
Tom Schreiber
2023年5月24日

立即开始使用 ClickHouse Cloud 并获得 300 美元的信用额度。要了解有关我们基于用量的折扣的更多信息,请联系我们或访问我们的定价页面

header.png

此博文是系列文章的一部分

在我们的上一篇文章中,我们开始探索为 ClickHouse 开发的 6 种不同的连接算法。提醒一下:这些算法决定了连接查询的计划和执行方式。ClickHouse 可以配置为自适应地选择并在运行时动态更改要使用的连接算法。具体取决于资源可用性和使用情况。但是,ClickHouse 也允许用户指定他们自己想要的连接算法。此图表根据其相对内存消耗和执行时间概述了这些算法

algorithms.png

在我们的上一篇文章中,我们详细描述并比较了上面图表中基于内存中哈希表的三种 ClickHouse 连接算法

提醒一下:**哈希连接**和**并行哈希连接**速度很快,但受内存限制。来自右侧表的连接数据需要放入内存。**Grace 哈希连接**是一个不受内存限制的版本,它将数据临时溢出到磁盘,而无需对数据进行任何排序,因此克服了其他将数据溢出到磁盘并需要预先排序数据的连接算法的一些性能挑战。这让我们来到了这篇文章。

我们将在本文中继续探索 ClickHouse 连接算法,并描述上面图表中基于外部排序的两种算法

这两种算法都不受内存限制,并使用一种连接策略,该策略要求在识别连接匹配之前先按连接键对连接数据进行排序。

使用**完全排序合并连接**,然后通过交错线性扫描和合并来自两个表的排序行块流来连接两个表的行:full_sorting_merge_abstract.png

使用**部分合并连接**,通过将来自左侧表的每个排序行块与来自右侧表的排序行块合并来连接两个表的行:partial_merge_abstract.png

完全排序合并连接可以利用一个或两个表的物理行顺序,从而跳过排序。在这种情况下,连接性能可以与上面图表中的哈希连接算法相媲美,同时通常需要更少的内存。否则,完全排序合并连接需要在识别连接匹配之前完全对表的行进行排序。排序可以在内存中进行(如果数据适合)或在磁盘上进行。

部分合并连接针对在连接大型表时最大程度地减少内存使用进行了优化。右侧表始终首先通过外部排序进行完全排序。为了最大程度地减少在识别连接匹配时内存中处理的数据量,会在磁盘上创建特殊的索引结构。左侧表始终在内存中逐块排序。但是,如果左侧表的物理行顺序与连接键排序顺序匹配,则内存中连接匹配的识别效率更高。

我们将在下一篇文章中完成对 ClickHouse 连接算法的探索,我们将描述上面图表中 ClickHouse 最快的连接算法

测试设置

我们正在使用我们在上一篇文章中介绍的相同的两个表和ClickHouse Cloud服务实例。

对于所有示例查询运行,我们使用max_threads的默认设置。执行查询的节点具有 30 个 CPU 内核,因此默认的max_threads设置为 30。对于所有查询管道可视化,为了使其简洁易读,我们使用设置max_threads = 2人为地限制了 ClickHouse 查询管道中使用的并行级别。

现在让我们继续探索 ClickHouse 连接算法。

完全排序合并连接

描述

完全排序合并连接算法是集成到 ClickHouse 查询管道中的经典排序合并连接

ClickHouse 版本的排序合并连接提供了多种性能优化。

  • 在任何排序和合并操作之前,可以通过彼此的连接键过滤连接表,以最大程度地减少处理的数据量。
  • 如果一个或两个表的物理行顺序与连接键排序顺序匹配,则将跳过相应表(s)的排序阶段。

我们稍后将详细讨论这些优化。

下图显示了未应用任何优化的完全排序合并连接算法的通用版本

full_sorting_merge_1.png

① 所有来自右侧表的数据由 2 个流阶段(因为max_threads = 2)并行地以块方式流式传输到内存。两个并行排序阶段按连接键列值对每个流式传输的块中的行进行排序。这些排序的块由两个并行溢出阶段溢出到临时存储

② 与 ① 同时,所有来自左侧表的数据由 2 个线程(max_threads = 2)并行地以块方式流式传输,类似于 ①,每个块都进行排序并溢出到磁盘。

③ 每个表使用一个数据流,从磁盘读取排序后的数据块并进行归并排序,通过合并(交错扫描)两个排序的数据流来识别连接匹配。

支持的连接类型

支持 INNER、LEFT、RIGHT 和 FULL 连接类型,适用于 ALL 和 ANY 严格性支持

示例

为了首先演示完整排序合并连接算法的通用版本(没有任何优化应用),我们使用一个连接查询来查找所有其名字用作电影中角色名称的演员。使用 `max_rows_in_set_to_optimize_join=0` 设置,我们禁用在连接之前根据彼此的连接键过滤连接表的优化。

SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0;

0 rows in set. Elapsed: 11.559 sec. Processed 101.00 million rows, 3.67 GB (8.74 million rows/s., 317.15 MB/s.)

像往常一样,我们可以查询 query_log 系统表以检查上次查询运行的运行时统计信息。请注意,我们使用 一些 来自 ProfileEvents 列的键来检查在连接处理期间外部排序溢出到磁盘的数据量。

SELECT
    query,
    formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
    formatReadableSize(memory_usage) AS memory_usage,
    formatReadableQuantity(read_rows) AS read_rows,
    formatReadableSize(read_bytes) AS read_data,
    formatReadableSize(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) AS data_spilled_to_disk_uncompressed,
    formatReadableSize(ProfileEvents['ExternalProcessingCompressedBytesTotal']) AS data_spilled_to_disk_compressed
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles'])
ORDER BY initial_query_start_time DESC
LIMIT 1
FORMAT Vertical;


Row 1:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   JOIN roles AS r ON a.first_name = r.role
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge',
                                   max_rows_in_set_to_optimize_join = 0;
query_duration:                    11 seconds
memory_usage:                      4.71 GiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed:   0.00 B

我们可以看到 ClickHouse 没有将任何数据溢出到磁盘,并且完全在内存中处理了连接,峰值使用量为 4.71 GiB。

执行上述查询的 ClickHouse 节点有 120 GiB 的主内存可用。

SELECT formatReadableSize(getSetting('max_memory_usage'));


┌─formatReadableSize(getSetting('max_memory_usage'))─┐
│ 120.00 GiB                                         │
└────────────────────────────────────────────────────┘

并且 ClickHouse 配置 为在要排序的数据量超过可用主内存的一半时使用外部排序。

SELECT formatReadableSize(getSetting('max_bytes_before_external_sort'));


┌─formatReadableSize(getSetting('max_bytes_before_external_sort'))─┐
│ 60.00 GiB                                                        │
└──────────────────────────────────────────────────────────────────┘

我们通过将 `max_bytes_before_external_sort` 设置为较低的阈值(使用查询的 SETTINGS 子句)来触发上述连接示例查询的外部排序。

SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0, max_bytes_before_external_sort = '100M';


0 rows in set. Elapsed: 12.267 sec. Processed 132.92 million rows, 4.82 GB (10.84 million rows/s., 393.25 MB/s.)

我们检查最后两个连接示例的运行时统计信息。

SELECT
    query,
    formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
    formatReadableSize(memory_usage) AS memory_usage,
    formatReadableQuantity(read_rows) AS read_rows,
    formatReadableSize(read_bytes) AS read_data,
    formatReadableSize(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) AS data_spilled_to_disk_uncompressed,
    formatReadableSize(ProfileEvents['ExternalProcessingCompressedBytesTotal']) AS data_spilled_to_disk_compressed
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles'])
ORDER BY initial_query_start_time DESC
LIMIT 2
FORMAT Vertical;


Row 1:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   JOIN roles AS r ON a.first_name = r.role
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge',
                                   max_rows_in_set_to_optimize_join = 0,
                                   max_bytes_before_external_sort = '100M'
query_duration:                    12 seconds
memory_usage:                      3.49 GiB
read_rows:                         132.92 million
read_data:                         4.49 GiB
data_spilled_to_disk_uncompressed: 1.79 GiB
data_spilled_to_disk_compressed:   866.36 MiB

Row 2:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   JOIN roles AS r ON a.first_name = r.role
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge',
                                   max_rows_in_set_to_optimize_join = 0
query_duration:                    11 seconds
memory_usage:                      4.71 GiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed:   0.00 B

我们可以看到,对于使用降低的 `max_bytes_before_external_sort` 设置运行的查询,使用的内存更少,并且数据溢出到磁盘,这表明使用了外部排序。

请注意,对于具有外部处理的管道,此查询的 `read_rows` 指标目前不精确。

查询管道和跟踪日志

如本博客系列的 上一部分 所述,我们可以使用 ClickHouse 命令行客户端 检查示例连接查询(max_threads 设置为 2)的 ClickHouse 查询管道(快速安装说明在此处)。我们使用 EXPLAIN 语句打印用 DOT 图描述语言描述的查询管道的图形,并使用 Graphviz dot 将图形渲染为 pdf 格式。

clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password <PASSWORD> --database=imdb_large --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
SETTINGS max_threads = 2, join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0
;" | dot -Tpdf > pipeline.pdf

我们在管道上添加了与上图抽象图中使用的相同圆圈编号,稍微简化了主要阶段的名称,并添加了两个连接的表以使两个图对齐:full_sorting_merge_2.png

我们看到查询管道与我们上面的抽象版本相匹配。

请注意,如果要排序的数据块在内存中的峰值体积保持在低于 配置 的外部排序阈值,则会忽略溢出阶段,并且排序后的数据块会立即进行归并排序和连接。

此外,请注意,要排序的数据块在内存中的峰值体积仅与两个连接表中的数据总量略相关,并且更多地取决于查询管道中 配置 的并行级别 并行性。一般来说,数据在 ClickHouse 中是 流式处理 的:数据以并行和分块的方式流入(内存中的)查询引擎。流式传输的数据块由特定的查询管道阶段依次并行处理,以便一旦某些表示查询结果(部分)的数据块可用,它们就会从内存中流出并返回到查询的发送方。

为了分别观察外部排序和数据溢出到磁盘,我们通过要求 ClickHouse 在执行期间将跟踪级日志发送到 ClickHouse 命令行客户端来检查两个示例连接查询运行的实际执行情况。

首先,我们获取使用降低的外部排序阈值运行的查询的跟踪日志。

clickhouse client --host ea3kq2u4fm.eu-west-1.aws.clickhouse.cloud --secure --password <PASSWORD> --database=imdb_large --send_logs_level='trace' --query "
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0, max_bytes_before_external_sort = '100M';"

    ...
... imdb_large.actors ... : Reading approx. 1000000 rows with 6 streams
    ...
... imdb_large.roles ... : Reading approx. 100000000 rows with 30 streams
    ...
... MergeSortingTransform: ... writing part of data into temporary file …
    ... 
... MergingSortedTransform: Merge sorted … blocks, … rows in … sec., … rows/sec., … MiB/sec
    ... 
... MergeJoinAlgorithm: Finished processing in … seconds, left: 16 blocks, 1000000 rows; right: 1529 blocks, 100000000 rows, max blocks loaded to memory: 3
    ...

在我们分析上述跟踪日志条目之前,快速提醒一下,我们对所有示例查询运行都使用 max_threads 的默认设置。该设置控制 并行性查询管道 中的级别。执行查询的节点有 30 个 CPU 内核,因此默认的 `max_threads` 设置为 30。为了使所有查询管道可视化保持简洁易读,我们使用设置 `max_threads = 2` 人为地限制了 ClickHouse 查询管道中使用的并行级别。

我们可以看到如何使用 6 个和 30 个并行数据流将来自两个表的数据分块流入查询引擎。因为 `max_threads` 设置为 30。请注意,对于包含 100 万行的 `actors` 表,仅使用了 6 个并行数据流,而不是 30 个。原因是设置 merge_tree_min_rows_for_concurrent_read_for_remote_filesystem(对于云,对于 OSS,设置是 merge_tree_min_rows_for_concurrent_read)。此设置配置单个查询执行线程至少应读取/处理的行数的最小值。默认值为 163,840 行。而 100 万行 / 163,840 行 = 6 个线程。对于包含 1 亿行的 `roles` 表,结果将是 610 个线程,这超过了我们配置的最大线程数 30。

此外,我们看到了 MergeSortingTransform 管道阶段的条目(其名称在上图中简化为“溢出”),表明数据(排序后的数据块)溢出到磁盘上的临时存储。MergingSortedTransform 阶段(上图中的“归并排序”)的条目总结了在从临时存储读取排序后的数据块后对其进行的归并排序。

最后的 MergeJoinAlgorithm 条目总结了连接处理:来自左侧表的 100 万行以 16 个块的形式(每个块约 62500 行 - 接近 默认块大小)分块(通过 6 个并行数据流)流式传输。来自右侧表的 1 亿行以 1529 个块的形式(每个块约 65400 行)分块(通过 30 个并行数据流)流式传输。在流式处理过程中,在 `merge join` 阶段,最多有 3 个具有相同连接键的行所在的块同时位于内存中。对于来自我们示例查询的 INNER 连接的 ALL 严格性,需要这些行的 笛卡尔积。这在内存中完成。

接下来,我们获取没有降低外部排序阈值运行的查询的跟踪日志。

clickhouse client --host ea3kq2u4fm.eu-west-1.aws.clickhouse.cloud --secure --password <PASSWORD> --database=imdb_large --send_logs_level='trace' --query "
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0;"

    ...
... imdb_large.actors ... : Reading approx. 1000000 rows with 6 streams
    ...
... imdb_large.roles ... : Reading approx. 100000000 rows with 30 streams
    ...
... MergingSortedTransform: Merge sorted … blocks, … rows in … sec., … rows/sec., … MiB/sec
    ... 
... MergeJoinAlgorithm: Finished processing in … seconds, left: 16 blocks, 1000000 rows; right: 1529 blocks, 100000000 rows, max blocks loaded to memory: 3
    ...

日志条目看起来类似于之前使用降低的外部排序阈值运行的日志条目。除了缺少 MergeSortingTransform(“溢出”)阶段,因为要排序的数据块在内存中的峰值体积保持在外部排序的默认阈值以下。因此,溢出阶段被忽略,并且排序后的数据块立即进行归并排序和连接。

扩展

上一篇文章 中,我们解释了 `max_threads` 设置控制查询管道中的并行级别。为了易读性,我们使用设置 `max_threads=2` 人为地限制了查询管道可视化中的并行级别。

现在我们检查 `max_threads` 设置为 4 的完整排序合并连接查询的查询管道。

clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password <PASSWORD> --database=imdb_large --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
SETTINGS max_threads = 4, join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0
;" | dot -Tpdf > pipeline.pdf

full_sorting_merge_3.png

现在每个表使用四个并行数据流、排序和溢出阶段。这加快了数据块的(外部)排序。但是,每个表的归并排序阶段和最终的合并连接阶段需要保持单线程才能正常工作。但是,ClickHouse 为排序合并连接提供了一些额外的性能优化。我们接下来将讨论这些优化。

优化

在连接之前根据彼此的连接键值过滤表

可以在任何排序和合并操作之前根据彼此的连接键过滤连接的表,以最大程度地减少需要排序和合并的数据量。为此,如果可能(见下文),ClickHouse 将构建一个包含右侧表(唯一)连接键列值的内存中 集合,并使用此集合过滤掉左侧表中不可能有连接匹配的所有行。反之亦然。如果一个表远小于另一个表并且表的唯一连接键列值适合内存,则此方法特别有效。

哈希连接 在这种情况下也会表现良好。但是完整排序合并连接对左右两个表的作用方式相同,并且在两个表都大于可用内存的情况下,它将自动回退到外部排序。此优化试图将哈希连接性能引入此特定用例的完整排序合并连接。 max_rows_in_set_to_optimize_join 设置控制优化。将其设置为 0 将禁用它。默认值为 100,000。此值指定两个表集合的总最大允许大小(以条目为单位)。这意味着如果两个集合的总和保持在阈值以下,则优化将应用于两个表。如果两个集合的总和高于阈值,则仍然可能其中一个集合低于阈值,并且优化将仅应用于一个表。正如我们将在下面的跟踪日志中看到的那样,ClickHouse 将依次尝试为两个表构建集合,并在超过限制时恢复并跳过构建集合。

我们的示例连接查询分别通过 `first_name` 和 `role` 列连接两个表。

SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge';

我们检查(较小)左侧表中唯一连接键列值的数量。

SELECT countDistinct(first_name)
FROM actors;


┌─uniqExact(first_name)─┐
│                109993 │
└───────────────────────┘

我们检查(较大)右侧表中唯一连接键列值的数量。

SELECT countDistinct(role)
FROM roles;


┌─uniqExact(role)─┐
│          999999 │
└─────────────────┘

使用 `max_rows_in_set_to_optimize_join` 设置的默认值 100,000,优化将不会应用于任何一个表。

为了演示,我们使用 `max_rows_in_set_to_optimize_join` 的默认值执行示例查询。

SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge';


0 rows in set. Elapsed: 11.602 sec. Processed 101.00 million rows, 3.67 GB (8.71 million rows/s., 315.97 MB/s.)

现在我们使用 `max_rows_in_set_to_optimize_join` 设置为 `200,000` 执行示例查询。请注意,此限制对于为两个表构建集合仍然太低。但它允许为较小的左侧表构建集合,这是此优化的主要思想,即如果一个表远小于另一个表并且表的唯一连接键列值适合内存,则此方法特别有效。

SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 200_000;


0 rows in set. Elapsed: 2.156 sec. Processed 101.00 million rows, 3.67 GB (46.84 million rows/s., 1.70 GB/s.)

我们已经可以看到执行时间快了很多。让我们检查两个查询运行的运行时统计信息以查看更多详细信息。

SELECT
    query,
    formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
    formatReadableSize(memory_usage) AS memory_usage,
    formatReadableQuantity(read_rows) AS read_rows,
    formatReadableSize(read_bytes) AS read_data,
    formatReadableSize(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) AS data_spilled_to_disk_uncompressed,
    formatReadableSize(ProfileEvents['ExternalProcessingCompressedBytesTotal']) AS data_spilled_to_disk_compressed
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles'])
ORDER BY initial_query_start_time DESC
LIMIT 2
FORMAT Vertical;


Row 1:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   JOIN roles AS r ON a.first_name = r.role
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge',
                                   max_rows_in_set_to_optimize_join = 200_000;
query_duration:                    2 seconds
memory_usage:                      793.30 MiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed:   0.00 B


Row 2:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   JOIN roles AS r ON a.first_name = r.role
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge';
query_duration:                    11 seconds
memory_usage:                      4.71 GiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed:   0.00 B

我们可以看到预过滤优化的效果:执行时间快了 5 倍,峰值内存消耗减少了约 6 倍。

现在我们检查启用了优化的查询管道。

clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password <PASSWORD> --database=imdb_large --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
SETTINGS max_threads = 2, join_algorithm = 'full_sorting_merge';" | dot -Tpdf > pipeline.pdf

full_sorting_merge_4.png

与没有任何优化应用的完整排序合并连接算法的通用版本管道相比,我们可以看到其他阶段(上图中以蓝色和绿色显示)。这些阶段负责在连接之前根据彼此的连接键值过滤两个表。

使用两个并行的蓝色CreatingSetsOnTheFlyTransform阶段(如果可行)来构建一个包含右表连接键列值的(唯一)内存集合。

然后,这个集合被两个(因为max_threads设置为2)并行的蓝色FilterBySetOnTheFlyTransform阶段使用,用于过滤掉左表中不可能存在连接匹配的所有行。

使用两个并行的绿色CreatingSetsOnTheFlyTransform阶段(如果可行)来构建一个包含左表连接键列值的(唯一)内存集合。

然后,这个集合被两个并行的绿色FilterBySetOnTheFlyTransform阶段使用,用于过滤掉右表中不可能存在连接匹配的所有行。

在这些集合从连接键列完全构建之前,包含所有所需列的行以并行块的形式进行流式传输,绕过过滤器优化,以便在每个块中按其连接键对行进行排序,并(可能)将其溢出到磁盘。过滤器只有在集合准备就绪后才会开始工作。这就是为什么还有两个ReadHeadBalancedProcessor阶段。这些阶段确保数据从一开始(在集合准备就绪之前)就以与它们总大小成比例的方式从两个表流式传输,以防止一个大型表的数据在小型表能够用于过滤它之前就被大部分处理的情况。

为了深入了解这些附加阶段的执行情况,我们检查查询执行的跟踪日志。

clickhouse client --host ea3kq2u4fm.eu-west-1.aws.clickhouse.cloud --secure --password <PASSWORD> --database=imdb_large --send_logs_level='trace' --query "
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 200_000;"


    ...
... imdb_large.actors ... : Reading approx. 1000000 rows with 6 streams
    ...
... imdb_large.roles ... : Reading approx. 100000000 rows with 30 streams
    ...
... CreatingSetsOnTheFlyTransform: Create set and filter Right joined stream: set limit exceeded, give up building set, after reading 577468 rows and using 96.00 MiB
    ...
... CreatingSetsOnTheFlyTransform: Create set and filter Left joined stream: finish building set for [first_name] with 109993 rows, set size is 6.00 MiB
    ...
... FilterBySetOnTheFlyTransform: Finished create set and filter right joined stream by [role]: consumed 3334144 rows in total, 573440 rows bypassed, result 642125 rows, 80.74% filtered
... FilterBySetOnTheFlyTransform: Finished create set and filter right joined stream by [role]: consumed 3334144 rows in total, 573440 rows bypassed, result 642125 rows, 80.74% filtered
    ... 
... MergingSortedTransform: Merge sorted … blocks, … rows in … sec., … rows/sec., … MiB/sec
    ... 
... MergeJoinAlgorithm: Finished processing in 3.140038835 seconds, left: 16 blocks, 1000000 rows; right: 207 blocks, 13480274 rows, max blocks loaded to memory: 3
    ...

我们看到如何使用6个和30个并行流将数据从两个表流式传输到查询引擎中。

接下来,我们看到一个CreatingSetsOnTheFlyTransform阶段的条目,指示包含右表连接键列值的内存集合无法构建,因为条目数量将超过为max_rows_in_set_to_optimize_join设置配置的200000阈值。

另一个CreatingSetsOnTheFlyTransform阶段的条目显示,包含左表连接键列值的集合已成功构建。该集合用于过滤右表中的行,如FilterBySetOnTheFlyTransform阶段的30个条目(我们只显示前两个并省略其余部分)所示。30是因为ClickHouse使用30个并行流阶段从右表流式传输行,并使用30个并行的FilterBySetOnTheFlyTransform阶段来过滤这30个流。

利用物理行顺序

如果一个或两个连接表的物理行顺序与连接键排序顺序匹配,则完整排序合并连接算法的排序阶段将跳过相应的表。

我们可以通过检查使用与两个表排序键匹配的连接键的连接查询的查询管道来验证这一点。首先,我们检查两个连接表的排序键。

SELECT
    name AS table,
    sorting_key
FROM system.tables
WHERE database = 'imdb_large';


┌─table───────┬─sorting_key───────────────────────┐
│ actors      │ id, first_name, last_name, gender │
│ roles       │ actor_id, movie_id                │
└─────────────┴───────────────────────────────────┘

我们使用一个连接查询,通过id(用于actors表)和actor_id(用于roles表)连接两个示例表,查找每个演员的所有角色。这些连接键是表排序键的前缀,允许ClickHouse通过按磁盘上存储的顺序读取两个表中的行来跳过完整排序合并算法的排序阶段。

我们检查此查询的查询管道。

clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password <PASSWORD> --database=imdb_large --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.actor_id
SETTINGS max_threads = 2, join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0, max_bytes_before_external_sort = '100M';" | dot -Tpdf > pipeline.pdf

full_sorting_merge_5.png

我们看到查询管道①②以每个表两个并行流阶段(因为max_threads设置为2)开头,这些阶段将行从两个表按顺序分块流式传输到查询引擎中。

请注意排序和溢出阶段是如何缺失的。已排序的块按表合并排序,并且③通过合并(交错扫描)两个已排序的流来识别连接匹配。

我们运行排序和溢出阶段缺失的查询。请注意,此按顺序读取优化当前仅在禁用max_rows_in_set_to_optimize_join设置时应用。有一个待处理的PR,如果ClickHouse可以按顺序读取数据,则会自动禁用该设置。ClickHouse不支持按顺序优化和预过滤同时进行。使用上面提到的PR,将优先考虑按顺序优化。

SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0;

0 rows in set. Elapsed: 7.280 sec. Processed 101.00 million rows, 3.67 GB (13.87 million rows/s., 503.56 MB/s.)

为了进行比较,我们运行相同的查询,通过不禁用max_rows_in_set_to_optimize_join设置来强制排序。

SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge';

0 rows in set. Elapsed: 7.542 sec. Processed 101.00 million rows, 3.67 GB (13.39 million rows/s., 486.09 MB/s.)

为了进一步比较,我们运行相同的查询,通过不禁用max_rows_in_set_to_optimize_join设置并降低max_bytes_before_external_sort值来强制外部排序。

SELECT *
FROM actors AS a
INNER JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_bytes_before_external_sort = '100M';

0 rows in set. Elapsed: 8.332 sec. Processed 139.35 million rows, 5.06 GB (16.72 million rows/s., 606.93 MB/s.)

我们检查最后三次查询运行的运行时统计信息。

SELECT
    query,
    formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
    formatReadableSize(memory_usage) AS memory_usage,
    formatReadableQuantity(read_rows) AS read_rows,
    formatReadableSize(read_bytes) AS read_data,
    formatReadableSize(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) AS data_spilled_to_disk_uncompressed,
    formatReadableSize(ProfileEvents['ExternalProcessingCompressedBytesTotal']) AS data_spilled_to_disk_compressed
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND hasAll(tables, ['imdb_large.actors', 'imdb_large.roles'])
ORDER BY initial_query_start_time DESC
LIMIT 3
FORMAT Vertical;


Row 1:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   INNER JOIN roles AS r ON a.id = r.actor_id
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge',                                                                                                          
                                   max_bytes_before_external_sort = '100M';
query_duration:                    8 seconds
memory_usage:                      3.56 GiB
read_rows:                         139.35 million
read_data:                         4.71 GiB
data_spilled_to_disk_uncompressed: 1.62 GiB
data_spilled_to_disk_compressed:   1.09 GiB


Row 2:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   JOIN roles AS r ON a.id = r.actor_id
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge';
query_duration:                    7 seconds
memory_usage:                      5.07 GiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed:   0.00 B


Row 3:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   JOIN roles AS r ON a.id = r.actor_id
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge',                                    
                                   max_rows_in_set_to_optimize_join = 0;
query_duration:                    7 seconds
memory_usage:                      497.88 MiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 0.00 B
data_spilled_to_disk_compressed:   0.00 B

第3行中查询运行(其中跳过了排序和溢出阶段)具有最快的执行时间和非常低的内存使用率。因为来自两个表的数据以块为单位并按顺序流经查询引擎,所以同时只有少量数据块在内存中,并且只需要合并并将流式传输到查询的发送方。

我们可以看到,对于第2行中强制排序的查询运行,排序是在内存中进行的,因为没有数据溢出到磁盘。此查询运行使用的内存是第3行中不进行排序的运行的10倍。

对于第1行中强制进行外部排序的查询运行,执行时间最慢,但与第2行中强制进行内存排序的查询运行相比,内存使用率较低。

当且仅当其中一个表的物理行顺序与连接键排序顺序匹配时,也会应用按顺序流式传输优化。我们可以通过检查连接查询的查询管道来演示这一点,其中左表通过与表在磁盘上的物理行顺序匹配的列进行连接,但右表并非如此。

clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password <PASSWORD> --database=imdb_large --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.id = r.movie_id
SETTINGS max_threads = 2, join_algorithm = 'full_sorting_merge', max_rows_in_set_to_optimize_join = 0;" | dot -Tpdf > pipeline.pdf

full_sorting_merge_6.png

左表中的行由两个流按顺序并行流式传输到查询引擎中。这些已排序流的排序和溢出阶段缺失。相反,右表的阶段指示排序和(可能)溢出。

部分合并连接

描述

部分合并连接是集成到ClickHouse查询管道中的排序合并连接的变体。经典的排序合并连接首先按连接键完全对两个连接表进行排序,然后合并排序结果。ClickHouse部分合并连接针对连接大型表时最小化内存使用进行了优化,并且仅通过外部排序首先完全对右表进行排序。为了最大程度地减少内存中处理的数据量,它在磁盘上创建了最小-最大索引。左表始终按块进行排序,并在内存中排序。但是,如果左表的物理行顺序与连接键排序顺序匹配,则在内存中识别连接匹配将更有效。

此图概述了ClickHouse如何实现部分合并连接的详细信息。

partial_merge_1.png

查询管道看起来非常类似于ClickHouse哈希连接算法的管道。这并非巧合。部分合并连接正在重用哈希连接管道,因为与哈希连接一样,它具有构建和扫描阶段。请记住,哈希连接首先从右表构建哈希表,然后扫描左表。类似地,部分合并连接首先构建右表的排序版本,然后扫描左表。

① 首先,所有来自右表的数据都按方式由2个流(因为max_threads = 2)并行流式传输到内存中。通过填充阶段,每个流式传输块中的行按连接键列值排序,并与每个排序块的最小-最大索引一起溢出到临时存储。最小-最大索引存储每个排序块包含的最小和最大连接键值。这些最小-最大索引在步骤②中用于最小化识别连接匹配时内存中处理的数据量。

② 然后,所有来自左表的数据都按块方式由2个流(max_threads = 2)并行流式传输。每个块在运行时按连接键排序,然后③与来自右表的磁盘上的排序块匹配。最小-最大索引用于仅加载磁盘上可能包含连接匹配的右表块。

这种连接处理策略非常节省内存。无论连接表的尺寸和物理行顺序如何。在上面的步骤①中,只有来自右表的少量块在写入临时存储之前同时流经内存。在步骤②中,只有来自左表的少量块在同一时间流经内存。步骤①中创建的最小-最大索引有助于最大程度地减少从临时存储加载到内存中的右表块的数量,以识别连接匹配。

请注意,如果左表的物理行顺序与连接键排序顺序匹配,则这种基于最小-最大索引的非匹配右表块跳过将最有效。

但是,当左表的数据块具有连接键值的某种一般分布时,使用部分合并连接算法的成本最高。因为如果左表的每个块都包含一般分布的连接键值的大子集,则右表的排序块的最小-最大索引将不起作用,并且实际上会在两个表的块之间创建笛卡尔积:对于左表的每个块,都会从磁盘加载右表的一大组排序块。

支持的连接类型

具有ALL严格度的INNER、LEFT、RIGHT、FULL连接类型,以及具有ANY和SEMI严格度的INNER、LEFT连接类型都受支持

示例

我们运行我们的示例连接查询(使用连接键,这些键是连接表排序键的前缀,以从上述基于最小-最大索引的性能优化中获益),并使用部分合并算法。

SELECT *
FROM actors AS a
INNER JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'partial_merge';

0 rows in set. Elapsed: 33.796 sec. Processed 101.00 million rows, 3.67 GB (2.99 million rows/s., 108.47 MB/s.)

现在,我们运行相同的查询,但使用一个在磁盘上具有不同物理顺序的左表。我们创建了按非连接键列排序的actors表的副本。这意味着行按随机连接键顺序排列。如上所述,这是部分合并连接执行时间的最坏情况。

SELECT *
FROM actors_unsorted AS a
INNER JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'partial_merge';


0 rows in set. Elapsed: 44.872 sec. Processed 101.00 million rows, 3.67 GB (2.25 million rows/s., 81.70 MB/s.)

执行时间比之前的运行慢了36%。

为了进一步比较,我们使用完整排序合并算法运行相同的查询。为了与部分合并算法进行公平比较,我们强制进行外部排序。通过禁用完整排序合并算法的“按顺序流式传输优化”(通过不将max_rows_in_set_to_optimize_join设置为0)。并降低max_bytes_before_external_sort值。

SELECT *
FROM actors AS a
INNER JOIN roles AS r ON a.id = r.actor_id
FORMAT `Null`
SETTINGS join_algorithm = 'full_sorting_merge', max_bytes_before_external_sort = '100M';

0 rows in set. Elapsed: 7.381 sec. Processed 139.35 million rows, 5.06 GB (18.88 million rows/s., 685.11 MB/s.)

我们检查最后三次查询运行的运行时统计信息。

SELECT
    query,
    formatReadableTimeDelta(query_duration_ms / 1000) AS query_duration,
    formatReadableSize(memory_usage) AS memory_usage,
    formatReadableQuantity(read_rows) AS read_rows,
    formatReadableSize(read_bytes) AS read_data,
    formatReadableSize(ProfileEvents['ExternalProcessingUncompressedBytesTotal']) AS data_spilled_to_disk_uncompressed,
    formatReadableSize(ProfileEvents['ExternalProcessingCompressedBytesTotal']) AS data_spilled_to_disk_compressed
FROM clusterAllReplicas(default, system.query_log)
WHERE (type = 'QueryFinish') AND (hasAll(tables, ['imdb_large.actors', 'imdb_large.roles']) OR hasAll(tables, ['imdb_large.actors_unsorted', 'imdb_large.roles']))
ORDER BY initial_query_start_time DESC
LIMIT 3
FORMAT Vertical;


Row 1:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   INNER JOIN roles AS r ON a.id = r.actor_id
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'full_sorting_merge',                                                                       
                                   max_bytes_before_external_sort = '100M';
query_duration:                    7 seconds
memory_usage:                      3.54 GiB
read_rows:                         139.35 million
read_data:                         4.71 GiB
data_spilled_to_disk_uncompressed: 1.62 GiB
data_spilled_to_disk_compressed:   1.09 GiB

Row 2:
──────
query:                             SELECT *
                                   FROM actors_unsorted AS a
                                   INNER JOIN roles AS r ON a.id = r.actor_id
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'partial_merge';
query_duration:                    44 seconds
memory_usage:                      2.20 GiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 5.27 GiB
data_spilled_to_disk_compressed:   3.52 GiB

Row 3:
──────
query:                             SELECT *
                                   FROM actors AS a
                                   INNER JOIN roles AS r ON a.id = r.actor_id
                                   FORMAT `Null`
                                   SETTINGS join_algorithm = 'partial_merge';
query_duration:                    33 seconds
memory_usage:                      2.21 GiB
read_rows:                         101.00 million
read_data:                         3.41 GiB
data_spilled_to_disk_uncompressed: 5.27 GiB
data_spilled_to_disk_compressed:   3.52 GiB

从第 2 行和第 3 行可以看出,部分合并连接的两次运行使用了相同数量的内存和磁盘溢出数据。但是,如上所述,当左表物理行顺序与连接键顺序匹配时(第 3 行),运行速度更快。

即使对连接表进行了(人为强制的)完全外部排序,第 1 行中完全排序合并连接的执行速度也几乎是第 3 行中部分合并连接执行速度的 5 倍。不过,正如其设计目标一样,部分合并连接使用了更少的内存。

查询管道和跟踪日志

我们检查了将max_threads设置为 2 的部分合并连接示例的查询管道。

clickhouse client --host ekyyw56ard.us-west-2.aws.clickhouse.cloud --secure --port 9440 --password <PASSWORD> --database=imdb_large --query "
EXPLAIN pipeline graph=1, compact=0
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
SETTINGS max_threads = 2, join_algorithm = 'partial_merge';" | dot -Tpdf > pipeline.pdf

圆圈数字、主要阶段的简化名称以及添加的两个连接表用于与上方的抽象图保持一致。

partial_merge_2.png

实际的查询管道反映了我们上面给出的抽象版本。如前所述,部分排序合并连接正在重用哈希连接管道,因为与哈希连接一样,它也具有构建和扫描阶段:部分合并连接首先构建右表的排序版本,然后扫描左表。

由于上述管道重用,右表块的排序以及与左表块的排序合并过程在管道中并不直接可见。

为了检查这些阶段的执行情况,我们检查查询执行的跟踪日志。

clickhouse client --host ea3kq2u4fm.eu-west-1.aws.clickhouse.cloud --secure --password <PASSWORD> --database=imdb_large --send_logs_level='trace' --query "
SELECT *
FROM actors AS a
JOIN roles AS r ON a.first_name = r.role
FORMAT `Null`
SETTINGS join_algorithm = 'partial_merge';"


    ...
... imdb_large.actors ... : Reading approx. 1000000 rows with 6 streams
    ...
... imdb_large.roles ... : Reading approx. 100000000 rows with 30 streams 
    ...
... MergingSortedTransform: Merge sorted 1528 blocks, 100000000 rows …
    ...

我们可以看到,如何使用 6 个和 30 个并行流将来自两个表的数据块式地流式传输到查询引擎中。

一个 MergingSortedTransform 条目总结了连接处理过程:来自右表的 1528 个数据块(包含 1 亿行)被排序,随后与来自左表的块进行合并连接。请注意,1 亿行 1528 个数据块相当于每个块大约 65445 行,这对应于默认块大小

总结

这篇博文详细描述并比较了 ClickHouse 中两种基于外部排序的连接算法。

**完全排序合并连接**不受内存限制,基于内存或外部排序,并且可以利用连接表的物理行顺序并跳过排序阶段。在这种情况下,连接性能可以与哈希连接算法相媲美,同时通常需要更少的内存。

**部分合并连接**针对连接大型表时最小化内存使用进行了优化,并且始终首先通过外部排序对右表进行完全排序。左表也始终按块进行内存排序。如果左表的物理行顺序与连接键排序顺序匹配,则连接匹配过程将更有效率。

此图表总结并比较了本文中一些连接查询运行的内存使用情况和执行时间。我们始终运行相同的连接查询,连接相同的数据,并将较大的表放在右侧,在一台具有 30 个 CPU 内核(因此max_threads设置为 30)的节点上运行。

comparison.png

① 在此运行中,完全排序合并连接跳过了排序和溢出阶段,因为两个连接表的物理行顺序与连接键排序顺序匹配。从而导致最快的执行时间和显著最低的内存使用量。② 当两个连接表都进行内存排序时,完全排序合并连接的内存消耗最高,而③ 使用外部排序代替内存排序则降低了内存消耗,但牺牲了执行速度。④ 部分合并连接始终通过外部排序对右表的数据进行排序。我们可以看到,在所有使用外部排序的连接查询运行中,该算法的内存使用量最低。这是该算法在牺牲相对较低的执行速度的情况下进行优化的目标。左表数据也始终按块进行内存排序。但是,⑤ 我们可以看到,如果左表的物理行顺序与连接键顺序不匹配,则执行速度最慢。

在我们的下一篇文章中,我们将介绍 ClickHouse 最快的连接算法:直接连接。

敬请期待!

立即开始使用 ClickHouse Cloud 并获得 300 美元的信用额度。在 30 天试用期结束后,您可以继续使用按需付费计划,或联系我们以了解有关我们基于容量的折扣的更多信息。请访问我们的定价页面以获取详细信息。

分享此文章

订阅我们的时事通讯

随时了解功能发布、产品路线图、支持和云产品信息!
加载表单...
关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image