简介
ClickHouse 是一款极速的关系型数据库管理系统,针对实时分析进行了优化。即使查询通常足够快,无需或只需少量调整,我们也相信不应该浪费任何毫秒!在这篇文章中,我们将为您提供一些关于如何加速 ClickHouse 查询并使其比以往任何时候都更快的方法,并使用一些实际示例。
ClickHouse 查询处理入门
ClickHouse 中的表旨在每秒接收数百万行的插入,并存储非常大的(数百 PB)数据量。
ClickHouse 中的快速查询速度通常是通过正确利用表的(稀疏)主键索引来实现的,以便大幅减少 ClickHouse 需要从磁盘读取的数据量,并防止在查询时对数据进行排序,这也可以在使用 LIMIT 子句时启用短路。
ClickHouse 主键索引的设计基于二分查找算法,该算法可以高效地(时间复杂度为 O(log2 n))找到目标值在已排序数组中的位置。
例如,考虑下图中的已排序数组,其中二分查找算法用于查找值 42。该算法将目标值 42 与数组的中间元素进行比较。如果它们不相等,则消除目标值不可能存在的半部分,并在剩余的半部分上继续搜索,再次获取中间元素与目标值进行比较,并重复此操作,直到找到目标值。
使用表的索引在 ClickHouse 表中查找行的工作方式相同。
基于该行顺序,主键索引(类似于上图中的已排序数组)存储表中每8192行的主键列值。
在下图中,我们假设一个表包含一个列“numbers”,该列也是主键列。
ClickHouse 主键索引不是索引单个行,而是索引块行(称为数据块)。
使用这样的主键索引,可以跳过 TB 甚至 PB 的数据,从而在几秒钟(或更短时间)内完成搜索。
下图显示了 ClickHouse 通常如何执行查询。
步骤 1:将涉及表的索引加载到主内存中。
步骤 2:通常通过对索引条目进行二分查找,ClickHouse 选择可能包含与查询 WHERE 子句匹配的行的数据块。
步骤 3:所选数据块并行流式传输到 ClickHouse 查询引擎以进行进一步处理,并将查询结果流式传输到调用方。
ClickHouse 中有三个主要的调整旋钮可以加快此查询执行工作流程。
ClickHouse 需要从磁盘流式传输到主内存的数据越少,查询的执行时间就越快。可以通过 (1) 正确利用主键索引和 (2) 预先计算聚合来最大程度地减少需要从磁盘流式传输的数据量。
可以通过 (3) 提高 ClickHouse 查询处理引擎内部使用的并行级别来加快数据的流式传输和实际处理速度。
(1) 正确利用主键索引
首先让我们看看如何确保充分利用主键索引以确保最佳查询性能。
利用索引最大程度地减少流式传输到 ClickHouse 的数据量
作为运行示例,我们使用来自英国房产价格支付教程的表,该表包含 2764 万行。此数据集在我们的play.clickhouse.com环境中可用。
我们运行一个查询,列出伦敦的三个最高价格的郡。
SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 ┌─county─────────┬─────price─┐ │ GREATER LONDON │ 594300000 │ │ GREATER LONDON │ 569200000 │ │ GREATER LONDON │ 448500000 │ └────────────────┴───────────┘ 3 rows in set. Elapsed: 0.044 sec. Processed 27.64 million rows, 44.21 MB (634.60 million rows/s., 1.01 GB/s.)✎
ClickHouse 正在进行全表扫描!
因为无法正确地为查询利用表的索引。
我们检查表的索引。
SHOW CREATE TABLE uk_price_paid CREATE TABLE default.uk_price_paid ( `price` UInt32, `date` Date, `postcode1` LowCardinality(String), `postcode2` LowCardinality(String), `type` Enum8('other' = 0, 'terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4), `is_new` UInt8, `duration` Enum8('unknown' = 0, 'freehold' = 1, 'leasehold' = 2), `addr1` String, `addr2` String, `street` LowCardinality(String), `locality` LowCardinality(String), `town` LowCardinality(String), `district` LowCardinality(String), `county` LowCardinality(String), `category` UInt8 ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/fb100991-4cae-4a92-995a-1ca11416879e/{shard}', '{replica}') ORDER BY (postcode1, postcode2, addr1, addr2) SETTINGS index_granularity = 8192 1 row in set. Elapsed: 0.001 sec.✎
表的 ORDER BY 子句确定数据在磁盘上的排序方式以及索引条目。ORDER BY 中的列是 postcode1、postcode2、addr1 和 addr2。
请注意,如果未显式定义(即通过 PRIMARY KEY 子句),ClickHouse 会将排序键(由 ORDER BY 子句定义)用作索引。
这对带有 town = 'LONDON'
谓词的查询没有帮助。
如果我们想显著加快筛选特定城镇行的示例查询的速度,则需要使用针对该查询优化的索引。
一种方法是创建第二个表,该表基于不同的索引具有不同的行顺序。
因为表只能在磁盘上具有一个物理顺序,所以我们需要将表数据复制到另一个表中。
我们创建了一个与原始表具有相同模式但具有不同索引的第二个表,并将数据复制到表之间。
CREATE TABLE uk_price_paid_oby_town_price ( price UInt32, date Date, postcode1 LowCardinality(String), postcode2 LowCardinality(String), type Enum8('terraced' = 1, 'semi-detached' = 2, 'detached' = 3, 'flat' = 4, 'other' = 0), is_new UInt8, duration Enum8('freehold' = 1, 'leasehold' = 2, 'unknown' = 0), addr1 String, addr2 String, street LowCardinality(String), locality LowCardinality(String), town LowCardinality(String), district LowCardinality(String), county LowCardinality(String), category UInt8 ) ENGINE = MergeTree ORDER BY (town, price); INSERT INTO uk_price_paid_oby_town_price SELECT * FROM uk_price_paid; 0 rows in set. Elapsed: 13.033 sec. Processed 52.50 million rows, 2.42 GB (4.03 million rows/s., 185.58 MB/s.)
我们在第二个表上运行查询。
SELECT county, price FROM uk_price_paid_oby_town_price WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 ┌─county─────────┬─────price─┐ │ GREATER LONDON │ 594300000 │ │ GREATER LONDON │ 569200000 │ │ GREATER LONDON │ 448500000 │ └────────────────┴───────────┘ 3 rows in set. Elapsed: 0.005 sec. Processed 81.92 thousand rows, 493.76 KB (15.08 million rows/s., 90.87 MB/s.)✎
好多了。
利用索引防止重新排序并启用短路
在我们之前的示例中,ClickHouse 不仅从磁盘流式传输的数据少得多,而且还应用了额外的优化。
查询是
- 筛选 town = ‘London’ 的行
- 按价格降序排列匹配的行
- 获取前 3 行
为此,通常 ClickHouse
- 使用索引选择可能包含“London”行的块并从磁盘流式传输这些行
- 在主内存中按价格对这些行排序
- 将前 3 行作为结果流式传输到调用方
但是,因为磁盘上的“London”行已经按价格排序(请参见表 DDL 的 ORDER BY 子句),所以 ClickHouse 可以跳过在主内存中进行排序。
并且 ClickHouse 可以进行短路。ClickHouse 需要做的就是按相反顺序从磁盘流式传输所选数据块,并且一旦流式传输了三个匹配的 (town = ‘London’) 行,查询就完成了。
这正是optimize_read_in_order
优化在这种情况下所做的 - 防止对行进行重新排序并启用短路。
此优化默认启用,当我们通过EXPLAIN检查查询的逻辑查询计划时,我们可以在计划的底部看到ReadType: InReverseOrder。
EXPLAIN actions = 1 SELECT county, price FROM uk_price_paid_oby_town_price WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 Expression (Projection) Actions: INPUT :: 0 -> price UInt32 : 0 INPUT :: 1 -> county LowCardinality(String) : 1 Positions: 1 0 Limit (preliminary LIMIT (without OFFSET)) Limit 3 Offset 0 Sorting (Sorting for ORDER BY) Prefix sort description: price DESC Result sort description: price DESC Limit 3 Expression (Before ORDER BY) Actions: INPUT :: 0 -> price UInt32 : 0 INPUT :: 1 -> county LowCardinality(String) : 1 Positions: 0 1 Filter (WHERE) Filter column: equals(town, 'LONDON') (removed) Actions: INPUT :: 0 -> price UInt32 : 0 INPUT : 1 -> town LowCardinality(String) : 1 INPUT :: 2 -> county LowCardinality(String) : 2 COLUMN Const(String) -> 'LONDON' String : 3 FUNCTION equals(town :: 1, 'LONDON' :: 3) -> equals(town, 'LONDON') LowCardinality(UInt8) : 4 Positions: 0 2 4 ReadFromMergeTree (default.uk_price_paid_oby_town_price) ReadType: InReverseOrder Parts: 6 Granules: 267 27 rows in set. Elapsed: 0.002 sec.✎
可以通过查询的SETTINGS子句禁用optimize_read_in_order
设置。这将导致ClickHouse从磁盘流式传输217万行,而不是81.92万行(仍然比完整表扫描好)。下图显示了查询处理步骤的差异。
使用具有不同行顺序的第二个表的一个剩余问题是保持表同步。在下面,我们将讨论解决此问题的实用方案。
(2)预计算聚合
我们执行一个聚合查询,列出平均支付价格最高的三个英国郡。
SELECT county, avg(price) FROM uk_price_paid GROUP BY county ORDER BY avg(price) DESC LIMIT 3 ┌─county──────────────────────────────┬─────────avg(price)─┐ │ WINDSOR AND MAIDENHEAD │ 383843.17329304793 │ │ BOURNEMOUTH, CHRISTCHURCH AND POOLE │ 383478.9135281004 │ │ GREATER LONDON │ 376911.4824869095 │ └─────────────────────────────────────┴────────────────────┘ 3 rows in set. Elapsed: 0.020 sec. Processed 26.25 million rows, 132.57 MB (1.33 billion rows/s., 6.69 GB/s.)✎
在这种情况下,ClickHouse正在执行完整表扫描,因为查询聚合所有现有的表行。因此,无法使用主索引来减少从磁盘流式传输的数据量。
但是,如果我们可以在单独的表中预计算所有现有的130个英国郡的聚合值,并在原始表数据发生变化时更新该单独的表,则可以大幅减少从磁盘流式传输的数据量(以牺牲额外的磁盘空间为代价)。
下图显示了这个想法。
ClickHouse有一个方便的新交钥匙功能实现了这个想法:投影!
对(1)和(2)使用投影
ClickHouse表可以有多个投影。
投影是一个额外的(隐藏的)表,它会自动与原始表保持同步。投影可以与原始表具有不同的行顺序(因此具有不同的主索引),还可以自动且增量地预计算聚合值。
当查询针对原始表时,ClickHouse会自动选择(通过对主键进行采样)一个可以生成相同正确结果但需要读取最少数据量的表。我们在这里可视化此概念。
投影有点类似于物化视图,物化视图也允许您拥有增量聚合和多个行顺序。但与物化视图不同,投影以原子方式更新并与主表保持一致,ClickHouse会在查询时自动选择最佳版本。
对于原始示例表uk_price_paid
,我们将创建(并填充)两个投影。
为了在我们的游乐场中保持整洁和简单,我们首先将表uk_price_paid
复制为uk_price_paid_with_projections
。
CREATE TABLE uk_price_paid_with_projections AS uk_price_paid; INSERT INTO uk_price_paid_with_projections SELECT * FROM uk_price_paid; 0 rows in set. Elapsed: 4.410 sec. Processed 52.50 million rows, 2.42 GB (11.90 million rows/s., 548.46 MB/s.)
我们创建并填充投影prj_oby_town_price
- 一个带有主索引的额外(隐藏的)表,按城镇和价格排序,以优化列出特定城镇中支付价格最高的郡的查询。
ALTER TABLE uk_price_paid_with_projections ADD PROJECTION prj_oby_town_price ( SELECT * ORDER BY town, price ); ALTER TABLE uk_price_paid_with_projections MATERIALIZE PROJECTION prj_oby_town_price SETTINGS mutations_sync = 1; 0 rows in set. Elapsed: 6.028 sec.
我们创建并填充投影prj_gby_county
- 一个额外(隐藏的)表,它增量地预计算所有现有130个英国郡的avg(price)聚合值。
ALTER TABLE uk_price_paid_with_projections ADD PROJECTION prj_gby_county ( SELECT county, avg(price) GROUP BY county ); ALTER TABLE uk_price_paid_with_projections MATERIALIZE PROJECTION prj_gby_county SETTINGS mutations_sync = 1; 0 rows in set. Elapsed: 0.123 sec.
请注意,如果在投影中使用了GROUP BY子句(如上面的prj_gby_county
),则(隐藏的)表的底层存储引擎变为AggregatingMergeTree,并且所有聚合函数都转换为AggregateFunction。这确保了正确的增量数据聚合。
另外,请注意SETTINGS子句强制同步执行。
这是主表uk_price_paid_with_projections
及其两个投影的可视化。
如果我们现在运行列出伦敦郡中支付价格最高的三个郡的查询,我们会看到性能的巨大差异。
SELECT county, price FROM uk_price_paid_with_projections WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 ┌─county─────────┬─────price─┐ │ GREATER LONDON │ 594300000 │ │ GREATER LONDON │ 569200000 │ │ GREATER LONDON │ 448500000 │ └────────────────┴───────────┘ 3 rows in set. Elapsed: 0.026 sec. Processed 2.17 million rows, 13.03 MB (83.14 million rows/s., 499.14 MB/s.)✎
同样,对于列出平均支付价格最高的三个英国郡的查询也是如此。
SELECT county, avg(price) FROM uk_price_paid_with_projections GROUP BY county ORDER BY avg(price) DESC LIMIT 3 ┌─county──────────────────────────────┬─────────avg(price)─┐ │ WINDSOR AND MAIDENHEAD │ 398334.9180566017 │ │ GREATER LONDON │ 396401.2740568222 │ │ BOURNEMOUTH, CHRISTCHURCH AND POOLE │ 387441.28323942184 │ └─────────────────────────────────────┴────────────────────┘ 3 rows in set. Elapsed: 0.007 sec.✎
请注意,这两个查询都针对原始表,并且在创建两个投影之前,这两个查询都导致了完整表扫描(所有2764万行都从磁盘流式传输)。
另外,请注意,列出伦敦郡中支付价格最高的三个郡的查询正在流式传输217万行。当我们直接使用针对此查询优化的第二个表时,仅流式传输了81.92万行。
造成差异的原因是,目前,上面提到的optimize_read_in_order
优化不支持投影。
我们检查system.query_log表以查看ClickHouse是否自动为上述两个查询使用了这两个投影(请参阅下面的投影列)。
SELECT tables, query, query_duration_ms::String || ' ms' AS query_duration, formatReadableQuantity(read_rows) AS read_rows, projections FROM clusterAllReplicas(default, system.query_log) WHERE (type = 'QueryFinish') AND (tables = ['default.uk_price_paid_with_projections']) ORDER BY initial_query_start_time DESC LIMIT 2 FORMAT Vertical Row 1: ────── tables: ['default.uk_price_paid_with_projections'] query: SELECT county, avg(price) FROM uk_price_paid GROUP BY county ORDER BY avg(price) DESC LIMIT 3 query_duration: 6 ms read_rows: 597.00 projections: ['default.uk_price_paid.prj_gby_county'] Row 2: ────── tables: ['default.uk_price_paid_with_projections'] query: SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 query_duration: 25 ms read_rows: 2.17 million projections: ['default.prj_oby_town_price']
提高查询处理并行度
大多数分析查询都具有筛选、聚合和排序阶段。这些阶段中的每一个都可以独立地并行化,并且默认情况下会使用与CPU核心一样多的线程,从而利用机器的全部资源来执行查询(因此,在ClickHouse中,首选纵向扩展而不是横向扩展)。
发送到ClickHouse的查询会被转换为物理查询计划,称为查询管道,该管道由在从磁盘流式传输的数据上执行的查询处理阶段组成。
如第一段所述,为了执行查询,ClickHouse首先使用主索引选择可能包含与查询WHERE子句匹配的行的数据块。
所选的数据块被划分为n个独立的数据范围。
n取决于max_threads
设置,该设置默认设置为ClickHouse在其运行的机器上看到的CPU核心数量。
并行地,每个数据范围的一个线程将以流式方式逐块读取其范围中的行。查询管道中的大多数查询处理阶段都由n个线程以流式方式并行执行。
我们为具有4个CPU核心的ClickHouse节点可视化了这一点。
通过增加查询的max_threads
设置,数据处理的并行级别会增加。
我们可以通过EXPLAIN检查查询的查询管道。
EXPLAIN PIPELINE SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 ┌─explain─────────────────────────────────┐ │ (Expression) │ │ ExpressionTransform │ │ (Limit) │ │ Limit │ │ (Sorting) │ │ MergingSortedTransform 4 → 1 │ │ MergeSortingTransform × 4 │ │ LimitsCheckingTransform × 4 │ │ PartialSortingTransform × 4 │ │ (Expression) │ │ ExpressionTransform × 4 │ │ (ReadFromMergeTree) │ │ MergeTreeThread × 4 0 → 1 │ └─────────────────────────────────────────┘ 13 rows in set. Elapsed: 0.002 sec.✎
可以从下到上读取计划,我们可以看到,4个并行线程用于从磁盘读取/流式传输所选数据块,并且查询管道中的大多数查询处理阶段都由4个线程并行执行。
4个线程是因为EXPLAIN查询是在具有4个CPU核心的ClickHouse节点上运行的,因此max_threads
默认设置为4。
SELECT * FROM system.settings WHERE name = 'max_threads' FORMAT Vertical Row 1: ────── name: max_threads value: 4 changed: 0 description: The maximum number of threads to execute the request. By default, it is determined automatically. min: ᴺᵁᴸᴸ max: ᴺᵁᴸᴸ readonly: 0 type: MaxThreads 1 row in set. Elapsed: 0.009 sec.
我们再次检查同一查询的查询管道,该查询现在有一个SETTINGS子句将max_threads
设置增加到20。
EXPLAIN PIPELINE SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 SETTINGS max_threads = 20 ┌─explain──────────────────────────────────┐ │ (Expression) │ │ ExpressionTransform │ │ (Limit) │ │ Limit │ │ (Sorting) │ │ MergingSortedTransform 20 → 1 │ │ MergeSortingTransform × 20 │ │ LimitsCheckingTransform × 20 │ │ PartialSortingTransform × 20 │ │ (Expression) │ │ ExpressionTransform × 20 │ │ (ReadFromMergeTree) │ │ MergeTreeThread × 20 0 → 1 │ └──────────────────────────────────────────┘ 13 rows in set. Elapsed: 0.003 sec.
现在,20个并行线程用于从磁盘读取/流式传输所选数据块,并且查询管道中的大多数查询处理阶段都由20个线程并行执行。
我们使用4个线程运行查询(ClickHouse游乐场中max_threads
的默认设置)。
SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 ┌─county─────────┬─────price─┐ │ GREATER LONDON │ 594300000 │ │ GREATER LONDON │ 569200000 │ │ GREATER LONDON │ 448500000 │ └────────────────┴───────────┘ 3 rows in set. Elapsed: 0.070 sec. Processed 27.64 million rows, 40.53 MB (393.86 million rows/s., 577.50 MB/s.)✎
我们使用20个线程运行查询。
SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 SETTINGS max_threads = 20 ┌─county─────────┬─────price─┐ │ GREATER LONDON │ 594300000 │ │ GREATER LONDON │ 569200000 │ │ GREATER LONDON │ 448500000 │ └────────────────┴───────────┘ 3 rows in set. Elapsed: 0.036 sec. Processed 27.64 million rows, 40.00 MB (765.42 million rows/s., 1.11 GB/s.)
使用20个线程,查询运行速度大约快了一倍,但请注意,这会增加查询执行消耗的峰值内存量,因为更多数据将并行流式传输到ClickHouse中。
我们通过检查system.query_log表来检查这两个查询运行的内存消耗。
SELECT query, query_duration_ms::String || ' ms' as query_duration, formatReadableSize(memory_usage) as memory_usage, formatReadableQuantity(read_rows) AS read_rows, formatReadableSize(read_bytes) as read_data FROM clusterAllReplicas(default, system.query_log) WHERE type = 'QueryFinish' AND tables = ['default.uk_price_paid'] ORDER BY initial_query_start_time DESC LIMIT 2 FORMAT Vertical Row 1: ────── query: SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 SETTINGS max_threads = 20 query_duration: 35 ms memory_usage: 49.49 MiB read_rows: 27.64 million read_data: 38.15 MiB Row 2: ────── query: SELECT county, price FROM uk_price_paid WHERE town = 'LONDON' ORDER BY price DESC LIMIT 3 query_duration: 69 ms memory_usage: 31.01 MiB read_rows: 27.64 million read_data: 38.65 MiB 2 rows in set. Elapsed: 0.026 sec. Processed 64.00 thousand rows, 5.98 MB (2.46 million rows/s., 230.17 MB/s.)
使用20个线程,查询消耗的峰值主内存比使用4个线程运行的相同查询多约40%。
请注意,使用较高的max_threads
设置,资源争用和上下文切换可能会成为瓶颈,因此增加max_threads
设置不会线性扩展。
总结
ClickHouse查询处理架构针对实时分析进行了优化,并带有使查询处理速度更快的默认设置。
您可以通过应用上面描述的技术来优化特定查询的性能。
- 选择包含您要筛选的字段的行顺序/主键。
- 为多个行顺序和增量聚合添加投影。
- 提高查询处理并行级别。
这将最大程度地减少流式传输到查询处理引擎中的数据量,并加快这些数据的流式传输和处理速度。