2000 年代中期 Apache Hadoop 的出现标志着大数据时代的开始,从根本上改变了组织存储和处理海量数据集的方式。这种转变产生了对专用文件格式的需求,这些格式旨在高效地处理大规模的数据分析和跨系统数据传输。
两种著名的文件格式从这种演变中脱颖而出:Apache Avro (2009) 和 Apache Parquet (2013)。虽然这两种格式在大数据生态系统中都发挥着重要作用,但每种格式都为不同的用例带来了独特的优势。
在本文中,我们将探讨 Avro 和 Parquet 之间的主要区别,帮助您了解何时以及为何使用每种格式。我们将检查它们的优势、局限性和理想的应用场景,以帮助您为数据架构做出明智的决策。
什么是 Avro?
Apache Avro 是一个面向行的数据序列化框架,它源于 Apache Hadoop 项目。它提供了一种紧凑、快速且高效的方式来处理数据序列化,使其对于数据流和基于消息的系统特别有价值。
Avro 的主要特点
- 基于模式的序列化系统
- 丰富的模式定义和演化能力
- 紧凑的二进制数据格式
- 与模式注册表的集成
- 内置数据压缩支持
Avro 的突出特点之一是其复杂的模式管理。虽然模式在数据写入和读取期间必须存在,但组织可以利用模式注册表来优化此过程。系统无需在每次数据交换时都传输完整的模式,而只需引用模式 ID,从而显着减少开销并提高性能。
Avro 的架构和用例
- 主要用于流数据场景
- 与 Apache Kafka 强集成,用于消息发布
- 针对数据线上传输进行了优化
- 通常处理较小的数据单元,通常每个文件一个记录
- 非常适合实时数据处理和消息系统
该格式因其高效的序列化能力和模式演化功能而在流数据社区中声名鹊起。它的设计使其特别适合数据结构可能需要演化而不会破坏现有消费者的系统。
我应该何时使用 Avro?
Apache Avro 在数据传输场景中表现出色,尤其是在流数据管道和基于消息的系统中。它的设计使其非常适合多种数据流和消息系统。
与用于数据传输的 JSON 相比,它具有以下优点
- 模式强制执行
- 严格的模式定义确保数据一致性
- 生产者和消费者之间清晰的合同
- 减少数据质量问题和运行时错误
- 模式演化
- 支持向前和向后兼容性
- 允许系统演化而不会破坏现有应用程序
- 清晰地处理随时间推移的模式更改
- 性能优势
- 比 JSON 更紧凑的二进制格式
- 减少网络带宽使用
- 更快的序列化和反序列化
- 更低的存储要求
通过选择 Avro 而不是基于文本的格式(如 JSON),组织可以确保更好的数据治理、更高的性能以及跨系统更可靠的数据集成。模式验证和高效的二进制序列化的结合使 Avro 在企业级数据操作中特别有价值,在企业级数据操作中,数据一致性和性能至关重要。
在 Python 中使用 Avro
让我们看看如何在 Python 中使用 Avro。fastavro 是一个用于执行此操作的流行库。我们将使用 faker 库创建一个 100 万行的 Avro 文件
1pip install fastavro faker
1import fastavro 2from faker import Faker 3 4fake = Faker() 5 6schema = { 7 "type": "record", 8 "name": "User", 9 "fields": [ 10 {"name": "name", "type": "string"}, 11 {"name": "age", "type": "int"}, 12 {"name": "email", "type": "string"} 13 ] 14} 15 16records = [ 17 { 18 "name": fake.name(), 19 "age": fake.random_int(min=18, max=80), 20 "email": fake.email() 21 } 22 for _ in range(1_000_000) 23] 24 25with open('users.avro', 'wb') as out: 26 fastavro.writer(out, schema, records)
从该文件读取时,我们需要首先安装 cramjam 库
1pip install cramjam
1with open('users.avro', 'rb') as file:
2 reader = fastavro.reader(file)
3 for record in reader:
4 print(record)
{'key': '_HYtg3FZj:77pUB1!x:?0', 'number': 55093572, 'value1': 70, 'value2': 199, 'value3': 10.167093690048159}
{'key': 'lSRL3Tk_1ZcY*F-3#\\4?', 'number': 55093573, 'value1': 29, 'value2': 193, 'value3': 27.204761505913336}
^C{'key': '~_=66N0);]JxeF>P74j', 'number': 55093577, 'value1': 93, 'value2': 198, 'value3': 5.050937769552736}
{'key': '\'<;2zu_>c"', 'number': 55093578, 'value1': 94, 'value2': 189, 'value3': 6.70526603589251}
{'key': '*Jl0ur=rJRDNe', 'number': 55093579, 'value1': 74, 'value2': 200, 'value3': 26.631595740252777}
{'key': "gX:DBO.'g+0g", 'number': 55093580, 'value1': 88, 'value2': 210, 'value3': 2.3440855043518796}
什么是 Parquet?
Apache Parquet 是一种列式存储格式,已成为数据湖存储的行业标准。这种复杂的格式通过优化大规模数据的存储和查询方式,彻底改变了大数据分析。
Parquet 的主要特点
- 面向列的存储格式
- 针对大规模数据处理进行了优化
- 通常部署在云对象存储上(Amazon S3、Google Cloud Storage)
- 现代数据湖实现的基础
- Apache Iceberg 和其他表格式的底层格式
Parquet 的突出特点之一是其智能数据组织。该格式使用行组、列块和元数据过滤来有效查询大型数据集。凭借针对特定数据类型量身定制的复杂压缩算法,Parquet 显着降低了存储成本,同时保持了高查询性能。
Parquet 的架构和用例
- 数据湖存储的首选,尤其是在云平台(S3、GCS)上
- 现代表格式(如 Apache Iceberg)的基础存储层
- 针对分析查询和大数据处理进行了优化
- 非常适合数据仓库和商业智能应用程序
- 通常与查询引擎(如 Apache Spark、Presto 和 Athena)一起使用
该格式在数据湖运动期间因其高效处理大规模分析工作负载的能力而声名鹊起。它的设计使其特别适合查询性能和存储优化是关键要求的系统。
我应该何时使用 Parquet?
当处理可以分解为不同字段的大规模分析数据时,Apache Parquet 是理想的选择。它在以下几个关键场景中表现出色
数据存储和分析
- 大规模数据仓库和数据湖
- 长期数据存储(“静态数据”)
- 商业智能和分析处理
- 需要列特定访问的复杂查询
ETL 和批处理
- ETL 工作流中记录的批处理
- 将多个记录聚合到单个 Parquet 文件中
- Kafka Connect 用于基于时间或记录计数的流数据聚合
该格式的列式结构和压缩功能使其对于需要经济高效地存储和分析大型数据集的组织特别有效。
在 Python 中使用 Parquet
在 Python 中使用 Parquet 文件格式时,我们可以使用流行的 pandas 库以及 fastparquet 或 pyarrow。我们可以编写以下代码来创建一个包含 100 万行的文件
1pip install pandas fastparquet
1import pandas as pd 2from faker import Faker 3 4fake = Faker() 5 6records = [ 7 { 8 "name": fake.name(), 9 "age": fake.random_int(min=18, max=80), 10 "email": fake.email() 11 } 12 for _ in range(1_000_000) 13] 14 15df = pd.DataFrame(records)df.to_parquet('users.parquet', index=False)
然后查询文件
1import pandas as pd 2 3df = pd.read_parquet('users.parquet') 4print(df.head())
key number value1 value2 value3
0 g1.NHQ(0%-JrY*}jxbCS 46752066 39 196 19.823986
1 jIY(eL!_Hu 46752067 37 213 12.839577
2 x}0I__1$MKvIQ+V@z00`*ol; 46752068 23 214 10.135543
3 4Yv))Dt 46752069 58 194 5.582311
4 H7AP#bi_}4{p7\^O5`^Cpu: 46752070 34 183 5.432857
将数据从 ClickHouse 导出到 Avro 和 Parquet
接下来,我们将从 ClickHouse 生成 Avro 和 Parquet 文件。我们将首先创建一个名为 events
的表
1CREATE TABLE events (
2 key String,
3 number UInt64,
4 value1 Int32,
5 value2 UInt64,
6 value3 Float64
7)
8ENGINE = MergeTree
9ORDER BY number;
现在,我们将 1 亿条记录摄取到此表中
1INSERT INTO events
2SELECT
3 randomPrintableASCII(randUniform(5, 25)) AS key,
4 number,
5 randUniform(18, 100)::Int AS value1,
6 randPoisson(200) AS value2,
7 randExponential(1/10) AS value3
8FROM numbers(100_000_000);
Ok.
0 rows in set. Elapsed: 14.013 sec. Processed 100.00 million rows, 800.00 MB (7.14 million rows/s., 57.09 MB/s.)
Peak memory usage: 91.82 MiB.
我们可以使用 INTO OUTFILE
子句来输出数据,因此让我们首先将其输出到 Avro 格式的文件中
1SELECT *
2FROM events
3INTO OUTFILE 'users.avro' TRUNCATE
4FORMAT Avro;
100000000 rows in set. Elapsed: 16.757 sec. Processed 99.61 million rows, 5.13 GB (5.94 million rows/s., 306.12 MB/s.)
Peak memory usage: 29.83 MiB.
现在对 Parquet 执行相同的操作
1SELECT *
2FROM events
3INTO OUTFILE 'users.parquet' TRUNCATE
4FORMAT Parquet;
100000000 rows in set. Elapsed: 3.161 sec. Processed 94.68 million rows, 4.88 GB (29.95 million rows/s., 1.54 GB/s.)
Peak memory usage: 1.07 GiB.
使用 ClickHouse 查询 Avro 和 Parquet
ClickHouse 支持直接查询 Avro 和 Parquet 文件。让我们首先查询 Avro 版本的数据
1SELECT
2 count(),
3 quantiles(0.5, 0.9, 0.99)(value1) AS quantV1,
4 quantiles(0.5, 0.9, 0.99)(value2) AS quantV2,
5 arrayMap(x -> round(x, 2), quantiles(0.5, 0.9, 0.99)(value3)) AS quantV3
6FROM file('users.avro');
┌───count()─┬─quantV1────┬─quantV2───────┬─quantV3────────────┐
1. │ 100000000 │ [58,91,99] │ [200,219,233] │ [6.85,22.46,46.01] │
└───────────┴────────────┴───────────────┴────────────────────┘
1 row in set. Elapsed: 20.318 sec. Processed 99.68 million rows, 3.09 GB (4.91 million rows/s., 152.03 MB/s.)
Peak memory usage: 8.75 MiB.
现在,让我们对 Parquet 版本的数据执行相同的操作
1SELECT
2 count(),
3 quantiles(0.5, 0.9, 0.99)(value1) AS quantV1,
4 quantiles(0.5, 0.9, 0.99)(value2) AS quantV2,
5 arrayMap(x -> round(x, 2), quantiles(0.5, 0.9, 0.99)(value3)) AS quantV3
6FROM file('users.parquet');
┌───count()─┬─quantV1────┬─quantV2───────┬─quantV3────────────┐
1. │ 100000000 │ [58,92,99] │ [200,218,234] │ [6.96,22.54,44.25] │
└───────────┴────────────┴───────────────┴────────────────────┘
1 row in set. Elapsed: 0.482 sec. Processed 85.27 million rows, 2.09 GB (177.09 million rows/s., 4.34 GB/s.)
Peak memory usage: 231.83 MiB.