博客 / 工程

ClickHouse 与 BigQuery:使用 ClickHouse 在 BigQuery 数据之上服务实时查询

author avatar
Dale McDiarmid
2023 年 2 月 27 日 - 33 分钟阅读

big_query_clickhouse.png

Google BigQuery 是一种无服务器云数据仓库,支持对 PB 级数据进行可扩展分析。BigQuery 完美集成到 GCP 生态系统中,已广泛应用于各种报告和批量分析用例。

越来越多的场景需要用户在高速吞吐量流式数据源之上获得亚秒级交互性能。为了满足这些需求,我们越来越多地看到用户将 ClickHouse 与 BigQuery 一起部署,以加速为面向客户的应用程序和内部分析提供支持的查询。因此,ClickHouse 和 BigQuery 可以作为构建现代分析堆栈的企业的互补技术。

在这篇文章中,我们探讨了在 BigQuery 和 ClickHouse 之间同步数据的选项。这包括批量移动数据以及使用 BigQuery 计划查询和 Google DataFlowApache Beam 的运行器)持续追加新数据。我们还将向您介绍使用 ClickHouse 进行的基本数据分析。

ClickHouse 和 BigQuery 用例比较

BigQuery 擅长在 PB 级数据的各种查询访问模式下提供可预测的性能。它共享计算、混洗数据和溢出到磁盘的能力意味着,即使是最复杂的深度分析查询也能在分析师和业务报告可接受的时间内得到服务。这些功能非常适合与经典数据仓库相关的低(或较低)每秒查询数 (QPS) 场景中的临时查询,尤其是在查询访问模式未知或高度可变的情况下。

相比之下,ClickHouse 针对需要以下功能的用例进行了优化:

  • 亚秒级分析查询,用于面向用户的应用程序,其中访问模式是已知且可预测的。这有时可能是 BigQuery 中数据的子集,甚至是针对一组重点分析查询的整个数据集。请注意,虽然 BigQuery 从不“慢”,但它通常以秒为单位提供性能,这使得构建动态实时应用程序更具挑战性。
  • 潜在的无限每秒查询数,随着应用程序使用量的增长。ClickHouse 旨在以高并发性服务查询,并且对并行查询的数量没有强制限制。
  • 支持高插入速率,同时仍能在最近的数据上实现低延迟并发查询,而数据仓库(如 BigQuery)传统上并未针对此进行优化。

外部或内部面向用户的应用程序通常需要这些属性,以呈现近乎实时的分析。

定价模型也可能是技术选择的一个因素。BigQuery 按扫描的数据量收费,这对于临时分析查询非常有效,但对于实时分析工作负载来说,可能会被视为成本高昂。相反,ClickHouse 是开源的,可以部署在您自己的基础设施上,也可以托管在 ClickHouse Cloud 中,后者仅根据消耗的计算和存储收费。请注意,其中一些定价挑战可以通过预留 BigQuery 插槽来解决,但这需要所有用户都无法实现的较高使用阈值。

设置和数据集

本博客文章中的示例使用 ClickHouse Cloud,它提供免费试用,允许完成我们涵盖的场景。我们使用 sql.clickhouse.com 上公开可用的云环境,它总共有 720 GB 内存和 180 个 vCPU。所有说明也与运行最新版本 (23.1) 的自管理 ClickHouse 部署兼容。

我们使用 BigQuery 的公共项目中提供的以太坊加密货币数据集作为我们的数据集。我们推迟到以后的博客文章中详细探讨此数据集,但建议阅读 Google 关于 如何构建此数据集的博客 以及关于查询此数据集和其他加密数据集的后续文章。阅读本博客文章不需要任何加密货币方面的经验,但对于那些感兴趣的人,以太坊简介提供了一个有用的概述。Google 记录了许多关于此数据集的优秀查询,我们稍后将在博客中引用,并且我们收集了等效的 ClickHouse 查询此处,并欢迎贡献。

总而言之,我们的数据集包含 4 个表。这是完整数据的子集,但足以满足大多数常见问题

  • 区块 (Blocks) - 区块是交易批次,其中包含链中前一个区块的哈希值。
  • 交易 (Transactions) - 交易是来自账户的加密签名指令。账户将发起交易以更新以太坊网络的状态,例如将 ETH 从一个账户转移到另一个账户。
  • 追踪 (Traces) - 内部交易,允许查询所有以太坊地址及其余额。
  • 合约 (Contracts) - “智能合约”只是一个在以太坊区块链上运行的程序。

对于想要绕过 BigQuery 来插入此数据集的用户,可以使用出色的 Ethereum ETL 工具生成该数据集,该工具已提交 PR 以支持 ClickHouse 作为目标。上面的表代表了一个子集,并解决了最常见的查询,同时提供了大量的数量。或者,此数据在 BigQuery 的公共项目中可用 - 用户只需根据扫描的数据量付费查询此数据,每月有 1TB 的免费额度。此数据由 Google 持续更新和维护,更新通常仅比实时区块链滞后 4 分钟。为了让用户能够重现示例,我们已在 sql.clickhouse.com 以及公共存储桶 gs://clickhouse_public_datasets/ethereum 中提供此数据以供查询。

假设

在选择将哪些数据存储在 ClickHouse 与 BigQuery 中时,我们通常会看到用户识别实时分析应用程序最常用的查询。对于流式分析数据,这通常构成基于时间维度的数据子集。

在本博客的其余部分中,我们做出以下假设

  • 数据持续生成并存储在 BigQuery 中,需要将新行持续流式传输到 ClickHouse。
  • 数据是仅追加且不可变的。没有选择性更新行的要求,但预计会删除较旧的数据,如下所述。
  • 数据上存在时间维度或递增的数字标识符,允许识别要复制到 ClickHouse 的新行。

以太坊区块链数据本身就满足这些属性。在我们的例子中,我们使用区块时间戳。下面我们将介绍如何将历史数据从 BigQuery 迁移到 ClickHouse,以及如何处理持续追加新数据。

BigQuery 和 ClickHouse 数据类型和架构之间的差异

在 ClickHouse 和 BigQuery 之间移动数据的用户会立即注意到,ClickHouse 在数值方面提供了更精细的精度。例如,BigQuery 提供数值类型 INT64、NUMERIC、BIGNUMERIC 和 FLOAT64。将这些与 ClickHouse 进行对比,后者为小数、浮点数整数提供多种精度。借助这些,ClickHouse 用户可以优化存储和内存开销,从而加快查询速度并降低资源消耗。下面我们映射每个 BigQuery 类型的等效 ClickHouse 类型

BigQuery ClickHouse
ARRAY Array(t)
NUMERIC Decimal(P, S)、Decimal32(S)、Decimal64(S)、Decimal128(S)
BIG NUMERIC Decimal256(S)
BOOL Bool
BYTES FixedString
DATE Date32 (范围较窄)
DATETIME DateTimeDateTime64(范围较窄,精度更高)
FLOAT64 Float64
GEOGRAPHY 地理数据类型
INT64 UInt8、UInt16、UInt32、UInt64、UInt128、UInt256、Int8、Int16、Int32、Int64、Int128、Int256
INTERVAL NA 支持作为表达式通过函数
JSON JSON
STRING String (bytes)
STRUCT TupleNested
TIME DateTime64
TIMESTAMP DateTime64

当有多种 ClickHouse 类型选项时,请考虑数据的实际范围并选择最低的必需类型。此外,请考虑使用适当的编解码器以进行进一步压缩。

可以使用以下查询检索 BigQuery 表的当前架构

SELECT table_name, ddl FROM `bigquery-public-data`.crypto_ethereum.INFORMATION_SCHEMA.TABLES
WHERE  table_name = 'blocks';

原始 BigQuery 架构可以在此处找到。使用上述查询的结果,我们可以根据每个列的已知范围创建具有适当类型的 ClickHouse 表。您可以运行额外的查询来识别数据范围和基数,例如

SELECT
 MAX(number) AS max_number, 
 MIN(number) AS min_number, 
 MAX(size) AS max_size, 
 MIN(size) AS min_size
FROM bigquery-public-data.crypto_ethereum.blocks

max_number    min_number    max_size    min_size
16547585    0    1501436    514

我们对这些架构进行了一些基本的优化,使用了适当的类型和编解码器以最大限度地减少存储,但将完整的分析留到以后专门介绍此数据集的博客中。区块的架构

CREATE TABLE ethereum.blocks
(
    `number` UInt32 CODEC(Delta(4), ZSTD(1)),
    `hash` String,
    `parent_hash` String,
    `nonce` String,
    `sha3_uncles` String,
    `logs_bloom` String,
    `transactions_root` String,
    `state_root` String,
    `receipts_root` String,
    `miner` String,
    `difficulty` Decimal(38, 0),
    `total_difficulty` Decimal(38, 0),
    `size` UInt32 CODEC(Delta(4), ZSTD(1)),
    `extra_data` String,
    `gas_limit` UInt32 CODEC(Delta(4), ZSTD(1)),
    `gas_used` UInt32 CODEC(Delta(4), ZSTD(1)),
    `timestamp` DateTime CODEC(Delta(4), ZSTD(1)),
    `transaction_count` UInt16,
    `base_fee_per_gas` UInt64
)
ENGINE = MergeTree
ORDER BY timestamp

其他表架构可以在此处找到。请注意,我们没有使我们的列可为空,尽管它们在原始 BigQuery 架构中是可为空的。对于大多数查询,无需区分默认值和 Null 值。通过使用默认值,我们避免了 Nullable 的额外 UInt8 列开销。我们还为这些表选择了主键,以优化预期的查询。最后,鉴于 BigQuery 基于物理大小或逻辑大小对数据存储收费,因此值得记录这些表的各自大小。此信息可以从 UI 中获得,方法是选择表并检查详细信息,或者使用简单的查询

SELECT
table_name, total_rows, round(total_physical_bytes / power(1024, 3),2) as total_physical_gb, round(total_logical_bytes /power(1024, 3),2)  as total_logical_gb
FROM
 `<project id>.region-<region>.INFORMATION_SCHEMA.TABLE_STORAGE` WHERE table_name IN ('transactions', 'contracts', 'blocks', 'traces') AND table_schema='crypto_ethereum'

请注意,此查询无法在 bigquery-public-data 项目上执行,并且需要将表复制到您自己的项目。

此表显示了 2023 年 2 月 1 日捕获的 BigQuery 中存储的数据集统计信息。BigQuery 的“物理”大小和“逻辑”大小大致相当于 ClickHouse 的“压缩”大小和“未压缩”大小。

table_name total_rows total_physical_gb total_logical_gb
transactions1,852,951,870 332.62 695.99

通过 Google Cloud Storage (GCS) 批量加载

big_query_gcs_clickhouse.png

BigQuery 支持将数据导出到 Google 的对象存储 (GCS)。在此示例中,将公共表 blockstracestransactionscontracts 导出到 GCS,然后将数据导入到 ClickHouse Cloud。我们使用 s3 表函数,因为 GCS 可与 Amazon Simple Storage Service (S3) 互操作。这种方法有许多优点:

在尝试以下示例之前,我们建议用户查看导出所需的权限位置建议,以最大限度地提高导出和导入性能。

将数据导出到 GCS

在我们的示例中,我们使用 BigQuery SQL 界面 - 请参阅 bq 等替代方案此处。下面我们使用 EXPORT DATA 语句将区块表导出到指定的 GCS 存储桶。虽然在下面的示例中,我们导出了整个数据集,但 SELECT 语句允许潜在地导出子集。

EXPORT DATA
  OPTIONS (
    uri = 'gs://clickhouse_public_datasets/ethereum/blocks/*.csv.gz',
    format = 'CSV',
    compression = 'GZIP',
    overwrite = true,
    header = true,
    field_delimiter = ',')
AS (
  SELECT *
  FROM bigquery-public-data.crypto_ethereum.blocks
  ORDER BY number ASC
);

我们导出到 CSV 并请求使用 GZIP 压缩文件。对于区块数据,这可能需要大约 1 分钟。我们的 uri 参数中也有一个 * 字符。这确保了输出被分片为多个文件,如果导出超过 1GB 的数据,则使用数字递增的后缀。

exporting_data_bigquery.gif

从 GCS 将数据导入到 ClickHouse

导出完成后,我们可以将此数据导入到 ClickHouse 表中。请注意,我们在运行以下 INSERT INTO 区块表之前预先创建了该表。

SET parallel_distributed_insert_select = 1

INSERT INTO blocks
SELECT number, hash, parent_hash, nonce, sha3_uncles, logs_bloom, transactions_root, state_root, receipts_root, miner, difficulty, total_difficulty, size, extra_data, gas_limit, gas_used, timestamp, transaction_count, base_fee_per_gas
FROM s3Cluster('default', 'https://storage.googleapis.com/clickhouse_public_datasets/ethereum/blocks/*.gz', 'CSVWithNames', 'timestamp DateTime, number Int64, hash String, parent_hash String, nonce String, sha3_uncles String, logs_bloom String, transactions_root String, state_root String, receipts_root String, miner String, difficulty Decimal(38, 0), total_difficulty Decimal(38, 0), size Int64, extra_data String, gas_limit Int64,gas_used Int64,transaction_count Int64,base_fee_per_gas Int64')

0 rows in set. Elapsed: 22.712 sec. Processed 16.54 million rows, 19.52 GB (728.29 thousand rows/s., 859.50 MB/s.)

我们使用 s3Cluster 函数,它是 s3 函数的分布式变体,允许在 ClickHouse Cloud 中使用完整的集群资源进行读取和写入。设置 parallel_distributed_insert_select=1 确保插入是并行的,并且数据从读取数据的同一节点插入到该节点(跳过写入时的启动器节点)。

在上面的示例中,由于存储桶是公共的,因此我们不提供身份验证密钥。如果用户为存储桶生成 HMAC 密钥(在服务或用户级别),则支持使用私有存储桶。Access keySecret 可以分别替换为 aws_access_key_idaws_secret_access_key。在下面的安全示例中,我们就地查询数据 - 这是临时分析的典型访问模式

SELECT max(number) AS max_block_number
FROM s3('https://storage.googleapis.com/clickhouse_public_datasets/ethereum/blocks/*.csv.gz', 'CSVWithNames')

┌─max_block_number─┐
│       16542640 │
└──────────────────┘

1 row in set. Elapsed: 15.926 sec. Processed 16.54 million rows, 148.88 MB (1.04 million rows/s., 9.35 MB/s.)

上面 CSV 的使用作为交换格式是次优的。Parquet 作为面向列的格式,代表了更好的交换格式,因为它本身是压缩的,并且 BigQuery 导出和 ClickHouse 查询速度更快。如果使用 Parquet,您将需要将 Nullable 列映射到非 Null 等效项,因为上面的架构选择(或在 ClickHouse 中使用 Nullable 列)。例如,下面我们使用 ifNull 函数将 base_fee_per_gas 的 null 值映射为 0。导出到 Parquet 时,可用的其他压缩算法可能会进一步改进该过程,但我们将此练习留给读者。

INSERT INTO blocks
SELECT number, hash, parent_hash, nonce, sha3_uncles, logs_bloom, transactions_root, state_root, receipts_root, miner, difficulty, total_difficulty, size, extra_data, gas_limit, gas_used, timestamp, transaction_count, ifNull(base_fee_per_gas, 0) AS base_fee_per_gas
FROM s3Cluster('default', 'https://storage.googleapis.com/bigquery_ethereum_export/blocks/*.parquet', 'GOOGR2DYAAX6RVODIMREAVPB', '+tNQdQQ0DCEItWQlJseXcidKSG6pOU65o1r0N17O')

我们为每个表重复了上述练习,并使用 Parquet 格式记录了下面的时间。使用此方法,我们能够在不到一小时的时间内将 4TB 数据从 BigQuery 传输到 ClickHouse!

行数导出的文件数据大小BigQuery 导出插槽时间ClickHouse 导入
blocks16,569,4897314.53GB23 秒37 分钟15.4 秒
transactions1,864,514,4145169957GB1 分钟 38 秒1 天 8 小时18 分钟 5 秒
traces6,325,819,30617,9852.896TB5 分钟 46 秒5 天 19 小时34 分钟 55 秒
contracts57,225,83735045.35GB16 秒1 小时 51 分钟39.4 秒
总计82.6 亿23,5773.982TB8 分钟 3 秒> 6 天 5 小时53 分钟 45 秒

ClickHouse 存储效率与 BigQuery 的比较

如下所示,ClickHouse 实现了大约 8 倍的压缩,将 BigQuery 的存储效率提高了高达 30%。

SELECT table,
formatReadableSize(sum(data_compressed_bytes)) AS compressed_size,
formatReadableSize(sum(data_uncompressed_bytes)) AS uncompressed_size,
round(sum(data_uncompressed_bytes) / sum(data_compressed_bytes), 2) AS ratio
FROM system.columns
WHERE database = 'ethereum'
GROUP BY table
ORDER BY sum(data_compressed_bytes) DESC

┌─table────────┬─compressed_size─┬─uncompressed_size─┬─ratio─┐
│ traces       │ 509.39 GiB      │ 3.85 TiB          │  7.74 │
│ transactions │ 228.52 GiB      │ 1.14 TiB          │  5.09 │
│ blocks       │ 5.37 GiB        │ 15.62 GiB         │  2.92 │
│ contracts    │ 2.98 GiB        │ 15.78 GiB         │  5.3  │
└──────────────┴─────────────────┴───────────────────┴───────┘

总结如下,我们可以看到 ClickHouse 比 BigQuery 实现了大约 30% 的更好压缩率。

BigQuery 逻辑大小BigQuery 物理大小BigQuery 压缩比ClickHouse 未压缩大小ClickHouse 压缩大小ClickHouse 压缩比
transactions695.99 GB332.62 GB2.091.14TiB228.52 GB5.09
blocks17.05 GB6.25 GB2.7315.62GB5.37 GB2.92
traces4212.71 GB738.45 GB5.73.85 TiB509.39 GB7.74
contracts51.33 GB3.54 GB14.515.78 GB2.98 GB5.3

使用计划查询

上述方法非常适合批量数据加载,这对于开发和实验非常有用。但这并不能解决我们的 BigQuery 表正在接收新数据的事实。因此,对于生产环境,我们需要另一种方法来处理持续追加新数据。

计划数据导出

一种方法是简单地使用 BigQuery 的计划查询功能计划定期导出。如果您可以接受数据插入到 ClickHouse 中的一些延迟,则此方法易于实施和维护。

在我们的示例中,我们将计划每小时导出一次。每小时,我们导出最近 60 分钟的数据。但是,我们偏移此窗口以允许将区块提交到区块链并出现在 BigQuery 中的延迟。通常这不超过 4 分钟,我们使用 15 分钟以确保安全。因此,每次运行导出时,我们都会导出从 now-75minsnow-15mins 的所有行。可视化如下所示

Markdown Image

为了确保我们的窗口查询不会遗漏任何数据,我们需要根据计划时间(通过名为 [run_time](https://cloud.google.com/bigquery/docs/scheduling-queries) 的变量提供)而不是执行时间(可能略有不同)来确定我们的间隔计算。

我们现在可以计划以下查询每小时运行一次,以导出以太坊区块数据。确保您计划的第一个导出作业在导入后至少 75 分钟开始,以避免重复!

DECLARE
 export_time_lower TIMESTAMP DEFAULT TIMESTAMP_SUB(@run_time, INTERVAL 75 MINUTE);
DECLARE
 export_time_upper TIMESTAMP DEFAULT TIMESTAMP_SUB(@run_time, INTERVAL 15 MINUTE);

EXPORT DATA
 OPTIONS ( uri = CONCAT('gs://clickhouse_public_datasets/ethereum/blocks/increment-', CAST(UNIX_SECONDS(TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), HOUR)) AS STRING), '*.parquet'),
   format = 'PARQUET', overwrite = true) AS (
 SELECT
   *
 FROM
   bigquery-public-data.crypto_ethereum.blocks
 WHERE
   timestamp > export_time_lower
   AND timestamp <= export_time_upper
 ORDER BY
   number ASC );

Markdown Image

此增量导出完成速度更快(在大多数情况下只需几分钟),因为 BigQuery 表按时间戳列分区,使这些筛选子句能够快速运行。

关于查询语法的一些详细信息

  • 上面的查询仅导出区块。其他表的等效查询非常相似,只是使用了列 block_timestamp 而不是 timestamp
  • 导出文件具有 increment- 前缀。这允许我们在导入期间仅定位增量文件(请参阅下文)。
  • 我们使用表达式 CAST(UNIX_SECONDS(TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), HOUR)) AS STRING) 将当前时间作为 epoch 秒数包含在文件名中。这允许我们在导入期间定位特定文件(请参阅下文)。

计划数据导入

在撰写本文时,ClickHouse 没有内置的计划导入方式(提案正在讨论中),但我们可以使用外部作业计划此导入。这可以通过多种方式实现 - 使用 lambda 函数、Cloud Run,甚至 dbt 中的增量物化 - 但为了演示的简单性,我们使用简单的 cron。

以下 bash 脚本可以在导出完成后由 cron 作业定期运行。此示例处理区块表,但可以轻松地适用于其他表。

#!/bin/bash

max_date=$(clickhouse-client --query "SELECT toInt64(toStartOfHour(toDateTime(max(block_timestamp))) + toIntervalHour(1)) AS next FROM ethereum.transactions");

clickhouse-client --query "INSERT INTO blocks SELECT number, hash, parent_hash, nonce, sha3_uncles, logs_bloom, transactions_root, state_root, receipts_root, miner, difficulty, total_difficulty, size, extra_data, gas_limit, gas_used, timestamp, transaction_count, ifNull(base_fee_per_gas, 0) AS base_fee_per_gas
FROM s3(''https://storage.googleapis.com/clickhouse_public_datasets/ethereum/blocks/increment-' || toString(${max_date}) || '-*.parquet')"

这里的第一个行标识 ClickHouse 中数据的当前最大时间。我们使用 toStartOfHour 函数将其舍入到小时,并在将值转换为 Int64 之前添加一小时。这为我们提供了 epoch 秒值。当附加我们的 increment- 前缀时,这标识了 cron 作业执行时当前小时的文件。下一行使用 s3 函数对该文件执行简单的导入到表中。

此方法的替代方法是导入所有时间戳大于 max_dateincrement-* 行,即 WHERE timestamp > ${max_date}。但是,这将需要我们扫描所有增量文件。虽然最初速度很快,但随着文件数量的增加,这会随着时间的推移而导致性能下降。

填补空白

如果我们执行批量导入,然后计划上述导入和导出查询,我们将在批量加载完成和增量加载开始之间的期间不可避免地存在“空白”。再次注意,计划增量导出在批量数据导入后 75 分钟开始的重要性,以避免重复。

此空白很容易通过以下步骤解决。我们为区块数据集概述了这些步骤,但对于使用 block_timestamp 列的其他表,该过程是相同的。请注意,我们在将增量数据加载到 ClickHouse 之前执行此操作。

  1. 通过查询 ClickHouse 中数据的最大时间戳来识别我们“空白”的下限。
SELECT max(timestamp)
FROM blocks
┌──────max(timestamp)─┐
│ 2023-02-02 13:34:11 │
└─────────────────────┘
  1. 假设上面的查询已计划并且已完成增量导入,请识别导出文件中的最小时间戳,即我们“空白”的上限。

SELECT min(timestamp)
FROM s3('https://storage.googleapis.com/bigquery_ethereum_export/blocks/increment-*.parquet')
┌─────────────min(timestamp)─┐
│ 2023-02-02 14:22:47.000000 │
└────────────────────────────┘
  1. 导出 (2) 中识别的时间范围之间的数据。使用允许轻松识别它们的前缀,例如 gap
EXPORT DATA
 OPTIONS ( uri = CONCAT('gs://bigquery_ethereum_export/blocks/gap-*.parquet'),
   format = 'PARQUET',
   overwrite = TRUE) AS (
 SELECT * FROM
   bigquery-public-data.crypto_ethereum.blocks
 WHERE
   timestamp > '2023-02-02 13:34:11' AND timestamp < '2023-02-02 14:22:47'
 ORDER BY
   number ASC );
  1. 使用正确的 gap- 前缀将这些文件导入到表中。
INSERT INTO blocks SELECT number, hash, parent_hash, nonce, sha3_uncles, logs_bloom, transactions_root, state_root, receipts_root, miner, difficulty, total_difficulty, size, extra_data, gas_limit, gas_used, timestamp, transaction_count, ifNull(base_fee_per_gas, 0) AS base_fee_per_gas
FROM s3('https://storage.googleapis.com/bigquery_ethereum_export/blocks/gap-*.parquet')

处理故障

如果导出失败,上述方法需要手动干预。导入作业更强大,永远不会多次导入同一文件。如果导入了最新文件,或者当前小时文件导出失败,则不执行任何操作。如果导出失败然后修复,则可以根据需要多次运行此脚本以填补丢失的小时并赶上进度。

在其当前形式中,早期的脚本意味着 ClickHouse 中的以太坊数据最多比区块链滞后 85 分钟 - 假设我们在增量导出完成后 10 分钟执行 cron。我们可以调整上述流程,将导出缩短到几分钟。这将需要我们调整上面的导入脚本,可能使用过滤器 - 此练习留给读者,或者考虑其他工具……

使用 Dataflow 在 BigQuery 和 ClickHouse 之间流式传输数据

Google Cloud Dataflow 是一项完全托管的服务(Runner),用于在 Google Cloud Platform 生态系统中执行 Apache Beam 管道。Apache Beam 是一个开源的统一编程模型(由 Google 开发),用于定义和执行数据处理管道,包括 ETL、批处理和流式(连续)处理。基于 Dataflow 模型论文,Dataflow 允许用户使用 PythonGoJava 开发管道,并将这些管道部署到 GCE 上运行。这些管道由 I/O 连接器组成,这些连接器连接到数据源并提供读取(源)和写入(接收器)操作,以及 转换操作,这些操作有助于数据处理。关键在于,此框架提供了允许这些管道及其操作以 并行方式执行,无论是批处理还是流式处理的原始组件。我们强烈建议有兴趣了解此方法的用户阅读 基本概念

虽然其他数据集可能需要更多的转换逻辑,但我们只想在 BigQuery 和 ClickHouse 之间流式传输数据。除了将行分组为批次(为了高效地插入 ClickHouse)之外,不需要其他转换。BigQuery 由 核心 I/O 连接器 支持,该连接器提供了一个 简单的源接口用于读取行。Apache Beam 通过一个 官方 IO 连接器 支持 ClickHouse,但仅适用于 Java SDK。目前,这仅提供接收器支持,并要求目标表存在。虽然 Python SDK 没有对 ClickHouse 的内置支持,但 Beam 提供了一种简单的方法,通过 ParDo 转换编写接收器连接器Apache Beam 对外部库的支持 使我们能够使用 ClickHouse Python 客户端来执行插入到 ClickHouse 的操作。最后,一个简单的 BatchElements 转换允许我们在插入 ClickHouse 之前生成分组行。我们在下面展示了一个基本实现的重要部分,以展示这种方法的潜力。完整示例请参见此处

clickhouse_data_flow.png

执行此管道以迁移 blocks 表需要运行如下所示的 Python 代码。这假设您已配置您的机器以使用 Google Dataflow 并具有所需的权限

python -m sync_clickhouse --target_table ethereum.blocks --clickhouse_host <clickhouse_host> --clickhouse_password <password> --region us-central1 --runner DataflowRunner --project <GCE project> --temp_location gs://<bucket> --requirements_file requirements.txt

请注意,还需要通过 requirements.txt 和一个可以缓存数据的 GCS 存储桶来提供依赖项,因为 BigQuery 连接器的工作方式是将数据导出到存储桶并将其用作中间存储 - 有点像我们早期的做法。GCE 控制台提供了流程的良好可视化效果。

dataflow_execution.png

上述方法有以下限制,我们将这些改进留给读者作为练习

  • 理想的解决方案是利用流式管道,随着更多数据添加到源,该管道将永远运行。但是,流式管道需要一个 无界源,并且由于 BigQuery 源是有界的,因此不能在流式管道中使用。相反,我们使用批处理管道,该管道基于快照运行直到完成,然后停止。这里最简单的解决方案是在管道启动时识别 ClickHouse 中的当前最大时间戳,并将其用作 BigQuery 的过滤器条件。然后可以轻松地使用 Cloud Scheduler 安排管道运行。
  • 我们的 ClickHouse 连接器必须将行结构化为二维数组,以便与 ClickHouse Python 客户端一起使用。这项工作也可以作为并行化的 ParDo 完成。
  • 我们使用 BatchElements 转换 进行批处理,固定大小为 10000k。其他数据集可能需要调整此大小。请注意,此转换也可以进行自适应批处理。
  • 我们通过 requirements.txt 文件提供依赖项。这是入门最简单的方法,但在生产环境中不建议使用。

关于连续数据加载的说明

实际上,对于这个特定的数据集,我们可以更轻松地实现连续数据加载方法,因为 Google 已通过其他方式使其可用。以太坊数据由 Google 发布在公共 Pub/Sub 主题上,可以在数据可用时立即使用。虽然我们仍然会使用以下技术将历史以太坊数据加载到 ClickHouse,但随后可能会编写 Google Dataflow 作业或使用 Vector,Vector 同时支持 pub/sub 作为源ClickHouse 作为接收器。可能存在解决此问题的其他方法,我们欢迎提出建议。

在 ClickHouse 中删除旧数据

对于大多数部署,ClickHouse 卓越的数据压缩意味着您可以长期以精细格式存储数据。对于我们特定的以太坊数据集,这可能不是特别有利,因为我们可能需要保留区块链的完整历史记录以用于许多查询,例如,计算账户余额。

但是,有一些简单且可扩展的方法可以删除旧数据,这些方法可能适用于其他数据集。例如,可以使用 TTL 功能 在行或列级别过期 ClickHouse 中的旧数据。通过 按日期对表进行分区,可以更有效地实现此目的,从而可以按设定的时间间隔高效地删除数据。为了举例说明,我们修改了 blocks 表的架构,使其按月分区。反过来,使用 TTL 功能有效地过期超过五年的行。ttl_only_drop_parts 设置确保仅当部件中的所有行都过期时才删除该部件。

CREATE TABLE blocks
(
...
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY timestamp
TTL timestamp + INTERVAL 60 MONTH DELETE
SETTINGS ttl_only_drop_parts=1

请注意,分区既可以 正面和负面地影响查询,并且应更多地被视为数据管理功能,而不是优化查询性能的工具。

在 ClickHouse 中运行查询

此数据集值得写一篇完整的博客来介绍可能的查询。用于将此数据加载到 BigQuery 的 etherium-etl 工具的作者 发布了 一系列精彩的博客,重点介绍了关于此数据集的见解。在以后的博客中,我们将介绍这些查询,并展示如何将它们转换为 ClickHouse 语法,以及如何显着加速其中一些查询。现在,我们展示一些更简单的查询以开始使用。

每日以太币供应量

最初的 BigQuery 查询记录为 Awesome BigQuery views 的一部分,并在 此处 讨论过,执行时间为 6 秒。优化的 ClickHouse 查询运行时间为 0.009 秒,在将 ClickHouse 与 BigQuery 进行比较时,这是一个巨大的差异。

ether_supply.png

ALTER TABLE traces ADD PROJECTION trace_type_projection (
                    SELECT trace_type,
                    toStartOfDay(block_timestamp) as date, sum(value) as value GROUP BY trace_type, date
                    )
ALTER TABLE traces MATERIALIZE PROJECTION trace_type_projection

WITH ether_emitted_by_date AS
    (
        SELECT
            date,
            sum(value) AS value
        FROM traces
        WHERE trace_type IN ('genesis', 'reward')
        GROUP BY toStartOfDay(block_timestamp) AS date
    )
SELECT
    date,
    sum(value) OVER (ORDER BY date ASC) / power(10, 18) AS supply
FROM ether_emitted_by_date

┌────────────────date─┬────────────supply─┐
│ 1970-01-01 00:00:0072009990.49948001 │
│ 2015-07-30 00:00:0072049301.59323001 │
│ 2015-07-31 00:00:0072085493.31198 │
│ 2015-08-01 00:00:0072113195.49948 │
│ 2015-08-02 00:00:0072141422.68698 │
...

3 rows in set. Elapsed: 0.009 sec. Processed 11.43 thousand rows, 509.00 KB (1.23 million rows/s., 54.70 MB/s.)

Markdown Image

请注意,此查询已使用 投影进行了优化 - 这是 ClickHouse 中可用于针对特定工作负载进行优化的众多工具之一。

一段时间内的平均以太币成本

从 Kaggle 上此数据集的 最受欢迎的笔记本 中提取。我们最初重写此查询以包含左反连接,尽管这 似乎不是必需的。因此,使用了更优化的查询版本

avg_ether_costs.png

SELECT
    SUM(value / POWER(10, 18)) AS sum_tx_ether,
    AVG(gas_price * (receipt_gas_used / POWER(10, 18))) AS avg_tx_gas_cost,
    toStartOfDay(block_timestamp) AS tx_date
FROM transactions
WHERE (receipt_status = 1) AND (value > 0) AND (block_timestamp > '2018-01-01') AND (block_timestamp <= '2018-12-31')
GROUP BY tx_date
ORDER BY tx_date ASC

┌───────sum_tx_ether─┬───────avg_tx_gas_cost─┬─────────────tx_date─┐
│  8246871.7668937680.00053703009548676442018-01-01 00:00:00 │
│ 13984780.9269497820.00058449798182618732018-01-02 00:00:00 │
│ 13975588.8507883140.00060507489157098392018-01-03 00:00:00 │
│ 20231765.9356602540.00070002563204667762018-01-04 00:00:00364 rows in set. Elapsed: 0.673 sec. Processed 250.90 million rows, 8.28 GB (373.01 million rows/s., 12.31 GB/s.)

Markdown Image

此查询由 BigQuery 市场 中列出的数据集推广,并且可以从 此处 加载。

most_popular_collectables.png

SELECT to_address, count() AS tx_count
FROM transactions
WHERE to_address IN (
    SELECT address
    FROM contracts
    WHERE is_erc721 = true
)
GROUP BY to_address
ORDER BY tx_count DESC
LIMIT 5

┌─to_address─────────────────────────────────┬─tx_count─┐
│ 0x06012c8cf97bead5deae237070f9587f8e7a266d4949539 │
│ 0x06a6a7af298129e3a2ab396c9c06f91d3c54aba8646405 │
│ 0xd73be539d6b2076bab83ca6ba62dfe189abc6bbe443184 │
│ 0x1a94fce7ef36bc90959e206ba569a12afbc91ca1181073 │
│ 0xf5b0a3efb8e8e4c201e2a935f110eaaf3ffecb8d148123 │
└────────────────────────────────────────────┴──────────┘

10 rows in set. Elapsed: 0.804 sec. Processed 374.39 million rows, 19.09 GB (465.46 million rows/s., 23.74 GB/s.)

所有这三个查询的延迟都通过将其执行卸载到 ClickHouse 而得到改善,改善程度各不相同。延迟的改善程度会有所不同,访问模式已知且侧重于对单个表进行分析的查询获益最多。上述查询是完美的示例,可以想象它们为应用程序提供支持。在这些情况下,可以利用 ClickHouse 的特定功能(例如主键和投影)来提供超过 10 倍的性能提升。

结论

在这篇博文中,我们探讨了如何将数据从 BigQuery 移动到 ClickHouse 进行分析,以及这两种技术如何互补。我们展示了许多加载数据并保持数据同步的方法,以及如何利用 ClickHouse 对此数据进行实时分析。在以后的文章中,我们将更详细地探讨此以太坊数据集。

与此同时,我们已在公共 ClickHouse 部署中提供此数据集以供探索 (sql.clickhouse.com),以及公共 GCS 存储桶 gs://clickhouse_public_datasets/ethereum。欢迎您通过 下载免费开源版本 的 ClickHouse 并自行部署或启动 ClickHouse Cloud 免费试用 来试用它。ClickHouse Cloud 是一项基于 ClickHouse 的完全托管的无服务器产品,您可以在其中轻松开始构建实时应用程序,而无需担心部署和管理基础设施。

资源

我们推荐以下关于以太坊和查询此数据集的资源。

分享这篇文章

订阅我们的新闻通讯

随时了解功能发布、产品路线图、支持和云产品!
正在加载表单...
关注我们
X imageSlack imageGitHub image
Telegram imageMeetup imageRss image
©2025ClickHouse, Inc. 总部位于加利福尼亚州湾区和荷兰阿姆斯特丹。