本月早些时候,来自 Decodable 的 Gunnar Morling 在一月份发起了一项挑战,该挑战引起了广泛关注 - 编写一个 Java 程序,用于从包含 10 亿行文本的文件中检索温度测量值,并计算每个气象站的最低温度、平均温度和最高温度。虽然我们远非 Java 专家,但作为一家既热爱大数据又喜欢速度测试的公司,我们认为应该对这项挑战做出官方的 ClickHouse 回应!
虽然最初的挑战仍然是 Java,但 Gunnar 已经以 Github 讨论的形式开辟了一个 “展示与交流”,以允许更广泛的技术贡献。我们还要感谢我们的社区,他们也对挑战做出了回应。
遵循规则
在回应这项挑战时,我们试图保持 原始挑战的精神。因此,我们已将任何处理时间或数据加载时间都包含在最终提交中。例如,仅提供数据加载到表中的查询响应时间,而不考虑插入时间,感觉有点像……作弊 :)
Gunnar 正在 Hetzner AX161 上执行测试,将执行限制为 8 个核心。尽管我很想仅仅为了互联网挑战而采购一台专用裸机服务器,但最终决定这可能有点过分了。为了尽可能具有可比性,我们的示例使用了具有 8 个核心和 32GB 内存的 Hetzer 虚拟实例(专用 CPU)CCX33。虽然是虚拟实例,但这些实例使用了更新的 AMD EPYC-Milan 处理器,采用 Zen3 架构 - 比 Hetzner AX161 提供的 AMD EPYC-Rome 7502P 处理器更新。
生成(或只是下载)数据
用户可以按照原始说明生成 10 亿行数据集。这需要 Java 21,并且需要一些命令。
在撰写这篇博客时,我发现了 sdkman,它简化了为那些没有现有安装的用户安装 Java 的过程。
但是,生成 13GB 的 measurements.txt 文件非常慢,如下所示
# clone and build generation tool. Output omitted.
git clone [email protected]:gunnarmorling/1brc.git
./mvnw clean verify
./create_measurements.sh 1000000000
Created file with 1,000,000,000 measurements in 395955 ms
出于对 ClickHouse Local 在生成此文件方面的速度比较的好奇,我们探索了 源代码。站点列表及其平均温度被编译到代码中,随机点是通过对均值和方差为 10 的高斯分布进行采样而产生的。将原始站点数据提取到 CSV 并将其托管在 s3 上,使我们能够使用 INSERT INTO FUNCTION FILE
复制此逻辑。请注意,在用随机函数对这些结果进行采样之前,使用了 s3 函数将我们的站点读入 CTE。
INSERT INTO FUNCTION file('measurements.csv', CustomSeparated)
WITH (
SELECT groupArray((station, avg)) FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/1brc/stations.csv')
) AS averages
SELECT
averages[floor(randUniform(1, length(averages)))::Int64].1 as city,
round(averages[floor(randUniform(1, length(averages)))::Int64].2 + (10 * SQRT(-2 * LOG(randCanonical(1))) * COS(2 * PI() * randCanonical(2))), 2) as temperature
FROM numbers(1_000_000_000)
SETTINGS format_custom_field_delimiter=';', format_custom_escaping_rule='Raw'
0 rows in set. Elapsed: 57.856 sec. Processed 1.00 billion rows, 8.00 GB (17.28 million rows/s., 138.27 MB/s.)
Peak memory usage: 36.73 MiB.
速度提高了 6.8 倍,这似乎值得分享!
经验丰富的 ClickHouse 用户可能会在此处使用 randNormal 函数。遗憾的是,这目前需要均值和方差为常数。因此,我们使用 randCanonical 函数,并使用它通过 Muller 变换 对高斯分布进行采样。
或者,用户可以直接从 此处 下载我们生成的文件的 gzip 压缩版本 :)
仅限 ClickHouse Local
虽然许多用户熟悉作为部署在服务器上的实时数据仓库的 ClickHouse,但它也可以用作本地二进制文件“Clickhouse Local”,用于查询文件以进行临时数据分析。自从我们的 博客在一年多前描述了此用例以来,这已成为 ClickHouse 越来越流行的应用。
ClickHouse Local 具有控制台模式(通过运行 clickhouse local
访问),可以从中创建表并提供交互式查询反馈,或者命令行界面,旨在与脚本和外部工具集成。我们使用后者来采样我们的 measurements.txt。设置 `format_csv_delimiter=';" 允许指定 CSV 文件分隔符。
clickhouse local --query "SELECT city, temperature FROM file('measurements.txt', CSV, 'city String, temperature DECIMAL(8,1)') LIMIT 5 SETTINGS format_csv_delimiter=';'"
Mexicali 44.8
Hat Yai 29.4
Villahermosa 27.1
Fresno 31.7
Ouahigouya 29.3
计算每个城市的温度的最小值、最大值和平均值需要一个简单的 GROUP BY
查询。我们利用 -t
来确保包含计时信息。挑战要求输出采用特定格式
{Abha=-23.0/18.0/59.2, Abidjan=-16.2/26.0/67.3, Abéché=-10.0/29.4/69.0, Accra=-10.1/26.4/66.4, Addis Ababa=-23.7/16.0/67.0, Adelaide=-27.8/17.3/58.5, ...}
为了实现这一点,我们可以使用 CustomSeparated 输出格式和 format 函数。这避免了使用任何函数(例如 groupArray)的需要,groupArray 会将行折叠为单行。下面,我们使用 ClickHouse Local 的控制台模式。
SELECT format('{}={}/{}/{}', city, min(temperature), round(avg(temperature), 2), max(temperature))
FROM file('measurements.txt', CSV, 'city String, temperature DECIMAL(8,1)')
GROUP BY city
ORDER BY city ASC
FORMAT CustomSeparated
SETTINGS
format_custom_result_before_delimiter = '{',
format_custom_result_after_delimiter = '}',
format_custom_row_between_delimiter = ', ',
format_custom_row_after_delimiter = '',
format_csv_delimiter = ';'
{Abha=-34.6/18/70.3, Abidjan=-22.8/25.99/73.5, Abéché=-25.3/29.4/80.1, Accra=-25.6/26.4/76.8, Addis Ababa=-38.3/16/67, Adelaide=-33.4/17.31/65.5, …}
413 rows in set. Elapsed: 27.671 sec. Processed 1.00 billion rows, 13.79 GB (36.14 million rows/s., 498.46 MB/s.)
Peak memory usage: 47.46 MiB.
27.6 秒代表我们的基线。这与 Java 基线相比,Java 基线在相同的硬件上需要近 3 分钟才能完成。
./calculate_average_baseline.sh
real 2m59.364s
user 2m57.511s
sys 0m3.372s
提高性能
我们可以通过观察到我们的 CSV 文件不使用值转义来提高上述性能。因此,CSV 读取器是不必要的 - 我们可以简单地将每一行读取为字符串,并使用分隔符 ;
访问相关的子字符串。
SELECT format('{}={}/{}/{}', city, min(temperature), round(avg(temperature), 2), max(temperature))
FROM
(
SELECT
substringIndex(line, ';', 1) AS city,
substringIndex(line, ';', -1)::Decimal(8, 1) AS temperature
FROM file('measurements.txt', LineAsString)
)
GROUP BY city
ORDER BY city ASC FORMAT CustomSeparated
SETTINGS
format_custom_result_before_delimiter = '{',
format_custom_result_after_delimiter = '}',
format_custom_row_between_delimiter = ', ',
format_custom_row_after_delimiter = '',
format_csv_delimiter = ';'
413 rows in set. Elapsed: 19.907 sec. Processed 1.00 billion rows, 13.79 GB (50.23 million rows/s., 692.86 MB/s.)
Peak memory usage: 132.20 MiB.
这使我们的执行时间减少到 20 秒以下!
测试替代方法
我们的 ClickHouse Local 方法对文件执行完整的线性扫描。此处的一种替代方法可能是先将文件加载到表中,然后再对文件运行查询。也许不足为奇,这没有提供真正的性能优势,因为查询实际上对数据执行了第二次扫描。因此,总加载和查询时间超过 19 秒。
CREATE TABLE weather
(
`city` String,
`temperature` Decimal(8, 1)
)
ENGINE = Memory
INSERT INTO weather SELECT
city,
temperature
FROM
(
SELECT
splitByChar(';', line) AS vals,
vals[1] AS city,
CAST(vals[2], 'Decimal(8, 1)') AS temperature
FROM file('measurements.txt', LineAsString)
)
0 rows in set. Elapsed: 21.219 sec. Processed 1.00 billion rows, 13.79 GB (47.13 million rows/s., 650.03 MB/s.)
Peak memory usage: 26.16 GiB.
SELECT
city,
min(temperature),
avg(temperature),
max(temperature)
FROM weather
GROUP BY city
ORDER BY city ASC
SETTINGS max_threads = 8
413 rows in set. Elapsed: 2.997 sec. Processed 970.54 million rows, 20.34 GB (323.82 million rows/s., 6.79 GB/s.)
Peak memory usage: 484.27 KiB.
上述方法具有明显的优势,即允许用户在将数据加载到表中后对数据发出任意查询。请注意,此处我们在经典的 MergeTree 上使用了 Memory 表。鉴于数据集适合内存,并且查询不包含过滤器(因此没有从 MergeTree 的稀疏索引中获益),我们可以使用此引擎类型避免 I/O。
最后,如果我们计算最小值、最大值和平均值的目标查询性能不足,我们可以使用物化视图将此工作转移到插入时。在这种情况下,物化视图 weather_mv
在数据插入时计算我们的统计信息。更具体地说,我们之前的聚合查询在数据插入时对数据块执行,并将结果(实际上是聚合状态)使用 AggregatingMergeTree 表引擎发送到目标表“weather_results”。对此表的查询将利用结果已预先计算的事实,从而显着加快执行时间。
作为优化,接收我们数据的 weather
表可以使用 Null 引擎。这将导致行被丢弃,从而节省内存。
CREATE TABLE weather
(
`city` String,
`temperature` Decimal(8, 1)
)
ENGINE = Null
CREATE TABLE weather_results(
city String,
max AggregateFunction(max, Decimal(8, 1)),
min AggregateFunction(min, Decimal(8, 1)),
avg AggregateFunction(avg, Decimal(8, 1))
) ENGINE = AggregatingMergeTree ORDER BY tuple()
CREATE MATERIALIZED VIEW weather_mv TO weather_results
AS SELECT city, maxState(temperature) as max, minState(temperature) as min, avgState(temperature) as avg
FROM weather
GROUP BY city
INSERT INTO weather SELECT
city,
temperature
FROM
(
SELECT
splitByChar(';', line) AS vals,
vals[1] AS city,
CAST(vals[2], 'Decimal(8, 1)') AS temperature
FROM file('measurements.txt', LineAsString)
)
0 rows in set. Elapsed: 26.569 sec. Processed 2.00 billion rows, 34.75 GB (75.27 million rows/s., 1.31 GB/s.)
我们随后对 weather_results
的查询需要 merge-
函数来合并我们的聚合状态。
SELECT format('{}={}/{}/{}', city, minMerge(min), round(avgMerge(avg), 2), maxMerge(max))
FROM weather_results
GROUP BY city
ORDER BY city ASC
FORMAT CustomSeparated
SETTINGS format_custom_result_before_delimiter = '{', format_custom_result_after_delimiter = '}', format_custom_row_between_delimiter = ', ', format_custom_row_after_delimiter = '', format_csv_delimiter = ';'
413 rows in set. Elapsed: 0.014 sec.
这为我们提供了名义执行时间,其他实验已报告了该时间。但是,当与我们的 26 秒加载时间结合使用时,我们仍然未能击败我们简单的 ClickHouse Local 查询。
结论
我们已经正式回应了十亿行挑战。我们已经证明,ClickHouse Local 在与挑战规则相当的硬件上大约在 19 秒内解决了问题。虽然它不与专门的解决方案竞争,但它只需要几行 SQL 代码。我们要借此机会感谢 Gunnar 为这项挑战投入的思想和时间。