简介
ClickHouse 是一个极速的关系型数据库管理系统,针对实时分析进行了优化。即使查询通常已经足够快,几乎不需要调整,但我们坚信不应浪费任何毫秒!在这篇文章中,我们将为您提供一些关于如何加速 ClickHouse 查询,使其比以往更快的技巧,并提供一些实际示例。
ClickHouse 查询处理快速入门
ClickHouse 中的表被设计为每秒接收数百万行插入,并存储非常大的(数百 PB)数据量。
ClickHouse 中实现快速查询速度通常是通过正确利用表的(稀疏)主索引,以便大幅限制 ClickHouse 需要从磁盘读取的数据量,并防止在查询时对数据进行重新排序,这也可以在使用 LIMIT 子句时启用短路。
ClickHouse 设计的主索引基于二分搜索算法,该算法可以高效地(时间复杂度为 O(log2 n))查找排序数组中目标值的位置。
例如,考虑下图中的排序数组,其中二分搜索算法用于查找值 42。该算法将目标值 42 与数组的中间元素进行比较。如果它们不相等,则消除目标不可能存在的半部分,并在剩余的半部分继续搜索,再次取中间元素与目标值进行比较,并重复此过程,直到找到目标值
在 ClickHouse 表中使用表的主索引查找行的方式相同。
基于该行顺序,主索引(它是一个像上图中的排序数组)存储表中每8192 行的主键列值。
在下图中,我们假设一个表有一个名为“numbers”的列,它也是主键列
ClickHouse 主索引不是索引单个行,而是索引行块(所谓的粒度)。
使用这样的主索引,可以跳过 TB 甚至 PB 级的数据,从而在(亚)秒级内完成搜索。
下图显示了 ClickHouse 通常如何执行查询
步骤 1:将相关表的主索引加载到主内存中。
步骤 2:通常,通过对索引条目进行二分搜索,ClickHouse 选择可能包含与查询的 WHERE 子句匹配的行的行块。
步骤 3:所选的行块并行流式传输到 ClickHouse 查询引擎以进行进一步处理,查询结果流式传输到调用者。
有三个主要的调整旋钮可以加速 ClickHouse 中的此查询执行工作流程
ClickHouse 需要从磁盘流式传输到主内存的数据越少,查询的执行时间就越快。从磁盘流式传输所需的数据量可以通过(1)正确利用主索引和(2)预计算聚合来最小化。
数据流式传输和实际处理可以通过(3)增加 ClickHouse 查询处理引擎内部使用的并行度来加速。
(1)正确利用主索引
让我们首先看看如何确保充分利用主索引,以确保最佳查询性能。
利用索引最小化要流式传输到 ClickHouse 的数据量
作为一个运行示例,我们使用来自我们的 英国房价支付教程 的表,其中包含 2764 万行。此数据集可在我们的 sql.clickhouse.com 环境中使用。
我们运行一个查询,列出伦敦县中支付价格最高的三项
SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 ┌─county─────────┬─────price─┐ │ GREATER LONDON │ 594300000 │ │ GREATER LONDON │ 569200000 │ │ GREATER LONDON │ 448500000 │ └────────────────┴───────────┘ 3 rows in set. Elapsed: 0.044 sec. Processed 27.64 million rows, 44.21 MB (634.60 million rows/s., 1.01 GB/s.)✎
ClickHouse 正在执行全表扫描!
因为表的主索引无法正确用于查询。
我们检查表的主键
SHOW CREATE TABLE uk_price_paid CREATE TABLE default.uk_price_paid ( `price` UInt32, `date` Date, `postcode1` LowCardinality(String), `postcode2` LowCardinality(String), `type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4), `is_new` UInt8, `duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2), `addr1` String, `addr2` String, `street` LowCardinality(String), `locality` LowCardinality(String), `town` LowCardinality(String), `district` LowCardinality(String), `county` LowCardinality(String), `category` UInt8 ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/fb100991-4cae-4a92-995a-1ca11416879e/{shard}', '{replica}') ORDER BY (postcode1, postcode2, addr1, addr2) SETTINGS index_granularity = 8192 1 row in set. Elapsed: 0.001 sec.✎
表的 ORDER BY 子句决定了数据在磁盘上的排序方式以及主索引条目。ORDER BY 中的列是 postcode1、postcode2、addr1 和 addr2。
请注意,如果未明确定义排序键(由 ORDER BY 子句定义),即通过 PRIMARY KEY 子句,则 ClickHouse 将排序键用作主键。
这对带有 town = 'LONDON'
谓词的查询没有帮助。
如果我们想显着加速我们的示例查询(该查询过滤特定城镇的行),那么我们需要使用针对该查询优化的主索引。
一个选项是创建第二个表,该表具有基于不同主键的不同行顺序。
由于一个表在磁盘上只能有一个物理顺序,因此我们需要将表数据复制到另一个表中。
我们创建第二个表,其架构与原始表相同,但主键不同,并在表之间复制数据
CREATE TABLE uk_price_paid_oby_town_price ( price UInt32, date Date, postcode1 LowCardinality(String), postcode2 LowCardinality(String), type Enum8('terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4, 'other' = 0), is_new UInt8, duration Enum8('freehold' = 1, 'leasehold' = 2, 'unknown' = 0), addr1 String, addr2 String, street LowCardinality(String), locality LowCardinality(String), town LowCardinality(String), district LowCardinality(String), county LowCardinality(String), category UInt8 ) ENGINE = MergeTree ORDER BY (town, price); INSERT INTO uk_price_paid_oby_town_price SELECT * FROM uk_price_paid; 0 rows in set. Elapsed: 13.033 sec. Processed 52.50 million rows, 2.42 GB (4.03 million rows/s., 185.58 MB/s.)
我们在第二个表上运行查询
SELECT county, price FROM uk_price_paid_oby_town_price WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 ┌─county─────────┬─────price─┐ │ GREATER LONDON │ 594300000 │ │ GREATER LONDON │ 569200000 │ │ GREATER LONDON │ 448500000 │ └────────────────┴───────────┘ 3 rows in set. Elapsed: 0.005 sec. Processed 81.92 thousand rows, 493.76 KB (15.08 million rows/s., 90.87 MB/s.)✎
好多了。
利用索引防止重新排序并启用短路
在我们之前的示例中,ClickHouse 不仅从磁盘流式传输的数据少得多,而且还应用了额外的优化。
查询是
- 过滤 town = ‘London’ 的行
- 按价格降序对匹配的行进行排序
- 获取前 3 行
为此,通常,ClickHouse
- 使用主索引选择可能包含“London”行的块,并将这些行从磁盘流式传输
- 在主内存中按价格对这些行进行排序
- 将前 3 行作为结果流式传输到调用者
但是,由于磁盘上的“London”行已经按价格排序存储(请参阅表 DDL 的 ORDER BY 子句),因此 ClickHouse 可以直接跳过主内存中的重新排序。
并且 ClickHouse 可以进行短路。ClickHouse 所要做的就是以相反的顺序从磁盘流式传输选定的行块,一旦流式传输了三个匹配的(town = ‘London’)行,查询就完成了。
这正是 optimize_read_in_order
优化在这种情况下所做的 - 防止行重新排序并启用短路。
此优化默认启用,当我们通过 EXPLAIN 检查查询的逻辑查询计划时,我们可以在计划底部看到 ReadType: InReverseOrder
EXPLAIN actions = 1 SELECT county, price FROM uk_price_paid_oby_town_price WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 Expression (Projection) Actions: INPUT :: 0 -> price UInt32 : 0 INPUT :: 1 -> county LowCardinality(String) : 1 Positions: 1 0 Limit (preliminary LIMIT (without OFFSET)) Limit 3 Offset 0 Sorting (Sorting for ORDER BY) Prefix sort description: price DESC Result sort description: price DESC Limit 3 Expression (Before ORDER BY) Actions: INPUT :: 0 -> price UInt32 : 0 INPUT :: 1 -> county LowCardinality(String) : 1 Positions: 0 1 Filter (WHERE) Filter column: equals(town, 'LONDON') (removed) Actions: INPUT :: 0 -> price UInt32 : 0 INPUT : 1 -> town LowCardinality(String) : 1 INPUT :: 2 -> county LowCardinality(String) : 2 COLUMN Const(String) -> 'LONDON' String : 3 FUNCTION equals(town :: 1, 'LONDON' :: 3) -> equals(town, 'LONDON') LowCardinality(UInt8) : 4 Positions: 0 2 4 ReadFromMergeTree (default.uk_price_paid_oby_town_price) ReadType: InReverseOrder Parts: 6 Granules: 267 27 rows in set. Elapsed: 0.002 sec.✎
可以通过查询的 SETTINGS 子句禁用 optimize_read_in_order
设置。这将导致 ClickHouse 从磁盘流式传输 217 万行,而不是 81920 行(仍然比全表扫描好)。下图可视化了查询处理步骤的差异
使用具有不同行顺序的第二个表的剩余问题是保持表同步。在下文,我们将讨论针对此问题的便捷解决方案。
(2)预计算聚合
我们执行一个聚合查询,列出英国各县中平均支付价格最高的前三名
SELECT county, avg(price) FROM uk_price_paid GROUP BY county ORDER BY avg(price) DESC LIMIT 3 ┌─county──────────────────────────────┬─────────avg(price)─┐ │ WINDSOR AND MAIDENHEAD │ 383843.17329304793 │ │ BOURNEMOUTH, CHRISTCHURCH AND POOLE │ 383478.9135281004 │ │ GREATER LONDON │ 376911.4824869095 │ └─────────────────────────────────────┴────────────────────┘ 3 rows in set. Elapsed: 0.020 sec. Processed 26.25 million rows, 132.57 MB (1.33 billion rows/s., 6.69 GB/s.)✎
在这种情况下,ClickHouse 正在执行全表扫描,因为查询聚合了所有现有表行。因此,主索引不能用于减少从磁盘流式传输的数据量。
但是,如果我们可以在单独的表中预计算所有现有 130 个英国县的聚合值,并在原始表数据发生更改时更新该单独的表(通过牺牲额外的磁盘空间),我们可以大幅减少从磁盘流式传输的数据量。
下图可视化了这个想法
ClickHouse 有一个便捷的新交钥匙功能来实现这个想法:Projections(投影)!
使用投影来实现(1)和(2)
一个 ClickHouse 表可以有(多个)投影。
投影是一个额外的(隐藏)表,它自动与原始表保持同步。投影可以具有与原始表不同的行顺序(因此主索引也不同),以及自动且增量地预计算聚合值。
当查询以原始表为目标时,ClickHouse 会自动选择(通过对主键进行采样)一个可以生成相同正确结果的表,但需要读取最少量的数据。我们在此处可视化此概念
投影在某种程度上类似于物化视图,物化视图也允许您进行增量聚合和多个行顺序。但与物化视图不同,投影会原子地更新,并在查询时由 ClickHouse 自动选择最佳版本,从而与主表保持一致。
对于原始示例表 uk_price_paid
,我们将创建(并填充)两个投影。
为了在我们的 playground 中保持整洁和简单,我们首先将表 uk_price_paid
复制为 uk_price_paid_with_projections
CREATE TABLE uk_price_paid_with_projections AS uk_price_paid; INSERT INTO uk_price_paid_with_projections SELECT * FROM uk_price_paid; 0 rows in set. Elapsed: 4.410 sec. Processed 52.50 million rows, 2.42 GB (11.90 million rows/s., 548.46 MB/s.)
我们创建并填充投影 prj_oby_town_price
– 一个额外的(隐藏)表,其主索引按城镇和价格排序,以优化查询,该查询列出特定城镇中支付价格最高的县
ALTER TABLE uk_price_paid_with_projections ADD PROJECTION prj_oby_town_price ( SELECT * ORDER BY town, price ); ALTER TABLE uk_price_paid_with_projections MATERIALIZE PROJECTION prj_oby_town_price SETTINGS mutations_sync = 1; 0 rows in set. Elapsed: 6.028 sec.
我们创建并填充投影 prj_gby_county
– 一个额外的(隐藏)表,该表增量预计算所有现有 130 个英国县的 avg(price) 聚合值
ALTER TABLE uk_price_paid_with_projections ADD PROJECTION prj_gby_county ( SELECT county, avg(price) GROUP BY county ); ALTER TABLE uk_price_paid_with_projections MATERIALIZE PROJECTION prj_gby_county SETTINGS mutations_sync = 1; 0 rows in set. Elapsed: 0.123 sec.
请注意,如果在投影中使用了 GROUP BY 子句(如上面的 prj_gby_county
中),则(隐藏)表的底层存储引擎变为 AggregatingMergeTree,并且所有聚合函数都转换为 AggregateFunction。这确保了正确的增量数据聚合。
另外,请注意 强制同步执行的 SETTINGS 子句。
这是主表 uk_price_paid_with_projections
及其两个投影的可视化
如果我们现在运行查询,列出伦敦县中支付价格最高的三项,我们会看到性能上的巨大差异
SELECT county, price FROM uk_price_paid_with_projections WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 ┌─county─────────┬─────price─┐ │ GREATER LONDON │ 594300000 │ │ GREATER LONDON │ 569200000 │ │ GREATER LONDON │ 448500000 │ └────────────────┴───────────┘ 3 rows in set. Elapsed: 0.026 sec. Processed 2.17 million rows, 13.03 MB (83.14 million rows/s., 499.14 MB/s.)✎
同样,对于查询,列出英国各县中平均支付价格最高的前三名
SELECT county, avg(price) FROM uk_price_paid_with_projections GROUP BY county ORDER BY avg(price) DESC LIMIT 3 ┌─county──────────────────────────────┬─────────avg(price)─┐ │ WINDSOR AND MAIDENHEAD │ 398334.9180566017 │ │ GREATER LONDON │ 396401.2740568222 │ │ BOURNEMOUTH, CHRISTCHURCH AND POOLE │ 387441.28323942184 │ └─────────────────────────────────────┴────────────────────┘ 3 rows in set. Elapsed: 0.007 sec.✎
请注意,这两个查询都以原始表为目标,并且在创建两个投影之前,这两个查询都导致了全表扫描(从磁盘流式传输了所有 2764 万行)。
另请注意,列出伦敦县中支付价格最高的三项的查询正在流式传输 217 万行。当我们直接使用针对此查询优化的第二个表时,仅从磁盘流式传输了 81920 行。
造成差异的原因是,目前,上述 optimize_read_in_order
优化不支持投影。
我们检查 system.query_log 表,以查看 ClickHouse 是否自动将两个投影用于上面的两个查询(请参阅下面的 projections 列)
SELECT tables, query, query_duration_ms::String || ' ms' AS query_duration, formatReadableQuantity(read_rows) AS read_rows, projections FROM clusterAllReplicas(default, system.query_log) WHERE (type = 'QueryFinish') AND (tables = ['default.uk_price_paid_with_projections']) ORDER BY initial_query_start_time DESC LIMIT 2 FORMAT Vertical Row 1: ────── tables: ['default.uk_price_paid_with_projections'] query: SELECT county, avg(price) FROM uk_price_paid GROUP BY county ORDER BY avg(price) DESC LIMIT 3 query_duration: 6 ms read_rows: 597.00 projections: ['default.uk_price_paid.prj_gby_county'] Row 2: ────── tables: ['default.uk_price_paid_with_projections'] query: SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 query_duration: 25 ms read_rows: 2.17 million projections: ['default.prj_oby_town_price']
提高查询处理并行度
大多数分析查询都有过滤、聚合和排序阶段。这些阶段中的每一个都可以独立并行化,并且默认情况下将使用与 CPU 核心数一样多的线程,从而为查询充分利用机器资源(因此,在 ClickHouse 中,纵向扩展优于横向扩展)。
发送到 ClickHouse 的查询被转换为物理查询计划,称为查询管道,该管道由在从磁盘流式传输的数据上执行的查询处理阶段组成。
如第一段所述,为了执行查询,ClickHouse 首先使用主索引选择可能包含与查询的 WHERE 子句匹配的行的行块。
所选的行块被划分为 n 个单独的数据范围。
n 取决于 max_threads
设置,该设置默认设置为 ClickHouse 在其运行的机器上看到的 CPU 核心数。
并行地,每个数据范围一个线程将以流式方式从其范围块中逐块读取行。查询管道中的大多数查询处理阶段都由 n 个线程以流式方式并行执行。
我们针对具有 4 个 CPU 核心的 ClickHouse 节点对此进行可视化
通过增加查询的 max_threads
设置,可以提高数据处理的并行度。
我们可以通过 EXPLAIN 检查查询的查询管道
EXPLAIN PIPELINE SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 ┌─explain─────────────────────────────────┐ │ (Expression) │ │ ExpressionTransform │ │ (Limit) │ │ Limit │ │ (Sorting) │ │ MergingSortedTransform 4 → 1 │ │ MergeSortingTransform × 4 │ │ LimitsCheckingTransform × 4 │ │ PartialSortingTransform × 4 │ │ (Expression) │ │ ExpressionTransform × 4 │ │ (ReadFromMergeTree) │ │ MergeTreeThread × 4 0 → 1 │ └─────────────────────────────────────────┘ 13 rows in set. Elapsed: 0.002 sec.✎
可以从下到上读取计划,我们可以看到 4 个并行线程用于从磁盘读取/流式传输选定的行块,并且查询管道中的大多数查询处理阶段都由 4 个线程并行执行。
4 个线程是因为 EXPLAIN 查询在具有 4 个 CPU 核心的 ClickHouse 节点上运行,因此默认情况下 max_threads
设置为 4
SELECT * FROM system.settings WHERE name = 'max_threads' FORMAT Vertical Row 1: ────── name: max_threads value: 4 changed: 0 description: The maximum number of threads to execute the request. By default, it is determined automatically. min: ᴺᵁᴸᴸ max: ᴺᵁᴸᴸ readonly: 0 type: MaxThreads 1 row in set. Elapsed: 0.009 sec.
我们再次检查同一查询的查询管道,该查询现在具有 SETTINGS 子句,将 max_threads
设置增加到 20
EXPLAIN PIPELINE SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 SETTINGS max_threads = 20 ┌─explain──────────────────────────────────┐ │ (Expression) │ │ ExpressionTransform │ │ (Limit) │ │ Limit │ │ (Sorting) │ │ MergingSortedTransform 20 → 1 │ │ MergeSortingTransform × 20 │ │ LimitsCheckingTransform × 20 │ │ PartialSortingTransform × 20 │ │ (Expression) │ │ ExpressionTransform × 20 │ │ (ReadFromMergeTree) │ │ MergeTreeThread × 20 0 → 1 │ └──────────────────────────────────────────┘ 13 rows in set. Elapsed: 0.003 sec.
现在,20 个并行线程用于从磁盘读取/流式传输选定的行块,并且查询管道中的大多数查询处理阶段都由 20 个线程并行执行。
我们使用 4 个线程运行查询(ClickHouse playground 上的 max_threads
的默认设置)
SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 ┌─county─────────┬─────price─┐ │ GREATER LONDON │ 594300000 │ │ GREATER LONDON │ 569200000 │ │ GREATER LONDON │ 448500000 │ └────────────────┴───────────┘ 3 rows in set. Elapsed: 0.070 sec. Processed 27.64 million rows, 40.53 MB (393.86 million rows/s., 577.50 MB/s.)✎
我们使用 20 个线程运行查询
SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 SETTINGS max_threads = 20 ┌─county─────────┬─────price─┐ │ GREATER LONDON │ 594300000 │ │ GREATER LONDON │ 569200000 │ │ GREATER LONDON │ 448500000 │ └────────────────┴───────────┘ 3 rows in set. Elapsed: 0.036 sec. Processed 27.64 million rows, 40.00 MB (765.42 million rows/s., 1.11 GB/s.)
使用 20 个线程,查询运行速度大约快一倍,但请注意,这会增加查询执行将消耗的峰值内存量,因为更多数据将并行流式传输到 ClickHouse 中。
我们通过检查 system.query_log 表来检查两个查询运行的内存消耗
SELECT query, query_duration_ms::String || ' ms' as query_duration, formatReadableSize(memory_usage) as memory_usage, formatReadableQuantity(read_rows) AS read_rows, formatReadableSize(read_bytes) as read_data FROM clusterAllReplicas(default, system.query_log) WHERE type = 'QueryFinish' AND tables = ['default.uk_price_paid'] ORDER BY initial_query_start_time DESC LIMIT 2 FORMAT Vertical Row 1: ────── query: SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 SETTINGS max_threads = 20 query_duration: 35 ms memory_usage: 49.49 MiB read_rows: 27.64 million read_data: 38.15 MiB Row 2: ────── query: SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 query_duration: 69 ms memory_usage: 31.01 MiB read_rows: 27.64 million read_data: 38.65 MiB 2 rows in set. Elapsed: 0.026 sec. Processed 64.00 thousand rows, 5.98 MB (2.46 million rows/s., 230.17 MB/s.)
使用 20 个线程,查询消耗的峰值主内存比使用 4 个线程运行的同一查询多约 40%。
请注意,在高 max_threads
设置下,资源争用和上下文切换可能会成为瓶颈,因此增加 max_threads 设置不会线性扩展。
总结
ClickHouse 查询处理架构针对实时分析进行了优化,并附带使查询处理快速的默认设置。
您可以通过应用上述技术来优化特定查询的性能
- 选择包含您要过滤的字段的行顺序/主键
- 为多个行顺序和增量聚合添加投影
- 提高查询处理并行度
这将最大限度地减少从磁盘流式传输到查询处理引擎的数据量,并加快该数据的流式传输和处理速度。