简介
在最近的一篇文章中,我们探讨了一个实时流处理用例,该用例使用官方 ClickHouse Kafka 连接器,并使用 Confluent Cloud 的自定义连接器产品进行部署。为此,我们使用了 Google 发布并在公共 pub/sub 上提供的以太坊区块链数据集。
在本文中,我们使用 ClickPipes 简化了此架构 - ClickPipes 是最近发布的适用于 ClickHouse Cloud 的完全托管的摄取服务。这使我们能够降低架构复杂性并克服以前的一些限制。
ClickPipes
ClickPipes 是 ClickHouse Cloud 的原生功能,使用户能够连接到远程 Kafka broker 并立即开始将数据摄取到其 ClickHouse 服务中。这释放了 ClickHouse Cloud 的全部潜力,并使用户能够利用近乎实时的数据进行洞察和分析。虽然目前仅支持 Kafka,但我们计划扩展支持的数据源和系统列表,将 ClickPipes 转变为 ClickHouse Cloud 的成熟连接平台。有关更多详细信息,我们建议您阅读我们的公告帖子。
数据集
我们使用 Google 在公共项目中提供的以太坊加密货币数据集作为我们的测试数据集。
阅读此博客文章不需要任何加密货币方面的经验,但对于那些感兴趣的人,以太坊简介提供了一个有用的概述,以及 Google 关于如何构建此数据集的博客。
作为快速提醒,我们的数据集包含 4 个表。这是完整数据集的子集,但足以回答大多数常见问题
- 区块 - 区块是交易批次,其中包含链中前一个区块的哈希值。大约 1700 万行,每天增长 7 千行。
- 交易 - 交易是来自账户的加密签名指令。账户将发起交易以更新以太坊网络的状态,例如,将 ETH 从一个账户转移到另一个账户。超过 20 亿笔交易,每天新增约 100 万笔。
- 追踪 - 内部交易,允许查询所有以太坊地址及其余额。超过 70 亿条追踪记录,每天新增 500 万条。
- 合约 - “智能合约” 只是一个在以太坊区块链上运行的程序。超过 6000 万个合约,每天新增约 5 万个。
架构演变
在之前的博客文章中,我们详细探讨了此数据集,将BigQuery 与 ClickHouse 进行了比较,并提出了基于批处理的方法,以使此数据集在 ClickHouse 中保持最新。这是通过定期将数据导出从公共 BigQuery 表(通过计划查询)到 GCS,并通过计划 cron 作业将其导入到 ClickHouse 中来实现的。
虽然这在当时足以满足我们的需求,并且用于为我们的加密货币爱好者用户保持我们的公共 sql.clickhouse.com 环境的最新状态,但这在区块链和 BigQuery 上可用的数据与在 ClickHouse 中可查询的数据之间引入了令人不满意的延迟(约 30 分钟)。
幸运的是,Google 还将此数据集发布在多个公共 Pub/Sub 主题中,提供了可以使用的事件流。从以太坊区块链算起仅延迟 4 分钟,此数据源将使我们能够提供与 BigQuery 相媲美的服务。
为了构建一个强大的管道,将这些公共 Pub/Sub 主题连接到 ClickHouse,并希望最大限度地减少初始工作和未来的维护开销,使用我们的 Kafka Connect 连接器的云托管方法似乎是理想的解决方案,Confluent 托管基础设施。除了减少数据在 ClickHouse 中可用之前的延迟外,Kafka 还允许我们缓冲最多 N 天的数据,并在需要时提供重放功能。
为了实现此架构,我们还需要一种可靠的方式将消息从 Pub/Sub 发送到 Kafka。Confluent 为此目的提供了一个源连接器,该连接器也可以零代码部署。结合这些连接器可产生以下简单架构
有关实施此架构的更多详细信息,请参阅我们之前的博客文章。
虽然此架构足以满足我们的需求,但它确实要求用户对所有组件使用 Confluent Cloud。虽然这对我们的特定问题来说不是问题,但对于将自管理 Kafka 或 MSK 与 ClickHouse Cloud 结合使用的用户,ClickPipes 提供了一个更简单的解决方案。
借助 ClickPipes,我们可以进一步简化此架构。我们无需部署 Kafka 连接器来将数据发送到 ClickHouse Cloud,只需从 Kafka 主题中提取数据即可。如下图所示
介绍 ClickPipes
我们的消息由 Pub/Sub 连接器以 JSON 格式传递到我们的 Kafka 主题,每种数据类型一个主题。有关配置这些的更多详细信息,请在此处找到。
管道回顾
对于我们的新架构,我们为每个 Kafka 主题创建一个 ClickPipe,将数据发送到与之前相同的接收表。提醒一下,这些假设数据以 JSON 字符串的形式在 MessageData
列中传递。例如,对于 Blocks 数据
CREATE TABLE default.block_messages
(
`MessageData` String,
`AttributesMap` Map(String, String)
)
ENGINE = MergeTree
ORDER BY tuple()
SELECT *
FROM default.block_messages
LIMIT 1
FORMAT PrettyJSONEachRow
{
"MessageData": "{\"type\": \"block\", \"number\": 17635706, \"hash\": \"0x44886d4a33deea1b76564ac6068357ee7e167a4e2b625d47e0bd048e7592bdee\", \"parent_hash\": \"0xbabfb51d4d645081c6fb28eccebf27543de094e4bb8e31d1b884a72a0a948f9b\", \"nonce\": \"0x0000000000000000\", \"sha3_uncles\": \"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347\", \"logs_bloom\": \"0x3061030f6188c36a303ebbb18652922694812ca9ee4b60cd10d996afc69c6c6fca611f8dd95032c057023aaf6488090d4213d28ade017aa623368462192f28f6648ac12d6618d8a9488bc46fc00985a291d600c817686c4202a4ac65956b1e25b8606480873fd9032bdcd5a04a3c1cd5a0c14c5714c0d594390455f2087ab2152f06875646c21da32253c35031024227e319a7a3998080a8c424737fd097c06ebed1837b61a8a6725a190ac099a56e0215564c1876ea669bb96a8874228c2a34cb5e340ff9a896ce002a8e47983c12c680e1132a97e954112860b71388c6ac40c9ff369205b292680a6674f47334140906ab1a0ad9488e5620397883a3ac74a5\", \"transactions_root\": \"0xe61ff16a082b53be8893e64224b0840de8f4ba246b7f2e1021227496750ce37d\", \"state_root\": \"0xdcb4abc3f10a51bb1691f5aa6b94d841360d454e44c848e04040e00d492c7a93\", \"receipts_root\": \"0x6376166e9180aa437e67b27c7119ceef073a99bcdbbbca00e5322c92661d7f4f\", \"miner\": \"0x4675c7e5baafbffbca748158becba61ef3b0a263\", \"difficulty\": 0, \"total_difficulty\": 58750003716598352816469, \"size\": 84025, \"extra_data\": \"0x6265617665726275696c642e6f7267\", \"gas_limit\": 30000000, \"gas_used\": 14672410, \"timestamp\": 1688657471, \"transaction_count\": 168, \"base_fee_per_gas\": 36613178634, \"withdrawals_root\": \"0xf213df7058a0ee7055c61c1703db0d27cfbec98a930ec0b46ae60f228aec3f16\", \"withdrawals\": [{\"index\": 9612515, \"validator_index\": 147393, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14340397}, {\"index\": 9612516, \"validator_index\": 147394, \"address\": \"0xbf85eb89b26f48aed0b4c28cf1281381e72bdec1\", \"amount\": 14348823}, {\"index\": 9612517, \"validator_index\": 147395, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14370108}, {\"index\": 9612518, \"validator_index\": 147396, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14389895}, {\"index\": 9612519, \"validator_index\": 147397, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14290983}, {\"index\": 9612520, \"validator_index\": 147398, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14378411}, {\"index\": 9612521, \"validator_index\": 147399, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14297547}, {\"index\": 9612522, \"validator_index\": 147400, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14272411}, {\"index\": 9612523, \"validator_index\": 147401, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 50164441}, {\"index\": 9612524, \"validator_index\": 147402, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14340502}, {\"index\": 9612525, \"validator_index\": 147403, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14330852}, {\"index\": 9612526, \"validator_index\": 147404, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14398952}, {\"index\": 9612527, \"validator_index\": 147405, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14297302}, {\"index\": 9612528, \"validator_index\": 147406, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14292279}, {\"index\": 9612529, \"validator_index\": 147407, \"address\": \"0xbf85eb89b26f48aed0b4c28cf1281381e72bdec1\", \"amount\": 14275314}, {\"index\": 9612530, \"validator_index\": 147409, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14297649}], \"item_id\": \"block_0x44886d4a33deea1b76564ac6068357ee7e167a4e2b625d47e0bd048e7592bdee\", \"item_timestamp\": \"2023-07-06T15:31:11Z\"}",
"AttributesMap": {
"item_id": "block_0x44886d4a33deea1b76564ac6068357ee7e167a4e2b625d47e0bd048e7592bdee",
"item_timestamp": "2023-07-06T15:31:11Z"
}
}
为了将此数据提取到我们的最终表中,我们使用了一个物化视图,该视图利用了 JSONExtract 函数系列。
CREATE TABLE blocks
(
`number` UInt32 CODEC(Delta(4), ZSTD(1)),
`hash` String,
`parent_hash` String,
`nonce` String,
`sha3_uncles` String,
`logs_bloom` String,
`transactions_root` String,
`state_root` String,
`receipts_root` String,
`miner` String,
`difficulty` Decimal(38, 0),
`total_difficulty` Decimal(38, 0),
`size` UInt32 CODEC(Delta(4), ZSTD(1)),
`extra_data` String,
`gas_limit` UInt32 CODEC(Delta(4), ZSTD(1)),
`gas_used` UInt32 CODEC(Delta(4), ZSTD(1)),
`timestamp` DateTime CODEC(Delta(4), ZSTD(1)),
`transaction_count` UInt16,
`base_fee_per_gas` UInt64,
`withdrawals_root` String,
`withdrawals.index` Array(UInt64),
`withdrawals.validator_index` Array(Int64),
`withdrawals.address` Array(String),
`withdrawals.amount` Array(UInt64)
)
ENGINE = MergeTree
ORDER BY timestamp
此视图在行插入到 block_messages
表中时执行,转换数据并将结果插入到最终的 blocks
表中。我们的目标 blocks 表架构
CREATE TABLE blocks
(
`number` UInt32 CODEC(Delta(4), ZSTD(1)),
`hash` String,
`parent_hash` String,
`nonce` String,
`sha3_uncles` String,
`logs_bloom` String,
`transactions_root` String,
`state_root` String,
`receipts_root` String,
`miner` String,
`difficulty` Decimal(38, 0),
`total_difficulty` Decimal(38, 0),
`size` UInt32 CODEC(Delta(4), ZSTD(1)),
`extra_data` String,
`gas_limit` UInt32 CODEC(Delta(4), ZSTD(1)),
`gas_used` UInt32 CODEC(Delta(4), ZSTD(1)),
`timestamp` DateTime CODEC(Delta(4), ZSTD(1)),
`transaction_count` UInt16,
`base_fee_per_gas` UInt64,
`withdrawals_root` String,
`withdrawals.index` Array(UInt64),
`withdrawals.validator_index` Array(Int64),
`withdrawals.address` Array(String),
`withdrawals.amount` Array(UInt64)
)
ENGINE = MergeTree
ORDER BY timestamp
相同的过程用于我们的数据类型、追踪、交易和合约。
配置 ClickPipes
配置 ClickPipes 很简单。首先,我们需要从我们的 Confluent 集群获取 API 密钥和端点。请注意我们如何在下面记录引导服务器地址。
现在,我们可以使用记录的这些凭据和端点从云控制台创建 ClickPipe。下面我们为 blocks 数据集创建一个 ClickPipe,从 Confluent Cloud 中的 block_messages
Kafka 主题消费消息,以插入到同名的表中。选择 Confluent Cloud 作为源时,请注意以下提示,我们希望在未来的迭代中改进这些提示
- 使用 API 密钥和密钥作为登录名和密码
- 确保将消费者组设置为唯一的字符串。在未来的版本中,我们可能会预先填充此设置。
- 使用之前复制的引导端点作为服务器地址。
如图所示,我们将 Kafka 主题映射到现有表 - 后者在我们的例子中已预先创建。用户也可以根据需要创建新表,将消息字段映射到不同名称的列(如果需要)。
结论
在这篇博客文章中,我们演示了新的 ClickHouse Cloud 功能 ClickPipes,以及如何使用它来简化之前一篇博客文章中介绍的流处理架构。ClickPipes 是 ClickHouse Cloud 的原生功能,目前处于私有预览阶段。有兴趣试用 ClickPipes 的用户可以在此处加入我们的候补名单。
您可以按照文档中的说明创建您的第一个 ClickPipes。更多详情请参阅