简介
Parquet 自 2013 年发布 作为 Hadoop 的列式存储以来,几乎已无处不在,成为一种文件交换格式,可提供高效的存储和检索。这种采用使其成为更新的数据湖格式的基础,例如 Apache Iceberg。在本博客系列中,我们将探讨如何在深入了解 Parquet 之前使用 ClickHouse 读取和写入此格式。对于更有经验的 Parquet 用户,我们还将讨论用户在使用 ClickHouse 编写 Parquet 文件时可以进行的一些优化,以最大限度地提高压缩率,以及最近为优化使用并行化的读取性能而进行的一些开发。
在我们的示例中,我们使用了 英国房价 数据集。其中包含自 1995 年至撰写本文时英格兰和威尔士房地产价格的数据。我们在公共 s3 存储桶 s3://datasets-documentation/uk-house-prices/parquet/
中以 Parquet 格式分发此数据集。
ClickHouse Local
在我们的示例中,我们使用本地和 S3 托管的 Parquet 文件,并使用 ClickHouse Local 查询这些文件。ClickHouse Local 是 ClickHouse 的一个易于使用的版本,非常适合需要使用 SQL 对本地和远程文件执行快速处理而无需安装完整数据库服务器的开发人员。专为使用笔记本电脑或工作站上的本地计算资源进行数据分析而设计和优化,用户只需使用 SQL 即可查询、筛选和转换几乎任何格式的数据文件,而无需编写任何 Python 代码。我们推荐这篇最近的博客文章,以概述此工具的功能。最重要的是,ClickHouse Local 和 ClickHouse Server 共享相同的 Parquet 读取和写入代码,因此任何解释都适用于两者。
什么是 Parquet?
Apache Parquet 的官方描述 对其设计和属性进行了出色的总结:“Apache Parquet 是一种开源、面向列的数据文件格式,专为高效数据存储和检索而设计”。
与 ClickHouse 的 MergeTree 格式 类似,数据以面向列的方式存储。这实际上意味着同一列的值存储在一起,这与面向行的文件格式(例如 Avro)相反,在面向行的文件格式中,行数据是并置的。
这种数据方向,以及对适用于现代处理器管道的多种 编码技术 的支持,实现了高压缩率和高效的存储属性。列方向还最大限度地减少了读取的数据量,因为对于分析查询(例如 group bys),仅从存储中读取必要的列。当与高压缩率和每列提供的内部统计信息(存储为元数据)相结合时,Parquet 还承诺快速检索。
后一个属性在很大程度上取决于元数据的充分利用、任何查询引擎中的并行化级别以及存储数据时做出的决策。我们将在下面结合 ClickHouse 讨论这些问题。
在深入了解 Parquet 的内部结构之前,我们将介绍 ClickHouse 如何支持此格式的写入和读取。
使用 ClickHouse 查询 Parquet
在下面的示例中,我们假设我们的房价数据已导出到单个 house_prices.parquet
文件,并且除非另有说明,否则使用 ClickHouse Local 进行查询。
读取模式
可以使用 DESCRIBE 语句 和 file 表函数 来实现识别任何文件的模式
DESCRIBE TABLE file('house_prices.parquet')
┌─name──────┬─type─────────────┬
│ price │ Nullable(UInt32) │
│ date │ Nullable(UInt16) │
│ postcode1 │ Nullable(String) │
│ postcode2 │ Nullable(String) │
│ type │ Nullable(Int8) │
│ is_new │ Nullable(UInt8) │
│ duration │ Nullable(Int8) │
│ addr1 │ Nullable(String) │
│ addr2 │ Nullable(String) │
│ street │ Nullable(String) │
│ locality │ Nullable(String) │
│ town │ Nullable(String) │
│ district │ Nullable(String) │
│ county │ Nullable(String) |
└───────────┴──────────────────┴
查询本地文件
上面的 file 表函数可以用作 SELECT
查询的输入,从而允许我们对 Parquet 文件执行查询。下面我们计算伦敦房产的年平均价格。
SELECT
toYear(toDate(date)) AS year,
round(avg(price)) AS price,
bar(price, 0, 2000000, 100)
FROM file('house_prices.parquet')
WHERE town = 'LONDON'
GROUP BY year
ORDER BY year ASC
┌─year─┬───price─┬─bar(round(avg(price)), 0, 2000000, 100)──────────────┐
│ 1995 │ 109120 │ █████▍ │
│ 1996 │ 118672 │ █████▉ │
│ 1997 │ 136530 │ ██████▊ │
│ 1998 │ 153014 │ ███████▋ │
│ 1999 │ 180639 │ █████████ │
│ 2000 │ 215860 │ ██████████▊ │
│ 2001 │ 232998 │ ███████████▋ │
│ 2002 │ 263690 │ █████████████▏ │
│ 2003 │ 278423 │ █████████████▉ │
│ 2004 │ 304666 │ ███████████████▏ │
│ 2005 │ 322886 │ ████████████████▏ │
│ 2006 │ 356189 │ █████████████████▊ │
│ 2007 │ 404065 │ ████████████████████▏ │
│ 2008 │ 420741 │ █████████████████████ │
│ 2009 │ 427767 │ █████████████████████▍ │
│ 2010 │ 480329 │ ████████████████████████ │
│ 2011 │ 496293 │ ████████████████████████▊ │
│ 2012 │ 519473 │ █████████████████████████▉ │
│ 2013 │ 616182 │ ██████████████████████████████▊ │
│ 2014 │ 724107 │ ████████████████████████████████████▏ │
│ 2015 │ 792274 │ ███████████████████████████████████████▌ │
│ 2016 │ 843685 │ ██████████████████████████████████████████▏ │
│ 2017 │ 983673 │ █████████████████████████████████████████████████▏ │
│ 2018 │ 1016702 │ ██████████████████████████████████████████████████▊ │
│ 2019 │ 1041915 │ ████████████████████████████████████████████████████ │
│ 2020 │ 1060936 │ █████████████████████████████████████████████████████│
│ 2021 │ 968152 │ ████████████████████████████████████████████████▍ │
│ 2022 │ 967439 │ ████████████████████████████████████████████████▎ │
│ 2023 │ 830317 │ █████████████████████████████████████████▌ │
└──────┴─────────┴──────────────────────────────────────────────────────┘
29 rows in set. Elapsed: 0.625 sec. Processed 28.11 million rows, 750.65 MB (44.97 million rows/s., 1.20 GB/s.)
查询 S3 上的文件
虽然上面的 file 函数可以与 ClickHouse 服务器实例一起使用,但它要求文件存在于服务器文件系统的配置 user_files_path 目录下。在这些情况下,更自然地从 S3 读取 Parquet 文件。这是数据湖用例中的常见要求,在这些用例中,需要即席分析。在这种情况下,上面的 file 函数可以用 s3 函数 替换,以进行类似 AWS Athena 的查询
SELECT
toYear(toDate(date)) AS year,
round(avg(price)) AS price,
bar(price, 0, 2000000, 100)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_all.parquet')
WHERE town = 'LONDON'
GROUP BY year
ORDER BY year ASC
┌─year─┬───price─┬─bar(round(avg(price)), 0, 2000000, 100)───────────────┐
│ 1995 │ 109120 │ █████▍ │
│ 1996 │ 118672 │ █████▉ │
│ 1997 │ 136530 │ ██████▊ │
│ 1998 │ 153014 │ ███████▋ │
│ 1999 │ 180639 │ █████████ │
...
29 rows in set. Elapsed: 2.069 sec. Processed 28.11 million rows, 750.65 MB (13.59 million rows/s., 362.87 MB/s.)
查询多个文件
这两个函数都支持 glob 模式,允许选择文件子集。正如我们将在后面的文章中讨论的那样,这不仅提供了跨文件查询的优势,而且还提供了主要的读取并行化优势。下面我们将查询限制为所有带有年份后缀的 house_prices_
文件 - 这假定我们每个年份都有一个文件(见下文)。
SELECT
toYear(toDate(date)) AS year,
round(avg(price)) AS price,
bar(price, 0, 2000000, 100)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_{1..2}*.parquet')
WHERE town = 'LONDON'
GROUP BY year
ORDER BY year ASC
29 rows in set. Elapsed: 3.387 sec. Processed 28.11 million rows, 750.65 MB (8.30 million rows/s., 221.66 MB/s.)
用户还应注意 s3Cluster 函数,该函数允许从集群中的许多节点并行处理文件 - 这与 ClickHouse Cloud 用户尤其相关。这可以提供显着的性能优势,尤其是在需要读取许多文件的情况下(允许分配工作)。
使用 ClickHouse 写入 Parquet
可以使用几种方法将 ClickHouse 中的表数据写入 Parquet 文件。此处首选的选项通常取决于您是否使用 ClickHouse Server 或 ClickHouse Local。在下面的示例中,我们假设 uk_price_paid
表已填充数据。有关加载此数据的详细信息,请参阅 此处。
写入本地文件
使用 INTO FUNCTION
子句,我们可以使用与读取相同的 file 函数写入 parquet。这最适合 ClickHouse Local,在 ClickHouse Local 中,文件可以写入本地文件系统上的任何位置。ClickHouse 服务器会将这些文件写入配置参数 user_files_path
指定的目录。
INSERT INTO FUNCTION file('house_prices.parquet') SELECT *
FROM uk_price_paid
0 rows in set. Elapsed: 12.490 sec. Processed 28.11 million rows, 1.32 GB (2.25 million rows/s., 105.97 MB/s.)
dalemcdiarmid@dales-mac houseprices % ls -lh house_prices.parquet
-rw-r----- 1 dalemcdiarmid staff 243M 17 Apr 16:59 house_prices.parquet
在大多数情况下(包括 ClickHouse Cloud),无法访问本地服务器文件系统。在这些情况下,用户可以通过 clickhouse-client
连接并使用 INTO OUTFILE 子句将 parquet 文件写入客户端的文件系统。此处将根据文件扩展名自动检测所需的输出格式。
SELECT *
FROM uk_price_paid
INTO OUTFILE 'house_prices.parquet'
28113076 rows in set. Elapsed: 15.690 sec. Processed 28.11 million rows, 2.47 GB (1.79 million rows/s., 157.47 MB/s.)
clickhouse@clickhouse-mac ~ % ls -lh house_prices.parquet
-rw-r--r-- 1 dalemcdiarmid staff 291M 17 Apr 18:23 house_prices.parquet
或者,用户可以简单地发出 SELECT 查询,将输出格式指定为 Parquet,并将结果重定向到文件。在此示例中,我们从终端将 --query
参数传递给客户端。
clickhouse@clickhouse-mac ~ % ./clickhouse client --query "SELECT * FROM uk_price_paid FORMAT Parquet" > house_price.parquet
最后两种方法生成的文件比我们之前的 file 函数方法略大。我们将在本系列的第 2 部分中解释原因,但目前建议用户在可能的情况下使用早期的 INSERT INTO FUNCTION
方法,以获得更佳的文件大小。
将文件写入 S3
通常,客户端存储空间有限。在这些情况下,用户可能希望将文件写入对象存储,例如 S3 和 GCS。这两者都通过与读取时使用的相同的 s3 函数 支持。请注意,将需要凭据 - 在下面的示例中,我们将这些凭据作为函数参数传递,但 也支持 IAM 凭据。
INSERT INTO FUNCTION s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_sample.parquet', '<aws_access_key_id>', '<aws_secret_access_key>') SELECT *
FROM uk_price_paid
LIMIT 1000
0 rows in set. Elapsed: 0.726 sec. Processed 2.00 thousand rows, 987.86 KB (2.75 thousand rows/s., 1.36 MB/s.)
写入多个文件
最后,通常需要限制任何单个 Parquet 文件的大小。为了帮助写入文件,用户可以将 PARTITION BY 子句与 INTO FUNCTION
子句一起使用。这接受任何 SQL 表达式来为结果集中的每一行创建分区 ID。反过来,此 parition_id
可用于文件路径,以确保将行分配给不同的文件。在下面的示例中,我们按年份分区。因此,属于同一年份的房屋销售将写入同一文件。文件将带有各自年份的后缀,如图所示。
INSERT INTO FUNCTION file('house_prices_{_partition_id}.parquet') PARTITION BY toYear(date) SELECT * FROM uk_price_paid
0 rows in set. Elapsed: 23.281 sec. Processed 28.11 million rows, 1.32 GB (1.21 million rows/s., 56.85 MB/s.)
clickhouse@clickhouse-mac houseprices % ls house_prices_*
house_prices_1995.parquet house_prices_2001.parquet house_prices_2007.parquet house_prices_2013.parquet house_prices_2019.parquet
house_prices_1996.parquet house_prices_2002.parquet house_prices_2008.parquet house_prices_2014.parquet house_prices_2020.parquet
house_prices_1997.parquet house_prices_2003.parquet house_prices_2009.parquet house_prices_2015.parquet house_prices_2021.parquet
house_prices_1998.parquet house_prices_2004.parquet house_prices_2010.parquet house_prices_2016.parquet house_prices_2022.parquet
house_prices_1999.parquet house_prices_2005.parquet house_prices_2011.parquet house_prices_2017.parquet house_prices_2023.parquet
house_prices_2000.parquet house_prices_2006.parquet house_prices_2012.parquet house_prices_2018.parquet
此相同方法可以与 s3 函数一起使用。
INSERT INTO FUNCTION s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_sample_{_partition_id}.parquet', '<aws_access_key_id>', '<aws_secret_access_key>') PARTITION BY toYear(date) SELECT *
FROM uk_price_paid
LIMIT 1000
0 rows in set. Elapsed: 2.247 sec. Processed 2.00 thousand rows, 987.86 KB (889.92 rows/s., 439.56 KB/s.)
在撰写本文时,PARTITION BY
子句 当前不支持 INTO OUTFILE
。
将文件转换为 Parquet
结合以上内容,我们可以使用 ClickHouse Local 在格式之间转换文件。在下面的示例中,我们使用 ClickHouse Local 和 file 函数读取本地副本(CSV 格式的房价数据集,包含所有 2800 万行),然后再将其作为 Parquet 写入 S3。这些文件按年份分区,如前所示。
INSERT INTO FUNCTION s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_sample_{_partition_id}.parquet', '<aws_access_key_id>', '<aws_secret_access_key>') PARTITION BY toYear(date) SELECT *
FROM file('house_prices.csv')
0 rows in set. Elapsed: 223.864 sec. Processed 28.11 million rows, 5.87 GB (125.58 thousand rows/s., 26.24 MB/s.)
将 Parquet 文件插入 ClickHouse
所有之前的示例都假设用户正在查询本地和 S3 托管的文件以进行即席分析,或将数据从 ClickHouse 迁移到 Parquet 以进行分发。虽然 Parquet 是一种与数据存储无关的文件分发格式,但对于查询而言,它不如 ClickHouse MergeTree 表高效,后者能够利用索引和特定于格式的优化。考虑以下查询的性能,该查询使用本地 Parquet 文件和带有 推荐模式 的 MergeTree 表计算伦敦房产的年平均价格(两者都在 Macbook Pro 2021 上运行)
SELECT
toYear(toDate(date)) AS year,
round(avg(price)) AS price,
bar(price, 0, 2000000, 100)
FROM file('house_prices.parquet')
WHERE town = 'LONDON'
GROUP BY year
ORDER BY year ASC
29 rows in set. Elapsed: 0.625 sec. Processed 28.11 million rows, 750.65 MB (44.97 million rows/s., 1.20 GB/s.)
SELECT
toYear(toDate(date)) AS year,
round(avg(price)) AS price,
bar(price, 0, 2000000, 100)
FROM uk_price_paid
WHERE town = 'LONDON'
GROUP BY year
ORDER BY year ASC
29 rows in set. Elapsed: 0.022 sec.
此处的差异是巨大的,并证明了为什么对于需要实时性能的大型数据集,用户会将 Parquet 文件加载到 ClickHouse 中。下面我们假设 uk_price_paid
表已预先创建。
从本地文件加载
可以使用 INFILE
子句从客户端计算机加载文件。以下查询从 clickhouse-client
执行,并从本地客户端的文件系统中读取数据。
INSERT INTO uk_price_paid FROM INFILE 'house_price.parquet' FORMAT Parquet
28113076 rows in set. Elapsed: 15.412 sec. Processed 28.11 million rows, 1.27 GB (1.82 million rows/s., 82.61 MB/s.)
如果用户的数据分布在多个 Parquet 文件中,此方法也支持 glob 模式。或者,可以使用 --query
参数将 Parquet 文件重定向到 clickhouse-client
中
clickhouse@clickhouse-mac ~ % ~/clickhouse client --query "INSERT INTO uk_price_paid FORMAT Parquet" < house_price.parquet
从 S3 加载
由于客户端存储通常有限,并且基于对象存储的数据湖兴起,Parquet 文件通常驻留在 S3 或 GCS 上。同样,我们可以使用 s3 函数读取这些文件,使用 INSERT INTO SELECT 子句将其数据插入到 MergeTree 表中。在下面的示例中,我们使用 glob 模式读取按年份分区的文件,并在一个三节点 ClickHouse Cloud 集群上执行此查询。
INSERT INTO uk_price_paid SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_{1..2}*.parquet')
0 rows in set. Elapsed: 12.028 sec. Processed 28.11 million rows, 4.64 GB (2.34 million rows/s., 385.96 MB/s.)
与读取类似,可以使用 s3Cluster 函数加速此过程。为了确保插入和读取分布在整个集群中,必须启用设置 parallel_distributed_insert_select
(否则,仅会分布读取,并且插入将发送到协调节点)。以下查询在与上一个示例中使用的同一 Cloud 集群上运行,显示了并行化此工作的好处。
SET parallel_distributed_insert_select=1
INSERT INTO uk_price_paid SELECT *
FROM s3Cluster('default', 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_{1..2}*.parquet')
0 rows in set. Elapsed: 6.425 sec. Processed 28.11 million rows, 4.64 GB (4.38 million rows/s., 722.58 MB/s.)
结论
在本博客系列的第一部分中,我们介绍了 Parquet 格式,并展示了如何使用 ClickHouse 查询和写入此格式。在下一篇文章中,我们将更详细地深入了解该格式,进一步探讨 ClickHouse 集成和最新的性能改进,以及优化查询的技巧。