跳到主要内容
跳到主要内容
编辑此页

将 Kafka 与 ClickHouse Cloud 集成

先决条件

您已熟悉 ClickPipes 简介

创建您的第一个 Kafka ClickPipe

  1. 访问您的 ClickHouse Cloud 服务的 SQL 控制台。

ClickPipes service

  1. 选择左侧菜单上的 数据源 按钮,然后单击“设置 ClickPipe”

Select imports

  1. 选择您的数据源。

Select data source type

  1. 填写表格,提供 ClickPipe 的名称、描述(可选)、您的凭据和其他连接详细信息。

Fill out connection details

  1. 配置模式注册表。Avro 流需要有效的模式,JSON 可选。此模式将用于解析 AvroConfluent 或验证所选主题上的 JSON 消息。
  • 无法解析的 Avro 消息或验证失败的 JSON 消息将生成错误。
  • 模式注册表的“根”路径。例如,Confluent Cloud 模式注册表 URL 只是一个没有路径的 HTTPS URL,例如 https://test-kk999.us-east-2.aws.confluent.cloud。如果仅指定根路径,则用于确定步骤 4 中列名和类型的模式将由嵌入在采样的 Kafka 消息中的 ID 确定。
  • 路径 /schemas/ids/[ID],通过数字模式 ID 指向模式文档。使用模式 ID 的完整 URL 将是 https://registry.example.com/schemas/ids/1000
  • 路径 /subjects/[subject_name],通过主题名称指向模式文档。或者,可以通过在 URL 后附加 /versions/[version] 来引用特定版本(否则 ClickPipes 将检索最新版本)。使用模式主题的完整 URL 将是 https://registry.example.com/subjects/eventshttps://registry/example.com/subjects/events/versions/4

请注意,在所有情况下,如果消息中嵌入的模式 ID 指示,ClickPipes 将自动从注册表中检索更新或不同的模式。如果消息在没有嵌入模式 ID 的情况下写入,则必须指定特定的模式 ID 或主题才能解析所有消息。

  1. 选择您的主题,UI 将显示来自该主题的示例文档。

Set data format and topic

  1. 在下一步中,您可以选择是将数据摄取到新的 ClickHouse 表中还是重用现有表。按照屏幕上的说明修改您的表名、模式和设置。您可以在顶部的示例表中看到更改的实时预览。

Set table, schema, and settings

您还可以使用提供的控件自定义高级设置

Set advanced controls

  1. 或者,您可以决定将数据摄取到现有的 ClickHouse 表中。在这种情况下,UI 将允许您将源中的字段映射到所选目标表中的 ClickHouse 字段。

Use and existing table

  1. 最后,您可以为内部 ClickPipes 用户配置权限。

权限: ClickPipes 将创建一个专用用户,用于将数据写入目标表。您可以使用自定义角色或预定义角色之一为此内部用户选择角色

  • 完全访问权限:具有对集群的完全访问权限。如果您将物化视图或字典与目标表一起使用,这可能很有用。
  • 仅目标表:仅具有对目标表的 INSERT 权限。

permissions

  1. 通过单击“完成设置”,系统将注册您的 ClickPipe,您将能够在摘要表中看到它。

Success notice

Remove notice

摘要表提供控件,用于显示来自 ClickHouse 中源表或目标表的示例数据

View destination

以及用于删除 ClickPipe 和显示摄取作业摘要的控件。

View overview

  1. 恭喜您! 您已成功设置您的第一个 ClickPipe。如果这是一个流式 ClickPipe,它将持续运行,从您的远程数据源实时摄取数据。

支持的数据源

名称徽标类型状态描述
Apache Kafka流式稳定配置 ClickPipes 并开始将流式数据从 Apache Kafka 摄取到 ClickHouse Cloud 中。
Confluent Cloud流式稳定通过我们的直接集成,释放 Confluent 和 ClickHouse Cloud 的组合力量。
RedpandaRedpanda logo流式稳定配置 ClickPipes 并开始将流式数据从 Redpanda 摄取到 ClickHouse Cloud 中。
AWS MSK流式稳定配置 ClickPipes 并开始将流式数据从 AWS MSK 摄取到 ClickHouse Cloud 中。
Azure Event Hubs流式稳定配置 ClickPipes 并开始将流式数据从 Azure Event Hubs 摄取到 ClickHouse Cloud 中。
WarpStream流式稳定配置 ClickPipes 并开始将流式数据从 WarpStream 摄取到 ClickHouse Cloud 中。

更多连接器将添加到 ClickPipes,您可以通过联系我们了解更多信息。

支持的数据格式

支持的格式有

支持的数据类型

以下 ClickHouse 数据类型目前在 ClickPipes 中受支持

  • 基本数值类型 - [U]Int8/16/32/64 和 Float32/64
  • 大整数类型 - [U]Int128/256
  • Decimal 类型
  • Boolean
  • String
  • FixedString
  • Date, Date32
  • DateTime, DateTime64(仅限 UTC 时区)
  • Enum8/Enum16
  • UUID
  • IPv4
  • IPv6
  • 所有 ClickHouse LowCardinality 类型
  • Map,键和值使用上述任何类型(包括 Nullables)
  • Tuple 和 Array,元素使用上述任何类型(包括 Nullables,仅限一级深度)

Avro

支持的 Avro 数据类型

ClickPipes 支持所有 Avro 原始类型和复杂类型,以及除 time-millistime-microslocal-timestamp-millislocal_timestamp-microsduration 之外的所有 Avro 逻辑类型。Avro record 类型转换为 Tuple,array 类型转换为 Array,map 转换为 Map(仅限字符串键)。通常,此处列出的转换可用。我们建议对 Avro 数值类型使用精确的类型匹配,因为 ClickPipes 不检查类型转换时的溢出或精度损失。

Nullable 类型和 Avro Unions

Avro 中的 Nullable 类型通过使用 (T, null)(null, T) 的 Union 模式定义,其中 T 是基本 Avro 类型。在模式推断期间,此类 union 将映射到 ClickHouse “Nullable”列。请注意,ClickHouse 不支持 Nullable(Array)Nullable(Map)Nullable(Tuple) 类型。这些类型的 Avro null union 将映射到非 nullable 版本(Avro Record 类型映射到 ClickHouse 命名 Tuple)。这些类型的 Avro “nulls”将作为以下内容插入

  • 空 Array 用于 null Avro array
  • 空 Map 用于 null Avro Map
  • 一个命名 Tuple,其中所有默认值/零值用于 null Avro Record

ClickPipes 目前不支持包含其他 Avro Union 的模式(这可能会在新的 ClickHouse Variant 和 JSON 数据类型成熟后发生变化)。如果 Avro 模式包含“非 null”union,则当尝试计算 Avro 模式和 Clickhouse 列类型之间的映射时,ClickPipes 将生成错误。

Avro 模式管理

ClickPipes 使用嵌入在每个消息/事件中的模式 ID,从配置的模式注册表中动态检索和应用 Avro 模式。模式更新会被自动检测和处理。

目前,ClickPipes 仅与使用 Confluent 模式注册表 API 的模式注册表兼容。除了 Confluent Kafka 和 Cloud 之外,这还包括 Redpanda、AWS MSK 和 Upstash 模式注册表。ClickPipes 目前与 AWS Glue 模式注册表或 Azure 模式注册表不兼容(即将推出)。

以下规则应用于检索到的 Avro 模式和 ClickHouse 目标表之间的映射

  • 如果 Avro 模式包含 ClickHouse 目标映射中未包含的字段,则会忽略该字段。
  • 如果 Avro 模式缺少 ClickHouse 目标映射中定义的字段,则 ClickHouse 列将填充“零”值,例如 0 或空字符串。请注意,DEFAULT 表达式当前未针对 ClickPipes 插入进行评估(这是暂时的限制,等待 ClickHouse 服务器默认处理的更新)。
  • 如果 Avro 模式字段和 ClickHouse 列不兼容,则该行/消息的插入将失败,并且失败将记录在 ClickPipes 错误表中。请注意,支持几种隐式转换(例如数值类型之间),但并非全部(例如,Avro record 字段无法插入到 Int32 ClickHouse 列中)。

Kafka 虚拟列

以下虚拟列受 Kafka 兼容流式数据源支持。创建新的目标表时,可以使用 添加列 按钮添加虚拟列。

名称描述推荐数据类型
_keyKafka 消息键String
_timestampKafka 时间戳(毫秒精度)DateTime64(3)
_partitionKafka 分区Int32
_offsetKafka 偏移量Int64
_topicKafka 主题String
_header_keys记录标头中键的并行数组Array(String)
_header_values记录标头中标头的并行数组Array(String)
_raw_message完整 Kafka 消息String

请注意,_raw_message 列仅建议用于 JSON 数据。对于只需要 JSON 字符串的使用案例(例如使用 ClickHouse JsonExtract* 函数填充下游物化视图),删除所有“非虚拟”列可能会提高 ClickPipes 性能。

限制

交付语义

ClickPipes for Kafka 提供 至少一次 交付语义(作为最常用的方法之一)。我们很乐意听到您对交付语义的反馈,请使用联系表单。如果您需要精确一次语义,我们建议使用我们的官方 clickhouse-kafka-connect sink。

身份验证

对于 Apache Kafka 协议数据源,ClickPipes 支持 SASL/PLAIN 身份验证与 TLS 加密,以及 SASL/SCRAM-SHA-256SASL/SCRAM-SHA-512。根据流式源(Redpanda、MSK 等),将启用所有或部分这些身份验证机制,具体取决于兼容性。如果您的身份验证需求不同,请向我们提供反馈

IAM

信息

MSK ClickPipe 的 IAM 身份验证是一项 Beta 功能。

ClickPipes 支持以下 AWS MSK 身份验证

当使用 IAM 身份验证连接到 MSK 代理时,IAM 角色必须具有必要的权限。以下是 Apache Kafka API for MSK 所需的 IAM 策略示例

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect"
],
"Resource": [
"arn:aws:kafka:us-west-2:12345678912:cluster/clickpipes-testing-brokers/b194d5ae-5013-4b5b-ad27-3ca9f56299c9-10"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:DescribeTopic",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:us-west-2:12345678912:topic/clickpipes-testing-brokers/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:us-east-1:12345678912:group/clickpipes-testing-brokers/*"
]
}
]
}

配置信任关系

如果您正在使用 IAM 角色 ARN 向 MSK 进行身份验证,则需要在您的 ClickHouse Cloud 实例之间添加信任关系,以便可以承担该角色。

注意

基于角色的访问仅适用于部署到 AWS 的 ClickHouse Cloud 实例。

{
"Version": "2012-10-17",
"Statement": [
...
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::12345678912:role/CH-S3-your-clickhouse-cloud-role"
},
"Action": "sts:AssumeRole"
},
]
}

自定义证书

ClickPipes for Kafka 支持为具有 SASL 和公共 SSL/TLS 证书的 Kafka 代理上传自定义证书。您可以在 ClickPipe 设置的 SSL 证书部分上传您的证书。

注意

请注意,虽然我们支持上传单个 SSL 证书以及 Kafka 的 SASL,但目前不支持具有相互 TLS (mTLS) 的 SSL。

性能

批处理

ClickPipes 以批处理方式将数据插入 ClickHouse。这是为了避免在数据库中创建太多部分,这可能会导致集群中的性能问题。

当满足以下条件之一时,将插入批次

  • 批次大小已达到最大大小(100,000 行或 20MB)
  • 批次已打开最长时间(5 秒)

延迟

延迟(定义为 Kafka 消息生成和消息在 ClickHouse 中可用之间的时间)将取决于多种因素(即代理延迟、网络延迟、消息大小/格式)。上面部分中描述的批处理也会影响延迟。我们始终建议使用典型负载测试您的特定用例,以确定预期的延迟。

ClickPipes 不提供有关延迟的任何保证。如果您有特定的低延迟要求,请联系我们

扩展

ClickPipes for Kafka 设计为可水平扩展。默认情况下,我们创建一个包含 1 个消费者的消费者组。可以使用 ClickPipe 详细信息视图中的扩展控件更改此设置。

F.A.Q

通用

  • ClickPipes for Kafka 是如何工作的?

    ClickPipes 使用专用架构运行 Kafka Consumer API,以从指定主题读取数据,然后将数据插入特定 ClickHouse Cloud 服务上的 ClickHouse 表中。

  • ClickPipes 和 ClickHouse Kafka 表引擎之间有什么区别?

    Kafka 表引擎是 ClickHouse 的核心功能,它实现了一种“拉取模型”,其中 ClickHouse 服务器本身连接到 Kafka,拉取事件,然后将它们本地写入。

    ClickPipes 是一项独立于 ClickHouse 服务的云服务,它连接到 Kafka(或其他数据源)并将事件推送到关联的 ClickHouse Cloud 服务。这种解耦架构允许卓越的操作灵活性、清晰的责任分离、可扩展的摄取、优雅的故障管理、可扩展性等等。

  • 使用 ClickPipes for Kafka 有哪些要求?

    为了使用 ClickPipes for Kafka,您需要一个正在运行的 Kafka 代理和一个启用了 ClickPipes 的 ClickHouse Cloud 服务。您还需要确保 ClickHouse Cloud 可以访问您的 Kafka 代理。这可以通过在 Kafka 端允许远程连接、在 Kafka 设置中将 ClickHouse Cloud 出口 IP 地址列入白名单来实现。

  • ClickPipes for Kafka 是否支持 AWS PrivateLink?

    支持 AWS PrivateLink。请联系我们了解更多信息。

  • 我可以使用 ClickPipes for Kafka 将数据写入 Kafka 主题吗?

    否,ClickPipes for Kafka 旨在从 Kafka 主题读取数据,而不是向其写入数据。要将数据写入 Kafka 主题,您需要使用专用的 Kafka 生产者。

  • ClickPipes 是否支持多个代理?

    是的,如果代理是同一仲裁的一部分,则可以将它们配置在一起,并用 , 分隔。

Upstash

  • ClickPipes 是否支持 Upstash?

    是的。Upstash Kafka 产品于 2024 年 9 月 11 日进入 6 个月的弃用期。现有客户可以使用 ClickPipes 和他们现有的 Upstash Kafka 代理,方法是在 ClickPipes 用户界面上使用通用 Kafka 磁贴。在弃用通知之前,现有的 Upstash Kafka ClickPipes 不受影响。当弃用期结束后,ClickPipe 将停止运行。

  • ClickPipes 是否支持 Upstash 模式注册表?

    否。ClickPipes 与 Upstash Kafka 模式注册表不兼容。

  • ClickPipes 是否支持 Upstash QStash Workflow?

    否。除非 QStash Workflow 中引入了 Kafka 兼容表面,否则它将无法与 Kafka ClickPipes 一起使用。

Azure EventHubs

  • Azure Event Hubs ClickPipe 在没有 Kafka 表面的情况下是否可以工作?

    否。ClickPipes 要求 Azure Event Hubs 启用 Kafka 表面。Kafka 协议仅在其标准、高级和专用 SKU 定价层中受支持。

  • Azure 模式注册表是否与 ClickPipes 兼容

    否。ClickPipes 目前与 Event Hubs 模式注册表不兼容。

  • 我的策略需要哪些权限才能从 Azure Event Hubs 消费?

    要列出主题和消费事件,提供给 ClickPipes 的共享访问策略至少需要“侦听”声明。

  • 为什么我的 Event Hubs 没有返回任何数据?

如果您的 ClickHouse 实例与您的 Event Hubs 部署位于不同的区域或大洲,则在加入 ClickPipes 时可能会遇到超时,并且在从 Event Hub 消费数据时可能会遇到更高的延迟。最佳实践是将您的 ClickHouse Cloud 部署和 Azure Event Hubs 部署放置在彼此靠近的云区域中,以避免不利的性能。

  • 我是否应该包含 Azure Event Hubs 的端口号?

    是的。ClickPipes 希望您包含 Kafka 表面的端口号,应为 :9093

  • ClickPipes IP 对于 Azure Event Hubs 仍然相关吗?

    是的。如果您限制对 Event Hubs 实例的流量,请添加记录的静态 NAT IP

  • 连接字符串是用于 Event Hub,还是用于 Event Hub 命名空间?

    两者都可以工作,但是,我们建议在命名空间级别使用共享访问策略,以从多个 Event Hubs 检索示例。