跳至主要内容
跳至主要内容

ClickHouse 如何并行执行查询

ClickHouse 是 为速度而构建的。它以高度并行的方式执行查询,使用所有可用的 CPU 核心,将数据分布到处理通道,并且经常将硬件推向其极限。

本指南将介绍 ClickHouse 中查询并行性的工作原理,以及如何调整或监控它以提高大型工作负载的性能。

我们使用针对 uk_price_paid_simple 数据集的聚合查询来说明关键概念。

逐步说明:ClickHouse 如何并行化聚合查询

当 ClickHouse ① 运行带有表主键过滤器的聚合查询时,它 ② 将主索引加载到内存中以 ③ 识别需要处理的 granule,以及可以安全跳过的 granule

Index analysis

将工作分布到处理通道

然后,所选数据会 动态 地分布到 n 个并行 处理通道,这些通道以 为单位流式处理和处理数据,最终得到结果

4 parallel processing lanes


并行处理通道的数量 nmax_threads 设置控制,默认情况下与服务器上 ClickHouse 可用的单个 CPU 的核心(线程)数量相匹配。在上面的示例中,我们假设 4 个核心。

在具有 8 个核心的机器上,查询处理吞吐量大致会翻倍(但内存使用量也会相应增加),因为更多的通道并行处理数据

8 parallel processing lanes


高效的通道分布是最大化 CPU 利用率和减少总查询时间的关键。

处理分片表上的查询

当表数据分布在多个服务器上作为 分片 时,每个服务器并行处理其分片。在每个服务器内部,本地数据使用并行处理通道处理,就像上面描述的那样

Distributed lanes


最初接收查询的服务器会收集来自分片的所有子结果,并将它们组合成最终的全局结果。

将查询负载分布到分片上,可以实现并行性的水平扩展,尤其是在高吞吐量环境中。

ClickHouse Cloud 使用并行副本而不是分片

在 ClickHouse Cloud 中,通过 并行副本 实现相同的并行性,这些副本在无共享集群中类似于分片。每个 ClickHouse Cloud 副本(一个无状态计算节点)并行处理一部分数据,并为最终结果做出贡献,就像一个独立的分片一样。

监控查询并行性

使用这些工具来验证您的查询是否充分利用了可用的 CPU 资源,以及在未充分利用时进行诊断。

我们正在使用一台具有 59 个 CPU 核心的测试服务器运行此程序,这使得 ClickHouse 能够充分展示其查询并行性。

为了观察示例查询的执行方式,我们可以指示 ClickHouse 服务器在聚合查询期间返回所有 trace-level 日志条目。为了演示,我们删除了查询的谓词——否则,只有 3 个 granule 会被处理,这不足以让 ClickHouse 使用多个并行处理通道

SELECT
   max(price)
FROM
   uk.uk_price_paid_simple
SETTINGS send_logs_level='trace';
① <Debug> ...: 3609 marks to read from 3 ranges
② <Trace> ...: Spreading mark ranges among streams
② <Debug> ...: Reading approx. 29564928 rows with 59 streams

我们可以看到

  • ① ClickHouse 需要读取 3,609 个 granule(在 trace 日志中指示为 marks)跨越 3 个数据范围。
  • ② 在 59 个 CPU 核心的情况下,它将这项工作分布到 59 个并行处理流中——每个通道一个。

或者,我们可以使用 EXPLAIN 子句来检查 物理操作符计划——也称为“查询管道”——用于聚合查询

EXPLAIN PIPELINE
SELECT
   max(price)
FROM
   uk.uk_price_paid_simple;
    ┌─explain───────────────────────────────────────────────────────────────────────────┐
 1. │ (Expression)                                                                      │
 2. │ ExpressionTransform × 59                                                          │
 3. │   (Aggregating)                                                                   │
 4. │   Resize 59 → 59                                                                  │
 5. │     AggregatingTransform × 59                                                     │
 6. │       StrictResize 59 → 59                                                        │
 7. │         (Expression)                                                              │
 8. │         ExpressionTransform × 59                                                  │
 9. │           (ReadFromMergeTree)                                                     │
10. │           MergeTreeSelect(pool: PrefetchedReadPool, algorithm: Thread) × 59 0 → 1 │
    └───────────────────────────────────────────────────────────────────────────────────┘

注意:请从底部到顶部读取上面的操作符计划。每行代表物理执行计划中的一个阶段,从底部读取存储中的数据开始,到顶部结束最终处理步骤。标记为 × 59 的操作符在 59 个并行处理通道上并发执行,这些通道跨越非重叠的数据区域。这反映了 max_threads 的值,并说明了查询的每个阶段如何在 CPU 核心上并行化。

ClickHouse 的 嵌入式 Web UI(在 /play 端点可用)可以将上述物理计划渲染为图形可视化。在此示例中,我们将 max_threads 设置为 4 以保持可视化紧凑,仅显示 4 个并行处理通道

注意:请从左到右读取可视化。每行代表一个流式处理数据的并行处理通道,应用诸如过滤、聚合和最终处理阶段之类的转换。在此示例中,您可以看到四个并行通道对应于 max_threads = 4 设置。

处理通道之间的负载均衡

请注意,上述物理计划中的 Resize 操作符 重新分区和重新分配 数据块流到处理通道,以保持它们均匀利用。这种重新平衡对于数据范围在匹配查询谓词的行数方面有所不同时尤其重要,否则,某些通道可能会过载,而其他通道则处于空闲状态。通过重新分配工作,更快的通道可以有效地帮助更慢的通道,从而优化整体查询运行时间。

为什么 max_threads 并不总是被尊重

如上所述,并行处理通道的数量 nmax_threads 设置控制,默认情况下与服务器上 ClickHouse 可用的 CPU 核心数量相匹配

SELECT getSetting('max_threads');
   ┌─getSetting('max_threads')─┐
1. │                        59 │
   └───────────────────────────┘

但是,根据所选处理的数据量,max_threads 值可能会被忽略

EXPLAIN PIPELINE
SELECT
   max(price)
FROM
   uk.uk_price_paid_simple
WHERE town = 'LONDON';
...   
(ReadFromMergeTree)
MergeTreeSelect(pool: PrefetchedReadPool, algorithm: Thread) × 30

如上方的操作符计划摘录所示,即使 max_threads 设置为 59,ClickHouse 也仅使用 30 个并发流来扫描数据。

现在让我们运行查询

SELECT
   max(price)
FROM
   uk.uk_price_paid_simple
WHERE town = 'LONDON';
   ┌─max(price)─┐
1. │  594300000 │ -- 594.30 million
   └────────────┘
   
1 row in set. Elapsed: 0.013 sec. Processed 2.31 million rows, 13.66 MB (173.12 million rows/s., 1.02 GB/s.)
Peak memory usage: 27.24 MiB.   

如上图所示,查询处理了 231 万行数据,并读取了 13.66MB 的数据。这是因为,在索引分析阶段,ClickHouse 为处理选择了 282 个 granule,每个 granule 包含 8,192 行,总计约 231 万行

EXPLAIN indexes = 1
SELECT
   max(price)
FROM
   uk.uk_price_paid_simple
WHERE town = 'LONDON';
    ┌─explain───────────────────────────────────────────────┐
 1. │ Expression ((Project names + Projection))             │
 2. │   Aggregating                                         │
 3. │     Expression (Before GROUP BY)                      │
 4. │       Expression                                      │
 5. │         ReadFromMergeTree (uk.uk_price_paid_simple)   │
 6. │         Indexes:                                      │
 7. │           PrimaryKey                                  │
 8. │             Keys:                                     │
 9. │               town                                    │
10. │             Condition: (town in ['LONDON', 'LONDON']) │
11. │             Parts: 3/3                                │
12. │             Granules: 282/3609                        │
    └───────────────────────────────────────────────────────┘  

无论配置的 max_threads 值如何,ClickHouse 仅在有足够的数据证明其合理性时才分配额外的并行处理通道。“max”在 max_threads 中指的是上限,而不是使用的线程数的保证。

“足够的数据”的含义主要由两个设置决定,这些设置定义了每个处理通道应处理的最小行数(默认值为 163,840)和最小字节数(默认值为 2,097,152)

对于无共享集群

对于具有共享存储的集群(例如 ClickHouse Cloud)

此外,还有一个读取任务大小的硬下限,由以下控制

不要修改这些设置

我们不建议在生产环境中修改这些设置。它们仅显示在这里,是为了说明为什么 max_threads 并不总是确定实际的并行级别。

为了演示,让我们在覆盖这些设置以强制最大并发的情况下检查物理计划

EXPLAIN PIPELINE
SELECT
   max(price)
FROM
   uk.uk_price_paid_simple
WHERE town = 'LONDON'
SETTINGS
  max_threads = 59,
  merge_tree_min_read_task_size = 0,
  merge_tree_min_rows_for_concurrent_read_for_remote_filesystem = 0, 
  merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem = 0;
...   
(ReadFromMergeTree)
MergeTreeSelect(pool: PrefetchedReadPool, algorithm: Thread) × 59

现在 ClickHouse 使用 59 个并发流来扫描数据,完全尊重配置的 max_threads

这表明对于小型数据集上的查询,ClickHouse 会故意限制并发性。仅将设置覆盖用于测试——不要在生产中使用——因为它们可能导致效率低下或资源争用。

关键要点

  • ClickHouse 使用与 max_threads 绑定的处理通道并行化查询。
  • 通道的实际数量取决于所选数据的规模。
  • 使用 EXPLAIN PIPELINE 和 trace 日志来分析通道使用情况。

在哪里可以找到更多信息

如果您想深入了解 ClickHouse 如何并行执行查询以及如何实现大规模高性能,请探索以下资源

    © . This site is unofficial and not affiliated with ClickHouse, Inc.