立即开始使用 ClickHouse Cloud,并获得 300 美元信用额度。要了解有关我们基于用量的折扣的更多信息,请联系我们或访问我们的定价页面。
ClickHouse Kafka Connector 简介
今年早些时候,我们介绍了新的官方开源 ClickHouse Kafka 连接器,该连接器基于 Kafka Connect 框架。此连接器旨在利用 ClickHouse 的新功能,特别是 Keeper Map 表引擎,以提供恰好一次的语义。我们很高兴地宣布,此连接器已在 Confluent Cloud 中经过测试,现在可通过“Confluent Cloud 自定义连接器”产品提供。
这篇博文演示了如何使用这种新的部署方法,通过 Confluent Cloud Kafka 主题,将以太坊加密货币事件从 Google Pub/Sub 可靠地流式传输到 ClickHouse Cloud。我们通过零代码和仅基于 UI 的配置实现了这一点。
我们的示例使用 ClickHouse Cloud 集群的开发实例。但是,这些示例应该可以在同等大小的自托管集群上重现。请注意,需要更大的实例来托管整个以太坊数据集 - 请参阅我们关于此主题的专门博文。或者,立即启动您的 ClickHouse Cloud 集群,并获得 300 美元的信用额度。让我们担心基础设施,让您专注于查询!
示例假设您拥有 Google Cloud 和 Confluent 帐户。这些示例使用数据的最小子集,Google Cloud 每月费用应低于 1 美元,Confluent 每天费用约为 6 美元。
Confluent Cloud 和自定义连接器
Apache Kafka 是一个无处不在的开源分布式事件流平台,数千家公司将其用于高性能数据管道、流分析、数据集成和关键任务应用程序。
除了维护和分发企业版 Kafka 之外,Confluent 还提供完全托管的云环境,用户可以在其中部署 Kafka。这包括 Kafka Connect 框架的托管产品,Kafka Connect 框架是 Kafka 的一个组件,它充当集中式数据中心,用于数据库、键值存储、搜索索引和文件系统之间的简单数据集成。从历史上看,用户只能在 Confluent Cloud 中部署官方支持的连接器,只需定义并行级别(任务),而无需担心管理相关的基础设施。
最近,Confluent 通过支持自定义连接器扩展了此产品。这允许用户通过简单地上传编译后的软件包并指定关键配置详细信息来部署任何基于 Java 构建在 Kafka connect 框架上的连接器。这些连接器反过来可以使用与 Confluent 拥有的官方连接器相同的托管产品进行部署。
在 ClickHouse,我们经常看到用户使用 Kafka 流式传输功能来处理和缓冲事件,然后在将它们插入 ClickHouse 以进行实时分析。因此,此产品对于 ClickHouse Cloud 用户和现有的 Confluent 客户来说令人兴奋,他们现在可以轻松地在其 Kafka 主题和 ClickHouse 实例之间流式传输数据,而无需担心管理基础设施,只需使用官方的 ClickHouse Kafka Connect 连接器即可。
Confluent 自定义连接器的完整文档以及当前支持的 AWS 区域可以在此处找到。
测试数据集
对于我们的测试数据集,我们使用了 Google 在公共项目中提供的以太坊加密货币数据集。
阅读这篇博文不需要任何加密货币的先验经验,但对于那些感兴趣的人来说,《以太坊简介》提供了一个有用的概述,以及 Google 关于如何构建此数据集的博客。
总而言之,我们的数据集由 4 个表组成。这是完整数据的子集,但足以满足大多数常见问题
- 区块 - 区块是交易批次,其中包含链中前一个区块的哈希值。大约 1700 万行,增长率为每天 7k。
- 交易 - 交易是来自帐户的加密签名指令。帐户将发起交易以更新以太坊网络的状态,例如,将 ETH 从一个帐户转移到另一个帐户。超过 20 亿行,每天增加约 100 万行。
- 追踪 - 允许查询所有以太坊地址及其余额的内部交易。超过 70 亿行,每天增加 500 万行。
- 合约 - “智能合约”只是在以太坊区块链上运行的程序。超过 6000 万行,每天增加约 5 万行。
从批量到流式传输
我们在之前的博文中详细探讨过此数据集,比较了 BigQuery 与 ClickHouse,其中我们提出了一种基于批处理的方法,以使此数据集在 ClickHouse 中保持最新。这是通过定期从公共 BigQuery 表(通过计划查询)导出数据到 GCS,以及将此数据导入 ClickHouse(通过简单的计划 cron 作业)来实现的。
虽然这足以满足我们当时的需求,并用于使我们的公共 sql.clickhouse.com 环境为我们的加密货币爱好者用户保持最新,但这在区块链和 BigQuery 上提供的数据与在 ClickHouse 中可查询的数据之间引入了令人不满意的延迟(约 30 分钟)。
幸运的是,Google 还将此数据集以几个公共 Pub/Sub 主题提供,提供了可以使用的事件流。从以太坊区块链延迟仅 4 分钟,此来源将使我们能够提供与 BigQuery 相媲美的服务。
我们需要一个强大的管道将这些公共 Pub/Sub 主题连接到 ClickHouse,并且希望最大限度地减少初始工作和任何未来的维护开销,因此使用我们新连接器的云托管方法似乎是理想的解决方案,Confluent 托管基础设施。除了减少数据在 ClickHouse 中可用的延迟之外,Kafka 还允许我们缓冲最多 N 天的数据,并在需要时为我们提供重放功能。
为了实现此架构,我们还需要一种可靠的方法将消息从 Pub/Sub 发送到 Kafka。Confluent 为此目的提供了一个源连接器,该连接器也可以零代码部署。将这些连接器组合在一起会产生以下简单的架构
几个简单的步骤
在我们的示例中,我们将使用以太坊 blocks
数据集。这代表了可用的以太坊数据集中最小的数据集,大约有 1700 万行,每天形成 7000 个新区块。但是,我们对其他以太坊数据集使用相同的方法,在需要时引用配置文件,包括最大的表 traces
。这包括超过 70 亿行,每天增加约 550 万行。
此数据非常适合 ClickHouse,不可变的区块链生成仅追加行。无论数据集如何,Pub/Sub 连接器和 ClickHouse Kafka Connector 的单个工作程序都足以满足此吞吐量。
创建 Pub/Sub 订阅
Google Pub/Sub 是一种异步且可扩展的消息传递服务,它将生成消息的服务与处理这些消息的服务解耦。与 Kafka 一样,发布者将消息生成到主题,而无需担心它们将如何在以后通过订阅被订阅者异步使用。
信用:https://cloud.google.com/pubsub/architecture
Google 将每个以太坊数据集作为公共主题提供,我们可以向其注册订阅。此主题上的每条消息都相当于 ClickHouse 中的一行。
Pub/Sub 中前 10GB 的吞吐量数据是免费的,之后 Google 收取约 40 美元/TB。每月约 120GB 的以太坊追踪数据,每月约为 4 美元。重要的是,一旦已确认的消息传递到 Kafka,我们就不会保留这些消息,并且在我们的订阅中仅保留七天的未确认消息(也是免费的)。
公共主题名称遵循以下命名,因此您可以轻松找到它们
projects/crypto-public-data/topics/crypto_{chain}.{table_name}
其中,
chain
可以是 ethereum
、bitcoin
、zcash
、litecoin
、dogecoin
或 dash
之一。为了我们的目的,我们使用 ethereum
。table_name
可以是 blocks
或 transactions
之一。此外,对于以太坊:支持 logs
、token_transfers
、traces
、contracts
和 tokens
。我们为 blocks
、transactions
、traces
和 contracts
提供配置,重点是在示例中使用的 blocks
。
假设您已安装 Google Cloud CLI,通过身份验证并选择了您拥有相关权限的项目,则可以使用以下命令创建 id 为 ethereum.blocks
的订阅
gcloud pubsub subscriptions create ethereum.blocks --topic=crypto_ethereum.blocks --topic-project=crypto-public-data --ack-deadline 60
Created subscription [projects/pmm-project-377716/subscriptions/ethereum.blocks].
用户可以使用以下命令确认消息正在传递
gcloud pubsub subscriptions pull ethereum.traces --format=json
[
{
"ackId": "FixdRkhRNxkIaFEOT14jPzUgKEUSBAgUBXx9dUJfdV1acGhRDRlyfWB9YggQUABCUi8KURkLb1xWdRVgDQGo4vPhXHgzBgtEVHheUhwIa1lUdQBWBTG5nJjfycfSPxh5a6TAyY87SOnenLpiZiw9XxJLLD5-IC1FQV5AEkwrGERJUytDCypYEU4EISE-MD5FU0RQBg",
"message": {
"attributes": {
"item_id": "block_0x91af5d194d1f6450597cfe84895692cba1615487528f9aeea81bb6caecf24049",
"item_timestamp": "2023-06-13T16:19:11Z"
},
"data": "eyJ0eXBlIjoiYmxvY2siLCJudW1iZXIiOjE3NDcyMjE1LCJoYXNoIjoiMHg5MWFmNWQxOTRkMWY2NDUwNTk3Y2ZlODQ4OTU2OTJjYmExNjE1NDg3NTI4ZjlhZWVhODFiYjZjYWVjZjI0MDQ5IiwicGFyZW50X2hhc2giOiIweGI0MGQ3YWExNzJmNWZmNjZhNGE5YzY1ZGE0ODY4NGI1MWVlMjI2NTU4NmJkYzZlN2RiYThkMjMzNzMzYWEzNjUiLCJub25jZSI6IjB4MDAwMDAwMDAwMDAwMDAwMCIsInNoYTNfdW5jbGVzIjoiMHgxZGNjNGRlOGRlYzc1ZDdhYWI4NWI1NjdiNmNjZDQxYWQzMTI0NTFiOTQ4YTc0MTNmMGExNDJmZDQwZDQ5MzQ3IiwibG9nc19ibG9vbSI6IjB4MDhmZjFiMjZlMGEwYTVlYzUxMGM5NDhhZWEwYzBmNjMxMWVmMTkzZmVjMWEzMmE1MjMwZGMwNWJjNDdjNGVjMDE0NDcxYjgyYzQ3ODMxMzUwZjVmNTMzODc5MWM0NTg0M2FkMThjMGNjODAzYmVhNTQ1MjA3ZWEyOTBhOTczMTEwZDY3NzAwOTE5MGNjODJkNGQzMzU2ZWUwMDg1MmJhZDIwODkwODY1MTI1MzljNDgwYzcyZmMwNDhjZTUxNDIxNWFiYWM2Y2YwYjAzYWY0MjI0NDEzODA3MTIwYzQ5Y2EzZmM4NWFiMDBjOTk2YzI2OWFjMTE3ZmM2MDVjMmYzYzBiOGE0NDQxMzgyNmE5NDEyNGVkODg5ZTg5NjNhZjI2NTEyYTE0MzFiYmIxZTI1ZjhkMjkzNzRhMDliNDUyYmY4YWIyMjYzZTljY2QyMDc1MTYwMzRjY2RiNDAxMjVkMmE2NDNkODIxMGYyODA0MGYwMDcwNjYzMjRjY2MwMDVjYzJjZWNjNmE2NzhhNGFjYjIwNDE4YzUzZDYyNzEyZTIxY2NjMGI2YTE2MmIwYTE0MTNlNTkxNDI4OTMyMjEwZjY4M2JhMDZjZDlkODg2Mjg4MTA3NTA2YjBjMDliYmM2NGY2OTE2ODliNTJiMTZmNDk0MDllNjE1NDgyOTdjNDEiLCJ0cmFuc2FjdGlvbnNfcm9vdCI6IjB4YTUyOTA4YWFkYWRhNzkwZWFjYjYyNzFlZTZhMTRiMjE1ZTVkZGJmZDFhNTcwZmUzNDk2MWIyYTE4YTliMDg4NyIsInN0YXRlX3Jvb3QiOiIweGE5OGM0YTMzNWYzYzE1ZTQ4NGE3YjY0MmMxZTMyMzNlMGFlM2ExZjFhZWYwZTYxZWRmMTFmYWRhNjU3YjZjMWYiLCJyZWNlaXB0c19yb290IjoiMHg5MGJkYzkwMTY3OTM5Yzg3MjQ4ZDgzYzQ3Nzk0OTJkNWYyNmY0NTlmZWU3ZGM2ZmJhNDAxMmU4NDA3YjliOTk1IiwibWluZXIiOiIweDM4OGM4MThjYThiOTI1MWIzOTMxMzFjMDhhNzM2YTY3Y2NiMTkyOTciLCJkaWZmaWN1bHR5IjowLCJ0b3RhbF9kaWZmaWN1bHR5Ijo1ODc1MDAwMzcxNjU5ODM2MDAwMDAwMCwic2l6ZSI6NTIxMzUsImV4dHJhX2RhdGEiOiIweDYyNjU2MTc2NjU3MjYyNzU2OTZjNjQyZTZmNzI2NyIsImdhc19saW1pdCI6MzAwMDAwMDAsImdhc191c2VkIjoxMjg3Njk1NCwidGltZXN0YW1wIjoxNjg2NjczMTUxLCJ0cmFuc2FjdGlvbl9jb3VudCI6MTg3LCJiYXNlX2ZlZV9wZXJfZ2FzIjozODA2NTQ1NTgwMywid2l0aGRyYXdhbHNfcm9vdCI6IjB4YzYyMjVmYzYwMmFkMDZiNDk5ZmE5OTI0MjNkNjg3MmFlODJkZDM5YTZkZmY1MmQwMWQyZWIyMzQyYmMwMmRiZiIsIndpdGhkcmF3YWxzIjpbXSwiaXRlbV9pZCI6ImJsb2NrXzB4OTFhZjVkMTk0ZDFmNjQ1MDU5N2NmZTg0ODk1NjkyY2JhMTYxNTQ4NzUyOGY5YWVlYTgxYmI2Y2FlY2YyNDA0OSIsIml0ZW1fdGltZXN0YW1wIjoiMjAyMy0wNi0xM1QxNjoxOToxMVoifQo=",
"messageId": "7917619753580778",
"publishTime": "2023-06-13T16:22:52.230Z"
}
}
]
请注意,我们的主要消息负载在此处以 base64 编码。我们可以使用一些 bash 确认负载结构。
gcloud pubsub subscriptions pull ethereum.blocks --format=json | jq -r '.[].message.data' | base64 -d | jq
{
"type": "block",
"number": 17472215,
"hash": "0x91af5d194d1f6450597cfe84895692cba1615487528f9aeea81bb6caecf24049",
"parent_hash": "0xb40d7aa172f5ff66a4a9c65da48684b51ee2265586bdc6e7dba8d233733aa365",
"nonce": "0x0000000000000000",
"sha3_uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
"logs_bloom": "0x08ff1b26e0a0a5ec510c948aea0c0f6311ef193fec1a32a5230dc05bc47c4ec014471b82c47831350f5f5338791c45843ad18c0cc803bea545207ea290a973110d677009190cc82d4d3356ee00852bad2089086512539c480c72fc048ce514215abac6cf0b03af4224413807120c49ca3fc85ab00c996c269ac117fc605c2f3c0b8a44413826a94124ed889e8963af26512a1431bbb1e25f8d29374a09b452bf8ab2263e9ccd207516034ccdb40125d2a643d8210f28040f007066324ccc005cc2cecc6a678a4acb20418c53d62712e21ccc0b6a162b0a1413e591428932210f683ba06cd9d886288107506b0c09bbc64f691689b52b16f49409e61548297c41",
"transactions_root": "0xa52908aadada790eacb6271ee6a14b215e5ddbfd1a570fe34961b2a18a9b0887",
"state_root": "0xa98c4a335f3c15e484a7b642c1e3233e0ae3a1f1aef0e61edf11fada657b6c1f",
"receipts_root": "0x90bdc90167939c87248d83c4779492d5f26f459fee7dc6fba4012e8407b9b995",
"miner": "0x388c818ca8b9251b393131c08a736a67ccb19297",
"difficulty": 0,
"total_difficulty": 58750003716598360000000,
"size": 52135,
"extra_data": "0x6265617665726275696c642e6f7267",
"gas_limit": 30000000,
"gas_used": 12876954,
"timestamp": 1686673151,
"transaction_count": 187,
"base_fee_per_gas": 38065455803,
"withdrawals_root": "0xc6225fc602ad06b499fa992423d6872ae82dd39a6dff52d01d2eb2342bc02dbf",
"withdrawals": [
],
"item_id": "block_0x91af5d194d1f6450597cfe84895692cba1615487528f9aeea81bb6caecf24049",
"item_timestamp": "2023-06-13T16:19:11Z"
}
或者,用户可以通过 Google 控制台创建订阅并确认消息传递,如下所示。请注意,我们对订阅使用拉取策略(早期命令中的默认值),该策略与 Confluent Pub/Sub 连接器兼容。与推送策略相比,这更有效,并且最大限度地减少了延迟,同时不需要将 Confluent 端点暴露于公共互联网。
我们没有调整未确认消息的默认保留期 7 天(连接器确认的消息将立即删除),但确实修改了确认截止时间,将其设置为 60 秒(见下文)。
以上内容可以根据需要调整为其他以太坊数据类型,例如追踪。
在上面的示例中,我们将确认截止时间设置为 60 秒。在此期间后未确认的消息将重新发送。在测试 Pub/Sub 连接器时,我们发现默认值 10 秒会导致重复消息的传递,连接器无法在足够的时间内确认消息。增加此值符合 Confluent 的建议。更高的值可能会延迟失败消息的传递,但这不太可能影响我们的用例。对于希望调整此值的用户,过期的确认计数过高可能表示重复。可以从订阅的“运行状况”面板中查看此信息。如下所示,值 10 秒会导致我们的用例中出现重复项。
创建服务凭据
为了使外部服务能够从我们的 Pub/Sub 订阅中读取消息,我们需要创建一个服务帐户并授予其所需的订阅者角色。此步骤在此处记录,如下所示
或者,用户可以使用以下 gcloud
命令
# create service account
gcloud iam service-accounts create ethereum --display-name="ethereum"
#assign Pub/Sub Subscriber role
gcloud projects add-iam-policy-binding <project_id> --member="serviceAccount:ethereum@<project_id>.iam.gserviceaccount.com" --role="roles/pubsub.subscriber"
订阅的连接详细信息和上述凭据可以导出为单个密钥,以便与我们的 Confluent Cloud Pub/Sub Connector 共享,如下所示。
在 Confluent Cloud 中部署 Pub/Sub Connector
Kafka Connect Google Cloud Pub/Sub Source Connector 使用拉取策略从 Pub/Sub 主题读取消息,并将它们写入 Kafka 主题。消息的传递是异步的,即它们在 Pub/Sub 订阅上的可用性与传递到连接器是独立的。此连接器提供至少一次语义和无顺序保证。这些属性足以满足我们的用例 - 行可能会乱序出现,并且罕见的重复项不太可能影响以后的数据分析。
在 Confluent Cloud 中部署 Pub/Sub Connector 之前,请确保您具有以下详细信息
- 您的 Pub/Sub 订阅所在的 project Id
- Pub/Sub 中的原始主题 id - crypto_ethereum.blocks
- 订阅 id - **ethereum.blocks **(如果以上)
- 在上一步中获取的导出的服务密钥。
这些可以根据目标数据类型进行调整。在下面的示例中,我们使用这些详细信息创建一个 Pub/Sub 连接器实例,为其分配一个任务并创建一个 Kafka 主题 block_messages
。
创建后,我们可以导航到目标 block_messages
主题并对消息进行采样,如下所示。此源连接器生成 JSON 格式的消息,没有架构。
在将此数据发送到 ClickHouse 之前,我们需要确保此数据的格式正确并且目标表存在。
准备 ClickHouse
创建表
我们的 blocks 表建议的架构如下所示。
SET flatten_nested=0
CREATE TABLE ethereum.blocks
(
`number` UInt32 CODEC(Delta(4), ZSTD(1)),
`hash` String,
`parent_hash` String,
`nonce` String,
`sha3_uncles` String,
`logs_bloom` String,
`transactions_root` String,
`state_root` String,
`receipts_root` String,
`miner` String,
`difficulty` Decimal(38, 0),
`total_difficulty` Decimal(38, 0),
`size` UInt32 CODEC(Delta(4), ZSTD(1)),
`extra_data` String,
`gas_limit` UInt32 CODEC(Delta(4), ZSTD(1)),
`gas_used` UInt32 CODEC(Delta(4), ZSTD(1)),
`timestamp` DateTime CODEC(Delta(4), ZSTD(1)),
`transaction_count` UInt16,
`base_fee_per_gas` UInt64,
`withdrawals_root` String,
`withdrawals` Nested(index Int64, validator_index Int64, address String, amount String)
)
ENGINE = MergeTree
ORDER BY timestamp
其他数据类型的等效架构可以在此处找到
上面的架构与我们在早期博文中提出的架构略有不同,列 withdrawals_root
和 withdrawals
最近已添加到规范[1][2]。我们还将设置 flatten_nested
设置为 0
,以保留 withdrawals
列的 Nested 结构。这允许我们将此列作为嵌套的 JSON 结构插入。
下面我们展示了 Kafka 主题 block_messages
上保存的消息。
{
"MessageData": "{\"type\": \"block\", \"number\": 17477635, \"hash\": \"0x6c0e971090f48adfc04303b302e5f14895c104e9a60ec6126b96579194a2c14b\", \"parent_hash\": \"0xdf90825e84c50550be12143d998090883bb92deecbdb5bd84235023f8fcad9c5\", \"nonce\": \"0x0000000000000000\", \"sha3_uncles\": \"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347\", \"logs_bloom\": \"0xc0210585c002844400c000f0a011e324186195244041200041031242148ec051808106400d3030600c384180050003c20a41078488403c3022880000147ea8a348110c0ad50001086800430b83d120e89010000024445881003000848023a010562881021302a60118003c19400c099206904e7008b104301214001042fc007608271140460524a0904e3142174c250174070921a90006686244236043b080468f90b161e00364a20a8b48b15cb205a50c44082cc100040b02812017008c016501800162115c06022539a2401c0ef20020680a00002c201c0010c8920210a0000895a11c0a1844860a00208045810604a0301013015ea86219aa98831e027440\", \"transactions_root\": \"0xefaf112480278167af853214fc55a7fcaff4a879c1e97357be77d95fd114c046\", \"state_root\": \"0x2b5fa13bcecb133578e5e6c328944a551ccb497275ba4263d5b52d9f88bab2e1\", \"receipts_root\": \"0x2ef3cb89ab21d6d861777087ee1791c5a6197d5b6e9f8e0228717b064ad60efe\", \"miner\": \"0xbaf6dc2e647aeb6f510f9e318856a1bcd66c5e19\", \"difficulty\": 0, \"total_difficulty\": 58750003716598352816469, \"size\": 38091, \"extra_data\": \"0x4d616465206f6e20746865206d6f6f6e20627920426c6f636b6e6174697665\", \"gas_limit\": 30000000, \"gas_used\": 6452970, \"timestamp\": 1686739043, \"transaction_count\": 83, \"base_fee_per_gas\": 14420785730, \"withdrawals_root\": \"0x71fbe84200d685e619f28a7f3aedfcacadf5f8bde9be2c20f9d146110f66e558\", \"withdrawals\": [{\"index\": 7083728, \"validator_index\": 649036, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13657565}, {\"index\": 7083729, \"validator_index\": 649037, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13749238}, {\"index\": 7083730, \"validator_index\": 649038, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13636849}, {\"index\": 7083731, \"validator_index\": 649039, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13700532}, {\"index\": 7083732, \"validator_index\": 649040, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13647987}, {\"index\": 7083733, \"validator_index\": 649041, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13712208}, {\"index\": 7083734, \"validator_index\": 649042, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13748808}, {\"index\": 7083735, \"validator_index\": 649043, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13720541}, {\"index\": 7083736, \"validator_index\": 649044, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 49166908}, {\"index\": 7083737, \"validator_index\": 649045, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13747911}, {\"index\": 7083738, \"validator_index\": 649046, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13701501}, {\"index\": 7083739, \"validator_index\": 649047, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13695932}, {\"index\": 7083740, \"validator_index\": 649048, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13708868}, {\"index\": 7083741, \"validator_index\": 649049, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13676192}, {\"index\": 7083742, \"validator_index\": 649050, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13594476}, {\"index\": 7083743, \"validator_index\": 649051, \"address\": \"0x07fac54a901409fe10e56c899be3dcf2471ae321\", \"amount\": 13716126}], \"item_id\": \"block_0x6c0e971090f48adfc04303b302e5f14895c104e9a60ec6126b96579194a2c14b\", \"item_timestamp\": \"2023-06-14T10:37:23Z\"}",
"AttributesMap": {
"item_id": "block_0x6c0e971090f48adfc04303b302e5f14895c104e9a60ec6126b96579194a2c14b",
"item_timestamp": "2023-06-13T18:44:47Z"
}
}
主体保存在 MessageData
字段中,格式为转义的 JSON 字符串。此格式与上述架构不兼容,需要在插入之前进行转换。为此,我们使用物化视图。
使用物化视图转换消息
ClickHouse 中的物化视图可用于在插入时转换行。视图在接收行块的表的插入时触发,对块执行 SELECT 操作,并将结果发送到目标表。
此方法依赖于具有兼容架构的表来接收我们的插入消息。此表 block_messages
如下所示。
CREATE TABLE ethereum.block_messages
(
`MessageData` String,
`AttributesMap` Tuple(item_id String, item_timestamp String)
)
ENGINE = Null
如图所示,此表使用 Null 表引擎。这确保原始原始插入的行不会被保留,因为除了初始调试之外,它们不需要。物化视图仍将在对此表的插入时触发,将转换后的消息转发到我们期望的目标表 blocks
。
物化视图 blocks_mv
在将行插入到表 block_messages
时触发,如下所示。上述视图的 SELECT 查询将这些行转换为表 ethereum.blocks
和先前提供的架构期望的格式。此目标表通过 TO
语法指定。
CREATE MATERIALIZED VIEW ethereum.blocks_mv TO ethereum.blocks
(
`number` UInt32,
`hash` String,
`parent_hash` String,
`nonce` String,
`sha3_uncles` String,
`logs_bloom` String,
`transactions_root` String,
`state_root` String,
`receipts_root` String,
`miner` String,
`difficulty` Decimal(38, 0),
`total_difficulty` Decimal(38, 0),
`size` UInt32,
`extra_data` String,
`gas_limit` UInt32,
`gas_used` UInt32,
`timestamp` DateTime,
`transaction_count` UInt16,
`base_fee_per_gas` UInt64,
`withdrawals_root` String,
`withdrawals` Nested(index UInt64, validator_index Int64, address String, amount UInt64)
) AS
SELECT
JSONExtract(MessageData, 'number', 'UInt32') AS number,
JSONExtractString(MessageData, 'hash') AS hash,
JSONExtractString(MessageData, 'parent_hash') AS parent_hash,
JSONExtractString(MessageData, 'nonce') AS nonce,
JSONExtractString(MessageData, 'sha3_uncles') AS sha3_uncles,
JSONExtractString(MessageData, 'logs_bloom') AS logs_bloom,
JSONExtractString(MessageData, 'transactions_root') AS transactions_root,
JSONExtractString(MessageData, 'state_root') AS state_root,
JSONExtractString(MessageData, 'receipts_root') AS receipts_root,
JSONExtractString(MessageData, 'miner') AS miner,
JSONExtract(MessageData, 'difficulty', 'Decimal(38, 0)') AS difficulty,
JSONExtract(MessageData, 'total_difficulty', 'Decimal(38, 0)') AS total_difficulty,
JSONExtract(MessageData, 'size', 'UInt32') AS size,
JSONExtractString(MessageData, 'extra_data') AS extra_data,
JSONExtract(MessageData, 'gas_limit', 'UInt32') AS gas_limit,
JSONExtract(MessageData, 'gas_used', 'UInt32') AS gas_used,
JSONExtract(MessageData, 'timestamp', 'UInt64') AS timestamp,
JSONExtract(MessageData, 'transaction_count', 'UInt16') AS transaction_count,
JSONExtract(MessageData, 'base_fee_per_gas', 'UInt64') AS base_fee_per_gas,
JSONExtract(MessageData, 'withdrawals_root', 'String') AS withdrawals_root,
JSONExtract(MessageData, 'withdrawals', 'Nested(index UInt64, validator_index Int64, address String, amount UInt64)') AS withdrawals
FROM ethereum.block_messages
SETTINGS allow_simdjson = 0
在这里,我们依赖 JSONExtract
系列函数从 JSON 字符串列 MessageData
中提取字段。我们使用适当的变体来确保值被正确转换,例如,JSONExtract(MessageData, 'difficulty', 'Decimal(38, 0)') AS difficulty
从 MessageData
列中提取字段 difficulty
,类型为 `Decimal(38, 0)",并将结果别名为“difficulty”。此 SELECT 的结果是一组与我们之前的表架构兼容的结果。
在上面,我们使用设置
allow_simdjson=0
。这是必要的,因为用于解析 JSON 的默认 simdjson 实现无法解析大于 UInt64 的整数。将此值设置为 0 启用 RapidJSON 解析,虽然性能不如 simdjson,但足以满足我们的需求。另一种visitParam
函数需要更复杂的表达式,并且对 JSON 字符串中存在的间距不太宽容。
为了测试此工作流程,我们可以将消息以 JSONEachRow
格式插入到表 block_messages
中,并确认目标表 blocks
已收到该消息。在下面的示例中,文件 eth.json
包含我们在 ndJSON 格式的早期 Kafka 消息。
clickhouse-client --query "INSERT INTO ethereum.block_messages FORMAT JSONEachRow" < eth.json
SELECT *
FROM ethereum.block
FORMAT Vertical
Row 1:
──────
number: 17477635
hash: 0x6c0e971090f48adfc04303b302e5f14895c104e9a60ec6126b96579194a2c14b
parent_hash: 0xdf90825e84c50550be12143d998090883bb92deecbdb5bd84235023f8fcad9c5
nonce: 0x0000000000000000
sha3_uncles: 0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347
logs_bloom: 0xc0210585c002844400c000f0a011e324186195244041200041031242148ec051808106400d3030600c384180050003c20a41078488403c3022880000147ea8a348110c0ad50001086800430b83d120e89010000024445881003000848023a010562881021302a60118003c19400c099206904e7008b104301214001042fc007608271140460524a0904e3142174c250174070921a90006686244236043b080468f90b161e00364a20a8b48b15cb205a50c44082cc100040b02812017008c016501800162115c06022539a2401c0ef20020680a00002c201c0010c8920210a0000895a11c0a1844860a00208045810604a0301013015ea86219aa98831e027440
transactions_root: 0xefaf112480278167af853214fc55a7fcaff4a879c1e97357be77d95fd114c046
state_root: 0x2b5fa13bcecb133578e5e6c328944a551ccb497275ba4263d5b52d9f88bab2e1
receipts_root: 0x2ef3cb89ab21d6d861777087ee1791c5a6197d5b6e9f8e0228717b064ad60efe
miner: 0xbaf6dc2e647aeb6f510f9e318856a1bcd66c5e19
difficulty: 0
total_difficulty: 58750003716598355985296
size: 38091
extra_data: 0x4d616465206f6e20746865206d6f6f6e20627920426c6f636b6e6174697665
gas_limit: 30000000
gas_used: 6452970
timestamp: 2023-06-14 10:37:23
transaction_count: 83
base_fee_per_gas: 14420785730
withdrawals_root: 0x71fbe84200d685e619f28a7f3aedfcacadf5f8bde9be2c20f9d146110f66e558
withdrawals: [(7083728,649036,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13657565),(7083729,649037,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13749238),(7083730,649038,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13636849),(7083731,649039,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13700532),(7083732,649040,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13647987),(7083733,649041,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13712208),(7083734,649042,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13748808),(7083735,649043,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13720541),(7083736,649044,'0x07fac54a901409fe10e56c899be3dcf2471ae321',49166908),(7083737,649045,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13747911),(7083738,649046,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13701501),(7083739,649047,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13695932),(7083740,649048,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13708868),(7083741,649049,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13676192),(7083742,649050,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13594476),(7083743,649051,'0x07fac54a901409fe10e56c899be3dcf2471ae321',13716126)]
1 row in set. Elapsed: 0.025 sec.
其他数据类型的等效物化视图可以在此处找到。
在 Confluent Cloud 中部署 ClickHouse Connector
在我们的消息发布到 Kafka 主题并且 ClickHouse 配置为接收和转换插入的情况下,我们可以部署 ClickHouse 连接器。Confluent Cloud Custom Connector 产品要求我们以 zip 格式上传连接器包。最新的软件包分发可以从此处下载。
我们在下面演示了此过程,并注意了重要的设置。Confluent 文档中提供了完整步骤列表。
Connector Class
-com.clickhouse.kafka.connect.ClickHouseSinkConnector
Connector type
- SinkSensitive properties
-password
。这将确保 ClickHouse 密码的条目在配置和日志中被屏蔽。
上传后,可以创建连接器的实例。我们在下面演示了这一点,并注意了用于此特定任务的 JSON 配置。
{
"database": "ethereum",
"exactlyOnce": "false",
"hostname": "<hostname>",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"password": "<password>",
"port": "8443",
"schemas.enable": "false",
"security.protocol": "SSL",
"ssl": "true",
"topics": "block_messages",
"username": "default",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
在撰写本文时,连接器要求主题和目标表同名。在我们之前的示例中,我们的接收表因此创建为 block_messages
- 将其调整为数据类型。在上面的示例中,我们指定 ethereum
数据库以与之前的配置保持一致,并禁用恰好一次的语义。虽然支持这一点,但这会产生额外开销,并且在我们的案例中是不必要的,因为 Pub/Sub 连接器最多提供至少一次的传递。最后,请注意对我们的主负载使用 JSON 转换器。完整的配置选项列表可以在此处找到。
测试
以太坊区块链上每分钟形成大约 4 到 5 个新区块,每个区块代表一行。在运行上述工作流程几分钟后,用户应该能够确认数据正在流动
SELECT
count(),
toStartOfMinute(timestamp) AS min
FROM ethereum.blocks_test
GROUP BY min
ORDER BY min DESC
LIMIT 10
┌─count()─┬─────────────────min─┐
│ 2 │ 2023-06-14 16:45:00 │
│ 2 │ 2023-06-14 16:44:00 │
│ 2 │ 2023-06-14 16:40:00 │
│ 4 │ 2023-06-14 16:39:00 │
│ 4 │ 2023-06-14 16:38:00 │
│ 5 │ 2023-06-14 16:37:00 │
│ 5 │ 2023-06-14 16:36:00 │
│ 5 │ 2023-06-14 16:35:00 │
│ 5 │ 2023-06-14 16:34:00 │
│ 3 │ 2023-06-14 16:33:00 │
└─────────┴─────────────────────┘
10 rows in set. Elapsed: 0.029 sec.
虽然这些数据量很小,但相同的部署架构维护了我们的公共 sql.clickhouse.com 环境中的追踪、合约和交易数据集。例如,我们建议您参考我们之前的博文和记录的示例,以获取查询和灵感。
提示和技巧
-
连接器利用 Kafka Connect 的基于任务的模型来扩展吞吐量。每个任务都是单线程的,并从一个或多个分区使用消息。没有两个任务可以从同一个分区使用。为了获得最佳吞吐量,用户应配置与分区一样多的任务。超过分区数不会带来进一步的好处,只会导致任务空闲。在这种情况下,进一步吞吐量扩展需要增加源主题中的分区数。我们最大的追踪主题需要 3 个分区。
-
如果使用自定义连接器产品在 Confluent Cloud 中部署连接器,则控制传递到每个任务的批处理大小的设置,在撰写本文时,用户无法修改。这些设置特定于 Kafka Connect 框架,而不是连接器本身。默认情况下,每个连接器任务从框架接收最多 500 行的批处理。在插入 ClickHouse 之前,连接器中不会发生批处理。这可能意味着与建议的大小相比,到 ClickHouse 的插入批处理可能很小。更重要的是,此默认值可能导致插入速率大于每秒 1 次。当扩展任务数以提高吞吐量时,上述情况可能会导致诸如“parts 过多”之类的问题。
为了解决这个问题,用户应在 ClickHouse 中启用异步插入。这将导致 ClickHouse 在插入到基础表之前使用基于磁盘的缓冲区对插入进行批处理。这确保插入大数据块,防止出现诸如“parts 过多”之类的问题。但是,此方法目前与连接器的恰好一次语义不兼容,恰好一次语义依赖于正常插入的去重属性(即,在窗口期内对相同数据的插入进行去重)。虽然最近发布的 ClickHouse 支持异步插入的去重,但此功能是实验性的,尚未在连接器的恰好一次模式下进行测试。
我们正在与 Confluent 合作,以公开相关的 Kafka Connect 设置,以传递更大的批处理。
- 在撰写本文时,Confluent 自定义连接器界面仅提供摘要日志。这些通常不足以诊断问题。幸运的是,所有连接器都将创建一个包含日志的主题。可以下载和检查此主题中的日志消息以进行调试。我们计划通过确保在摘要日志中显示相关的调试信息来改进此工作负载,从而最大限度地减少诊断问题所需的工作量。
结论
在这篇博文中,我们探讨了如何使用 Confluent Cloud 的新自定义连接器产品来构建零代码 Kafka 管道,用于将数据从 Google Pub/Sub 移动到 ClickHouse Cloud。对于我们的示例数据集,我们使用了 Google 为以太坊区块链提供的事件和管道,以近乎实时的速度维护 sql.clickhouse.com 中的公共数据集。
clickHouse-kafka-connect Sink 目前正在积极开发中,这要归功于用户在存储库或通过我们的社区 Slack 中报告的宝贵反馈。我们的计划是在未来几个月内继续提高此连接器在生产环境中的就绪度,目标是在今年第三季度发布此连接器的 GA 版本。如果您想加入这项工作,请随时与我们联系!与 ClickHouse 一样,我们欢迎任何贡献!
立即开始使用 ClickHouse Cloud,并获得 300 美元 क्रेडिट。在 30 天试用期结束时,您可以继续使用按需付费计划,或联系我们以了解有关我们基于用量的折扣的更多信息。访问我们的定价页面了解详情。