DoubleCloud 即将停止服务。迁移到 ClickHouse,享受限时免费迁移服务。立即联系我们 ->->

博客 / 工程

使用 ClickHouse、Confluent Cloud 和 ClickPipes 进行实时事件流处理

author avatar
Dale McDiarmid
2023年7月20日

简介

在最近的一篇博文中,我们探讨了一个使用官方 ClickHouse Kafka 连接器(使用 Confluent Cloud 的自定义连接器产品部署)的实时流式处理用例。为此,我们使用了 Google 发布并在公共发布/订阅上提供的以太坊区块链数据集。

在这篇博文中,我们将使用 ClickPipes(最近发布的 ClickHouse Cloud 全托管数据摄取服务)简化此架构。这使我们能够降低架构复杂性并克服之前的一些限制。

ClickPipes

ClickPipes 是 ClickHouse Cloud 的一项原生功能,允许用户连接到远程 Kafka 代理并立即开始将数据摄取到他们的 ClickHouse 服务中。这释放了 ClickHouse Cloud 的全部潜力,并使用户能够利用近实时数据进行洞察和分析。虽然目前仅支持 Kafka,但我们计划扩展支持的数据源和系统列表,将 ClickPipes 变成 ClickHouse Cloud 的完整连接平台。有关更多详细信息,我们建议您查看我们的公告博文

数据集

我们使用 Google 在一个公共项目中提供的以太坊加密货币数据集作为我们的测试数据集。

阅读这篇博文不需要事先了解加密货币,但对于感兴趣的人来说,以太坊简介 提供了一个有用的概述,以及 Google 关于如何构建此数据集 的博文。

简单提醒一下,我们的数据集包含 4 个表。这是完整数据的子集,但足以满足大多数常见问题

  • **区块 (Blocks)** - 区块是一批交易,包含链中前一个区块的哈希值。大约 1700 万行,每天增长 7k 行。
  • **交易 (Transactions)** - 交易是从账户发出的加密签名指令。账户将发起交易以更新以太坊网络的状态,例如,将 ETH 从一个账户转移到另一个账户。超过 20 亿行,每天新增约 100 万行。
  • **跟踪 (Traces)** - 允许查询所有以太坊地址及其余额的内部交易。超过 70 亿行,每天新增约 500 万行。
  • **合约 (Contracts)** - “智能合约”只是在以太坊区块链上运行的程序。超过 6000 万行,每天新增约 50k 行。

架构演变

在之前的博文中,我们详细探讨了此数据集,将BigQuery 与 ClickHouse 进行比较,其中我们提出了一个基于批处理的方法来保持此数据集在 ClickHouse 中的最新状态。这是通过定期从公共 BigQuery 表(通过计划查询)导出数据 到 GCS 并将其导入 ClickHouse(通过简单的计划 cron 作业)来实现的。

Scheduled Export.png

虽然这在当时足以满足我们的需求,并且用于使我们的公共sql.clickhouse.com 环境对于我们的加密爱好者用户保持最新状态,但它在数据在区块链和 BigQuery 上可用以及在 ClickHouse 中可查询之间引入了令人不满意的延迟(大约 30 分钟)。

幸运的是,Google 还将此数据集提供在几个公共 Pub/Sub 主题 中,提供了可供消费的事件流。从以太坊区块链开始,此数据源的延迟仅为 4 分钟,这将使我们能够提供与 BigQuery 相当的服务。

我们需要一个强大的管道将这些公共 Pub/Sub 主题连接到 ClickHouse,并且希望最大限度地减少初始工作和任何未来的维护开销,使用我们的 Kafka Connect 连接器的云托管方法似乎是使用 Confluent 托管基础设施的理想解决方案。除了减少数据在 ClickHouse 中可用之前的延迟外,Kafka 还允许我们缓冲长达 N 天的数据,并在需要时提供重放功能。

为了实现此架构,我们还需要一种可靠的方法将消息从 Pub/Sub 发送到 Kafka。Confluent 为此目的提供了一个源连接器,该连接器也可以零代码部署。结合这些连接器会产生以下简单的架构

Pub Sub Export(1).png

有关实施此架构的更多详细信息,请参阅我们的上一篇博文

虽然此架构足以满足我们的需求,但它确实要求用户对所有组件使用 Confluent Cloud。虽然这对我们特定的问题没有问题,但对于使用自管理 Kafka 或 MSK 与 ClickHouse Cloud 的用户来说,ClickPipes 提供了一个更简单的解决方案。

借助 ClickPipes,我们可以进一步简化此架构。与其部署 Kafka 连接器将数据发送到 ClickHouse Cloud,我们可以简单地从 Kafka 主题中提取数据。如下所示

Pub Sub Export simplified(1).png

介绍 ClickPipes

我们的消息由 Pub/Sub 连接器以 JSON 格式传递到我们的 Kafka 主题,每个数据类型一个主题。有关配置这些内容的更多详细信息,请参阅此处

管道回顾

对于我们的新架构,我们为每个 Kafka 主题创建一个 ClickPipe,将数据发送到与之前相同的接收表。提醒一下,这些假设数据以名为 MessageData 的列中作为 JSON 字符串传递。例如,对于区块数据

CREATE TABLE default.block_messages
(
	`MessageData` String,
	`AttributesMap` Map(String, String)
)
ENGINE = MergeTree
ORDER BY tuple()

SELECT *
FROM default.block_messages
LIMIT 1
FORMAT PrettyJSONEachRow
{
    "MessageData": "{\"type\": \"block\", \"number\": 17635706, \"hash\": \"0x44886d4a33deea1b76564ac6068357ee7e167a4e2b625d47e0bd048e7592bdee\", \"parent_hash\": \"0xbabfb51d4d645081c6fb28eccebf27543de094e4bb8e31d1b884a72a0a948f9b\", \"nonce\": \"0x0000000000000000\", \"sha3_uncles\": \"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347\", \"logs_bloom\": \"0x3061030f6188c36a303ebbb18652922694812ca9ee4b60cd10d996afc69c6c6fca611f8dd95032c057023aaf6488090d4213d28ade017aa623368462192f28f6648ac12d6618d8a9488bc46fc00985a291d600c817686c4202a4ac65956b1e25b8606480873fd9032bdcd5a04a3c1cd5a0c14c5714c0d594390455f2087ab2152f06875646c21da32253c35031024227e319a7a3998080a8c424737fd097c06ebed1837b61a8a6725a190ac099a56e0215564c1876ea669bb96a8874228c2a34cb5e340ff9a896ce002a8e47983c12c680e1132a97e954112860b71388c6ac40c9ff369205b292680a6674f47334140906ab1a0ad9488e5620397883a3ac74a5\", \"transactions_root\": \"0xe61ff16a082b53be8893e64224b0840de8f4ba246b7f2e1021227496750ce37d\", \"state_root\": \"0xdcb4abc3f10a51bb1691f5aa6b94d841360d454e44c848e04040e00d492c7a93\", \"receipts_root\": \"0x6376166e9180aa437e67b27c7119ceef073a99bcdbbbca00e5322c92661d7f4f\", \"miner\": \"0x4675c7e5baafbffbca748158becba61ef3b0a263\", \"difficulty\": 0, \"total_difficulty\": 58750003716598352816469, \"size\": 84025, \"extra_data\": \"0x6265617665726275696c642e6f7267\", \"gas_limit\": 30000000, \"gas_used\": 14672410, \"timestamp\": 1688657471, \"transaction_count\": 168, \"base_fee_per_gas\": 36613178634, \"withdrawals_root\": \"0xf213df7058a0ee7055c61c1703db0d27cfbec98a930ec0b46ae60f228aec3f16\", \"withdrawals\": [{\"index\": 9612515, \"validator_index\": 147393, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14340397}, {\"index\": 9612516, \"validator_index\": 147394, \"address\": \"0xbf85eb89b26f48aed0b4c28cf1281381e72bdec1\", \"amount\": 14348823}, {\"index\": 9612517, \"validator_index\": 147395, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14370108}, {\"index\": 9612518, \"validator_index\": 147396, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14389895}, {\"index\": 9612519, \"validator_index\": 147397, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14290983}, {\"index\": 9612520, \"validator_index\": 147398, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14378411}, {\"index\": 9612521, \"validator_index\": 147399, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14297547}, {\"index\": 9612522, \"validator_index\": 147400, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14272411}, {\"index\": 9612523, \"validator_index\": 147401, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 50164441}, {\"index\": 9612524, \"validator_index\": 147402, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14340502}, {\"index\": 9612525, \"validator_index\": 147403, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14330852}, {\"index\": 9612526, \"validator_index\": 147404, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14398952}, {\"index\": 9612527, \"validator_index\": 147405, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14297302}, {\"index\": 9612528, \"validator_index\": 147406, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14292279}, {\"index\": 9612529, \"validator_index\": 147407, \"address\": \"0xbf85eb89b26f48aed0b4c28cf1281381e72bdec1\", \"amount\": 14275314}, {\"index\": 9612530, \"validator_index\": 147409, \"address\": \"0x5363aedb6dcd082c77642d5bf663eabe916031f7\", \"amount\": 14297649}], \"item_id\": \"block_0x44886d4a33deea1b76564ac6068357ee7e167a4e2b625d47e0bd048e7592bdee\", \"item_timestamp\": \"2023-07-06T15:31:11Z\"}",
    "AttributesMap": {
        "item_id": "block_0x44886d4a33deea1b76564ac6068357ee7e167a4e2b625d47e0bd048e7592bdee",
        "item_timestamp": "2023-07-06T15:31:11Z"
    }
}

为了将此数据提取到我们的最终表中,我们使用了一个物化视图,该视图利用了JSONExtract 函数系列

CREATE TABLE blocks
(
	`number` UInt32 CODEC(Delta(4), ZSTD(1)),
	`hash` String,
	`parent_hash` String,
	`nonce` String,
	`sha3_uncles` String,
	`logs_bloom` String,
	`transactions_root` String,
	`state_root` String,
	`receipts_root` String,
	`miner` String,
	`difficulty` Decimal(38, 0),
	`total_difficulty` Decimal(38, 0),
	`size` UInt32 CODEC(Delta(4), ZSTD(1)),
	`extra_data` String,
	`gas_limit` UInt32 CODEC(Delta(4), ZSTD(1)),
	`gas_used` UInt32 CODEC(Delta(4), ZSTD(1)),
	`timestamp` DateTime CODEC(Delta(4), ZSTD(1)),
	`transaction_count` UInt16,
	`base_fee_per_gas` UInt64,
	`withdrawals_root` String,
	`withdrawals.index` Array(UInt64),
	`withdrawals.validator_index` Array(Int64),
	`withdrawals.address` Array(String),
	`withdrawals.amount` Array(UInt64)
)
ENGINE = MergeTree
ORDER BY timestamp

此视图在将行插入到表 block_messages 时执行,转换数据并将结果插入到最终的 blocks 表中。我们的目标区块表架构

CREATE TABLE blocks
(
	`number` UInt32 CODEC(Delta(4), ZSTD(1)),
	`hash` String,
	`parent_hash` String,
	`nonce` String,
	`sha3_uncles` String,
	`logs_bloom` String,
	`transactions_root` String,
	`state_root` String,
	`receipts_root` String,
	`miner` String,
	`difficulty` Decimal(38, 0),
	`total_difficulty` Decimal(38, 0),
	`size` UInt32 CODEC(Delta(4), ZSTD(1)),
	`extra_data` String,
	`gas_limit` UInt32 CODEC(Delta(4), ZSTD(1)),
	`gas_used` UInt32 CODEC(Delta(4), ZSTD(1)),
	`timestamp` DateTime CODEC(Delta(4), ZSTD(1)),
	`transaction_count` UInt16,
	`base_fee_per_gas` UInt64,
	`withdrawals_root` String,
	`withdrawals.index` Array(UInt64),
	`withdrawals.validator_index` Array(Int64),
	`withdrawals.address` Array(String),
	`withdrawals.amount` Array(UInt64)
)
ENGINE = MergeTree
ORDER BY timestamp

此过程也用于我们的数据类型、跟踪、交易和合约。

mv_flow.png

配置 ClickPipes

配置 ClickPipes 很简单。首先,我们需要从 Confluent 集群中获取 API 密钥和端点。请注意我们如何在下面记录 bootstrap 服务器地址。

Markdown Image

现在,我们可以使用这些已记录的凭据和端点从 Cloud Console 创建 ClickPipe。下面我们为区块数据集创建了一个 ClickPipe,它从 Confluent Cloud 中的 block_messages Kafka 主题中消费消息,以插入到同名的表中。在选择 Confluent Cloud 作为源时,请注意以下提示,我们希望在将来的迭代中对其进行改进

  • 使用 API 密钥和密钥作为登录名和密码
  • 确保将消费者组设置为唯一的字符串。在将来的版本中,我们可能会预填充此设置。
  • 使用之前复制的 bootstrap 端点作为服务器地址。

Markdown Image

如所示,我们将 Kafka 主题映射到现有的表 - 在我们的案例中,该表已预先创建。用户也可以根据需要创建新表,并将消息字段映射到不同名称的列。

结论

在这篇博文中,我们演示了 ClickHouse Cloud 的新功能 ClickPipes,以及如何使用它简化先前博文中介绍的流式架构。ClickPipes 是 ClickHouse Cloud 的原生功能,目前处于私人预览阶段。有兴趣试用 ClickPipes 的用户可以在此处加入我们的候补名单

您可以按照文档中的说明创建您的第一个 ClickPipes。更多详细信息请参阅

立即开始使用 ClickHouse Cloud 并获得 300 美元的信用额度。在您的 30 天试用期结束时,您可以继续使用按需付费计划,或联系我们以了解有关我们基于用量的折扣的更多信息。请访问我们的定价页面了解详细信息。

分享此文章

订阅我们的时事通讯

随时了解功能发布、产品路线图、支持和云产品信息!
正在加载表单…
关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image