使用命名集合将 ClickHouse 与 Kafka 集成
简介
在本指南中,我们将探讨如何使用命名集合将 ClickHouse 连接到 Kafka。使用命名集合的配置文件有以下几个优点
- 集中化且更易于管理配置设置。
- 无需更改 SQL 表定义即可更改设置。
- 通过检查单个配置文件,可以更轻松地审查和排除配置故障。
本指南已在 Apache Kafka 3.4.1 和 ClickHouse 24.5.1 上进行过测试。
假设
本文档假设您已拥有:
- 一个正常运行的 Kafka 集群。
- 一个已设置并运行的 ClickHouse 集群。
- SQL 的基本知识以及对 ClickHouse 和 Kafka 配置的熟悉。
先决条件
确保创建命名集合的用户具有必要的访问权限
<access_management>1</access_management>
<named_collection_control>1</named_collection_control>
<show_named_collections>1</show_named_collections>
<show_named_collections_secrets>1</show_named_collections_secrets>
有关启用访问控制的更多详细信息,请参阅《用户管理指南》。
配置
将以下部分添加到您的 ClickHouse config.xml
文件中
<!-- Named collections for Kafka integration -->
<named_collections>
<cluster_1>
<!-- ClickHouse Kafka engine parameters -->
<kafka_broker_list>c1-kafka-1:9094,c1-kafka-2:9094,c1-kafka-3:9094</kafka_broker_list>
<kafka_topic_list>cluster_1_clickhouse_topic</kafka_topic_list>
<kafka_group_name>cluster_1_clickhouse_consumer</kafka_group_name>
<kafka_format>JSONEachRow</kafka_format>
<kafka_commit_every_batch>0</kafka_commit_every_batch>
<kafka_num_consumers>1</kafka_num_consumers>
<kafka_thread_per_consumer>1</kafka_thread_per_consumer>
<!-- Kafka extended configuration -->
<kafka>
<security_protocol>SASL_SSL</security_protocol>
<enable_ssl_certificate_verification>false</enable_ssl_certificate_verification>
<sasl_mechanism>PLAIN</sasl_mechanism>
<sasl_username>kafka-client</sasl_username>
<sasl_password>kafkapassword1</sasl_password>
<debug>all</debug>
<auto_offset_reset>latest</auto_offset_reset>
</kafka>
</cluster_1>
<cluster_2>
<!-- ClickHouse Kafka engine parameters -->
<kafka_broker_list>c2-kafka-1:29094,c2-kafka-2:29094,c2-kafka-3:29094</kafka_broker_list>
<kafka_topic_list>cluster_2_clickhouse_topic</kafka_topic_list>
<kafka_group_name>cluster_2_clickhouse_consumer</kafka_group_name>
<kafka_format>JSONEachRow</kafka_format>
<kafka_commit_every_batch>0</kafka_commit_every_batch>
<kafka_num_consumers>1</kafka_num_consumers>
<kafka_thread_per_consumer>1</kafka_thread_per_consumer>
<!-- Kafka extended configuration -->
<kafka>
<security_protocol>SASL_SSL</security_protocol>
<enable_ssl_certificate_verification>false</enable_ssl_certificate_verification>
<sasl_mechanism>PLAIN</sasl_mechanism>
<sasl_username>kafka-client</sasl_username>
<sasl_password>kafkapassword2</sasl_password>
<debug>all</debug>
<auto_offset_reset>latest</auto_offset_reset>
</kafka>
</cluster_2>
</named_collections>
配置说明
- 调整 Kafka 地址和相关配置以匹配您的 Kafka 集群设置。
- `<kafka>` 之前的部分包含 ClickHouse Kafka 引擎参数。有关完整参数列表,请参阅 Kafka 引擎参数。
- `<kafka>` 内的部分包含扩展的 Kafka 配置选项。有关更多选项,请参阅 librdkafka 配置。
- 本示例使用
SASL_SSL
安全协议和PLAIN
机制。根据您的 Kafka 集群配置调整这些设置。
创建表和数据库
在您的 ClickHouse 集群上创建必要的数据库和表。如果您的 ClickHouse 作为单节点运行,请省略 SQL 命令的集群部分,并使用任何其他引擎代替 ReplicatedMergeTree
。
创建数据库
CREATE DATABASE kafka_testing ON CLUSTER LAB_CLICKHOUSE_CLUSTER;
创建 Kafka 表
为第一个 Kafka 集群创建第一个 Kafka 表
CREATE TABLE kafka_testing.first_kafka_table ON CLUSTER LAB_CLICKHOUSE_CLUSTER
(
`id` UInt32,
`first_name` String,
`last_name` String
)
ENGINE = Kafka(cluster_1);
为第二个 Kafka 集群创建第二个 Kafka 表
CREATE TABLE kafka_testing.second_kafka_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER
(
`id` UInt32,
`first_name` String,
`last_name` String
)
ENGINE = Kafka(cluster_2);
创建复制表
为第一个 Kafka 表创建表
CREATE TABLE kafka_testing.first_replicated_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER
(
`id` UInt32,
`first_name` String,
`last_name` String
) ENGINE = ReplicatedMergeTree()
ORDER BY id;
为第二个 Kafka 表创建表
CREATE TABLE kafka_testing.second_replicated_table ON CLUSTER STAGE_CLICKHOUSE_CLUSTER
(
`id` UInt32,
`first_name` String,
`last_name` String
) ENGINE = ReplicatedMergeTree()
ORDER BY id;
创建物化视图
创建一个物化视图,以将数据从第一个 Kafka 表插入到第一个复制表中
CREATE MATERIALIZED VIEW kafka_testing.cluster_1_mv ON CLUSTER STAGE_CLICKHOUSE_CLUSTER TO first_replicated_table AS
SELECT
id,
first_name,
last_name
FROM first_kafka_table;
创建一个物化视图,以将数据从第二个 Kafka 表插入到第二个复制表中
CREATE MATERIALIZED VIEW kafka_testing.cluster_2_mv ON CLUSTER STAGE_CLICKHOUSE_CLUSTER TO second_replicated_table AS
SELECT
id,
first_name,
last_name
FROM second_kafka_table;
验证设置
现在您应该在 Kafka 集群上看到相关的消费者组
cluster_1_clickhouse_consumer
在cluster_1
上cluster_2_clickhouse_consumer
在cluster_2
上
在您的任何 ClickHouse 节点上运行以下查询,以查看两个表中的数据
SELECT * FROM first_replicated_table LIMIT 10;
SELECT * FROM second_replicated_table LIMIT 10;
注意
在本指南中,两个 Kafka 主题中摄取的数据是相同的。在您的情况下,它们可能会有所不同。您可以根据需要添加任意数量的 Kafka 集群。
示例输出
┌─id─┬─first_name─┬─last_name─┐
│ 0 │ FirstName0 │ LastName0 │
│ 1 │ FirstName1 │ LastName1 │
│ 2 │ FirstName2 │ LastName2 │
└────┴────────────┴───────────┘
至此,使用命名集合将 ClickHouse 与 Kafka 集成的设置已完成。通过将 Kafka 配置集中在 ClickHouse `config.xml` 文件中,您可以更轻松地管理和调整设置,从而确保集成更加简化和高效。