DoubleCloud 即将结束服务。迁移到 ClickHouse,享受限时免费迁移服务。立即联系我们 ->->

博客 / 工程

使用 ClickHouse、Kafka Connect 和 Confluent Cloud 进行实时事件流

author avatar
Dale McDiarmid
2023 年 6 月 22 日

立即开始使用 ClickHouse Cloud 并获得 300 美元的积分。要详细了解我们的按使用量付费折扣,请联系我们或访问我们的定价页面

ClickHouse Kafka 连接器简介

今年早些时候,我们介绍了基于 Kafka Connect 框架的新官方开源 ClickHouse Kafka 连接器。该连接器旨在使用 ClickHouse 的新功能,特别是 Keeper Map 表引擎,以提供精确一次语义。我们很高兴地宣布该连接器已在 Confluent Cloud 中经过测试,现在可通过“Confluent Cloud 自定义连接器”服务获取。

这篇博文演示了如何使用这种新的部署方法,通过 Confluent Cloud Kafka 主题,从 Google Pub/Sub 向 ClickHouse Cloud 可靠地流式传输以太坊加密货币事件。我们无需编写任何代码,仅通过基于 UI 的配置即可实现这一点。

在我们的示例中,我们使用的是 ClickHouse Cloud 集群的开发实例。但是,这些示例应该可以在等效大小的自托管集群上复制。请注意,托管整个以太坊数据集需要更大的实例——请参阅我们关于此主题的专门的博客文章。或者,立即开始使用 ClickHouse Cloud 集群,并获得 300 美元的积分。让我们为您处理基础设施,您可以尽情查询!

示例假设您拥有 Google Cloud 和 Confluent 账户。这些示例使用数据的最小子集,每月 Google Cloud 成本应该不到 1 美元,Confluent 每天约 6 美元。

Confluent Cloud 和自定义连接器

Apache Kafka 是一个普遍存在的开源分布式事件流平台,数千家公司使用它来构建高性能数据管道、流式分析、数据集成和关键任务应用程序。

除了维护和分发 Kafka 的企业版之外,Confluent 还提供了一个完全托管的云环境,用户可以在其中部署 Kafka。这包括 Kafka Connect 框架的托管服务,Kafka 的一个组件,充当数据库、键值存储、搜索索引和文件系统之间简单数据集成的集中式数据中心。历史上,用户只能在 Confluent Cloud 中部署官方支持的连接器,只需定义并行级别(任务),而无需担心管理相关的基础设施。

最近,Confluent 通过支持自定义连接器扩展了此服务。这允许用户通过简单地上传已编译的包并指定关键配置详细信息,来部署基于 Kafka connect 框架构建的任何基于 Java 的连接器。这些连接器可以反过来使用与 Confluent 拥有的官方连接器相同的托管服务进行部署。

在 ClickHouse,我们经常看到用户使用 Kafka 的流式传输功能,在将事件插入 ClickHouse 进行实时分析之前对事件进行处理和缓冲。因此,此服务对于 ClickHouse Cloud 用户和现有 Confluent 客户来说非常令人兴奋,他们现在可以使用官方的 ClickHouse Kafka Connect 连接器轻松地在 Kafka 主题和 ClickHouse 实例之间流式传输数据,而无需担心管理基础设施。

有关 Confluent 自定义连接器和当前支持的 AWS 区域的完整文档,请访问此处。

测试数据集

对于我们的测试数据集,我们使用 Google 在一个公共项目中提供的以太坊加密货币数据集。

阅读这篇文章不需要具备任何有关加密货币的先验知识,但对于感兴趣的读者,以太坊简介提供了有用的概述,以及 Google 关于如何构建此数据集的博客。

总之,我们的数据集包含 4 个表。这是完整数据的子集,但足以满足大多数常见问题

  • 区块 - 区块是一批交易,其中包含链中先前区块的哈希值。约 1700 万行,每天增长 7000 行。
  • 交易 - 交易是来自账户的加密签名指令。账户将启动交易以更新以太坊网络的状态,例如,将 ETH 从一个账户转移到另一个账户。超过 20 亿,每天新增约 100 万。
  • 跟踪 - 允许查询所有以太坊地址及其余额的内部交易。超过 70 亿,每天新增 500 万。
  • 合约 - “智能合约”只是一个在以太坊区块链上运行的程序。超过 6000 万,每天新增约 5 万。

从批处理到流式传输

我们之前在将 BigQuery 与 ClickHouse 进行比较的博文中详细探讨过这个数据集,其中我们提出了一种基于批处理的方法来保持 ClickHouse 中的这个数据集更新。这是通过定期从公共 BigQuery 表中导出数据(通过计划查询)到 GCS 以及将这些数据导入 ClickHouse(通过简单的计划 cron 作业)来实现的。

Scheduled Export.png

虽然这在当时足以满足我们的需求,并且用于保持我们面向加密货币爱好者的用户的公共sql.clickhouse.com 环境更新,但它在数据在区块链和 BigQuery 上可用与它在 ClickHouse 中可查询之间引入了不令人满意的延迟(约 30 分钟)。

幸运的是,Google 还将此数据集提供在几个公共 Pub/Sub 主题中,提供可供消费的事件流。由于与以太坊区块链的延迟仅为 4 分钟,因此该数据源将使我们能够为 BigQuery 提供类似的服务。

我们需要一个可靠的管道来将这些公共 Pub/Sub 主题连接到 ClickHouse,并且希望最大限度地减少初始工作和任何未来的维护开销,使用我们新的连接器,将基础设施托管在云中似乎是使用 Confluent 托管基础设施的理想解决方案。除了减少数据在 ClickHouse 中可用之前的延迟之外,Kafka 还允许我们缓冲多达 N 天的数据,并在需要时提供重播功能。

要实现这种架构,我们还需要一种可靠的方法,将消息从 Pub/Sub 发送到 Kafka。Confluent 为此目的提供了一个源连接器,它也可以在无需编写代码的情况下进行部署。将这些连接器结合起来会产生以下简单的架构

Pub Sub Export(1).png

一些简单的步骤

在我们的示例中,我们将使用以太坊 blocks 数据集。它代表了所有可用以太坊数据集中最小的一个,大约有 1700 万行,每天形成 7000 个新区块。但是,我们对其他以太坊数据集使用相同的方法,在需要时引用配置文件,包括最大的表 traces。它包含超过 70 亿行,每天新增约 550 万行。

这些数据非常适合 ClickHouse,因为不可变的区块链会生成只追加的行。无论数据集如何,对于 Pub/Sub 连接器和 ClickHouse Kafka 连接器来说,一个工作线程就足以满足这种吞吐量。

创建 Pub/Sub 订阅

Google Pub/Sub 是一种异步且可扩展的消息传递服务,它将生成消息的服务与处理这些消息的服务解耦。与 Kafka 类似,**发布者**将消息发布到主题,而无需担心这些消息将如何被后续处理,并将异步地通过**订阅**由**订阅者**使用。

google-pub-sub.png

来源:https://cloud.google.com/pubsub/architecture

Google 将每个以太坊数据集作为公共主题提供,我们可以注册订阅以访问这些主题。该主题上的每条消息相当于 ClickHouse 中的一行。

Pub/Sub 中的前 10GB 吞吐量数据 是免费的,之后 Google 收取约 40 美元/TB 的费用。每月约 120GB 的以太坊跟踪数据,费用约为 4 美元/月。重要的是,我们不会在消息传递到 Kafka 后保留已确认的消息,并且只保留我们订阅中七天的 未确认消息(也是免费的)

公共主题名称遵循以下命名规则,因此您可以轻松找到它们

projects/crypto-public-data/topics/crypto_{chain}.{table_name}

其中:

chain 可以是 ethereumbitcoinzcashlitecoindogecoindash 之一。出于我们的目的,我们使用 ethereumtable_name 可以是 blockstransactions 之一。此外,对于以太坊:logstoken_transferstracescontractstokens 也受支持。我们提供 blockstransactionstracescontracts 的配置,示例重点介绍 blocks

假设您已 安装 Google Cloud CLI完成身份验证 并选择一个您具有 相关权限 的项目,可以使用以下命令创建 ID 为 ethereum.blocks 的订阅

gcloud pubsub subscriptions create ethereum.blocks --topic=crypto_ethereum.blocks --topic-project=crypto-public-data --ack-deadline 60

Created subscription [projects/pmm-project-377716/subscriptions/ethereum.blocks].

用户可以使用以下命令确认消息是否正在传递

gcloud pubsub subscriptions pull ethereum.traces --format=json
[
  {
    "ackId": "FixdRkhRNxkIaFEOT14jPzUgKEUSBAgUBXx9dUJfdV1acGhRDRlyfWB9YggQUABCUi8KURkLb1xWdRVgDQGo4vPhXHgzBgtEVHheUhwIa1lUdQBWBTG5nJjfycfSPxh5a6TAyY87SOnenLpiZiw9XxJLLD5-IC1FQV5AEkwrGERJUytDCypYEU4EISE-MD5FU0RQBg",
    "message": {
      "attributes": {
        "item_id": "block_0x91af5d194d1f6450597cfe84895692cba1615487528f9aeea81bb6caecf24049",
        "item_timestamp": "2023-06-13T16:19:11Z"
      },
      "data": "eyJ0eXBlIjoiYmxvY2siLCJudW1iZXIiOjE3NDcyMjE1LCJoYXNoIjoiMHg5MWFmNWQxOTRkMWY2NDUwNTk3Y2ZlODQ4OTU2OTJjYmExNjE1NDg3NTI4ZjlhZWVhODFiYjZjYWVjZjI0MDQ5IiwicGFyZW50X2hhc2giOiIweGI0MGQ3YWExNzJmNWZmNjZhNGE5YzY1ZGE0ODY4NGI1MWVlMjI2NTU4NmJkYzZlN2RiYThkMjMzNzMzYWEzNjUiLCJub25jZSI6IjB4MDAwMDAwMDAwMDAwMDAwMCIsInNoYTNfdW5jbGVzIjoiMHgxZGNjNGRlOGRlYzc1ZDdhYWI4NWI1NjdiNmNjZDQxYWQzMTI0NTFiOTQ4YTc0MTNmMGExNDJmZDQwZDQ5MzQ3IiwibG9nc19ibG9vbSI6IjB4MDhmZjFiMjZlMGEwYTVlYzUxMGM5NDhhZWEwYzBmNjMxMWVmMTkzZmVjMWEzMmE1MjMwZGMwNWJjNDdjNGVjMDE0NDcxYjgyYzQ3ODMxMzUwZjVmNTMzODc5MWM0NTg0M2FkMThjMGNjODAzYmVhNTQ1MjA3ZWEyOTBhOTczMTEwZDY3NzAwOTE5MGNjODJkNGQzMzU2ZWUwMDg1MmJhZDIwODkwODY1MTI1MzljNDgwYzcyZmMwNDhjZTUxNDIxNWFiYWM2Y2YwYjAzYWY0MjI0NDEzODA3MTIwYzQ5Y2EzZmM4NWFiMDBjOTk2YzI2OWFjMTE3ZmM2MDVjMmYzYzBiOGE0NDQxMzgyNmE5NDEyNGVkODg5ZTg5NjNhZjI2NTEyYTE0MzFiYmIxZTI1ZjhkMjkzNzRhMDliNDUyYmY4YWIyMjYzZTljY2QyMDc1MTYwMzRjY2RiNDAxMjVkMmE2NDNkODIxMGYyODA0MGYwMDcwNjYzMjRjY2MwMDVjYzJjZWNjNmE2NzhhNGFjYjIwNDE4YzUzZDYyNzEyZTIxY2NjMGI2YTE2MmIwYTE0MTNlNTkxNDI4OTMyMjEwZjY4M2JhMDZjZDlkODg2Mjg4MTA3NTA2YjBjMDliYmM2NGY2OTE2ODliNTJiMTZmNDk0MDllNjE1NDgyOTdjNDEiLCJ0cmFuc2FjdGlvbnNfcm9vdCI6IjB4YTUyOTA4YWFkYWRhNzkwZWFjYjYyNzFlZTZhMTRiMjE1ZTVkZGJmZDFhNTcwZmUzNDk2MWIyYTE4YTliMDg4NyIsInN0YXRlX3Jvb3QiOiIweGE5OGM0YTMzNWYzYzE1ZTQ4NGE3YjY0MmMxZTMyMzNlMGFlM2ExZjFhZWYwZTYxZWRmMTFmYWRhNjU3YjZjMWYiLCJyZWNlaXB0c19yb290IjoiMHg5MGJkYzkwMTY3OTM5Yzg3MjQ4ZDgzYzQ3Nzk0OTJkNWYyNmY0NTlmZWU3ZGM2ZmJhNDAxMmU4NDA3YjliOTk1IiwibWluZXIiOiIweDM4OGM4MThjYThiOTI1MWIzOTMxMzFjMDhhNzM2YTY3Y2NiMTkyOTciLCJkaWZmaWN1bHR5IjowLCJ0b3RhbF9kaWZmaWN1bHR5Ijo1ODc1MDAwMzcxNjU5ODM2MDAwMDAwMCwic2l6ZSI6NTIxMzUsImV4dHJhX2RhdGEiOiIweDYyNjU2MTc2NjU3MjYyNzU2OTZjNjQyZTZmNzI2NyIsImdhc19saW1pdCI6MzAwMDAwMDAsImdhc191c2VkIjoxMjg3Njk1NCwidGltZXN0YW1wIjoxNjg2NjczMTUxLCJ0cmFuc2FjdGlvbl9jb3VudCI6MTg3LCJiYXNlX2ZlZV9wZXJfZ2FzIjozODA2NTQ1NTgwMywid2l0aGRyYXdhbHNfcm9vdCI6IjB4YzYyMjVmYzYwMmFkMDZiNDk5ZmE5OTI0MjNkNjg3MmFlODJkZDM5YTZkZmY1MmQwMWQyZWIyMzQyYmMwMmRiZiIsIndpdGhkcmF3YWxzIjpbXSwiaXRlbV9pZCI6ImJsb2NrXzB4OTFhZjVkMTk0ZDFmNjQ1MDU5N2NmZTg0ODk1NjkyY2JhMTYxNTQ4NzUyOGY5YWVlYTgxYmI2Y2FlY2YyNDA0OSIsIml0ZW1fdGltZXN0YW1wIjoiMjAyMy0wNi0xM1QxNjoxOToxMVoifQo=",
      "messageId": "7917619753580778",
      "publishTime": "2023-06-13T16:22:52.230Z"
    }
  }
]

请注意,这里我们的主要消息有效负载是 Base64 编码的。我们可以使用一些 Bash 代码确认有效负载结构。

gcloud pubsub subscriptions pull ethereum.blocks --format=json | jq -r '.[].message.data' | base64 -d | jq
{
  "type": "block",
  "number": 17472215,
  "hash": "0x91af5d194d1f6450597cfe84895692cba1615487528f9aeea81bb6caecf24049",
  "parent_hash": "0xb40d7aa172f5ff66a4a9c65da48684b51ee2265586bdc6e7dba8d233733aa365",
  "nonce": "0x0000000000000000",
  "sha3_uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
  "logs_bloom": "0x08ff1b26e0a0a5ec510c948aea0c0f6311ef193fec1a32a5230dc05bc47c4ec014471b82c47831350f5f5338791c45843ad18c0cc803bea545207ea290a973110d677009190cc82d4d3356ee00852bad2089086512539c480c72fc048ce514215abac6cf0b03af4224413807120c49ca3fc85ab00c996c269ac117fc605c2f3c0b8a44413826a94124ed889e8963af26512a1431bbb1e25f8d29374a09b452bf8ab2263e9ccd207516034ccdb40125d2a643d8210f28040f007066324ccc005cc2cecc6a678a4acb20418c53d62712e21ccc0b6a162b0a1413e591428932210f683ba06cd9d886288107506b0c09bbc64f691689b52b16f49409e61548297c41",
  "transactions_root": "0xa52908aadada790eacb6271ee6a14b215e5ddbfd1a570fe34961b2a18a9b0887",
  "state_root": "0xa98c4a335f3c15e484a7b642c1e3233e0ae3a1f1aef0e61edf11fada657b6c1f",
  "receipts_root": "0x90bdc90167939c87248d83c4779492d5f26f459fee7dc6fba4012e8407b9b995",
  "miner": "0x388c818ca8b9251b393131c08a736a67ccb19297",
  "difficulty": 0,
  "total_difficulty": 58750003716598360000000,
  "size": 52135,
  "extra_data": "0x6265617665726275696c642e6f7267",
  "gas_limit": 30000000,
  "gas_used": 12876954,
  "timestamp": 1686673151,
  "transaction_count": 187,
  "base_fee_per_gas": 38065455803,
  "withdrawals_root": "0xc6225fc602ad06b499fa992423d6872ae82dd39a6dff52d01d2eb2342bc02dbf",
  "withdrawals": [
  ],
  "item_id": "block_0x91af5d194d1f6450597cfe84895692cba1615487528f9aeea81bb6caecf24049",
  "item_timestamp": "2023-06-13T16:19:11Z"
}

或者,用户可以通过 Google 控制台创建订阅并确认消息传递,如下所示。请注意,我们对订阅使用的是 拉取策略(之前命令中的默认值),与 Confluent Pub/Sub 连接器兼容。与 推送策略 相比,这 更高效,可以最大程度地减少延迟,同时无需将 Confluent 端点公开到公共互联网。

我们不会调整 默认的七天保留期,用于未确认的消息(连接器确认的消息会立即删除),但会修改确认截止时间,将其设置为 60 秒(见下文)。

create_subscription.gif

可以根据需要将上述内容调整到其他以太坊数据类型,例如跟踪数据。

在上述示例中,我们将确认截止时间设置为 60 秒。超过此时间未确认的消息将被重新发送。在测试 Pub/Sub 连接器期间,我们发现默认值 10 秒会导致重复消息的传递,连接器无法及时确认消息。增加此值 符合 Confluent 的建议。较高的值可能会延迟失败消息的传递,但这不太可能影响我们的用例。对于希望调整此值的使用者,大量的已过期确认数可能表明存在重复消息。可以从订阅的“健康状况”面板中查看此信息。如下所示,在我们的用例中,10 秒的值会导致重复消息。

expired_acks.png

创建服务凭据

为了使外部服务能够从我们的 Pub/Sub 订阅中读取消息,我们需要创建一个服务帐号,并授予其所需的 订阅者角色。这些步骤已在此处记录,并在下面显示

create_service_account.gif

或者,用户可以使用以下 gcloud 命令

# create service account
gcloud iam service-accounts create ethereum --display-name="ethereum"

#assign Pub/Sub Subscriber role
gcloud projects add-iam-policy-binding <project_id> --member="serviceAccount:ethereum@<project_id>.iam.gserviceaccount.com" --role="roles/pubsub.subscriber"

订阅的连接详细信息和上面的凭据可以导出为一个密钥,用于与我们的 Confluent Cloud Pub/Sub 连接器共享,如下所示。

create_service_key.gif

在 Confluent Cloud 中部署 Pub/Sub 连接器

Kafka Connect Google Cloud Pub/Sub 源连接器使用 拉取策略 从 Pub/Sub 主题读取消息,并将它们写入 Kafka 主题。消息的传递是异步的,即消息在 Pub/Sub 订阅中的可用性和传递到连接器是独立的。此连接器提供 至少一次 语义和 无顺序保证。这些属性足以满足我们的用例——行可以乱序出现,而罕见的重复不太可能影响后续数据分析。

在 Confluent Cloud 中部署 Pub/Sub 连接器之前,请确保您拥有以下详细信息

  • 包含 Pub/Sub 订阅的项目 ID
  • Pub/Sub 中的原始主题 ID - **crypto_ethereum.blocks**
  • 订阅 ID - **ethereum.blocks**(如果上述内容适用)
  • 在最后一步中获得导出的服务密钥。

这些内容可以适应目标数据类型。在下面的示例中,我们使用这些详细信息创建 Pub/Sub 连接器实例,为其分配一个任务,并创建一个 Kafka 主题 block_messages

create_pub_sub_connector.gif

创建后,我们可以导航到目标 block_messages 主题并对消息进行采样,如下所示。此源连接器以 JSON 格式生成消息,没有模式。

sample_messages.gif

在将此数据发送到 ClickHouse 之前,我们需要确保此数据格式正确,并且目标表存在。

准备 ClickHouse

创建表

下面显示了我们为块表提出的模式。

SET flatten_nested=0
CREATE TABLE ethereum.blocks
(
	`number` UInt32 CODEC(Delta(4), ZSTD(1)),
	`hash` String,
	`parent_hash` String,
	`nonce` String,
	`sha3_uncles` String,
	`logs_bloom` String,
	`transactions_root` String,
	`state_root` String,
	`receipts_root` String,
	`miner` String,
	`difficulty` Decimal(38, 0),
	`total_difficulty` Decimal(38, 0),
	`size` UInt32 CODEC(Delta(4), ZSTD(1)),
	`extra_data` String,
	`gas_limit` UInt32 CODEC(Delta(4), ZSTD(1)),
	`gas_used` UInt32 CODEC(Delta(4), ZSTD(1)),
	`timestamp` DateTime CODEC(Delta(4), ZSTD(1)),
	`transaction_count` UInt16,
	`base_fee_per_gas` UInt64,
         `withdrawals_root` String,
         `withdrawals` Nested(index Int64, validator_index Int64, address String, amount String) 
)
ENGINE = MergeTree
ORDER BY timestamp

其他数据类型的等效模式可以 在这里 找到

上面的模式与我们在 早期博文中 提出的模式略有不同,其中最近在规范中 [1][2] 添加了 withdrawals_rootwithdrawals 列。我们还将 flatten_nested 设置设置为 0 以保留 withdrawals 列的嵌套结构。这样,我们可以将此列插入为嵌套 JSON 结构。

下面显示了 Kafka 主题 block_messages 中的消息。

{
  "MessageData": "{\"type\": \"block\", \"number\": 17477635, \"hash\": \"0x6c0e971090f48adfc04303b302e5f14895c104e9a60ec6126b96579194a2c14b\", \"parent_hash\": \"0xdf90825e84c50550be12143d998090883bb92deecbdb5bd84235023f8fcad9c5\", \"nonce\": \"0x0000000000000000\", \"sha3_uncles\": \"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347\", \"logs_bloom\": \"0xc0210585c002844400c000f0a011e324186195244041200041031242148ec051808106400d3030600c384180050003c20a41078488403c3022880000147ea8a348110c0ad50001086800430b83d120e89010000024445881003000848023a010562881021302a60118003c19400c099206904e7008b104301214001042fc007608271140460524a0904e3142174c250174070921a90006686244236043b080468f90b161e00364a20a8b48b15cb205a50c44082cc100040b02812017008c016501800162115c06022539a2401c0ef20020680a00002c201c0010c8920210a0000895a11c0a1844860a00208045810604a0301013015ea86219aa98831e027440\", \"transactions_root\": \"0xefaf112480278167af853214fc55a7fcaff4a879c1e97357be77d95fd114c046\", \"state_root\": \"0x2b5fa13bcecb133578e5e6c328944a551ccb497275ba4263d5b52d9f88bab2e1\", \"receipts_root\": \"0x2ef3cb89ab21d6d861777087ee1791c5a6197d5b6e9f8e0228717b064ad60efe\", \"miner\": \"0xbaf6dc2e647aeb6f510f9e318856a1bcd66c5e19\", \"difficulty\": 0, \"total_difficulty\": 58750003716598352816469, \"size\": 38091, \"extra_data\": \"0x4d616465206f6e20746865206d6f6f6e20627920426c6f636b6e6174697665\", \"gas_limit\": 30000000, \"gas_used\": 6452970, \"timestamp\": 1686739043, \"transaction_count\": 83, \"base_fee_per_gas\": 14420785730, \"withdrawals_root\": \"0x71fbe84200d685e619f28a7f3aedfcacadf5f8bde9be2c20f9d146110f66e558\", \"withdrawals\": [{\"index\": 7083728, \"validator_index\": 649036, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13657565}, {\"index\": 7083729, \"validator_index\": 649037, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13749238}, {\"index\": 7083730, \"validator_index\": 649038, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13636849}, {\"index\": 7083731, \"validator_index\": 649039, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13700532}, {\"index\": 7083732, \"validator_index\": 649040, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13647987}, {\"index\": 7083733, \"validator_index\": 649041, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13712208}, {\"index\": 7083734, \"validator_index\": 649042, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13748808}, {\"index\": 7083735, \"validator_index\": 649043, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13720541}, {\"index\": 7083736, \"validator_index\": 649044, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 49166908}, {\"index\": 7083737, \"validator_index\": 649045, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13747911}, {\"index\": 7083738, \"validator_index\": 649046, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13701501}, {\"index\": 7083739, \"validator_index\": 649047, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13695932}, {\"index\": 7083740, \"validator_index\": 649048, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13708868}, {\"index\": 7083741, \"validator_index\": 649049, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13676192}, {\"index\": 7083742, \"validator_index\": 649050, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13594476}, {\"index\": 7083743, \"validator_index\": 649051, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13716126}], \"item_id\": \"block_0x6c0e971090f48adfc04303b302e5f14895c104e9a60ec6126b96579194a2c14b\", \"item_timestamp\": \"2023-06-14T10:37:23Z\"}",
  "AttributesMap": {
	"item_id": "block_0x6c0e971090f48adfc04303b302e5f14895c104e9a60ec6126b96579194a2c14b",
	"item_timestamp": "2023-06-13T18:44:47Z"
  }
}

主体位于 MessageData 字段中,是一个转义的 JSON 字符串。这种格式与上面的模式不兼容,需要在插入之前进行转换。为此,我们使用物化视图。

使用物化视图转换消息

ClickHouse 中的物化视图可用于在插入时转换行。视图在接收行块的表进行插入时触发,对该块执行 SELECT 操作,并将结果发送到目标表。

mv.png

此方法依赖于具有兼容模式的表来接收我们的插入消息。此表 block_messages 如下所示。

CREATE TABLE ethereum.block_messages
(
    `MessageData` String,
    `AttributesMap` Tuple(item_id String, item_timestamp String)
)
ENGINE = Null

如您所见,此表使用的是 空表引擎。这样可以确保原始的原始插入行不会被保留,因为它们在初始调试后不再需要。物化视图仍将在对该表进行插入时触发,将转换后的消息转发到我们所需的 target 表 blocks

物化视图 blocks_mv 在将行插入到表 block_messages 时触发,如下所示。上面的视图的 SELECT 查询将这些行转换为表 ethereum.blocks 和前面介绍的模式所期望的格式。目标表通过 TO 语法指定。

CREATE MATERIALIZED VIEW ethereum.blocks_mv TO ethereum.blocks
(
	`number` UInt32,
	`hash` String,
	`parent_hash` String,
	`nonce` String,
	`sha3_uncles` String,
	`logs_bloom` String,
	`transactions_root` String,
	`state_root` String,
	`receipts_root` String,
	`miner` String,
	`difficulty` Decimal(38, 0),
	`total_difficulty` Decimal(38, 0),
	`size` UInt32,
	`extra_data` String,
	`gas_limit` UInt32,
	`gas_used` UInt32,
	`timestamp` DateTime,
	`transaction_count` UInt16,
	`base_fee_per_gas` UInt64,
	`withdrawals_root` String,
	`withdrawals` Nested(index UInt64, validator_index Int64, address String, amount UInt64)
) AS
SELECT
JSONExtract(MessageData, 'number', 'UInt32') AS number,
JSONExtractString(MessageData, 'hash') AS hash,
JSONExtractString(MessageData, 'parent_hash') AS parent_hash,
JSONExtractString(MessageData, 'nonce') AS nonce,
JSONExtractString(MessageData, 'sha3_uncles') AS sha3_uncles,
JSONExtractString(MessageData, 'logs_bloom') AS logs_bloom,
JSONExtractString(MessageData, 'transactions_root') AS transactions_root,
JSONExtractString(MessageData, 'state_root') AS state_root,
JSONExtractString(MessageData, 'receipts_root') AS receipts_root,
JSONExtractString(MessageData, 'miner') AS miner,
JSONExtract(MessageData, 'difficulty', 'Decimal(38, 0)') AS difficulty,
JSONExtract(MessageData, 'total_difficulty', 'Decimal(38, 0)') AS total_difficulty,
JSONExtract(MessageData, 'size', 'UInt32') AS size,
JSONExtractString(MessageData, 'extra_data') AS extra_data,
JSONExtract(MessageData, 'gas_limit', 'UInt32') AS gas_limit,
JSONExtract(MessageData, 'gas_used', 'UInt32') AS gas_used,
JSONExtract(MessageData, 'timestamp', 'UInt64') AS timestamp,
JSONExtract(MessageData, 'transaction_count', 'UInt16') AS transaction_count,
JSONExtract(MessageData, 'base_fee_per_gas', 'UInt64') AS base_fee_per_gas,
JSONExtract(MessageData, 'withdrawals_root', 'String') AS withdrawals_root,
JSONExtract(MessageData, 'withdrawals', 'Nested(index UInt64, validator_index Int64, address String, amount UInt64)') AS withdrawals
FROM ethereum.block_messages
SETTINGS allow_simdjson = 0

这里,我们依赖于 JSONExtract 函数族从 JSON 字符串列 MessageData 中提取字段。我们使用适当的变体来确保值正确转换,例如 JSONExtract(MessageData, 'difficulty', 'Decimal(38, 0)') AS difficultyMessageData 列中提取字段 difficulty 作为 Decimal(38, 0),并将结果作为 "difficulty" 别名。此 SELECT 的结果是一组与我们之前表模式兼容的结果。

在上面,我们使用设置 allow_simdjson=0。这是必需的,因为用于解析 JSON 的默认 simdjson 实现无法解析大于 UInt64 的整数。将此值设置为 0 将启用 RapidJSON 解析,尽管性能不如 simdjson,但足以满足我们的需求。另一个选择 visitParam 函数需要更复杂的表达式,并且对我们的 JSON 字符串中存在的空格的容忍度更低。

为了测试此工作流,我们可以将消息插入到表 block_messages 中作为 JSONEachRow,并确认它是否已由目标表 blocks 接收。在下面的示例中,文件 eth.json 包含我们 之前以 ndJSON 格式提供的 Kafka 消息

clickhouse-client --query "INSERT INTO ethereum.block_messages FORMAT JSONEachRow" < eth.json
SELECT *
FROM ethereum.block
FORMAT Vertical

Row 1:
──────
number:            17477635
hash:              0x6c0e971090f48adfc04303b302e5f14895c104e9a60ec6126b96579194a2c14b
parent_hash:       0xdf90825e84c50550be12143d998090883bb92deecbdb5bd84235023f8fcad9c5
nonce:             0x0000000000000000
sha3_uncles:       0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347
logs_bloom:        0xc0210585c002844400c000f0a011e324186195244041200041031242148ec051808106400d3030600c384180050003c20a41078488403c3022880000147ea8a348110c0ad50001086800430b83d120e89010000024445881003000848023a010562881021302a60118003c19400c099206904e7008b104301214001042fc007608271140460524a0904e3142174c250174070921a90006686244236043b080468f90b161e00364a20a8b48b15cb205a50c44082cc100040b02812017008c016501800162115c06022539a2401c0ef20020680a00002c201c0010c8920210a0000895a11c0a1844860a00208045810604a0301013015ea86219aa98831e027440
transactions_root: 0xefaf112480278167af853214fc55a7fcaff4a879c1e97357be77d95fd114c046
state_root:        0x2b5fa13bcecb133578e5e6c328944a551ccb497275ba4263d5b52d9f88bab2e1
receipts_root:     0x2ef3cb89ab21d6d861777087ee1791c5a6197d5b6e9f8e0228717b064ad60efe
miner:             0xbaf6dc2e647aeb6f510f9e318856a1bcd66c5e19
difficulty:        0
total_difficulty:  58750003716598355985296
size:              38091
extra_data:        0x4d616465206f6e20746865206d6f6f6e20627920426c6f636b6e6174697665
gas_limit:         30000000
gas_used:          6452970
timestamp:         2023-06-14 10:37:23
transaction_count: 83
base_fee_per_gas:  14420785730
withdrawals_root:  0x71fbe84200d685e619f28a7f3aedfcacadf5f8bde9be2c20f9d146110f66e558
withdrawals:       [(7083728,649036,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13657565),(7083729,649037,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13749238),(7083730,649038,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13636849),(7083731,649039,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13700532),(7083732,649040,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13647987),(7083733,649041,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13712208),(7083734,649042,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13748808),(7083735,649043,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13720541),(7083736,649044,'0x07fac54a901409fe10e56c899be3dcf2471ae321',49166908),(7083737,649045,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13747911),(7083738,649046,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13701501),(7083739,649047,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13695932),(7083740,649048,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13708868),(7083741,649049,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13676192),(7083742,649050,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13594476),(7083743,649051,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13716126)]

1 row in set. Elapsed: 0.025 sec.

其他数据类型的等效物化视图可以 在这里 找到。

在 Confluent Cloud 中部署 ClickHouse 连接器

将消息发布到 Kafka 主题并配置 ClickHouse 接收和转换插入操作后,我们可以部署 ClickHouse 连接器。Confluent Cloud 自定义连接器 产品要求我们以 zip 格式上传连接器包。最新的软件包发行版可以从 这里 下载。

我们在下面演示了此过程,并注意了重要的设置。完整的步骤列表可在 Confluent 文档 中找到。

upload_connector.gif

  • Connector Class - com.clickhouse.kafka.connect.ClickHouseSinkConnector
  • Connector type - Sink
  • Sensitive properties - password。这将确保 ClickHouse 密码的条目在配置和日志中被屏蔽。

上传后,可以创建连接器实例。我们在下面演示了这一点,并注意了用于此特定任务的 JSON 配置。

create_clickhouse_connector.gif

{
  "database": "ethereum",
  "exactlyOnce": "false",
  "hostname": "<hostname>",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "password": "<password>",
  "port": "8443",
  "schemas.enable": "false",
  "security.protocol": "SSL",
  "ssl": "true",
  "topics": "block_messages",
  "username": "default",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false"
}

截至撰写本文时,连接器要求主题和目标表的名称相同。在我们之前的示例中,我们的接收表因此创建为 block_messages——请将其调整为数据类型。在上面的示例中,我们指定 ethereum 数据库与之前的配置保持一致,并禁用恰好一次语义。虽然该功能受支持,但这会造成 额外的开销,在我们的用例中没有必要,因为 Pub/Sub 连接器最多提供至少一次传递。最后,请注意使用 JSON 转换器处理我们的主要有效负载。完整的配置选项列表可以 在这里 找到。

测试

以太坊区块链大约每分钟形成 4 到 5 个新区块,每个区块代表一行。在运行上述工作流几分钟后,用户应该能够确认数据正在流动

SELECT
    count(),
    toStartOfMinute(timestamp) AS min
FROM ethereum.blocks_test
GROUP BY min
ORDER BY min DESC
LIMIT 10

┌─count()─┬─────────────────min─┐
│       22023-06-14 16:45:00 │
│       22023-06-14 16:44:00 │
│       22023-06-14 16:40:00 │
│       42023-06-14 16:39:00 │
│       42023-06-14 16:38:00 │
│       52023-06-14 16:37:00 │
│       52023-06-14 16:36:00 │
│       52023-06-14 16:35:00 │
│       52023-06-14 16:34:00 │
│       32023-06-14 16:33:00 │
└─────────┴─────────────────────┘

10 rows in set. Elapsed: 0.029 sec.

尽管这些数据量很小,但相同的部署架构会维护我们公共 sql.clickhouse.com 环境中的跟踪、合约和交易数据集。例如,我们建议您参考我们之前 博文已记录的示例,以获得查询和灵感。

提示和技巧

  • 该连接器利用 Kafka Connect 的 基于任务 模型来扩展吞吐量。每个任务都是单线程的,并从一个或多个分区消费消息。没有两个任务可以从同一个分区消费。为了获得最佳吞吐量,用户应该配置与分区数量相同的任务。超过分区数量不会带来任何额外的好处,只会导致任务空闲。在这种情况下,要进一步扩展吞吐量,需要增加源主题的分区数量。我们用于跟踪的最大主题需要 3 个分区。

  • 如果使用自定义连接器产品在 Confluent Cloud 中部署连接器,则在撰写本文时,用户无法修改控制每个任务交付的批处理大小的设置。这些设置是 Kafka Connect 框架特有的,与连接器本身无关。默认情况下,每个连接器任务从框架接收最多 500 行的批处理。在插入 ClickHouse 之前,连接器不会进行任何批处理。这意味着插入 ClickHouse 的批处理可能比 推荐的大小 小。更重要的是,这个默认值会导致每秒的插入速率大于 1。当为了吞吐量而扩展任务数量时,上述情况会导致诸如“过多部分”之类的错误。

    为了解决这个问题,用户应该在 ClickHouse 中启用 异步插入。这将导致 ClickHouse 在插入底层表之前,使用基于磁盘的缓冲区对插入进行批处理。这确保了插入大块数据,防止出现诸如“过多部分”之类的错误。但是,这种方法目前与连接器的精确一次语义不兼容,该语义 依赖于普通插入的去重属性(即,在窗口期间对相同数据的插入进行去重)。虽然 ClickHouse 的最新版本支持异步插入的去重,但此功能处于实验阶段,尚未在连接器的精确一次模式下进行测试。

我们正在与 Confluent 合作,以公开相关的 Kafka Connect 设置,以便交付更大的批处理。

  • 在撰写本文时,Confluent 自定义连接器界面仅提供摘要日志。这些日志通常不足以诊断问题。幸运的是,所有连接器都会创建一个包含日志的主题。可以下载并检查该主题中的日志消息,以用于调试目的。我们计划通过 确保相关的调试信息在摘要日志中显示 来改进此工作负载,从而最大程度地减少诊断问题所需的工作量。

结论

在这篇博文中,我们探讨了如何使用 Confluent Cloud 的新自定义连接器产品来构建一个无代码 Kafka 管道,用于将数据从 Google Pub/Sub 移动到 ClickHouse Cloud。对于我们的示例数据集,我们使用了 Google 为以太坊区块链提供的事件,以及用于将公共数据集实时维护在 sql.clickhouse.com 中的管道。

clickHouse-kafka-connect 接收器目前正在积极开发中,这得益于用户在 存储库 或通过我们的 社区 Slack 中提供的宝贵反馈。我们的计划是在未来几个月内继续改进此连接器的生产就绪性,目标是在今年第三季度发布连接器的 GA 版本。如果您想加入这项工作,请随时与我们联系!正如 ClickHouse 一直以来的那样,任何贡献都是受欢迎的!

立即开始使用 ClickHouse Cloud 并获得 300 美元的积分。在 30 天试用期结束后,您可以继续使用按使用付费计划,或者 联系我们 了解有关我们基于容量的折扣的更多信息。请访问我们的 定价页面 以了解详细信息。

分享此帖子

订阅我们的时事通讯

了解最新功能发布、产品路线图、支持和云产品!
正在加载表单...
关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image