跳至主要内容

将 Kafka 集成到 ClickHouse Cloud

先决条件

您已熟悉 ClickPipes 简介

创建您的第一个 Kafka ClickPipe

  1. 访问 ClickHouse Cloud 服务的 SQL 控制台。

    ClickPipes service

  2. 选择左侧菜单中的“数据源”按钮,然后单击“设置 ClickPipe”。

    Select imports

  3. 选择您的数据源。

    Select data source type

  4. 填写表单,为您的 ClickPipe 提供名称、说明(可选)、凭据和其他连接详细信息。

    Fill out connection details

  5. 配置模式注册表。Avro 流需要有效的模式,JSON 可选。此模式将用于解析 AvroConfluent 或验证所选主题上的 JSON 消息。

  • 无法解析的 Avro 消息或验证失败的 JSON 消息将生成错误。
  • 模式注册表的“根”路径。例如,Confluent Cloud 模式注册表 URL 只是一个没有路径的 HTTPS url,例如 https://test-kk999.us-east-2.aws.confluent.cloud 如果仅指定根路径,则用于确定步骤 4 中列名和类型的模式将由采样 Kafka 消息中嵌入的 ID 确定。
  • 路径 /schemas/ids/[ID] 到模式文档,通过数字模式 ID。使用模式 ID 的完整 url 将是 https://registry.example.com/schemas/ids/1000
  • 路径 /subjects/[subject_name] 到模式文档,通过主题名称。可以选择通过将 /versions/[version] 附加到 url 来引用特定版本(否则 ClickPipes 将检索最新版本)。使用模式主题的完整 url 将是 https://registry.example.com/subjects/eventshttps://registry/example.com/subjects/events/versions/4

请注意,在所有情况下,如果消息中嵌入的模式 ID 指示,ClickPipes 将自动从注册表中检索更新的或不同的模式。如果消息在没有嵌入模式 ID 的情况下写入,则必须指定特定的模式 ID 或主题才能解析所有消息。

  1. 选择您的主题,UI 将显示来自该主题的示例文档。

    Set data format and topic

  2. 在下一步中,您可以选择是要将数据导入新的 ClickHouse 表还是重用现有的表。按照屏幕中的说明修改您的表名、模式和设置。您可以在顶部的示例表中查看更改的实时预览。

    Set table, schema, and settings

    您还可以使用提供的控件自定义高级设置。

    Set advanced controls

  3. 或者,您可以决定将数据导入现有的 ClickHouse 表。在这种情况下,UI 将允许您将源中的字段映射到所选目标表中的 ClickHouse 字段。

    Use and existing table

  4. 最后,您可以为内部 clickpipes 用户配置权限。

    **权限:**ClickPipes 将创建一个专用于将数据写入目标表的专用用户。您可以使用自定义角色或预定义角色之一为该内部用户选择角色。

    • 完全访问权限:对集群拥有完全访问权限。如果您在目标表中使用物化视图或字典,这可能很有用。
    • 仅目标表:仅对目标表拥有 INSERT 权限。

    permissions

  5. 通过单击“完成设置”,系统将注册您的 ClickPipe,您将能够在汇总表中看到它。

    Success notice

    Remove notice

    汇总表提供控件以显示来自 ClickHouse 中源表或目标表的示例数据。

    View destination

    以及用于删除 ClickPipe 并显示摄取作业摘要的控件。

    View overview

  6. **恭喜!**您已成功设置了第一个 ClickPipe。如果这是一个流式 ClickPipe,它将持续运行,实时从远程数据源摄取数据。

支持的数据源

名称徽标类型状态描述
Apache Kafka流式稳定配置 ClickPipes 并开始将来自 Apache Kafka 的流式数据摄取到 ClickHouse Cloud 中。
Confluent Cloud流式稳定通过我们的直接集成释放 Confluent 和 ClickHouse Cloud 的强大功能。
RedpandaRedpanda logo流式稳定配置 ClickPipes 并开始将来自 RedPanda 的流式数据摄取到 ClickHouse Cloud 中。
AWS MSK流式稳定配置 ClickPipes 并开始将来自 AWS MSK 的流式数据摄取到 ClickHouse Cloud 中。
Azure 事件中心流式稳定配置 ClickPipes 并开始将来自 Azure 事件中心的流式数据摄取到 ClickHouse Cloud 中。
WarpStream流式稳定配置 ClickPipes 并开始将来自 WarpStream 的流式数据摄取到 ClickHouse Cloud 中。

将添加更多连接器到 ClickPipes,您可以通过 联系我们 了解更多信息。

支持的数据格式

支持的格式为

支持的数据类型

ClickPipes 目前支持以下 ClickHouse 数据类型

  • 基本数字类型 -[U]Int8/16/32/64 和 Float32/64
  • 大整数类型 -[U]Int128/256
  • 十进制类型
  • 布尔值
  • 字符串
  • FixedString
  • 日期,Date32
  • DateTime,DateTime64(仅限 UTC 时区)
  • Enum8/Enum16
  • UUID
  • IPv4
  • IPv6
  • 所有 ClickHouse LowCardinality 类型
  • 使用上述任何类型(包括可为空类型)的键和值的映射
  • 使用上述任何类型(包括可为空类型,仅限一层深度)的元组和数组

Avro

支持的 Avro 数据类型

ClickPipes 支持所有 Avro 基本类型和复杂类型,以及所有 Avro 逻辑类型,除了 time-millistime-microslocal-timestamp-millislocal_timestamp-microsduration。Avro record 类型转换为元组,array 类型转换为数组,map 转换为映射(仅限字符串键)。通常,列出的转换 此处 可用。我们建议对 Avro 数字类型使用精确类型匹配,因为 ClickPipes 不会检查类型转换时的溢出或精度损失。

可为空类型和 Avro 并集

Avro 中的可为空类型通过使用 (T, null)(null, T) 的并集模式来定义,其中 T 是基本 Avro 类型。在模式推断期间,此类并集将映射到 ClickHouse“可为空”列。请注意,ClickHouse 不支持 Nullable(Array)Nullable(Map)Nullable(Tuple) 类型。这些类型的 Avro 空并集将映射到不可为空版本(Avro 记录类型映射到 ClickHouse 命名元组)。这些类型的 Avro“空值”将插入为

  • 对于空 Avro 数组,则为空数组
  • 对于空 Avro 映射,则为空映射
  • 对于空 Avro 记录,则为所有默认值/零值的命名元组

ClickPipes 目前不支持包含其他 Avro 并集的模式(这可能会随着新的 ClickHouse 变体和 JSON 数据类型的成熟而在将来发生变化)。如果 Avro 模式包含“非空”并集,则 ClickPipes 在尝试计算 Avro 模式和 Clickhouse 列类型之间的映射时会生成错误。

Avro 模式管理

ClickPipes 使用嵌入在每个消息/事件中的模式 ID 从配置的模式注册表动态检索和应用 Avro 模式。模式更新会自动检测和处理。

目前,ClickPipes 仅与使用 Confluent 模式注册表 API 的模式注册表兼容。除了 Confluent Kafka 和 Cloud 之外,这还包括 RedPanda、AWS MSK 和 Upstash 模式注册表。ClickPipes 目前与 AWS Glue 模式注册表或 Azure 模式注册表不兼容(即将推出)。

以下规则适用于检索到的 Avro 模式和 ClickHouse 目标表之间的映射

  • 如果 Avro 模式包含 ClickHouse 目标映射中未包含的字段,则忽略该字段。
  • 如果 Avro 模式缺少 ClickHouse 目标映射中定义的字段,则 ClickHouse 列将填充“零”值,例如 0 或空字符串。请注意,目前 ClickPipes 插入操作不会评估DEFAULT 表达式(这是 ClickHouse 服务器默认处理的临时限制,等待更新)。
  • 如果 Avro 模式字段和 ClickHouse 列不兼容,则该行/消息的插入将失败,并且错误将记录在 ClickPipes 错误表中。请注意,支持一些隐式转换(例如数值类型之间的转换),但并非所有转换都支持(例如,无法将 Avro 的 record 字段插入到 ClickHouse 的 Int32 列中)。

Kafka 虚拟列

以下虚拟列适用于 Kafka 兼容的流数据源。创建新的目标表时,可以使用“添加列”按钮添加虚拟列。

名称描述推荐的数据类型
_keyKafka 消息键字符串
_timestampKafka 时间戳(毫秒精度)DateTime64(3)
_partitionKafka 分区Int32
_offsetKafka 偏移量Int64
_topicKafka 主题字符串
_header_keys记录头中键的并行数组Array(String)
_header_values记录头中头的并行数组Array(String)
_raw_message完整的 Kafka 消息字符串

请注意,_raw_message 列仅推荐用于 JSON 数据。对于仅需要 JSON 字符串的用例(例如,使用 ClickHouse 的 JsonExtract* 函数填充下游物化视图),删除所有“非虚拟”列可能会提高 ClickPipes 的性能。

限制

交付语义

ClickPipes for Kafka 提供了 至少一次 的交付语义(作为最常用的方法之一)。我们非常乐意听取您关于交付语义的反馈,请访问 联系表单。如果您需要恰好一次的语义,我们建议您使用我们官方的 clickhouse-kafka-connect 接收器。

身份验证

对于 Apache Kafka 协议数据源,ClickPipes 支持使用 TLS 加密的 SASL/PLAIN 身份验证,以及 SASL/SCRAM-SHA-256SASL/SCRAM-SHA-512。根据流源(Redpanda、MSK 等),将启用所有或这些身份验证机制的子集,具体取决于兼容性。如果您有不同的身份验证需求,请 提供反馈

IAM

AWS MSK 身份验证目前仅支持 SASL/SCRAM-SHA-512 身份验证。

自定义证书

ClickPipes for Kafka 支持为使用 SASL 和公共 SSL/TLS 证书的 Kafka 代理上传自定义证书。您可以在 ClickPipe 设置的 SSL 证书部分上传证书。

注意

请注意,虽然我们支持与 Kafka 的 SASL 一起上传单个 SSL 证书,但目前不支持使用双向 TLS (mTLS) 的 SSL。

性能

批处理

ClickPipes 将数据分批插入 ClickHouse。这样做是为了避免在数据库中创建太多分区,因为这会导致集群性能问题。

满足以下条件之一时将插入批次

  • 批次大小达到最大值(100,000 行或 20MB)
  • 批次已打开最长时间(5 秒)

延迟

延迟(定义为 Kafka 消息生成到消息在 ClickHouse 中可用之间的时间)将取决于许多因素(例如代理延迟、网络延迟、消息大小/格式)。上面部分中描述的批处理也将影响延迟。我们始终建议使用典型负载测试您的特定用例,以确定预期延迟。

ClickPipes 不提供任何关于延迟的保证。如果您有特定的低延迟要求,请联系我们

扩展

ClickPipes for Kafka 旨在水平扩展。默认情况下,我们创建一个包含 2 个消费者的消费者组。可以通过联系我们来增加此数量。

常见问题解答

一般

  • ClickPipes for Kafka 如何工作?

    ClickPipes 使用专用的架构运行 Kafka Consumer API,从指定的主题读取数据,然后将数据插入到特定 ClickHouse Cloud 服务上的 ClickHouse 表中。

  • ClickPipes 和 ClickHouse Kafka 表引擎有什么区别?

    Kafka 表引擎是 ClickHouse 的核心功能,它实现了“拉取模型”,其中 ClickHouse 服务器本身连接到 Kafka,拉取事件,然后在本地写入。

    ClickPipes 是一项独立于 ClickHouse 服务运行的单独云服务,它连接到 Kafka(或其他数据源)并将事件推送到关联的 ClickHouse Cloud 服务。这种解耦架构提供了卓越的操作灵活性、明确的分工、可扩展的摄取、优雅的故障管理、可扩展性等等。

  • 使用 ClickPipes for Kafka 需要满足哪些要求?

    要使用 ClickPipes for Kafka,您需要一个正在运行的 Kafka 代理和一个启用了 ClickPipes 的 ClickHouse Cloud 服务。您还需要确保 ClickHouse Cloud 可以访问您的 Kafka 代理。这可以通过在 Kafka 端允许远程连接,在 Kafka 设置中将ClickHouse Cloud 出站 IP 地址列入白名单来实现。

  • ClickPipes for Kafka 支持 AWS PrivateLink 吗?

    支持 AWS PrivateLink。请联系我们以获取更多信息。

  • 我可以使用 ClickPipes for Kafka 将数据写入 Kafka 主题吗?

    不可以,ClickPipes for Kafka 旨在从 Kafka 主题读取数据,而不是向 Kafka 主题写入数据。要将数据写入 Kafka 主题,您需要使用专用的 Kafka 生产者。

  • ClickPipes 支持多个代理吗?

    是的,如果代理属于同一个仲裁组,则可以使用逗号 (,) 将它们一起配置。

Upstash

  • ClickPipes 支持 Upstash 吗?

    是的。Upstash 的 Kafka 产品于 2024 年 9 月 11 日开始进入为期 6 个月的弃用阶段。现有客户可以使用 ClickPipes 继续使用其现有的 Upstash Kafka 代理,方法是在 ClickPipes 用户界面上使用通用 Kafka 磁贴。现有 Upstash Kafka ClickPipes 在弃用通知发布之前不受影响。当弃用期限结束时,ClickPipe 将停止运行。

  • ClickPipes 支持 Upstash 模式注册表吗?

    不支持。ClickPipes 与 Upstash Kafka 模式注册表不兼容。

  • ClickPipes 支持 Upstash 的 QStash 工作流吗?

    不支持。除非在 QStash 工作流中引入了 Kafka 兼容的界面,否则它将无法与 Kafka ClickPipes 一起使用。

Azure EventHubs

  • Azure Event Hubs ClickPipe 是否可以在没有 Kafka 接口的情况下工作?

    不可以。ClickPipes 要求 Azure Event Hubs 启用了 Kafka 接口。仅 Standard、Premium 和 Dedicated SKU 定价层支持 Kafka 协议。

  • Azure 模式注册表是否与 ClickPipes 一起使用?

    不支持。ClickPipes 目前与 Event Hubs 模式注册表不兼容。

  • 我的策略需要哪些权限才能从 Azure Event Hubs 中消费数据?

    要列出主题并消费事件,提供给 ClickPipes 的共享访问策略至少需要“Listen”声明。

  • 为什么我的 Event Hubs 没有返回任何数据?

    如果您的 ClickHouse 实例与您的 Event Hubs 部署位于不同的区域或大陆,您可能会在加入 ClickPipes 时遇到超时,以及在从 Event Hub 消费数据时遇到更高的延迟。建议将 ClickHouse Cloud 部署和 Azure Event Hubs 部署都放置在彼此靠近的云区域中,以避免出现不利的性能。

  • 我是否应该为 Azure Event Hubs 包含端口号?

    是的。ClickPipes 期望您为 Kafka 接口包含端口号,该端口号应为 :9093

  • ClickPipes IP 是否仍然与 Azure Event Hubs 相关?

    是的。如果您限制对 Event Hubs 实例的流量,请添加记录的静态 NAT IP 地址

  • 连接字符串是针对 Event Hub 还是 Event Hub 命名空间?

    两者都可以,但是我们建议使用命名空间级别的共享访问策略来从多个 Event Hubs 中检索示例。