DoubleCloud 即将停止服务。使用 ClickHouse 并享受限时免费迁移服务。立即联系我们 ->->

博客 / 工程

ClickHouse 和万亿行挑战

author avatar
Dale McDiarmid
2024 年 3 月 5 日

上个月,我们对 来自 Decodable 的 Gunnar Morling 发起的非常成功的 10 亿行挑战赛做出了正式回应。此挑战要求用户编写一个 Java 程序,从包含 10 亿个测量的文本文件中计算每个城市的最低、平均和最高温度。我们对这一挑战的回应显然使用了 ClickHouse,尽管 ClickHouse 不是针对此问题的专门解决方案,但在规则规定的确切硬件配置下,其运行时间约为 19 秒,表现非常出色。

任何有经验的 ClickHouse 用户都知道,对于 ClickHouse 来说,10 亿行数据量非常小,用户经常对万亿级数据集执行聚合操作。因此,当 Dask 最近将挑战扩展到 1 万亿行 时,我们的好奇心被激发了。

在这篇博文中,**我们提交了我们对查询 1 万亿行数据集的尝试 - 在不到 3 分钟的时间内完成了查询,成本仅为 0.56 美元!**虽然我们最初的解决方案使用了中等硬件,但这个更大的挑战在笔记本电脑或相当于小型工作站的机器上执行起来有点棘手。

相反,我们选择使用 AWS 中的竞价型实例,同时旨在在成本和性能之间找到折衷方案。由于 AWS 采用了很大程度上线性的定价模型,因此这是一项简单的任务:我们只需在 AWS 中识别出能够最大程度地缩短查询执行时间的最佳性价比实例即可。使用 Pulumi 脚本,我们可以启动一个 ClickHouse 集群,运行查询,然后关闭资源 - 所有这些只需花费 0.56 美元!

对于那些好奇的人来说,如果将这些数据加载到 MergeTree 中,则可以在 16 秒内回答查询!但这会产生加载时间,我们认为这是一种作弊行为 :)

数据集

Dask 提供的数据结构与最初的挑战一致,包含城市和温度列。正如他们在最初的博文中所讨论的那样,如此规模的数据需要一种提供比 10 亿行挑战赛中使用的 CSV 更好的压缩格式。因此,他们以 Parquet 格式提供数据,存储在以下请求者付费存储桶中,即,下载此数据的用户需要提供 AWS 访问密钥并承担传输费用。

s3://coiled-datasets-rp/1trc

但是,只要确保我们的 ClickHouse 集群部署在与该存储桶相同的区域(us-east-1)中,我们就可以避免数据传输费用,同时优化网络带宽和延迟。

完整数据集由 2.4TiB 的 Parquet 文件组成,以 10 万个文件提供,每个文件包含 1000 万行,大小约为 23-24MiB。

就地查询

可以将经常访问的数据集加载到 ClickHouse 等分析型数据库中以进行快速查询。但对于不经常使用的数据集,有时将其保留在 S3 等“湖仓”中并能够对其进行就地临时分析查询会很有帮助。在 AWS 生态系统中,用户可能熟悉 Amazon Athena 等技术,它允许使用标准 SQL 直接分析 Amazon S3 中的数据。

能够同时查询湖仓并作为实时分析型数据库运行的技术可以被认为是 实时数据仓库

ClickHouse 提供了这两种功能。为了满足我们挑战的要求,我们只需要执行一次此查询。正如我们在 10 亿行挑战赛结果 中提到的,虽然数据加载到 ClickHouse 后最终查询会更快,但实际的加载时间会抵消任何好处。因此,我们利用 ClickHouse 的 s3 函数 来“就地”查询数据。

SELECT
    count() as num_rows,
    uniqExact(_file) as num_files
FROM s3('https://coiled-datasets-rp.s3.us-east-1.amazonaws.com/1trc/measurements-*.parquet', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', headers('x-amz-request-payer' = 'requester'))

┌──────num_rows─┬─num_files─┐
│ 1000000000000100000 │
└───────────────┴───────────┘

作为一个简单的示例,我们可以通过一个简单的查询确认行数和文件总数。

要访问请求者付费存储桶,必须在任何请求中传递一个标头 'x-amz-request-payer' = 'requester'。这可以通过将参数 headers('x-amz-request-payer' = 'requester') 传递给 s3 函数在上述调用中实现。

一个简单的查询

我们之前针对 10 亿行挑战赛的查询包含一些优化,以最大程度地减少字符串格式化并确保数据以正确的格式返回。由于 Parquet 格式,我们不需要进行任何字符串解析。因此,我们的查询是一个简单的 GROUP BY

最大程度地降低 AWS 成本

为了将查询的成本降至最低,我们可以利用 AWS 的竞价型实例。这些实例类型允许您使用备用 EC2 计算资源,您可以通过 AWS API 对其进行竞价(设置最低竞价)。虽然这些实例可能会被中断并被 AWS 重新回收(例如,如果竞价价格超过了出价),但它们的价格最多可以降低 90%。对于当前的竞价价格,我们推荐 Vantage 的 此资源

与按需定价不同,这些竞价价格对于每种实例类型而言并不那么线性。例如,在撰写本文时,具有四个 vCPU 的 c7g.xlarge 每小时价格为 0.0693 美元(比按需价格便宜 50% 以上),这比具有八个内核的 c7g.2xlarge 的价格 0.1157 美元便宜一半以上。EC2 备用容量的供求长期趋势最终决定了价格。

接下来的挑战是最小化任何实例处于活动状态的时间。我们需要

  1. 获取我们的实例并等待它们可用 - 通常大约 30 秒
  2. 安装 ClickHouse 并配置以形成集群
  3. 运行查询
  4. 立即停止实例。

这是需要执行操作的简化版本,没有考虑配置支持的 AWS 资源的需要,包括 VPC、子网、互联网网关和适当的安全组,以便实例可以相互通信。

为了执行 AWS 编排工作并配置 ClickHouse 集群,我们选择了 Pulumi,主要是因为它允许以 Python 编写基础设施供应代码,使其非常易读,并且允许所有工作在一个工具中完成。我们通过 动态资源提供程序 对其进行了扩展,以支持在集群准备就绪后对其进行查询。这使我们能够在一个脚本中完成所有上述工作,此处提供

要复现这些测试,用户只需克隆上述仓库并修改如下所示的 Pulumi 堆栈配置文件,在运行./run.sh之前指定区域、可用区、实例数量及其各自的类型。

config:
 aws:region: us-east-1
 1trc:aws_zone: us-east-1d
 1trc:instance_type: "c7g.4xlarge"
 1trc:number_instances: 16
 # this must exist as a key-pair in AWS
 1trc:key_name: "<key-pair-name>"
 # change as needed
 1trc:cluster_password: "clickhouse_admin"
 # AMD ami (us-east-1)
 1trc:ami: "ami-05d47d29a4c2d19e1"
 # Intel AMI (us-east-1)
 # 1trc:ami: "ami-0c7217cdde317cfec"
 # query to run on the cluster
 1trc:query: "SELECT 1"

此脚本将调用Pulumi up。这会导致实例被供应和配置,并在最后一步执行定义的查询。查询完成后,将发出Pulumi down以拆除基础设施,并报告最终时间。

我们必须强调,此脚本绝不能替代用于生产 ClickHouse 集群的正确供应代码。考虑到实例的短暂性(少于 5 分钟),我们做出了一些妥协,但这在生产集群中将受到严重限制。具体来说

  • 我们没有供应任何存储,因为我们查询 S3 中的原位数据。
  • 虽然访问权限仅限于运行 Pulumi 代码的 IP,但集群通过端口 8213 在 HTTP 上公开,因此我们可以运行所需的查询。在集群存活期间,这代表着最小的风险,但在生产环境中不建议缺少 SSL。我们的实例间通信网络设置也过于宽松。
  • 集群的配置极其有限,没有考虑修改、扩展或升级集群的需要。我们的 ClickHouse 配置也极简且不可自定义。
  • 未进行任何测试,也没有考虑可扩展性。
  • 我们没有处理 AWS 上使用竞价型实例时始终可能发生的潜在抢占中断。我们还假设了一组同构的实例类型(这违反了AWS 最佳实践),因为这使得推理和测试更容易。

尽管有限,但上述脚本确实允许我们以低成本启动 ClickHouse 集群,运行查询并立即销毁基础设施。此短暂集群部署可用于运行那些笔记本电脑在计算上可能无法触及的查询。例如,要运行 32 个c7g.4xlarge(16 核,32GiB 内存)实例 5 分钟,我们将支付大约0.2441 * (5/60) * 32 = $0.65 - 使用此数量的资源,ClickHouse 可以查询 S3 中大量数据!

选择 AWS 实例类型

在执行任何测试和查询调优之前,我们希望确定一种实例类型,主要目的是避免一组详尽的测试,这将非常耗时。我们基于简单的推理,旨在在不花费大量资源寻找完美的硬件配置的情况下产生有竞争力的结果。

作为简单的聚合查询,其中站点具有相对较低的基数(1000 个),我们怀疑此查询将受 CPU 或网络限制。除了在 AWS 中比 Intel/AMD 等效产品更便宜外,ARM 处理器在我们的公开基准测试(请参阅 c7g)和测试。例如,如果我们考虑最新系列的 c7 实例(对于 4xlarge 大小(16 个 vCPU))

Markdown Image

此处的 c7g 代表我们最具成本效益的选择,尤其是在我们不需要 c7gd 提供的额外存储时。c7gn 虽然具有相同的 ARM 处理器,但确实提供了额外的网络带宽,这在非常大的实例大小上可能需要,以避免其成为瓶颈。但是,额外的网络带宽需要付出代价——竞价型价格的 1.5 倍!

因此,我们关于实例类型的决定归结为 c7g 或 c7gn。如果我们横向扩展并且不使用网络可能成为瓶颈的较大实例,那么从经济角度来看,c7g 最有意义。

我们还考虑了略便宜的c6g 实例。根据我们的经验,这些实例更难获得并且网络接口速度较慢,但它们可能值得在详尽的评估过程中考虑。

作为经验丰富的 ClickHouse 用户,我们非常有信心能够垂直扩展性能。因此,我们选择特定 c7g 实例的标准归结为每个 vCPU 的价格和可用性。对 c7g 的快速分析表明,较大的实例类型通常在竞价型市场上更具价格优势。

Markdown Image

实例类型每小时平均价格(美元)vCPU每个 vCPU 的成本
c7g.xlarge0.069640.0174
c7g.2xlarge0.11680.0145
c7g.4xlarge0.2436160.015225
c7g.8xlarge0.522320.0163125
c7g.12xlarge0.7134480.0148625
c7g.16xlarge0.928640.0145

这些价格代表平均值。价格因可用区而异。对于我们的最终测试,我们使用当前最便宜的可用区。这可以通过AWS 控制台确定。

虽然c7g.16xlarge代表了最具成本效益的选择,但在测试期间我们难以获得大量此类实例——尽管它们似乎具有类似的可用性。我们发现c7g.12xlarge更容易供应,并且价格差异很小。

优化查询设置

选择 c7g.12xlarge 作为我们的实例类型后,我们希望确保查询设置是最佳的,以最大程度地减少查询运行时间。对于我们的性能测试,我们使用单个实例并使用模式measurements-1*.parquet查询数据子集。如下所示,这大约针对 1110 亿行,并提供 454 秒的初始响应时间。

SELECT station, min(measure), max(measure), round(avg(measure), 2)
FROM s3('https://coiled-datasets-rp.s3.us-east-1.amazonaws.com/1trc/measurements-1*.parquet', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', headers('x-amz-request-payer' = 'requester'))
GROUP BY station
ORDER BY station ASC
FORMAT `Null`

0 rows in set. Elapsed: 454.770 sec. Processed 111.11 billion rows, 279.18 GB (244.32 million rows/s., 613.88 MB/s.)
Peak memory usage: 476.72 MiB.

上面使用FORMAT Null,以便不打印结果,并且我们可以轻松识别实际的查询时间。由于响应很小,我们认为与完整的查询处理相比,此开销可以忽略不计。

您可以立即看到,在大约 615 MB/s 时,我们没有受到网络限制(实例支持 22.5Gbps)。我们使用ClickHouse 仪表盘评估了集群的资源利用率。这里有三个明显的可视化效果:查询使用的 CPU 内核数量、来自 S3 的读取性能以及 S3 读取等待时间。

Markdown Image

此查询使用的 CPU 内核数量约为 14-15 个(与 ClickHouse 客户端在执行期间报告的一致),远小于可用的 48 个。在进行任何线程调整之前,我们花了一些时间评估为什么会这样。S3 读取等待时间表明我们发出了过多的读取请求(通过查询跟踪证实)。快速浏览我们的文档说明

“对于 s3 函数和表,并行下载由值max_download_threadsmax_download_buffer_size确定。只有当文件大小大于所有线程组合的总缓冲区大小时,文件才会并行下载。这仅在版本 > 22.3.1 上可用。”

parquet 文件大约 25MB,max_download_buffer_size默认设置为 10MiB。考虑到此查询的简单性(它只是读取并执行一个简单的聚合),我们知道可以安全地将此缓冲区大小增加到 50 MB(max_download_buffer_size=52428800),目的是确保每个文件由单个线程下载。这将减少每个线程花费在进行 S3 调用的时间,从而降低 S3 等待时间。

将缓冲区大小增加到 50MiB

Markdown Image

我们可以看到,这将我们的 S3 读取等待时间减少了约 20%,同时将我们的读取吞吐量提高到超过 920 MB/秒。随着等待时间的减少,我们更有效地利用了线程,将 CPU 利用率提高到大约 23 个内核。所有这些都有助于将我们的执行时间从 486 秒减少到 303 秒。

0 rows in set. Elapsed: 303.462 sec. Processed 111.11 billion rows, 279.18 GB (366.14 million rows/s., 919.97 MB/s.)
Peak memory usage: 534.87 MiB.

在确认进一步增加缓冲区大小没有带来任何好处(正如预期的那样),并且鉴于在 920MB/s 时,我们的实例肯定没有受到网络限制,我们考虑了如何进一步提高 CPU 利用率。

默认情况下,此查询的大多数阶段将在每个节点上以 48 个线程运行,这已通过EXPLAIN PIPELINE确认

EXPLAIN PIPELINE
SELECT
	station,
	min(measure),
	max(measure),
	round(avg(measure), 2)
FROM s3('https://coiled-datasets-rp.s3.us-east-1.amazonaws.com/1trc/measurements-1*.parquet', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', headers('x-amz-request-payer' = 'requester'))
GROUP BY station
ORDER BY station ASC

┌─explain─────────────────────────────────────┐
│ (Expression)                            	  │
│ ExpressionTransform                     	  │
│   (Sorting)                             	  │
│   MergingSortedTransform 481         	  │
│ 	MergeSortingTransform × 48          	  │
│   	LimitsCheckingTransform × 48      	  │
│     	PartialSortingTransform × 48    	  │
│       	(Expression)                  	  │
│       	ExpressionTransform × 48      	  │
│         	(Aggregating)               	  │
│         	Resize 4848              	  │
│           	AggregatingTransform × 48 	  │
│             	StrictResize 4848    	  │
│               	(Expression)          	  │
│               	ExpressionTransform × 48  │
│                 	(ReadFromStorageS3Step)   │
│                 	S3 × 48 01       	  │
└─────────────────────────────────────────────┘

17 rows in set. Elapsed: 0.305 sec.

这默认为每个节点的 vCPU 数量,并且对于大多数查询来说是一个合理的默认值。在我们的例子中,鉴于任务的读取密集型特性,增加它是有意义的。

理想情况下,我们希望能够只增加此测试中的下载线程数。目前,ClickHouse 中没有此设置,但我们正在考虑

虽然我们没有进行详尽的测试,但一些简单的测试表明将max_threads增加到128的好处。

max_threads_vs_latency.png

以上代表3次执行的平均值。

这也有了不错的改进,将总运行时间缩短至138秒。

0 rows in set. Elapsed: 138.820 sec. Processed 111.11 billion rows, 279.18 GB (800.39 million rows/s., 2.01 GB/s.)
Peak memory usage: 1.36 GiB.

回到我们的高级仪表板,我们可以看到这将我们的读取吞吐量提高到约2 GiB/s(仍然不受网络限制),并利用了48个内核中的46个。将max_threads进一步增加到超过128会导致CPU争用和CPU等待时间,从而降低整体吞吐量。

optimal_single_node_1trillion.png

此时,我们很满意我们受CPU限制,并且不认为进一步调整会带来明显的收益;我们的最终查询时间为138秒,或者大约2分钟即可查询所有1110亿行。

单个实例显然不足以让我们在数据集大小几乎是其10倍的情况下获得合理的运行时间。但是,我们确信单个c7g.12xlarge不会受网络限制,并且使用合理的查询设置,我们可以水平扩展并解决我们更大的1万亿行问题。

当我们在集群中运行查询时,这些相同的设置可以传递到每个节点,以及要处理的数据子集,从而优化每个节点的性能。

使用集群

要查询集群并使用所有节点,我们必须稍微更改一下查询,使用s3Cluster而不是s3函数。这使我们能够使用集群中的所有节点来处理查询。

与仅在接收节点上执行查询的标准s3函数不同,此函数①首先执行存储桶列表。协调节点使用此列表将②要处理的文件发送到集群中的每个节点,从而允许并行化③工作。然后,协调节点④收集集群中每个节点的中间结果,以⑤提供最终响应。我们在下面对此进行了可视化。

s3cluster.png

我们的最终查询

SELECT station,
	min(measure),
	max(measure),
	round(avg(measure), 2)
FROM s3Cluster('default', 'https://coiled-datasets-rp.s3.us-east-1.amazonaws.com/1trc/measurements-*.parquet', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', headers('x-amz-request-payer' = 'requester'))
GROUP BY station
ORDER BY station ASC
FORMAT `Null`
SETTINGS max_download_buffer_size = 52428800, max_threads = 110

在上面的示例中,与所有ClickHouse Cloud集群一样,我们将集群命名为“default”。

最终时间和成本

虽然查询时间应随着我们部署的实例数量线性减少,从而保持成本不变,但我们也受到可以实际配置的实例数量的限制。AWS 通常建议使用混合实例类型和EC2扩展组,而不是我们的同构扩展方法。我们当前的脚本还将我们限制在单个可用区(尽管这是一个明显的改进,但它使代码变得稍微复杂了一些)。一些测试表明可以可靠地获取8个实例,这为我们提供了总共384个vCPU。因此,我们的最终配置为

config:
  aws:region: us-east-1
  1trc:aws_zone: us-east-1b
  1trc:instance_type: "c7g.12xlarge"
  1trc:number_instances: 8
  1trc:key_name: "dalem"
  1trc:cluster_password: "a_super_password"
  # AMD ami (us-east-1)
  1trc:ami: "ami-05d47d29a4c2d19e1"
  1trc:query: "SELECT station, min(measure), max(measure), round(avg(measure), 2) FROM s3Cluster('default','https://coiled-datasets-rp.s3.us-east-1.amazonaws.com/1trc/measurements-*.parquet', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', headers('x-amz-request-payer' = 'requester')) GROUP BY station ORDER BY station ASC SETTINGS max_download_buffer_size = 52428800, max_threads=128"

请注意,我们使用us-east-1b作为我们的可用区。在撰写本文时,这是最便宜的区域,每个实例每小时0.7162美元(比按需实例节省58.84%)。

使用我们的最佳设置,我们可以运行我们的运行脚本。

./run.sh

(venv) dalemcdiarmid@PY aws-starter % ./run.sh
Updating (dev)

View in Browser (Ctrl+O): https://app.pulumi.com/clickhouse/aws-starter/dev/updates/88

 	Type                           	Name                         	Status          	Info
 +   ├─ aws:ec2:Instance            	1trc_node_4                  	created (17s)
 +   ├─ aws:ec2:Instance            	1trc_node_2                  	created (17s)
 +   ├─ aws:ec2:Instance            	1trc_node_7                  	created (18s)

…

 +   └─ pulumi-python:dynamic:Resource  1trc-clickhouse-query        	created (181s)

Diagnostics:
  pulumi:pulumi:Stack (aws-starter-dev):
	info: checking cluster is ready...
	info: cluster is ready!
	info: running query...
	info: query took 178.94s ← query time!

Resources:
	+ 80 created

Duration: 5m10s ← startup time + query time!

…

Destroying (dev)

View in Browser (Ctrl+O): https://app.pulumi.com/clickhouse/aws-starter/dev/updates/89

 	Type                   	Name                   	Status
 	pulumi:pulumi:Stack    	aws-starter-dev
 -   ├─ command:remote:Command  restart_node_3_clickhouse  deleting (26s)...
 -   ├─ command:remote:Command  restart_node_2_clickhouse  deleting (26s)...
 -   ├─ command:remote:Command  restart_node_4_clickhouse  deleting (26s)...
 -   ├─ command:remote:Command  restart_node_5_clickhouse  deleting (26s)...

…

Resources:
	- 80 deleted

Duration: 2m58s ← time to remove resources

The resources in the stack have been deleted, but the history and configuration associated with the stack are still maintained.
If you want to remove the stack completely, run `pulumi stack rm dev`.
Total time: 499 seconds

从输出中,我们可以看到我们的查询在180秒或3分钟内完成。但是,我们的集群大约需要2分钟10秒才能配置并可供查询,然后在终止之前再花费3分钟。因此,我们的总运行时间为499秒。

我们可以用此来计算我们的最终成本。

每个实例的成本:0.7162美元

实例数量:8

活动时间:499秒

成本:(499/3600) * 8 * 0.7162 = 0.79美元

这仅代表成本估计。有几个因素可能导致发生的成本不同——主要是所有实例并非在全部499秒内都处于活动状态。通过订阅Spot 实例数据馈送,可以获得准确的成本。这会将与竞价型实例相关的成本发送到S3存储桶以供以后检索。检查我们的测试运行的统计信息,我们可以看到每个实例的实际发生成本。

Markdown Image

将这些加起来得到我们查询1万亿行的总价格。

(0.0708400000+0.0700529676+0.0704461296+0.0702499028+0.0702499028+0.0696590972+0.0706430648+0.0706430648) = $0.56

最终想法

值得注意的是,虽然更多实例会缩短查询时间,但它们的配置需要更长时间,可能会导致更多成本。因此,在水平扩展之前进行垂直扩展是有意义的,因为这占主导地位运行时间。但是,如果您正在运行多个查询,则此时间会被抵消,因为这是一次性成本!

作为一名声称自己不是Pulumi专家的用户,我也怀疑配置(和终止代码)可以得到改进,以最大程度地减少启动和关闭时间。我们一如既往地欢迎贡献!

在开发这种方法时,我们没有严格遵守AWS最佳实践关于竞价型实例。此代码的改进版本可以使用异构实例类型,甚至利用EC2自动扩展组允许用户只需指定所需的内核数量!预安装ClickHouse的AMI可以进一步减少启动时间,而AWS区域的灵活性可能会更容易扩展。

敏锐的读者会注意到我们没有为我们的竞价型实例配置出价。我们只是接受了当前小时的竞价价格。如果您确实想节省几美分,则有空间进一步降低此价格!

使用MergeTree

最后,我们很好奇这些数据在MergeTree表引擎中查询的速度有多快。虽然将数据加载到表中也需要一些时间(在300核ClickHouse Cloud集群上为385秒),但使用完整扫描查询生成的表仅需16.5秒,并且比查询parquet文件的任何结果快得多(相同的集群在48秒内完成查询)!请注意,这样的集群可以通过ClickHouse Cloud API轻松部署,用于运行查询,然后立即终止。感谢Alexey Milovidov提供这些时间数据。

结论

扩展十亿行挑战,我们展示了ClickHouse如何允许在不到3分钟的时间内以0.56美元的价格查询更大的1万亿数据集!!

我们欢迎您提出建议或替代方案,以更快、更经济高效地查询此数据集。

立即开始使用ClickHouse Cloud并获得300美元的积分。在您的30天试用期结束时,继续使用按需付费计划,或联系我们以了解更多关于我们基于容量的折扣信息。访问我们的定价页面了解详情。

分享此文章

订阅我们的新闻通讯

随时了解功能发布、产品路线图、支持和云产品!
加载表单…
关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image