将 Kafka 与 ClickHouse Cloud 集成
先决条件
您已熟悉 ClickPipes 简介。
创建您的第一个 Kafka ClickPipe
- 访问您的 ClickHouse Cloud 服务的 SQL 控制台。
- 选择左侧菜单上的
数据源
按钮,然后单击“设置 ClickPipe”
- 选择您的数据源。
- 填写表格,提供 ClickPipe 的名称、描述(可选)、您的凭据和其他连接详细信息。
- 配置模式注册表。Avro 流需要有效的模式,JSON 可选。此模式将用于解析 AvroConfluent 或验证所选主题上的 JSON 消息。
- 无法解析的 Avro 消息或验证失败的 JSON 消息将生成错误。
- 模式注册表的“根”路径。例如,Confluent Cloud 模式注册表 URL 只是一个没有路径的 HTTPS URL,例如
https://test-kk999.us-east-2.aws.confluent.cloud
。如果仅指定根路径,则用于确定步骤 4 中列名和类型的模式将由嵌入在采样的 Kafka 消息中的 ID 确定。 - 路径
/schemas/ids/[ID]
,通过数字模式 ID 指向模式文档。使用模式 ID 的完整 URL 将是https://registry.example.com/schemas/ids/1000
- 路径
/subjects/[subject_name]
,通过主题名称指向模式文档。或者,可以通过在 URL 后附加/versions/[version]
来引用特定版本(否则 ClickPipes 将检索最新版本)。使用模式主题的完整 URL 将是https://registry.example.com/subjects/events
或https://registry/example.com/subjects/events/versions/4
请注意,在所有情况下,如果消息中嵌入的模式 ID 指示,ClickPipes 将自动从注册表中检索更新或不同的模式。如果消息在没有嵌入模式 ID 的情况下写入,则必须指定特定的模式 ID 或主题才能解析所有消息。
- 选择您的主题,UI 将显示来自该主题的示例文档。
- 在下一步中,您可以选择是将数据摄取到新的 ClickHouse 表中还是重用现有表。按照屏幕上的说明修改您的表名、模式和设置。您可以在顶部的示例表中看到更改的实时预览。
您还可以使用提供的控件自定义高级设置
- 或者,您可以决定将数据摄取到现有的 ClickHouse 表中。在这种情况下,UI 将允许您将源中的字段映射到所选目标表中的 ClickHouse 字段。
- 最后,您可以为内部 ClickPipes 用户配置权限。
权限: ClickPipes 将创建一个专用用户,用于将数据写入目标表。您可以使用自定义角色或预定义角色之一为此内部用户选择角色
完全访问权限
:具有对集群的完全访问权限。如果您将物化视图或字典与目标表一起使用,这可能很有用。仅目标表
:仅具有对目标表的INSERT
权限。
- 通过单击“完成设置”,系统将注册您的 ClickPipe,您将能够在摘要表中看到它。
摘要表提供控件,用于显示来自 ClickHouse 中源表或目标表的示例数据
以及用于删除 ClickPipe 和显示摄取作业摘要的控件。
- 恭喜您! 您已成功设置您的第一个 ClickPipe。如果这是一个流式 ClickPipe,它将持续运行,从您的远程数据源实时摄取数据。
支持的数据源
名称 | 徽标 | 类型 | 状态 | 描述 |
---|---|---|---|---|
Apache Kafka | 流式 | 稳定 | 配置 ClickPipes 并开始将流式数据从 Apache Kafka 摄取到 ClickHouse Cloud 中。 | |
Confluent Cloud | 流式 | 稳定 | 通过我们的直接集成,释放 Confluent 和 ClickHouse Cloud 的组合力量。 | |
Redpanda | 流式 | 稳定 | 配置 ClickPipes 并开始将流式数据从 Redpanda 摄取到 ClickHouse Cloud 中。 | |
AWS MSK | 流式 | 稳定 | 配置 ClickPipes 并开始将流式数据从 AWS MSK 摄取到 ClickHouse Cloud 中。 | |
Azure Event Hubs | 流式 | 稳定 | 配置 ClickPipes 并开始将流式数据从 Azure Event Hubs 摄取到 ClickHouse Cloud 中。 | |
WarpStream | 流式 | 稳定 | 配置 ClickPipes 并开始将流式数据从 WarpStream 摄取到 ClickHouse Cloud 中。 |
更多连接器将添加到 ClickPipes,您可以通过联系我们了解更多信息。
支持的数据格式
支持的格式有
支持的数据类型
以下 ClickHouse 数据类型目前在 ClickPipes 中受支持
- 基本数值类型 - [U]Int8/16/32/64 和 Float32/64
- 大整数类型 - [U]Int128/256
- Decimal 类型
- Boolean
- String
- FixedString
- Date, Date32
- DateTime, DateTime64(仅限 UTC 时区)
- Enum8/Enum16
- UUID
- IPv4
- IPv6
- 所有 ClickHouse LowCardinality 类型
- Map,键和值使用上述任何类型(包括 Nullables)
- Tuple 和 Array,元素使用上述任何类型(包括 Nullables,仅限一级深度)
Avro
支持的 Avro 数据类型
ClickPipes 支持所有 Avro 原始类型和复杂类型,以及除 time-millis
、time-micros
、local-timestamp-millis
、local_timestamp-micros
和 duration
之外的所有 Avro 逻辑类型。Avro record
类型转换为 Tuple,array
类型转换为 Array,map
转换为 Map(仅限字符串键)。通常,此处列出的转换可用。我们建议对 Avro 数值类型使用精确的类型匹配,因为 ClickPipes 不检查类型转换时的溢出或精度损失。
Nullable 类型和 Avro Unions
Avro 中的 Nullable 类型通过使用 (T, null)
或 (null, T)
的 Union 模式定义,其中 T 是基本 Avro 类型。在模式推断期间,此类 union 将映射到 ClickHouse “Nullable”列。请注意,ClickHouse 不支持 Nullable(Array)
、Nullable(Map)
或 Nullable(Tuple)
类型。这些类型的 Avro null union 将映射到非 nullable 版本(Avro Record 类型映射到 ClickHouse 命名 Tuple)。这些类型的 Avro “nulls”将作为以下内容插入
- 空 Array 用于 null Avro array
- 空 Map 用于 null Avro Map
- 一个命名 Tuple,其中所有默认值/零值用于 null Avro Record
ClickPipes 目前不支持包含其他 Avro Union 的模式(这可能会在新的 ClickHouse Variant 和 JSON 数据类型成熟后发生变化)。如果 Avro 模式包含“非 null”union,则当尝试计算 Avro 模式和 Clickhouse 列类型之间的映射时,ClickPipes 将生成错误。
Avro 模式管理
ClickPipes 使用嵌入在每个消息/事件中的模式 ID,从配置的模式注册表中动态检索和应用 Avro 模式。模式更新会被自动检测和处理。
目前,ClickPipes 仅与使用 Confluent 模式注册表 API 的模式注册表兼容。除了 Confluent Kafka 和 Cloud 之外,这还包括 Redpanda、AWS MSK 和 Upstash 模式注册表。ClickPipes 目前与 AWS Glue 模式注册表或 Azure 模式注册表不兼容(即将推出)。
以下规则应用于检索到的 Avro 模式和 ClickHouse 目标表之间的映射
- 如果 Avro 模式包含 ClickHouse 目标映射中未包含的字段,则会忽略该字段。
- 如果 Avro 模式缺少 ClickHouse 目标映射中定义的字段,则 ClickHouse 列将填充“零”值,例如 0 或空字符串。请注意,DEFAULT 表达式当前未针对 ClickPipes 插入进行评估(这是暂时的限制,等待 ClickHouse 服务器默认处理的更新)。
- 如果 Avro 模式字段和 ClickHouse 列不兼容,则该行/消息的插入将失败,并且失败将记录在 ClickPipes 错误表中。请注意,支持几种隐式转换(例如数值类型之间),但并非全部(例如,Avro
record
字段无法插入到Int32
ClickHouse 列中)。
Kafka 虚拟列
以下虚拟列受 Kafka 兼容流式数据源支持。创建新的目标表时,可以使用 添加列
按钮添加虚拟列。
名称 | 描述 | 推荐数据类型 |
---|---|---|
_key | Kafka 消息键 | String |
_timestamp | Kafka 时间戳(毫秒精度) | DateTime64(3) |
_partition | Kafka 分区 | Int32 |
_offset | Kafka 偏移量 | Int64 |
_topic | Kafka 主题 | String |
_header_keys | 记录标头中键的并行数组 | Array(String) |
_header_values | 记录标头中标头的并行数组 | Array(String) |
_raw_message | 完整 Kafka 消息 | String |
请注意,_raw_message 列仅建议用于 JSON 数据。对于只需要 JSON 字符串的使用案例(例如使用 ClickHouse JsonExtract*
函数填充下游物化视图),删除所有“非虚拟”列可能会提高 ClickPipes 性能。
限制
- 不支持 DEFAULT。
交付语义
ClickPipes for Kafka 提供 至少一次
交付语义(作为最常用的方法之一)。我们很乐意听到您对交付语义的反馈,请使用联系表单。如果您需要精确一次语义,我们建议使用我们的官方 clickhouse-kafka-connect
sink。
身份验证
对于 Apache Kafka 协议数据源,ClickPipes 支持 SASL/PLAIN 身份验证与 TLS 加密,以及 SASL/SCRAM-SHA-256
和 SASL/SCRAM-SHA-512
。根据流式源(Redpanda、MSK 等),将启用所有或部分这些身份验证机制,具体取决于兼容性。如果您的身份验证需求不同,请向我们提供反馈。
IAM
MSK ClickPipe 的 IAM 身份验证是一项 Beta 功能。
ClickPipes 支持以下 AWS MSK 身份验证
- SASL/SCRAM-SHA-512 身份验证
- IAM 凭据或基于角色的访问 身份验证
当使用 IAM 身份验证连接到 MSK 代理时,IAM 角色必须具有必要的权限。以下是 Apache Kafka API for MSK 所需的 IAM 策略示例
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect"
],
"Resource": [
"arn:aws:kafka:us-west-2:12345678912:cluster/clickpipes-testing-brokers/b194d5ae-5013-4b5b-ad27-3ca9f56299c9-10"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:DescribeTopic",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:us-west-2:12345678912:topic/clickpipes-testing-brokers/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:us-east-1:12345678912:group/clickpipes-testing-brokers/*"
]
}
]
}
配置信任关系
如果您正在使用 IAM 角色 ARN 向 MSK 进行身份验证,则需要在您的 ClickHouse Cloud 实例之间添加信任关系,以便可以承担该角色。
基于角色的访问仅适用于部署到 AWS 的 ClickHouse Cloud 实例。
{
"Version": "2012-10-17",
"Statement": [
...
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::12345678912:role/CH-S3-your-clickhouse-cloud-role"
},
"Action": "sts:AssumeRole"
},
]
}
自定义证书
ClickPipes for Kafka 支持为具有 SASL 和公共 SSL/TLS 证书的 Kafka 代理上传自定义证书。您可以在 ClickPipe 设置的 SSL 证书部分上传您的证书。
请注意,虽然我们支持上传单个 SSL 证书以及 Kafka 的 SASL,但目前不支持具有相互 TLS (mTLS) 的 SSL。
性能
批处理
ClickPipes 以批处理方式将数据插入 ClickHouse。这是为了避免在数据库中创建太多部分,这可能会导致集群中的性能问题。
当满足以下条件之一时,将插入批次
- 批次大小已达到最大大小(100,000 行或 20MB)
- 批次已打开最长时间(5 秒)
延迟
延迟(定义为 Kafka 消息生成和消息在 ClickHouse 中可用之间的时间)将取决于多种因素(即代理延迟、网络延迟、消息大小/格式)。上面部分中描述的批处理也会影响延迟。我们始终建议使用典型负载测试您的特定用例,以确定预期的延迟。
ClickPipes 不提供有关延迟的任何保证。如果您有特定的低延迟要求,请联系我们。
扩展
ClickPipes for Kafka 设计为可水平扩展。默认情况下,我们创建一个包含 1 个消费者的消费者组。可以使用 ClickPipe 详细信息视图中的扩展控件更改此设置。
F.A.Q
通用
-
ClickPipes for Kafka 是如何工作的?
ClickPipes 使用专用架构运行 Kafka Consumer API,以从指定主题读取数据,然后将数据插入特定 ClickHouse Cloud 服务上的 ClickHouse 表中。
-
ClickPipes 和 ClickHouse Kafka 表引擎之间有什么区别?
Kafka 表引擎是 ClickHouse 的核心功能,它实现了一种“拉取模型”,其中 ClickHouse 服务器本身连接到 Kafka,拉取事件,然后将它们本地写入。
ClickPipes 是一项独立于 ClickHouse 服务的云服务,它连接到 Kafka(或其他数据源)并将事件推送到关联的 ClickHouse Cloud 服务。这种解耦架构允许卓越的操作灵活性、清晰的责任分离、可扩展的摄取、优雅的故障管理、可扩展性等等。
-
使用 ClickPipes for Kafka 有哪些要求?
为了使用 ClickPipes for Kafka,您需要一个正在运行的 Kafka 代理和一个启用了 ClickPipes 的 ClickHouse Cloud 服务。您还需要确保 ClickHouse Cloud 可以访问您的 Kafka 代理。这可以通过在 Kafka 端允许远程连接、在 Kafka 设置中将 ClickHouse Cloud 出口 IP 地址列入白名单来实现。
-
ClickPipes for Kafka 是否支持 AWS PrivateLink?
支持 AWS PrivateLink。请联系我们了解更多信息。
-
我可以使用 ClickPipes for Kafka 将数据写入 Kafka 主题吗?
否,ClickPipes for Kafka 旨在从 Kafka 主题读取数据,而不是向其写入数据。要将数据写入 Kafka 主题,您需要使用专用的 Kafka 生产者。
-
ClickPipes 是否支持多个代理?
是的,如果代理是同一仲裁的一部分,则可以将它们配置在一起,并用
,
分隔。
Upstash
-
ClickPipes 是否支持 Upstash?
是的。Upstash Kafka 产品于 2024 年 9 月 11 日进入 6 个月的弃用期。现有客户可以使用 ClickPipes 和他们现有的 Upstash Kafka 代理,方法是在 ClickPipes 用户界面上使用通用 Kafka 磁贴。在弃用通知之前,现有的 Upstash Kafka ClickPipes 不受影响。当弃用期结束后,ClickPipe 将停止运行。
-
ClickPipes 是否支持 Upstash 模式注册表?
否。ClickPipes 与 Upstash Kafka 模式注册表不兼容。
-
ClickPipes 是否支持 Upstash QStash Workflow?
否。除非 QStash Workflow 中引入了 Kafka 兼容表面,否则它将无法与 Kafka ClickPipes 一起使用。
Azure EventHubs
-
Azure Event Hubs ClickPipe 在没有 Kafka 表面的情况下是否可以工作?
否。ClickPipes 要求 Azure Event Hubs 启用 Kafka 表面。Kafka 协议仅在其标准、高级和专用 SKU 定价层中受支持。
-
Azure 模式注册表是否与 ClickPipes 兼容
否。ClickPipes 目前与 Event Hubs 模式注册表不兼容。
-
我的策略需要哪些权限才能从 Azure Event Hubs 消费?
要列出主题和消费事件,提供给 ClickPipes 的共享访问策略至少需要“侦听”声明。
-
为什么我的 Event Hubs 没有返回任何数据?
如果您的 ClickHouse 实例与您的 Event Hubs 部署位于不同的区域或大洲,则在加入 ClickPipes 时可能会遇到超时,并且在从 Event Hub 消费数据时可能会遇到更高的延迟。最佳实践是将您的 ClickHouse Cloud 部署和 Azure Event Hubs 部署放置在彼此靠近的云区域中,以避免不利的性能。
-
我是否应该包含 Azure Event Hubs 的端口号?
是的。ClickPipes 希望您包含 Kafka 表面的端口号,应为
:9093
。 -
ClickPipes IP 对于 Azure Event Hubs 仍然相关吗?
是的。如果您限制对 Event Hubs 实例的流量,请添加记录的静态 NAT IP。
-
连接字符串是用于 Event Hub,还是用于 Event Hub 命名空间?
两者都可以工作,但是,我们建议在命名空间级别使用共享访问策略,以从多个 Event Hubs 检索示例。