DoubleCloud 即将停止运营。迁移到 ClickHouse,享受限时免费迁移服务。立即联系我们 ->->

博客 / 工程

宣布推出新的 ClickHouse Kafka 连接器

author avatar
ClickHouse 团队
2023年1月12日

立即开始使用 ClickHouse Cloud 并获得 300 美元的信用额度。要了解有关我们基于用量的折扣的更多信息,请联系我们或访问我们的价格页面

Kafka-Connector.png

作为一家植根于开源的企业,我们相信赋予用户协作和尝试新功能的机会。本着这种精神,我们宣布开源 ClickHouse 的 Kafka Connect Sink 的 Beta 版发布,并邀请我们的社区开始测试并提供有关设计和可能改进方面的反馈。下面我们将讨论开发此连接器的理由以及我们如何建议解决 **精确一次** 传输语义的问题。

Apache Kafka 是一个无处不在的开源分布式事件流平台,数千家公司将其用于高性能数据管道、流式分析、数据集成和关键任务应用程序。

ClickHouse 和 Kafka 是互补的,用户需要将基于 Kafka 的数据插入到 ClickHouse 中以进行大规模分析。此问题已存在现有的解决方案,因此让我们深入探讨一下我们为什么构建另一个连接器。

为什么要构建另一个解决方案?

在探索当前的 ClickHouse-Kafka 环境之前,让我们提醒自己精确一次传输究竟意味着什么,这与其他选项有何关系,以及何时可能适合使用每个选项。

传输语义

Apache Kafka 支持三种消息传输语义,此处按实现复杂性升序排列

  • **最多一次**: 消息仅传递一次或根本不传递。
    此方案通过将消息开销降至最低,优先考虑性能和吞吐量而不是数据一致性。例如,它可能适用于物联网领域的大规模日志和指标收集部署。在这种情况下,即使某些事件丢失,仍然可以从统计上显著的观察结果中得出一些结论。
  • **至少一次**: 消息可以传递一次或多次,并保证永远不会丢失。这种方法代表了一种有趣的折衷方案,其中对重复项的容忍度降低了操作复杂性。然后,数据存储需要在查询层或通过提供重复数据删除功能来进行补偿,以确保重复项不会影响业务结论。这是迄今为止许多用例(如可观察性)中最常见的方法。
  • **精确一次**: 消息将始终仅传递一次。这种方法对于诸如财务分析之类的关键业务应用程序至关重要,在这些应用程序中,准确性不能受到影响,并且接收系统无法进行重复数据删除。由于需要跟踪许多移动组件以便能够从部分状态恢复摄取,因此它会带来很大的操作开销。

ClickHouse - Kafka 环境

ClickHouse 已支持 多种技术 来实现从 Kafka 中摄取数据,每种技术都有其各自的优缺点。

Kafka 表引擎 提供了 ClickHouse 的原生集成,可用于将数据从 Kafka 插入到 ClickHouse,反之亦然。作为另一个表引擎,其架构简单性吸引了入门用户,因为不需要额外的组件。但是,它也存在一些缺点。调试错误和检查当前行为可能具有挑战性,尽管我们计划进行改进。此外,它会给 ClickHouse 集群带来额外的负载,并且要求用户在常规插入和查询负载的上下文中考虑这一点。因此,在架构上,我们经常看到用户希望将这些任务分开。最重要的是,作为一种拉取架构,它要求您的 ClickHouse 集群与 Kafka 之间具有双向连接 - 如果存在网络隔离(例如,ClickHouse 位于云端,而 Kafka 是自管理的),您可能不愿出于合规性和安全原因允许双向连接。

对于那些需要推送架构和/或希望分离架构关注点的人来说,VectorKafka Connect 也是现有的选项。

Vector 使用其 Kafka 输入ClickHouse 输出,是一个极好的解决方案,但它是一个专注于可观察性的工具,并不适用于所有用例。

Kafka Connect 是 Apache Kafka 的一个免费开源组件,充当数据库、键值存储、搜索索引和文件系统之间简单数据集成的集中式数据中心。此框架支持两种类型的连接器:接收器(从 Kafka 到目标)和源(从源到 Kafka)。对于 ClickHouse,HTTP 和 JDBC 连接器可用于与 ClickHouse 集成。这些连接器再次带来了挑战。JDBC 连接器 既是接收器又是源,并且在 社区许可证 下分发。但是,目前不支持 ClickHouse 方言,这意味着它仅适用于基本的 ClickHouse 类型,例如 Int32。相反,HTTP 接收器 通过 JSON 和 ClickHouse HTTP 接口支持所有类型,但它是商业许可的。

最重要的是,以上所有方法充其量只能提供至少一次传输。

我们的方法

最终,我们希望为用户提供一个支持所有 **ClickHouse 类型** 和 **精确一次传输** 语义的 **基于推送** 的连接器。不幸的是,至少一次传输通常是在实现 Kafka 连接器时的基本设计决策。虽然我们可以通过增强现有连接器(例如,向 JDBC 接收器添加方言支持)来解决一些早期的挑战,但这并不能解决精确一次的要求。

面对上述所有挑战,我们决定构建一个新的连接器。我们需要决定是创建一个单独的组件还是使用现有的框架。鉴于 Kafka Connect 框架的普及及其在 MSKConfluent Cloud 中的支持,这两者在我们用户中越来越受欢迎,因此我们决定为 Kafka Connect 框架构建一个新的连接器,并为 ClickHouse Cloud 提供一流的支持。

clickhouse-kafka-connect.png

在我们所有的需求中,实现精确一次语义(exactly-once delivery)提出了最大的挑战。在我们讨论如何实现这一目标之前,让我们先回顾一下至少一次语义(at-least-once semantics)是如何有时会导致重复数据的。这些原因通常与连接器特定,但通常可以归纳为两种模式,这两种模式都与如何从 Kafka 中消费消息以及跟踪消息队列中当前位置(偏移量)的方法有关。

  • 一个 消费者 处理 Kafka 消息,将其发送到 ClickHouse 并提交偏移量,然后崩溃并丢失其内存中的偏移量。在这种情况下,Kafka 已配置为 自动提交偏移量(默认设置),但尚未有机会执行从消费者接收到的偏移量的提交操作(这是周期性的)。消费者重新启动,因此它会从上次提交的偏移量(它已经消费过)开始传递消息。
  • 消费者使用提交 API(禁用自动提交)并负责在 Kafka 中提交偏移量。它处理一条 Kafka 消息,将其发送到 ClickHouse,但在向 Kafka 提交偏移量之前崩溃。重新启动后,Kafka 从上次偏移量传递消息,导致重复的消息发送到 ClickHouse。

请注意,这些情况假设偏移量在 Kafka 中进行跟踪。确切的原因通常取决于集成、偏移量提交策略使用的 API。更多阅读信息请访问 此处

一个 解决此问题的常见方法 是在目标数据存储中管理您的偏移量。这可能涉及多种方法,通常取决于目标数据存储的属性。例如,如果数据存储提供 ACID 事务,则连接器可以将偏移量与消息一起提交。在没有 ACID 事务的情况下,使用不同存储来存储偏移量的两阶段提交可能是可行的。通常,这些方法会带来开销并降低吞吐量。

高级设计

在考虑解决方案时,我们希望找到一个依赖项最少、架构简单且利用 ClickHouse 现有功能的方案。我们的完整设计可以在这里找到 此处。随着我们增加测试并转向普遍可用性,我们欢迎您的反馈。

简而言之,我们实现精确一次传递的设计依赖于利用 ClickHouse 的插入去重功能,确保我们始终使用状态机为插入制定一致的批次,并使用 Kafka Connect API 开发连接器,以便在发生故障时始终接收重复数据。这种方法通过保证重复记录的去重来增强 Kafka Connect 的至少一次语义,从而实现精确一次传递。

kafka-connect-clickhouse-architecture.png

使用 ClickHouse Keeper 和新的表引擎

ClickHouse Keeper 为与 ClickHouse 集群协调系统相关联的数据提供强一致性存储,并且对于允许 ClickHouse 作为分布式系统发挥作用至关重要。这支持诸如 数据复制分布式 DDL 查询执行、领导者选举和服务发现等服务。ClickHouse Keeper 与 ZooKeeper 兼容,ZooKeeper 是 ClickHouse 中用于此功能的旧组件。与 Zookeeper 一样,ClickHouse Keeper 支持 写入的线性化读取的顺序一致性。但是,它比 Zookeeper 具有明显的优势,主要是压缩日志、轻量级安装、更低的内存消耗(没有 JVM),甚至可以选择读取的线性化。这些特性非常适合在需要高度一致性存储时持久化少量数据。

我们提出的连接器设计要求连接器在具有顺序一致性和线性化写入的强一致性存储中存储状态。最初,我们考虑过 ClickHouse,但很快因为几个原因而放弃了它。首先,ClickHouse 默认情况下不是强一致性的,仅提供最终一致性复制。但是,通过仔细配置,您可以确保 插入的线性化SELECT 的顺序一致性,用于 复制表。但是,这种 ClickHouse 配置会增加大量的插入延迟,主要是由于与 ClickHouse Keeper 通信以协调写入和后续数据复制而导致的。这种设计实际上添加了一个冗余组件和不必要的开销——ClickHouse 表存储。鉴于我们只需要存储最少的状态,因此直接使用 ClickHouse Keeper 似乎是解决这些需求的完美解决方案。

这种方法的挑战在于此组件通常不会在集群中公开。例如,它在 ClickHouse Cloud 中未公开,并且应仔细控制其访问和使用,以免影响集群操作和稳定性。与 ClickHouse 核心团队合作,我们决定通过表引擎(KeeperMap 引擎)以受控的方式公开 ClickHouse Keeper(在需要线性化插入和顺序一致性的情况下)。这为我们提供了一种集成且轻量级的方法来存储我们的状态。

请注意,您可以在没有 KeeperMap 的情况下使用内存模式测试连接器。这仅用于测试,并且在发生故障时不保证精确一次传递。

下一步是什么?

在未来几个月里,我们计划在各种故障场景下对连接器进行广泛测试。一旦我们对设计和实现充满信心,并从您(我们的用户)那里收集到反馈,连接器将普遍可用。

尽管处于测试阶段,但连接器已经功能丰富,支持 大多数 ClickHouse 类型(包括数组和映射),并允许带或不带架构插入数据。在没有架构的情况下,数据在插入之前先转换为 JSON。对于具有架构的数据,我们使用 RowBinary 格式以获得最佳性能。这两种方法都使用 ClickHouse HTTP 接口,并且我们持续针对 ClickHouse Cloud 进行测试。

最后,除了某些 当前限制 之外,我们还计划添加 对删除的支持,允许 使用 Redis 作为状态存储 以及 JSON 类型

结论

在这篇博文中,我们探讨了我们如何以及为什么为 ClickHouse 构建新的 Kafka 连接器。我们解释了我们提出的实现精确一次传递语义的方法,以及它是如何克服现有解决方案的局限性的。尝试新的连接器,我们欢迎您的反馈!

立即开始使用 ClickHouse Cloud 并获得 300 美元的积分。在 30 天试用期结束时,继续使用按需付费计划,或 联系我们 了解有关我们基于容量的折扣的更多信息。访问我们的 定价页面 以获取详细信息。

分享此帖子

订阅我们的新闻通讯

随时了解功能发布、产品路线图、支持和云产品信息!
正在加载表单...
关注我们
Twitter imageSlack imageGitHub image
Telegram imageMeetup imageRss image