Google BigQuery 是一款无服务器云数据仓库,可对 PB 级数据进行可扩展分析。BigQuery 与 GCP 生态系统很好地集成在一起,已应用于各种报告和批处理分析用例。
越来越多的场景需要在高吞吐量流式数据源之上实现亚秒级交互式性能。为了满足这些需求,我们越来越多地看到用户在 BigQuery 旁边部署 ClickHouse,以加快为面向客户的应用程序和内部分析提供支持的查询速度。因此,ClickHouse 和 BigQuery 作为补充技术,为构建现代分析堆栈的企业提供服务。
在这篇文章中,我们将探讨在 BigQuery 和 ClickHouse 之间同步数据的选择。这包括批量移动数据,以及使用 BigQuery 计划查询和Google DataFlow(Apache Beam 的运行器)连续追加新数据。我们还将向您介绍使用 ClickHouse 进行基本数据分析。
比较 ClickHouse 和 BigQuery 的用例
BigQuery 擅长在 PB 级数据上,针对各种查询访问模式提供可预测的性能。它能够共享计算、混洗数据和溢出到磁盘,意味着即使是最复杂的深度分析查询也能在可接受的时间内为分析师和业务报告提供服务。这些功能非常适合经典数据仓库中与较低查询每秒 (QPS) 场景相关的即席查询,尤其是在查询访问模式未知或变化很大时。
相比之下,ClickHouse 针对以下用例进行了优化:
- 亚秒级分析查询,适用于面向用户的应用程序,其中访问模式已知且可预测。这有时可能是 Big Query 中数据的子集,甚至可能是用于一组专注的分析查询的整个数据集。请注意,虽然 BigQuery 从未“慢”,但它通常在秒级内提供性能,这使得构建动态实时应用程序更具挑战性。
- 随着应用程序使用量的增长,可能不受限制的查询每秒。ClickHouse 旨在以高并发方式提供查询,并且不对并行查询的数量施加限制。
- 支持高插入率,同时仍能对最近数据实现低延迟并发查询,而传统上像 BigQuery 这样的数据仓库并未针对此进行优化。
这些属性通常需要在需要展现近实时分析的外部或内部应用程序中。
定价模式也可能是做出技术选择的一个因素。BigQuery 根据扫描的数据量收费,这非常适合即席分析查询,但对于实时分析工作负载来说,成本可能很高。相反,ClickHouse 是开源软件,可以在您自己的基础架构上部署,也可以托管在ClickHouse Cloud 上,后者只根据消耗的计算和存储收费。请注意,预留 BigQuery 时段 可以解决其中一些定价问题,但这需要高使用阈值,并非所有用户都能做到。
设置和数据集
本博文中的示例使用ClickHouse Cloud,它提供免费试用,可以完成我们介绍的场景。我们使用 sql.clickhouse.com 上的公共可用云环境,该环境总共拥有 720 GB 的内存和 180 个 vCPU。所有说明也与运行最新版本 (23.1) 的自托管 ClickHouse 部署兼容。
我们使用 BigQuery 公共项目 中提供的以太坊加密货币数据集作为我们的数据集。我们将探讨此数据集的详细信息推迟到以后的博文中,但建议阅读 Google 关于如何构建此数据集的博客文章以及关于查询此数据集和其他加密数据集的后续文章。阅读本博文不需要事先了解加密货币,但对于有兴趣的用户,以太坊简介 提供了有用的概述。Google 在此数据集之上记录了许多很棒的查询,我们在后面的博文中会提到这些查询,并且我们已经在这里整理了等效的 ClickHouse 查询,欢迎贡献。
总之,我们的数据集包含 4 个表。这是完整数据的子集,但足以满足大多数常见问题。
- 区块 - 区块是包含一系列交易的批次,其中包含链中前一个区块的哈希值。
- 交易 - 交易是从帐户发出的加密签名指令。帐户将发起交易以更新以太坊网络的状态,例如将 ETH 从一个帐户转移到另一个帐户。
- 跟踪 - 允许查询所有以太坊地址及其余额的内部交易。
- 合约 - “智能合约”仅仅是在以太坊区块链上运行的程序。
对于想要绕过 BigQuery 插入此数据集的用户,可以使用优秀的以太坊 ETL 工具,已提交 PR 支持ClickHouse 作为目标。上述表格代表了一个子集,并解决了最常见的查询,同时提供了相当大的数据量。或者,此数据在 BigQuery 公共项目中可用 - 用户只需根据扫描的数据量付费进行查询,每月免费提供 1 TB 数据。此数据由 Google不断更新和维护,更新通常仅落后于实时区块链 4 分钟。为了让用户能够重现示例,我们已将此数据提供到sql.clickhouse.com 上以供查询,并提供到公共存储桶 gs://clickhouse_public_datasets/ethereum
中。
假设
在选择将哪些数据存储在 ClickHouse 中,而哪些数据存储在 BigQuery 中时,我们通常看到用户识别最常用的查询,这些查询将由实时分析应用程序使用。对于流式分析数据,这通常构成基于时间维度的子集数据。
在本文的剩余部分中,我们做以下假设:
- 数据不断生成并存储在 BigQuery 中,需要将新行不断流式传输到 ClickHouse。
- 数据是追加型且不可变的。不需要选择性地更新行,虽然删除旧数据是预期的,并在下面描述。
- 数据中存在时间维度或递增的数字标识符,允许识别要复制到 ClickHouse 的新行。
以太坊区块链数据本质上满足这些属性。在我们的例子中,我们利用区块时间戳。下面我们将讨论将历史数据从 BigQuery 迁移到 ClickHouse,以及持续追加新数据的方法。
BigQuery 和 ClickHouse 数据类型和模式之间的差异
在 ClickHouse 和 BigQuery 之间移动数据的用户会立即注意到,ClickHouse 在数值精度方面提供了更细粒度的精度。例如,BigQuery 提供了数值类型 INT64、NUMERIC、BIGNUMERIC 和 FLOAT64。对比 ClickHouse,它为十进制、浮点数 和 整数 提供了多种精度。通过这些,ClickHouse 用户可以优化存储和内存开销,从而实现更快的查询和更低的资源消耗。下面我们将为每个 BigQuery 类型映射等效的 ClickHouse 类型
当 ClickHouse 类型有多个选项时,请考虑数据的实际范围并选择最低要求的类型。此外,请考虑利用 适当的编解码器 以进一步压缩。
可以使用以下查询检索 BigQuery 表的当前模式
SELECT table_name, ddl FROM `bigquery-public-data`.crypto_ethereum.INFORMATION_SCHEMA.TABLES
WHERE table_name = 'blocks';
原始 BigQuery 模式可以在 此处找到。使用上述查询的结果,我们可以根据每列的已知范围创建具有适当类型的 ClickHouse 表。您可以运行额外的查询以识别数据范围和基数,例如
SELECT
MAX(number) AS max_number,
MIN(number) AS min_number,
MAX(size) AS max_size,
MIN(size) AS min_size
FROM bigquery-public-data.crypto_ethereum.blocks
max_number min_number max_size min_size
16547585 0 1501436 514
我们对这些模式进行了基本优化,使用了适当的类型和编解码器来最小化存储,但将完整分析留待以后专门针对此数据集的博客。块的模式
CREATE TABLE ethereum.blocks
(
`number` UInt32 CODEC(Delta(4), ZSTD(1)),
`hash` String,
`parent_hash` String,
`nonce` String,
`sha3_uncles` String,
`logs_bloom` String,
`transactions_root` String,
`state_root` String,
`receipts_root` String,
`miner` String,
`difficulty` Decimal(38, 0),
`total_difficulty` Decimal(38, 0),
`size` UInt32 CODEC(Delta(4), ZSTD(1)),
`extra_data` String,
`gas_limit` UInt32 CODEC(Delta(4), ZSTD(1)),
`gas_used` UInt32 CODEC(Delta(4), ZSTD(1)),
`timestamp` DateTime CODEC(Delta(4), ZSTD(1)),
`transaction_count` UInt16,
`base_fee_per_gas` UInt64
)
ENGINE = MergeTree
ORDER BY timestamp
其他表模式可以在 此处找到。请注意,我们不会将列设置为可空,尽管它们在原始 BigQuery 模式中是可空的。对于大多数查询,无需区分默认值和空值。通过使用默认值,我们可以避免可空列的额外 UInt8 列开销。我们还为这些表选择了主键,以优化预期的查询。最后,鉴于 BigQuery 根据 物理大小或逻辑大小对数据存储收费,因此值得记录这些表的大小。此信息可以通过 UI (选择表并检查详细信息)或通过简单的查询获得
SELECT
table_name, total_rows, round(total_physical_bytes / power(1024, 3),2) as total_physical_gb, round(total_logical_bytes /power(1024, 3),2) as total_logical_gb
FROM
`<project id>.region-<region>.INFORMATION_SCHEMA.TABLE_STORAGE` WHERE table_name IN ('transactions', 'contracts', 'blocks', 'traces') AND table_schema='crypto_ethereum'
请注意,此查询无法在bigquery-public-data
项目上执行,需要将表复制到您自己的项目中。
此表显示了 2023 年 2 月 1 日捕获的 BigQuery 中存储的数据集统计信息。BigQuery 的“物理”大小和“逻辑”大小大致等同于 ClickHouse 的“压缩”大小和“未压缩”大小,分别。
通过 Google Cloud Storage (GCS) 批量加载
BigQuery 支持将数据导出到 Google 的对象存储 (GCS)。在本例中,将公共表blocks
、traces
、transactions
和contracts
导出到 GCS,然后将数据导入 ClickHouse Cloud。我们使用 s3 表函数,因为 GCS 与 Amazon 的简单存储服务 (S3)互操作。此方法有许多优点
- BigQuery 导出功能支持用于导出数据子集的过滤器。
- BigQuery 支持导出到 Parquet、Avro、JSON 和 CSV 格式以及几种 压缩类型 - ClickHouse 都支持。
- GCS 支持 对象生命周期管理,允许在指定时间段后删除已导出并导入 ClickHouse 的数据。
- Google 允许每天免费导出最多 50TB 数据到 GCS。用户只需为 GCS 存储付费。
- 导出会自动生成多个文件,每个文件最多包含 1GB 的表数据。这对 ClickHouse 来说很有益,因为它允许并行化导入。
在尝试以下示例之前,我们建议用户查看 导出所需的权限 和 位置建议,以最大限度地提高导出和导入性能。
将数据导出到 GCS
在我们的示例中,我们使用 BigQuery SQL 接口 - 请参阅其他方法,例如 此处的 bq
。下面我们将使用 EXPORT DATA 语句将 blocks 表导出到指定的 GCS 存储桶。虽然在下面的示例中,我们导出了整个数据集,但 SELECT 语句允许导出数据子集。
EXPORT DATA
OPTIONS (
uri = 'gs://clickhouse_public_datasets/ethereum/blocks/*.csv.gz',
format = 'CSV',
compression = 'GZIP',
overwrite = true,
header = true,
field_delimiter = ',')
AS (
SELECT *
FROM bigquery-public-data.crypto_ethereum.blocks
ORDER BY number ASC
);
我们导出到 CSV 并请求使用 GZIP 压缩文件。对于块数据,这大约需要 1 分钟。我们还在 uri
参数中有一个 *
字符。这确保输出 被分割成多个文件,并带有数值递增的后缀,如果导出超过 1GB 数据。
将数据从 GCS 导入 ClickHouse
导出完成后,我们可以将此数据导入 ClickHouse 表。请注意,我们会在运行以下 INSERT INTO blocks 表之前预先创建表。
SET parallel_distributed_insert_select = 1
INSERT INTO blocks
SELECT number, hash, parent_hash, nonce, sha3_uncles, logs_bloom, transactions_root, state_root, receipts_root, miner, difficulty, total_difficulty, size, extra_data, gas_limit, gas_used, timestamp, transaction_count, base_fee_per_gas
FROM s3Cluster('default', 'https://storage.googleapis.com/clickhouse_public_datasets/ethereum/blocks/*.gz', 'CSVWithNames', 'timestamp DateTime, number Int64, hash String, parent_hash String, nonce String, sha3_uncles String, logs_bloom String, transactions_root String, state_root String, receipts_root String, miner String, difficulty Decimal(38, 0), total_difficulty Decimal(38, 0), size Int64, extra_data String, gas_limit Int64,gas_used Int64,transaction_count Int64,base_fee_per_gas Int64')
0 rows in set. Elapsed: 22.712 sec. Processed 16.54 million rows, 19.52 GB (728.29 thousand rows/s., 859.50 MB/s.)
我们利用 s3Cluster 函数,它是 s3 函数的分布式变体,允许使用 ClickHouse Cloud 中的整个集群资源来进行读写。设置 parallel_distributed_insert_select=1
确保插入被并行化,并且数据被插入到读取它的同一个节点(跳过写入时的发起节点)。
在上面的示例中,我们没有提供身份验证密钥,因为存储桶是公开的。可以使用私有存储桶 支持,如果用户为存储桶生成了 HMAC 密钥 - 可以在服务级别或用户级别进行操作。Access key
和 Secret
可以分别替换为 aws_access_key_id
和 aws_secret_access_key
。在以下安全示例中,我们对数据进行就地查询 - 这是一种用于临时分析的典型访问模式
SELECT max(number) AS max_block_number
FROM s3('https://storage.googleapis.com/clickhouse_public_datasets/ethereum/blocks/*.csv.gz', 'CSVWithNames')
┌─max_block_number─┐
│ 16542640 │
└──────────────────┘
1 row in set. Elapsed: 15.926 sec. Processed 16.54 million rows, 148.88 MB (1.04 million rows/s., 9.35 MB/s.)
上面使用的 CSV 作为交换格式不是最优的。Parquet 作为一种面向列的格式,代表了一种更好的交换格式,因为它本质上是压缩的,并且 BigQuery 导出和 ClickHouse 查询的速度更快。如果使用 Parquet,您需要将可空列映射到非空等效列,因为上述模式选择(或者在 ClickHouse 中使用可空列)。例如,下面我们使用 ifNull
函数将 base_fee_per_gas
的空值映射为 0。其他压缩算法 在导出到 Parquet 时可用 可以进一步改进该过程,但我们将把此练习留给读者。
INSERT INTO blocks
SELECT number, hash, parent_hash, nonce, sha3_uncles, logs_bloom, transactions_root, state_root, receipts_root, miner, difficulty, total_difficulty, size, extra_data, gas_limit, gas_used, timestamp, transaction_count, ifNull(base_fee_per_gas, 0) AS base_fee_per_gas
FROM s3Cluster('default', 'https://storage.googleapis.com/bigquery_ethereum_export/blocks/*.parquet', 'GOOGR2DYAAX6RVODIMREAVPB', '+tNQdQQ0DCEItWQlJseXcidKSG6pOU65o1r0N17O')
我们对每个表重复了上述操作,使用 Parquet 格式记录了以下时间。使用这种方法,我们能够在不到一个小时的时间内将 4TB 数据从 BigQuery 迁移到 ClickHouse!
ClickHouse 存储效率与 BigQuery 的对比
如下所示,ClickHouse 实现了大约 8 倍的压缩,将 BigQuery 存储效率提高了 30%。
SELECT table,
formatReadableSize(sum(data_compressed_bytes)) AS compressed_size,
formatReadableSize(sum(data_uncompressed_bytes)) AS uncompressed_size,
round(sum(data_uncompressed_bytes) / sum(data_compressed_bytes), 2) AS ratio
FROM system.columns
WHERE database = 'ethereum'
GROUP BY table
ORDER BY sum(data_compressed_bytes) DESC
┌─table────────┬─compressed_size─┬─uncompressed_size─┬─ratio─┐
│ traces │ 509.39 GiB │ 3.85 TiB │ 7.74 │
│ transactions │ 228.52 GiB │ 1.14 TiB │ 5.09 │
│ blocks │ 5.37 GiB │ 15.62 GiB │ 2.92 │
│ contracts │ 2.98 GiB │ 15.78 GiB │ 5.3 │
└──────────────┴─────────────────┴───────────────────┴───────┘
下面总结一下,我们可以看到 ClickHouse 的压缩效率比 BigQuery 高约 30%。
使用计划查询
上述方法适用于批量数据加载,这对开发和实验很有用。但它没有解决 BigQuery 表正在接收新数据这一事实。因此,在生产环境中,我们需要另一种方法来持续处理追加的新数据。
计划数据导出
一种方法是简单地使用 BigQuery 的 计划查询功能来定期计划导出。如果您能够接受数据插入 ClickHouse 的一些延迟,这种方法很容易实施和维护。
在我们的示例中,我们将每小时计划一次导出。每小时,我们将导出过去 60 分钟的数据。但是,我们将此窗口偏移以允许块提交到区块链并在 BigQuery 中显示的延迟。通常,这不会超过 4 分钟,我们使用 15 分钟来确保安全。因此,每次运行导出时,我们都会导出从 now-75mins
到 now-15mins
的所有行。这在下面可视化
为了确保我们的窗口查询不会错过任何数据,我们需要将我们的时间间隔计算键入计划时间,该时间可以通过一个名为 [run_time](https://cloud.google.com/bigquery/docs/scheduling-queries) 的变量获得,而不是执行时间,执行时间可能略有不同。
我们现在可以计划以下查询,每小时运行一次,以导出以太坊块数据。**确保您安排的第一个导出作业至少在导入后 75 分钟开始,以避免重复!**
DECLARE
export_time_lower TIMESTAMP DEFAULT TIMESTAMP_SUB(@run_time, INTERVAL 75 MINUTE);
DECLARE
export_time_upper TIMESTAMP DEFAULT TIMESTAMP_SUB(@run_time, INTERVAL 15 MINUTE);
EXPORT DATA
OPTIONS ( uri = CONCAT('gs://clickhouse_public_datasets/ethereum/blocks/increment-', CAST(UNIX_SECONDS(TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), HOUR)) AS STRING), '*.parquet'),
format = 'PARQUET', overwrite = true) AS (
SELECT
*
FROM
bigquery-public-data.crypto_ethereum.blocks
WHERE
timestamp > export_time_lower
AND timestamp <= export_time_upper
ORDER BY
number ASC );
这种增量导出完成得快得多(大多数情况下只需几分钟),因为 BigQuery 表(按时间戳列分区)使这些过滤子句能够快速运行。
有关查询语法的几个详细信息
- 上面的查询只导出区块数据。其他表的等效查询非常相似,只是使用了 `block_timestamp` 列代替了 `timestamp` 列。
- 导出文件带有 `increment-` 前缀。这允许我们在导入时只针对增量文件(见下文)。
- 我们在文件名中包含当前时间作为时间戳秒,使用表达式 `CAST(UNIX_SECONDS(TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), HOUR)) AS STRING)`。这允许我们在导入时针对特定文件(见下文)。
调度数据导入
在撰写本文时,ClickHouse 还没有内置的调度导入功能(提案正在讨论中),但我们可以使用外部作业来调度此导入。这可以通过多种方式实现——使用 Lambda 函数、Cloud Run,甚至dbt 中的增量物化——但为了演示简单,我们使用简单的 cron。
以下 bash 脚本可以在导出完成后由 cron 作业定期运行。此示例处理 blocks 表,但可以轻松地适应其他表。
#!/bin/bash
max_date=$(clickhouse-client --query "SELECT toInt64(toStartOfHour(toDateTime(max(block_timestamp))) + toIntervalHour(1)) AS next FROM ethereum.transactions");
clickhouse-client --query "INSERT INTO blocks SELECT number, hash, parent_hash, nonce, sha3_uncles, logs_bloom, transactions_root, state_root, receipts_root, miner, difficulty, total_difficulty, size, extra_data, gas_limit, gas_used, timestamp, transaction_count, ifNull(base_fee_per_gas, 0) AS base_fee_per_gas
FROM s3(''https://storage.googleapis.com/clickhouse_public_datasets/ethereum/blocks/increment-' || toString(${max_date}) || '-*.parquet')"
这里的第一行识别 ClickHouse 中数据的当前最大时间。我们使用 toStartOfHour
函数将其四舍五入到小时,并在将值转换为 Int64 之前添加一小时。这将给我们一个时间戳秒值。当加上 `increment-` 前缀时,这将标识 cron 作业执行时当前小时的文件。下一行使用 s3 函数对该文件进行简单的导入到表中。
这种方法的另一种选择是导入所有包含时间戳大于 `max_date` 的 `increment-*` 行,即 `WHERE timestamp > ${max_date}`。但是,这将要求我们扫描所有增量文件。虽然最初速度很快,但随着文件数量的增加,这种方法的性能会随着时间推移而下降。
填补空白
如果我们执行批量导入,然后调度上面的导入和导出查询,我们将在批量加载完成和增量加载开始之间的这段时间内不可避免地存在数据“空白”。再次注意,调度增量导出在批量数据导入完成 75 分钟后开始的重要性,以避免重复。
此空白可以通过以下步骤轻松解决。我们为 blocks 数据集概述了这些步骤,但使用 `block_timestamp` 列,其他表的流程相同。请注意,我们是在将增量数据加载到 ClickHouse 之前进行此操作。
- 通过查询 ClickHouse 中数据的最大时间戳来识别“空白”的下限。
SELECT max(timestamp)
FROM blocks
┌──────max(timestamp)─┐
│ 2023-02-02 13:34:11 │
└─────────────────────┘
- 假设上面的查询已调度且增量导入已完成,请识别导出文件中最小的时间戳,即“空白”的上限。
SELECT min(timestamp)
FROM s3('https://storage.googleapis.com/bigquery_ethereum_export/blocks/increment-*.parquet')
┌─────────────min(timestamp)─┐
│ 2023-02-02 14:22:47.000000 │
└────────────────────────────┘
- 导出 (2) 中识别的时间范围内的数据。使用允许轻松识别的前缀,例如 `gap`。
EXPORT DATA
OPTIONS ( uri = CONCAT('gs://bigquery_ethereum_export/blocks/gap-*.parquet'),
format = 'PARQUET',
overwrite = TRUE) AS (
SELECT * FROM
bigquery-public-data.crypto_ethereum.blocks
WHERE
timestamp > '2023-02-02 13:34:11' AND timestamp < '2023-02-02 14:22:47'
ORDER BY
number ASC );
- 使用正确的 `gap-` 前缀将这些文件导入表中。
INSERT INTO blocks SELECT number, hash, parent_hash, nonce, sha3_uncles, logs_bloom, transactions_root, state_root, receipts_root, miner, difficulty, total_difficulty, size, extra_data, gas_limit, gas_used, timestamp, transaction_count, ifNull(base_fee_per_gas, 0) AS base_fee_per_gas
FROM s3('https://storage.googleapis.com/bigquery_ethereum_export/blocks/gap-*.parquet')
处理故障
如果导出失败,上述方法需要手动干预。导入作业更健壮,永远不会多次导入同一个文件。如果最新的文件被导入,或者当前小时的文件未能导出,则不执行任何操作。如果导出失败,然后被修复,则此脚本可以根据需要运行多次以填补缺失的小时并赶上进度。
以其当前形式,早期的脚本意味着 ClickHouse 中的以太坊数据最多落后区块链 85 分钟——假设我们在增量导出完成后 10 分钟执行 cron。我们可以调整上面的流程,将导出减少到分钟级别。这将要求我们调整上面的导入脚本,可能使用过滤器——留给读者练习,或者考虑其他工具…
使用 Dataflow 在 BigQuery 和 ClickHouse 之间流式传输数据
Google Cloud Dataflow 是一种完全托管的服务(一个 Runner),用于在 Google Cloud Platform 生态系统中执行 Apache Beam 管道。Apache Beam 是一种开源的统一编程模型(由 Google 开发),用于定义和执行数据处理管道,包括 ETL、批处理和流式(连续)处理。基于 Dataflow 模型论文,Dataflow 允许用户使用 Python、Go 或 Java 开发管道,并将这些管道部署到 GCE 以运行。这些管道由 I/O 连接器 组成,这些连接器连接到数据源并提供读(源)和写(接收器)操作,以及 转换操作,这些操作有助于数据处理。关键是此框架提供允许这些管道及其操作以 并行方式 执行的原语,无论是以批处理方式还是流式传输方式。我们强烈建议有兴趣使用这种方法的用户阅读 基本概念。
虽然其他数据集可能需要更多转换逻辑,但我们只想在 BigQuery 和 ClickHouse 之间流式传输数据。除了将行分组到批次(以实现有效的 ClickHouse 插入)之外,不需要其他转换。BigQuery 受 核心 I/O 连接器 支持,该连接器为 读取行提供简单的源接口。ClickHouse 通过 针对 Java SDK 的官方 IO 连接器 在 Apache Beam 中得到支持。目前,这仅提供接收器支持,并且要求目标表存在。虽然 python SDK 没有内置的 ClickHouse 支持,但 Beam 提供了一种简单的方法来编写 通过 ParDo 转换的接收器连接器。Apache Beam 对外部库的支持 允许我们使用 ClickHouse python 客户端执行插入 ClickHouse 的操作。最后,简单的 BatchElements 转换允许我们在插入 ClickHouse 之前生成分组行。我们展示了下面一个基本实现的重要部分,以展示这种方法的潜力。完整示例 在此处。
执行此管道以迁移 blocks 表将要求 python 代码按如下所示运行。这假设您已将机器配置为 使用 Google Dataflow 以及拥有所需的权限
python -m sync_clickhouse --target_table ethereum.blocks --clickhouse_host <clickhouse_host> --clickhouse_password <password> --region us-central1 --runner DataflowRunner --project <GCE project> --temp_location gs://<bucket> --requirements_file requirements.txt
请注意,还需要通过 requirements.txt 和数据可以被缓存的 GCS 存储桶提供依赖项,因为 BigQuery 连接器通过将数据导出到存储桶并使用它作为中间存储来工作——有点像我们之前的方法。GCE 控制台提供了一个很好的过程可视化。
上述方法存在以下限制,我们将这些改进留给读者练习。
- 理想的解决方案将使用一个永远运行的流式管道,以便随着更多数据添加到源中而进行处理。但是,流式管道需要一个 无界源,而由于 BigQuery 源是有界的,因此它不能在流式管道中使用。相反,我们使用一个批处理管道,该管道根据快照运行到完成,然后停止。这里最简单的解决方案是让管道在启动时识别 ClickHouse 中的当前最大时间戳,并将其用作 BigQuery 的筛选条件。然后可以轻松地使用 Cloud Scheduler 安排管道运行。
- 我们的 ClickHouse 连接器必须将行结构化为二维数组以供 ClickHouse python 客户端使用。此工作也可以作为并行化的 ParDo 完成。
- 我们使用 BatchElements 转换 进行批处理,其固定大小为 10000k。其他数据集可能需要调整此值。请注意,此转换也可以进行自适应批处理。
- 我们通过 requirements.txt 文件提供依赖项。这是入门最简单的方法,但 不建议在生产环境中使用。
关于连续数据加载的说明
我们针对此特定数据集实现连续数据加载的方法实际上可以通过其他方式更轻松地实现,因为 Google 已通过其他方式提供它。以太坊数据由 Google 发布在公共 Pub/Sub 主题上,数据可以在可用时被消费。虽然我们仍然会使用以下技术将历史以太坊数据加载到 ClickHouse,但可能会编写一个 Google Dataflow 作业,或者使用 Vector,它既支持 pub/sub 作为源,也支持 ClickHouse 作为接收器。可能存在解决此问题的其他方法,我们欢迎提出建议。
在 ClickHouse 中删除旧数据
对于大多数部署,ClickHouse 优越的数据压缩意味着您可以长期以细粒度格式存储数据。对于我们特定的以太坊数据集,这可能不是特别有用,因为我们可能需要为许多查询保留区块链的完整历史记录,例如计算账户余额。
但是,存在简单且可扩展的方法来删除可能适用于其他数据集的旧数据。例如,可以使用 TTL 功能 在行级别或列级别使 ClickHouse 中的旧数据过期。通过 按日期对表进行分区,可以提高效率,从而允许在设置的时间间隔内有效地删除数据。为了举例说明,我们修改了下面 `blocks` 表的模式以按月分区。反过来,使用 TTL 功能使五年以上的数据过期。设置 `ttl_only_drop_parts` 确保只有当其中所有行都过期时才删除一个分区。
CREATE TABLE blocks
(
...
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY timestamp
TTL timestamp + INTERVAL 60 MONTH DELETE
SETTINGS ttl_only_drop_parts=1
请注意,分区可能 对查询产生正面和负面影响,应该更多地被视为数据管理功能而不是优化查询性能的工具。
在 ClickHouse 中运行查询
此数据集值得一篇完整的博客来介绍可能的查询。用于将此数据加载到 BigQuery 的 etherium-etl 工具 的作者 发布了一份关于此数据集的见解的优秀博客列表。在后面的博客中,我们将介绍这些查询,并展示如何将它们转换为 ClickHouse 语法,以及如何显著提高某些查询的速度。现在,我们介绍一些更简单的查询,帮助您入门。
按天计算的以太币供应量
原始 BigQuery 查询,作为 Awesome BigQuery views 的一部分进行记录,并在 此处 进行讨论,执行时间为 6 秒。经过优化的 ClickHouse 查询运行时间为 0.009 秒,与 BigQuery 相比,差异巨大。
ALTER TABLE traces ADD PROJECTION trace_type_projection (
SELECT trace_type,
toStartOfDay(block_timestamp) as date, sum(value) as value GROUP BY trace_type, date
)
ALTER TABLE traces MATERIALIZE PROJECTION trace_type_projection
WITH ether_emitted_by_date AS
(
SELECT
date,
sum(value) AS value
FROM traces
WHERE trace_type IN ('genesis', 'reward')
GROUP BY toStartOfDay(block_timestamp) AS date
)
SELECT
date,
sum(value) OVER (ORDER BY date ASC) / power(10, 18) AS supply
FROM ether_emitted_by_date
┌────────────────date─┬────────────supply─┐
│ 1970-01-01 00:00:00 │ 72009990.49948001 │
│ 2015-07-30 00:00:00 │ 72049301.59323001 │
│ 2015-07-31 00:00:00 │ 72085493.31198 │
│ 2015-08-01 00:00:00 │ 72113195.49948 │
│ 2015-08-02 00:00:00 │ 72141422.68698 │
...
3 rows in set. Elapsed: 0.009 sec. Processed 11.43 thousand rows, 509.00 KB (1.23 million rows/s., 54.70 MB/s.)
请注意,此查询已使用投影进行优化 - 这是 ClickHouse 中众多用于针对特定工作负载进行优化的工具之一。
平均以太坊成本随时间变化
摘自 Kaggle 上此数据集最受欢迎的笔记本。我们最初重写了此查询以包含左反连接,尽管似乎不需要。因此使用了查询的优化版本。
SELECT
SUM(value / POWER(10, 18)) AS sum_tx_ether,
AVG(gas_price * (receipt_gas_used / POWER(10, 18))) AS avg_tx_gas_cost,
toStartOfDay(block_timestamp) AS tx_date
FROM transactions
WHERE (receipt_status = 1) AND (value > 0) AND (block_timestamp > '2018-01-01') AND (block_timestamp <= '2018-12-31')
GROUP BY tx_date
ORDER BY tx_date ASC
┌───────sum_tx_ether─┬───────avg_tx_gas_cost─┬─────────────tx_date─┐
│ 8246871.766893768 │ 0.0005370300954867644 │ 2018-01-01 00:00:00 │
│ 13984780.926949782 │ 0.0005844979818261873 │ 2018-01-02 00:00:00 │
│ 13975588.850788314 │ 0.0006050748915709839 │ 2018-01-03 00:00:00 │
│ 20231765.935660254 │ 0.0007000256320466776 │ 2018-01-04 00:00:00 │
364 rows in set. Elapsed: 0.673 sec. Processed 250.90 million rows, 8.28 GB (373.01 million rows/s., 12.31 GB/s.)
此查询由 BigQuery 市场中列出的数据集推广,可以在此加载。
SELECT to_address, count() AS tx_count
FROM transactions
WHERE to_address IN (
SELECT address
FROM contracts
WHERE is_erc721 = true
)
GROUP BY to_address
ORDER BY tx_count DESC
LIMIT 5
┌─to_address─────────────────────────────────┬─tx_count─┐
│ 0x06012c8cf97bead5deae237070f9587f8e7a266d │ 4949539 │
│ 0x06a6a7af298129e3a2ab396c9c06f91d3c54aba8 │ 646405 │
│ 0xd73be539d6b2076bab83ca6ba62dfe189abc6bbe │ 443184 │
│ 0x1a94fce7ef36bc90959e206ba569a12afbc91ca1 │ 181073 │
│ 0xf5b0a3efb8e8e4c201e2a935f110eaaf3ffecb8d │ 148123 │
└────────────────────────────────────────────┴──────────┘
10 rows in set. Elapsed: 0.804 sec. Processed 374.39 million rows, 19.09 GB (465.46 million rows/s., 23.74 GB/s.)
通过将这三个查询的执行卸载到 ClickHouse,它们的延迟都得到了一定程度的改善。延迟的改进将有所不同,访问模式已知且专注于单个表的分析的查询将受益最大。上面的查询是这方面的完美例子,可以想象它们为应用程序提供动力。在这些情况下,可以利用 ClickHouse 的特定功能,例如主键和投影,以实现超过 10 倍的性能改进。
结论
在这篇博文中,我们探讨了如何将数据从 BigQuery 移动到 ClickHouse 进行分析,以及这两种技术如何相互补充。我们展示了一些加载数据并保持数据同步的方法,以及如何在这些数据之上利用 ClickHouse 进行实时分析。在以后的文章中,我们将更详细地探讨这个以太坊数据集。
同时,我们在公共 ClickHouse 部署中提供了此数据集,供您探索 (sql.clickhouse.com),以及公共 GCS 存储桶 gs://clickhouse_public_datasets/ethereum
。欢迎您通过下载 ClickHouse 的免费开源版本并自行部署或启动ClickHouse Cloud 免费试用来尝试它。ClickHouse Cloud 是基于 ClickHouse 的完全托管的无服务器产品,您可以在其中轻松开始构建实时应用程序,而无需担心部署和管理基础设施。
资源
我们建议您参考以下有关以太坊和查询此数据集的资源。