DoubleCloud 即将结束。利用限时免费迁移服务迁移到 ClickHouse。立即联系我们 ->->

博客 / 工程

ClickHouse 和十亿行挑战

author avatar
戴尔·麦克迪亚米德
2024 年 1 月 23 日

1_billion_row_challenge_clickhouse.png

本月初,来自 Decodable 的 Gunnar Morling 发起了一项 1 月份的挑战,引起了广泛关注——编写一个 Java 程序,从 10 亿行文本文件中检索温度测量值,并计算每个气象站的最小温度、平均温度和最大温度。虽然我们离 Java 专家还差得很远,但作为一个既热爱大数据又喜欢速度测试的公司,我们想出一个针对这项挑战的 ClickHouse 官方解决方案!

虽然最初的挑战是在 Java 中进行的,但 Gunnar 在 Github 讨论中开设了一个 "展示与讲述",以允许更广泛的技术贡献。我们还要感谢我们的社区,他们 也对这项挑战做出了回应

遵循规则

在应对这项挑战时,我们试图保持 最初挑战的精神。因此,我们在最终提交中包含了任何处理时间或数据加载时间。例如,只提供查询的响应时间(数据加载到表中后,不考虑插入时间),感觉有点像……嗯,作弊 :)。

Gunnar 在 Hetzner AX161 上执行测试,将执行限制为 8 个核心。尽管我很想专门为一个互联网挑战采购一台专用裸机服务器,但最后决定这样做可能有点过分。为了尽可能进行比较,我们的示例使用 Hetzer 虚拟实例(专用 CPU),带有 CCX33,具有 8 个核心和 32GB 内存。尽管是虚拟实例,但这些实例使用的是更新的 AMD EPYC-Milan 处理器,采用 Zen3 架构——比 Hetzner AX161 提供的 AMD 的 EPYC-Rome 7502P 处理器更新。

生成(或直接下载)数据

用户可以按照 最初的说明生成 10 亿行数据集。这需要 Java 21 并需要一些命令。

在编写这篇博客时,我发现 sdkman,它简化了 Java 的安装过程,对于那些没有现有安装程序的用户来说非常有用。

然而,生成 13GB 的 measurements.txt 文件速度非常慢,如下所示

# clone and build generation tool. Output omitted.
git clone [email protected]:gunnarmorling/1brc.git
./mvnw clean verify
./create_measurements.sh 1000000000

Created file with 1,000,000,000 measurements in 395955 ms

出于对 ClickHouse Local 在生成此文件方面的速度的兴趣,我们探索了 源代码。气象站列表及其平均温度被编译到代码中,随机点通过对具有 10 的平均值和方差 的高斯分布进行采样来生成。将原始气象站数据提取到 CSV 并将其托管在 s3 上,允许我们使用 INSERT INTO FUNCTION FILE 来复制此逻辑。请注意,在使用随机函数对这些结果进行采样之前,我们使用 s3 函数将我们的气象站读入 CTE。

INSERT INTO FUNCTION file('measurements.csv', CustomSeparated)
WITH (
	SELECT groupArray((station, avg)) FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/1brc/stations.csv')
) AS averages
SELECT
    	averages[floor(randUniform(1, length(averages)))::Int64].1 as city,
    	round(averages[floor(randUniform(1, length(averages)))::Int64].2 + (10 * SQRT(-2 * LOG(randCanonical(1))) * COS(2 * PI() * randCanonical(2))), 2) as temperature
FROM numbers(1_000_000_000) 
SETTINGS format_custom_field_delimiter=';', format_custom_escaping_rule='Raw'

0 rows in set. Elapsed: 57.856 sec. Processed 1.00 billion rows, 8.00 GB (17.28 million rows/s., 138.27 MB/s.)
Peak memory usage: 36.73 MiB.

以 6.8 倍的速度,这似乎值得分享!

经验丰富的 ClickHouse 用户可能会在这里使用 randNormal 函数。不幸的是,这需要平均值和方差目前是常数。因此,我们使用 randCanonical 函数并使用它来使用 Muller 变换 对高斯分布进行采样。

或者,用户可以直接从 这里 下载我们生成文件的 gzip 压缩版本 :)。

仅 ClickHouse Local

虽然许多用户熟悉 ClickHouse 作为部署在服务器上的实时数据仓库,但它也可以用作本地二进制文件 "Clickhouse Local",用于对文件进行临时数据分析。自从我们 博客在一年多前介绍了这种用例 以来,这已成为 ClickHouse 的一个越来越流行的应用。

ClickHouse Local 具有控制台模式(可以通过运行 clickhouse local 访问),可以从中创建表并提供交互式查询反馈,或者具有命令行界面,旨在与脚本和外部工具集成。我们使用后者来对我们的 measurements.txt 进行采样。设置 format_csv_delimiter=';" 允许指定 CSV 文件分隔符。

clickhouse local --query "SELECT city, temperature FROM file('measurements.txt', CSV, 'city String, temperature DECIMAL(8,1)') LIMIT 5 SETTINGS format_csv_delimiter=';'"
Mexicali    44.8
Hat Yai    29.4
Villahermosa    27.1
Fresno    31.7
Ouahigouya    29.3

计算每个城市的温度的最小值、最大值和平均值需要一个简单的 GROUP BY 查询。我们使用 -t 来确保包含时间信息。挑战要求以特定格式输出

{Abha=-23.0/18.0/59.2, Abidjan=-16.2/26.0/67.3, Abéché=-10.0/29.4/69.0, Accra=-10.1/26.4/66.4, Addis Ababa=-23.7/16.0/67.0, Adelaide=-27.8/17.3/58.5, ...} 

为了实现这一点,我们可以使用 CustomSeparated 输出格式和 format 函数。这避免了使用任何函数(例如 groupArray)的需要,这些函数将行折叠为单行。下面,我们使用 ClickHouse Local 的控制台模式。

SELECT format('{}={}/{}/{}', city, min(temperature), round(avg(temperature), 2), max(temperature))
FROM file('measurements.txt', CSV, 'city String, temperature DECIMAL(8,1)')
GROUP BY city
ORDER BY city ASC
FORMAT CustomSeparated
SETTINGS 
  format_custom_result_before_delimiter = '{', 
  format_custom_result_after_delimiter = '}', 
  format_custom_row_between_delimiter = ', ', 
  format_custom_row_after_delimiter = '', 
  format_csv_delimiter = ';'

{Abha=-34.6/18/70.3, Abidjan=-22.8/25.99/73.5, Abéché=-25.3/29.4/80.1, Accra=-25.6/26.4/76.8, Addis Ababa=-38.3/16/67, Adelaide=-33.4/17.31/65.5, …}

413 rows in set. Elapsed: 27.671 sec. Processed 1.00 billion rows, 13.79 GB (36.14 million rows/s., 498.46 MB/s.)
Peak memory usage: 47.46 MiB.

27.6s 代表我们的基线。这与 Java 基线相比,在相同的硬件上完成需要近 3 分钟。

./calculate_average_baseline.sh

real    2m59.364s
user    2m57.511s
sys    0m3.372s

提高性能

我们可以通过观察我们的 CSV 文件没有使用值转义来提高上述性能。因此,CSV 读取器是不必要的——我们可以简单地将每行作为字符串读取,并使用分隔符 ; 访问相关的子字符串。

SELECT format('{}={}/{}/{}', city, min(temperature), round(avg(temperature), 2), max(temperature))
FROM
(
	SELECT
    	substringIndex(line, ';', 1) AS city,
   	substringIndex(line, ';', -1)::Decimal(8, 1) AS temperature
	FROM file('measurements.txt', LineAsString)
)
GROUP BY city
ORDER BY city ASC FORMAT CustomSeparated
SETTINGS 
  format_custom_result_before_delimiter = '{', 
  format_custom_result_after_delimiter = '}', 
  format_custom_row_between_delimiter = ', ', 
  format_custom_row_after_delimiter = '', 
  format_csv_delimiter = ';'

413 rows in set. Elapsed: 19.907 sec. Processed 1.00 billion rows, 13.79 GB (50.23 million rows/s., 692.86 MB/s.)
Peak memory usage: 132.20 MiB.

这将我们的执行时间缩短到 20 秒以下!

测试替代方法

我们的 ClickHouse Local 方法对文件执行完整的线性扫描。这里的一种替代方法可能是先将文件加载到表中,然后再对文件运行查询。也许不出所料,这没有带来任何真正的性能优势,因为查询实际上是对数据进行了第二次扫描。因此,总的加载和查询时间超过了 19 秒。

CREATE TABLE weather
(
	`city` String,
	`temperature` Decimal(8, 1)
)
ENGINE = Memory

INSERT INTO weather SELECT
	city,
	temperature
FROM
(
	SELECT
    	splitByChar(';', line) AS vals,
    	vals[1] AS city,
    	CAST(vals[2], 'Decimal(8, 1)') AS temperature
	FROM file('measurements.txt', LineAsString)
)

0 rows in set. Elapsed: 21.219 sec. Processed 1.00 billion rows, 13.79 GB (47.13 million rows/s., 650.03 MB/s.)
Peak memory usage: 26.16 GiB.

SELECT
	city,
	min(temperature),
	avg(temperature),
	max(temperature)
FROM weather
GROUP BY city
ORDER BY city ASC
SETTINGS max_threads = 8
413 rows in set. Elapsed: 2.997 sec. Processed 970.54 million rows, 20.34 GB (323.82 million rows/s., 6.79 GB/s.)
Peak memory usage: 484.27 KiB.

请注意,我们在这里使用的是 Memory 表而不是经典的 MergeTree。鉴于数据集适合内存,查询不包含任何过滤器(因此不从 MergeTree 的稀疏索引中受益),我们可以使用这种引擎类型避免 I/O。

上述方法具有明显的优势,允许用户在数据加载到表中后对数据发出任意查询。

最后,如果我们的目标查询计算最小值、最大值和平均值的性能不够好,我们可以使用 物化视图 将这项工作移至插入时。在这种情况下,物化视图 weather_mv 会在数据插入时计算我们的统计信息。更准确地说,我们之前的聚合查询会在插入的数据块上执行,并将结果(实际上是 聚合状态)通过 AggregatingMergeTree 表引擎发送到目标表 "weather_results"。对该表的查询将利用预先计算的结果,从而显著提高执行速度。

作为优化,接收我们数据的 weather 表可以使用 Null 引擎。这会导致行被丢弃,从而节省内存。

CREATE TABLE weather
(
    `city` String,
    `temperature` Decimal(8, 1)
)
ENGINE = Null

CREATE TABLE weather_results(
	city String,
	max AggregateFunction(max, Decimal(8, 1)),
	min AggregateFunction(min, Decimal(8, 1)),
	avg AggregateFunction(avg, Decimal(8, 1))
) ENGINE = AggregatingMergeTree ORDER BY tuple()

CREATE MATERIALIZED VIEW weather_mv TO weather_results
AS SELECT city, maxState(temperature) as max, minState(temperature) as min, avgState(temperature) as avg
FROM weather
GROUP BY city

INSERT INTO weather SELECT
	city,
	temperature
FROM
(
	SELECT
    	splitByChar(';', line) AS vals,
    	vals[1] AS city,
    	CAST(vals[2], 'Decimal(8, 1)') AS temperature
	FROM file('measurements.txt', LineAsString)
)

0 rows in set. Elapsed: 26.569 sec. Processed 2.00 billion rows, 34.75 GB (75.27 million rows/s., 1.31 GB/s.)

我们随后对 weather_results 的查询需要使用 merge- 函数来组合我们的聚合状态。

SELECT format('{}={}/{}/{}', city, minMerge(min), round(avgMerge(avg), 2), maxMerge(max))
FROM weather_results
GROUP BY city
ORDER BY city ASC
FORMAT CustomSeparated
SETTINGS format_custom_result_before_delimiter = '{', format_custom_result_after_delimiter = '}', format_custom_row_between_delimiter = ', ', format_custom_row_after_delimiter = '', format_csv_delimiter = ';'

413 rows in set. Elapsed: 0.014 sec.

这给我们提供了标称执行时间,这已经在 其他实验中报告。然而,当结合我们 26 秒的加载时间时,我们仍然无法击败简单的 ClickHouse 本地查询。

结论

我们已经正式化了我们对十亿行挑战的回应。我们已经证明,ClickHouse 本地在与挑战规则相似的硬件上,可以在大约 19 秒内解决问题。虽然它在竞争中不敌专门的解决方案,但它只需要几行 SQL 代码。我们想借此机会感谢 Gunnar 为此挑战投入的思考和时间。

立即开始使用 ClickHouse Cloud 并获得 300 美元的积分。在 30 天试用期结束后,您可以继续使用按需付费计划,或者 联系我们 了解有关我们基于容量的折扣的更多信息。访问我们的 定价页面 了解更多详情。

分享此文章

订阅我们的通讯

随时了解功能发布、产品路线图、支持和云服务!
正在加载表单...
关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image