介绍
自从 2013 年发布 作为 Hadoop 的列式存储,Parquet 已成为几乎无处不在的文件交换格式,提供高效的存储和检索。这种采用使其成为更近期的 Data Lake 格式(例如 Apache Iceberg)的基础。在本博客系列中,我们将探讨如何在深入研究 Parquet 的细节之前,使用 ClickHouse 来读写此格式。对于经验丰富的 Parquet 用户,我们还将讨论在使用 ClickHouse 编写 Parquet 文件时,用户可以进行的一些优化,以最大限度地提高压缩率,以及一些使用并行处理来优化读取性能的最新进展。
在我们的示例中,我们使用了 英国房价 数据集。该数据集包含自 1995 年至撰写本文之时,英格兰和威尔士房地产交易的价格数据。我们将该数据以 Parquet 格式分发在公共 s3 存储桶 s3://datasets-documentation/uk-house-prices/parquet/
中。
ClickHouse 本地版
在我们的示例中,我们使用本地和 S3 托管的 Parquet 文件,并使用 ClickHouse 本地版 查询它们。ClickHouse 本地版是一个易于使用的 ClickHouse 版本,非常适合需要使用 SQL 对本地和远程文件进行快速处理的开发人员,而无需安装完整的数据库服务器。该版本专为使用笔记本电脑或工作站上的本地计算资源进行数据分析而设计和优化,用户可以使用 SQL 查询、过滤和转换几乎任何格式的数据文件,而无需编写任何 Python 代码。我们推荐您阅读这篇最近的博客文章,以了解此工具功能的概述。最重要的是,ClickHouse 本地版和 ClickHouse 服务器在 Parquet 读写方面共享相同的代码,因此任何解释都适用于两者。
什么是 Parquet?
有关 Apache Parquet 的 官方描述 对其设计和属性进行了很好的概括:“Apache Parquet 是一种开源的列式数据文件格式,旨在实现高效数据存储和检索”。
与 ClickHouse 的 MergeTree 格式 类似,数据以列式存储。这实际上意味着同一列的值存储在一起,与行式文件格式(例如 Avro)形成对比,在行式文件格式中,行数据是共同定位的。
这种数据方向以及对适合现代处理器管道的多种 编码技术 的支持,使得高压缩率和高效存储属性成为可能。列式存储还最大限度地减少了读取的数据量,因为对于诸如分组之类的分析查询,仅从存储中读取必要的列。与高压缩率和每列提供的内部统计信息(存储为元数据)相结合,Parquet 还承诺快速检索。
后一项属性在很大程度上取决于对元数据的充分利用、任何查询引擎的并行处理级别以及存储数据时做出的决策。我们将在下面讨论这些内容与 ClickHouse 的关系。
在深入研究 Parquet 的内部机制之前,我们将介绍 ClickHouse 如何支持读写这种格式。
使用 ClickHouse 查询 Parquet
在下面的示例中,我们假设我们的房价数据已导出到单个 house_prices.parquet
文件中,并且使用 ClickHouse 本地版进行查询,除非另有说明。
读取架构
可以使用 DESCRIBE 语句 和 文件表函数 来识别任何文件的架构。
DESCRIBE TABLE file('house_prices.parquet')
┌─name──────┬─type─────────────┬
│ price │ Nullable(UInt32) │
│ date │ Nullable(UInt16) │
│ postcode1 │ Nullable(String) │
│ postcode2 │ Nullable(String) │
│ type │ Nullable(Int8) │
│ is_new │ Nullable(UInt8) │
│ duration │ Nullable(Int8) │
│ addr1 │ Nullable(String) │
│ addr2 │ Nullable(String) │
│ street │ Nullable(String) │
│ locality │ Nullable(String) │
│ town │ Nullable(String) │
│ district │ Nullable(String) │
│ county │ Nullable(String) |
└───────────┴──────────────────┴
查询本地文件
上述文件表函数可以用作 SELECT
查询的输入,使我们能够对 Parquet 文件执行查询。下面我们计算了伦敦房产每年的平均价格。
SELECT
toYear(toDate(date)) AS year,
round(avg(price)) AS price,
bar(price, 0, 2000000, 100)
FROM file('house_prices.parquet')
WHERE town = 'LONDON'
GROUP BY year
ORDER BY year ASC
┌─year─┬───price─┬─bar(round(avg(price)), 0, 2000000, 100)──────────────┐
│ 1995 │ 109120 │ █████▍ │
│ 1996 │ 118672 │ █████▉ │
│ 1997 │ 136530 │ ██████▊ │
│ 1998 │ 153014 │ ███████▋ │
│ 1999 │ 180639 │ █████████ │
│ 2000 │ 215860 │ ██████████▊ │
│ 2001 │ 232998 │ ███████████▋ │
│ 2002 │ 263690 │ █████████████▏ │
│ 2003 │ 278423 │ █████████████▉ │
│ 2004 │ 304666 │ ███████████████▏ │
│ 2005 │ 322886 │ ████████████████▏ │
│ 2006 │ 356189 │ █████████████████▊ │
│ 2007 │ 404065 │ ████████████████████▏ │
│ 2008 │ 420741 │ █████████████████████ │
│ 2009 │ 427767 │ █████████████████████▍ │
│ 2010 │ 480329 │ ████████████████████████ │
│ 2011 │ 496293 │ ████████████████████████▊ │
│ 2012 │ 519473 │ █████████████████████████▉ │
│ 2013 │ 616182 │ ██████████████████████████████▊ │
│ 2014 │ 724107 │ ████████████████████████████████████▏ │
│ 2015 │ 792274 │ ███████████████████████████████████████▌ │
│ 2016 │ 843685 │ ██████████████████████████████████████████▏ │
│ 2017 │ 983673 │ █████████████████████████████████████████████████▏ │
│ 2018 │ 1016702 │ ██████████████████████████████████████████████████▊ │
│ 2019 │ 1041915 │ ████████████████████████████████████████████████████ │
│ 2020 │ 1060936 │ █████████████████████████████████████████████████████│
│ 2021 │ 968152 │ ████████████████████████████████████████████████▍ │
│ 2022 │ 967439 │ ████████████████████████████████████████████████▎ │
│ 2023 │ 830317 │ █████████████████████████████████████████▌ │
└──────┴─────────┴──────────────────────────────────────────────────────┘
29 rows in set. Elapsed: 0.625 sec. Processed 28.11 million rows, 750.65 MB (44.97 million rows/s., 1.20 GB/s.)
查询 S3 上的文件
虽然上述文件函数可以与 ClickHouse 服务器实例一起使用,但它要求文件存在于服务器文件系统上,位于配置的 user_files_path 目录下。在这种情况下,Parquet 文件更自然地从 S3 中读取。这是 Data Lake 用例中的常见需求,在这些用例中需要进行临时分析。在这种情况下,可以使用 s3 函数 替换上述文件函数,以进行类似于 AWS Athena 的查询。
SELECT
toYear(toDate(date)) AS year,
round(avg(price)) AS price,
bar(price, 0, 2000000, 100)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_all.parquet')
WHERE town = 'LONDON'
GROUP BY year
ORDER BY year ASC
┌─year─┬───price─┬─bar(round(avg(price)), 0, 2000000, 100)───────────────┐
│ 1995 │ 109120 │ █████▍ │
│ 1996 │ 118672 │ █████▉ │
│ 1997 │ 136530 │ ██████▊ │
│ 1998 │ 153014 │ ███████▋ │
│ 1999 │ 180639 │ █████████ │
...
29 rows in set. Elapsed: 2.069 sec. Processed 28.11 million rows, 750.65 MB (13.59 million rows/s., 362.87 MB/s.)
查询多个文件
这两个函数都支持 glob 模式,允许选择文件的子集。正如我们将在后面的文章中讨论的那样,这除了跨文件进行查询之外,还有其他优势,主要是读取的并行化。下面我们将查询限制在所有带有年份后缀的 house_prices_
文件中 - 这假设我们每个年份都有一个文件(见下文)。
SELECT
toYear(toDate(date)) AS year,
round(avg(price)) AS price,
bar(price, 0, 2000000, 100)
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_{1..2}*.parquet')
WHERE town = 'LONDON'
GROUP BY year
ORDER BY year ASC
29 rows in set. Elapsed: 3.387 sec. Processed 28.11 million rows, 750.65 MB (8.30 million rows/s., 221.66 MB/s.)
用户还应该注意 s3Cluster 函数,该函数允许从集群中的多个节点并行处理文件 - 这对于 ClickHouse Cloud 用户来说尤其重要。这可以提供显著的性能优势,尤其是在需要读取许多文件的情况下(允许工作分配)。
使用 ClickHouse 编写 Parquet
将 ClickHouse 中的表数据写入 Parquet 文件可以采用几种方法。这里首选的选项通常取决于您是使用 ClickHouse 服务器还是 ClickHouse 本地版。在下面的示例中,我们假设 uk_price_paid
表已经填充了数据。有关加载此表的信息,请参见 此处。
编写本地文件
使用 INTO FUNCTION
子句,我们可以使用与读取相同的文件函数来编写 parquet。这最适合 ClickHouse 本地版,在 ClickHouse 本地版中,文件可以写入本地文件系统上的任何位置。ClickHouse 服务器会将这些文件写入由配置参数 user_files_path
指定的目录。
INSERT INTO FUNCTION file('house_prices.parquet') SELECT *
FROM uk_price_paid
0 rows in set. Elapsed: 12.490 sec. Processed 28.11 million rows, 1.32 GB (2.25 million rows/s., 105.97 MB/s.)
dalemcdiarmid@dales-mac houseprices % ls -lh house_prices.parquet
-rw-r----- 1 dalemcdiarmid staff 243M 17 Apr 16:59 house_prices.parquet
在大多数情况下,包括 ClickHouse Cloud,本地服务器文件系统是无法访问的。在这种情况下,用户可以通过 clickhouse-client
连接,并使用 INTO OUTFILE 子句将 parquet 文件写入客户端的文件系统。这里将根据文件扩展名自动检测所需的输出格式。
SELECT *
FROM uk_price_paid
INTO OUTFILE 'house_prices.parquet'
28113076 rows in set. Elapsed: 15.690 sec. Processed 28.11 million rows, 2.47 GB (1.79 million rows/s., 157.47 MB/s.)
clickhouse@clickhouse-mac ~ % ls -lh house_prices.parquet
-rw-r--r-- 1 dalemcdiarmid staff 291M 17 Apr 18:23 house_prices.parquet
或者,用户可以简单地发出 SELECT 查询,指定输出格式为 Parquet,并将结果重定向到文件。在这个示例中,我们从终端将 --query
参数传递给客户端。
clickhouse@clickhouse-mac ~ % ./clickhouse client --query "SELECT * FROM uk_price_paid FORMAT Parquet" > house_price.parquet
这两种方法最后产生的文件大小略大于我们之前使用文件函数方法产生的文件大小。我们将在本系列的第 2 部分解释原因,但目前建议用户尽可能使用早期的 `INSERT INTO FUNCTION` 方法,以获得更优化的文件大小。
将文件写入 S3
通常,客户端存储空间有限。在这种情况下,用户可能希望将文件写入对象存储,例如 S3 和 GCS。这两种存储方式都通过与用于读取相同的 s3 函数 支持。请注意,需要凭据 - 在以下示例中,我们将这些凭据作为函数参数传递,但 也支持 IAM 凭据。
INSERT INTO FUNCTION s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_sample.parquet', '<aws_access_key_id>', '<aws_secret_access_key>') SELECT *
FROM uk_price_paid
LIMIT 1000
0 rows in set. Elapsed: 0.726 sec. Processed 2.00 thousand rows, 987.86 KB (2.75 thousand rows/s., 1.36 MB/s.)
写入多个文件
最后,通常希望限制单个 Parquet 文件的大小。为了帮助将数据写入文件,用户可以使用 `PARTITION BY` 子句与 `INTO FUNCTION` 子句一起使用。这接受任何 SQL 表达式来为结果集中的每一行创建分区 ID。这个 `partition_id` 可以反过来用于文件路径,以确保行被分配到不同的文件。在以下示例中,我们按年份进行分区。因此,属于同一年的房屋销售将被写入同一个文件。文件将以各自的年份为后缀,如所示。
INSERT INTO FUNCTION file('house_prices_{_partition_id}.parquet') PARTITION BY toYear(date) SELECT * FROM uk_price_paid
0 rows in set. Elapsed: 23.281 sec. Processed 28.11 million rows, 1.32 GB (1.21 million rows/s., 56.85 MB/s.)
clickhouse@clickhouse-mac houseprices % ls house_prices_*
house_prices_1995.parquet house_prices_2001.parquet house_prices_2007.parquet house_prices_2013.parquet house_prices_2019.parquet
house_prices_1996.parquet house_prices_2002.parquet house_prices_2008.parquet house_prices_2014.parquet house_prices_2020.parquet
house_prices_1997.parquet house_prices_2003.parquet house_prices_2009.parquet house_prices_2015.parquet house_prices_2021.parquet
house_prices_1998.parquet house_prices_2004.parquet house_prices_2010.parquet house_prices_2016.parquet house_prices_2022.parquet
house_prices_1999.parquet house_prices_2005.parquet house_prices_2011.parquet house_prices_2017.parquet house_prices_2023.parquet
house_prices_2000.parquet house_prices_2006.parquet house_prices_2012.parquet house_prices_2018.parquet
相同的方法可用于 s3 函数。
INSERT INTO FUNCTION s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_sample_{_partition_id}.parquet', '<aws_access_key_id>', '<aws_secret_access_key>') PARTITION BY toYear(date) SELECT *
FROM uk_price_paid
LIMIT 1000
0 rows in set. Elapsed: 2.247 sec. Processed 2.00 thousand rows, 987.86 KB (889.92 rows/s., 439.56 KB/s.)
在撰写本文时,这个 `PARTITION BY` 子句 目前不支持 `INTO OUTFILE`。
将文件转换为 Parquet
结合上述方法,我们可以使用 ClickHouse Local 在不同格式之间转换文件。在下面的示例中,我们使用 ClickHouse Local 与文件函数来读取 CSV 格式的房屋价格数据集的本地副本(包含所有 2800 万行),然后将其写入 S3 作为 Parquet 格式。这些文件按年份对数据进行分区,如前所述。
INSERT INTO FUNCTION s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_sample_{_partition_id}.parquet', '<aws_access_key_id>', '<aws_secret_access_key>') PARTITION BY toYear(date) SELECT *
FROM file('house_prices.csv')
0 rows in set. Elapsed: 223.864 sec. Processed 28.11 million rows, 5.87 GB (125.58 thousand rows/s., 26.24 MB/s.)
将 Parquet 文件插入 ClickHouse
所有以上示例都假设用户正在查询本地和 S3 托管文件,以便进行临时分析或将数据从 ClickHouse 迁移到 Parquet 格式以进行分发。虽然 Parquet 是一种与数据存储无关的格式,适用于文件分发,但它在查询效率方面不如 ClickHouse MergeTree 表,后者能够 利用索引 和特定于格式的优化。请考虑以下查询的性能,该查询使用本地 Parquet 文件和具有 推荐模式 的 MergeTree 表计算伦敦房产每年的平均价格(两者都在 Macbook Pro 2021 上运行)。
SELECT
toYear(toDate(date)) AS year,
round(avg(price)) AS price,
bar(price, 0, 2000000, 100)
FROM file('house_prices.parquet')
WHERE town = 'LONDON'
GROUP BY year
ORDER BY year ASC
29 rows in set. Elapsed: 0.625 sec. Processed 28.11 million rows, 750.65 MB (44.97 million rows/s., 1.20 GB/s.)
SELECT
toYear(toDate(date)) AS year,
round(avg(price)) AS price,
bar(price, 0, 2000000, 100)
FROM uk_price_paid
WHERE town = 'LONDON'
GROUP BY year
ORDER BY year ASC
29 rows in set. Elapsed: 0.022 sec.
这里的差异是巨大的,这证明了为什么对于需要实时性能的大型数据集,用户将 Parquet 文件加载到 ClickHouse 中。在下面,我们假设 `uk_price_paid` 表已预先创建。
从本地文件加载
可以使用 `INFILE` 子句从客户端机器加载文件。以下查询从 `clickhouse-client` 执行,并从本地客户端的文件系统读取数据。
INSERT INTO uk_price_paid FROM INFILE 'house_price.parquet' FORMAT Parquet
28113076 rows in set. Elapsed: 15.412 sec. Processed 28.11 million rows, 1.27 GB (1.82 million rows/s., 82.61 MB/s.)
这种方法还 支持 glob 模式,如果用户的数据分散在多个 Parquet 文件中。或者,可以使用 `--query` 参数将 Parquet 文件重新定向到 `clickhouse-client` 中。
clickhouse@clickhouse-mac ~ % ~/clickhouse client --query "INSERT INTO uk_price_paid FORMAT Parquet" < house_price.parquet
从 S3 加载
由于客户端存储空间通常有限,而且基于对象存储的数据湖的兴起,Parquet 文件通常驻留在 S3 或 GCS 上。同样,我们可以使用 s3 函数来读取这些文件,并将它们的数据插入到具有 INSERT INTO SELECT 子句的 MergeTree 表中。在以下示例中,我们使用 glob 模式来读取按年份分区的文件,并在三个节点的 ClickHouse Cloud 集群上执行此查询。
INSERT INTO uk_price_paid SELECT *
FROM s3('https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_{1..2}*.parquet')
0 rows in set. Elapsed: 12.028 sec. Processed 28.11 million rows, 4.64 GB (2.34 million rows/s., 385.96 MB/s.)
与读取类似,可以使用 s3Cluster 函数来加速此过程。为了确保插入和读取在集群中分布,必须启用设置 `parallel_distributed_insert_select`(否则,只有读取会分布,插入会发送到协调节点)。以下查询在与之前示例中使用的相同云集群上运行,显示了并行执行此工作的优势。
SET parallel_distributed_insert_select=1
INSERT INTO uk_price_paid SELECT *
FROM s3Cluster('default', 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/uk-house-prices/parquet/house_prices_{1..2}*.parquet')
0 rows in set. Elapsed: 6.425 sec. Processed 28.11 million rows, 4.64 GB (4.38 million rows/s., 722.58 MB/s.)
结论
在本系列博客的第一部分中,我们介绍了 Parquet 格式,并展示了如何使用 ClickHouse 查询和写入此格式。在下一篇文章中,我们将更详细地介绍这种格式,进一步探索 ClickHouse 集成以及最近的性能改进和优化查询的提示。