Kafka
此引擎与 Apache Kafka 一起使用。
Kafka 允许您
- 发布或订阅数据流。
- 组织容错存储。
- 在数据可用时处理流。
创建表
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [ALIAS expr1],
name2 [type2] [ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];
必需参数
kafka_broker_list
— 以逗号分隔的代理列表(例如,localhost:9092
)。kafka_topic_list
— Kafka 主题列表。kafka_group_name
— Kafka 消费者的一个组。每个组的读取偏移量都会单独跟踪。如果您不希望消息在集群中重复,请在所有地方使用相同的组名。kafka_format
— 消息格式。使用与 SQLFORMAT
函数相同的表示法,例如JSONEachRow
。有关更多信息,请参见 格式 部分。
可选参数
kafka_schema
— 如果格式需要架构定义,则必须使用此参数。例如,Cap’n Proto 需要架构文件路径和根schema.capnp:Message
对象的名称。kafka_num_consumers
— 每个表的消费者数量。如果一个消费者的吞吐量不足,请指定更多消费者。消费者的总数不应超过主题中的分区数,因为每个分区只能分配一个消费者,并且不得大于部署 ClickHouse 的服务器上的物理核心数。默认值:1
。kafka_max_block_size
— 轮询的最大批次大小(以消息计)。默认值:max_insert_block_size。kafka_skip_broken_messages
— Kafka 消息解析器对每个块中与架构不兼容的消息的容忍度。如果kafka_skip_broken_messages = N
,则引擎将跳过N个无法解析的 Kafka 消息(一条消息等于一行数据)。默认值:0
。kafka_commit_every_batch
— 提交每个已消费和处理的批次,而不是在写入整个块后进行单次提交。默认值:0
。kafka_client_id
— 客户端标识符。默认为空。kafka_poll_timeout_ms
— 从 Kafka 单次轮询的超时时间。默认值:stream_poll_timeout_ms。kafka_poll_max_batch_size
— 在 Kafka 单次轮询中轮询的最大消息量。默认值:max_block_size。kafka_flush_interval_ms
— 从 Kafka 刷新数据的超时时间。默认值:stream_flush_interval_ms。kafka_thread_per_consumer
— 为每个消费者提供独立的线程。启用后,每个消费者都会独立并行刷新数据(否则,来自多个消费者的行会被压缩以形成一个块)。默认值:0
。kafka_handle_error_mode
— 如何处理 Kafka 引擎的错误。可能的值:default(如果我们无法解析消息,则会抛出异常),stream(异常消息和原始消息将保存在虚拟列_error
和_raw_message
中)。kafka_commit_on_select
— 在执行 select 查询时提交消息。默认值:false
。kafka_max_rows_per_message
— 基于行的格式在一则 Kafka 消息中写入的最大行数。默认值:1
。
示例
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
CREATE TABLE queue3 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
SETTINGS kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
创建表的已弃用方法
在新项目中不要使用此方法。如果可能,请将旧项目切换到上面描述的方法。
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_max_block_size, kafka_skip_broken_messages, kafka_commit_every_batch, kafka_client_id, kafka_poll_timeout_ms, kafka_poll_max_batch_size, kafka_flush_interval_ms, kafka_thread_per_consumer, kafka_handle_error_mode, kafka_commit_on_select, kafka_max_rows_per_message]);
Kafka 表引擎不支持带有 默认值 的列。如果您需要带有默认值的列,可以在物化视图级别添加它们(请参见下文)。
描述
传递的消息会自动跟踪,因此每个组中的消息只会被计算一次。如果您想获取数据两次,则使用另一个组名创建表的副本。
组是灵活的,并在集群中同步。例如,如果您在集群中拥有 10 个主题和 5 个表的副本,则每个副本将获得 2 个主题。如果副本的数量发生变化,主题将自动重新分布到副本之间。有关此内容的更多信息,请访问 http://kafka.apache.org/intro。
SELECT
对读取消息(调试除外)并不是特别有用,因为每条消息只能读取一次。使用物化视图创建实时线程更实用。为此
- 使用引擎创建 Kafka 消费者,并将其视为数据流。
- 创建具有所需结构的表。
- 创建一个物化视图,将数据从引擎转换并放入先前创建的表中。
当MATERIALIZED VIEW
连接引擎时,它会在后台开始收集数据。这允许您持续接收来自 Kafka 的消息,并使用SELECT
将其转换为所需的格式。一个 kafka 表可以拥有任意数量的物化视图,它们不会直接从 kafka 表读取数据,而是接收新的记录(以块为单位),这样您就可以写入多个具有不同详细级别的表(带有分组 - 聚合和不带分组)。
示例
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
CREATE TABLE daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
SELECT level, sum(total) FROM daily GROUP BY level;
为了提高性能,接收到的消息将分组到大小为 max_insert_block_size 的块中。如果在 stream_flush_interval_ms 毫秒内未形成块,则无论块是否完整,数据都将刷新到表中。
要停止接收主题数据或更改转换逻辑,请分离物化视图
DETACH TABLE consumer;
ATTACH TABLE consumer;
如果您想使用ALTER
更改目标表,我们建议禁用物化视图,以避免目标表与视图中的数据之间出现差异。
配置
与 GraphiteMergeTree 类似,Kafka 引擎支持使用 ClickHouse 配置文件进行扩展配置。您可以使用两个配置键:全局(在<kafka>
下面)和主题级别(在<kafka><kafka_topic>
下面)。首先应用全局配置,然后应用主题级别配置(如果存在)。
<kafka>
<!-- Global configuration options for all tables of Kafka engine type -->
<debug>cgrp</debug>
<statistics_interval_ms>3000</statistics_interval_ms>
<kafka_topic>
<name>logs</name>
<statistics_interval_ms>4000</statistics_interval_ms>
</kafka_topic>
<!-- Settings for consumer -->
<consumer>
<auto_offset_reset>smallest</auto_offset_reset>
<kafka_topic>
<name>logs</name>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_topic>
<kafka_topic>
<name>stats</name>
<fetch_min_bytes>50000</fetch_min_bytes>
</kafka_topic>
</consumer>
<!-- Settings for producer -->
<producer>
<kafka_topic>
<name>logs</name>
<retry_backoff_ms>250</retry_backoff_ms>
</kafka_topic>
<kafka_topic>
<name>stats</name>
<retry_backoff_ms>400</retry_backoff_ms>
</kafka_topic>
</producer>
</kafka>
有关可能的配置选项列表,请参见 librdkafka 配置参考。在 ClickHouse 配置中使用下划线 (_
) 代替点。例如,check.crcs=true
将变为 <check_crcs>true</check_crcs>
。
Kerberos 支持
要处理 Kerberos 感知的 Kafka,请添加具有 sasl_plaintext
值的 security_protocol
子元素。如果 Kerberos 票据授予票据已由操作系统工具获取并缓存,则就足够了。ClickHouse 能够使用密钥文件维护 Kerberos 凭据。请考虑 sasl_kerberos_service_name
、sasl_kerberos_keytab
和 sasl_kerberos_principal
子元素。
示例
<!-- Kerberos-aware Kafka -->
<kafka>
<security_protocol>SASL_PLAINTEXT</security_protocol>
<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/[email protected]</sasl_kerberos_principal>
</kafka>
虚拟列
_topic
— Kafka 主题。数据类型:LowCardinality(String)
。_key
— 消息的键。数据类型:String
。_offset
— 消息的偏移量。数据类型:UInt64
。_timestamp
— 消息的时间戳。数据类型:Nullable(DateTime)
。_timestamp_ms
— 消息以毫秒为单位的时间戳。数据类型:Nullable(DateTime64(3))
。_partition
— Kafka 主题的分区。数据类型:UInt64
。_headers.name
— 消息头键的数组。数据类型:Array(String)
。_headers.value
— 消息头值的数组。数据类型:Array(String)
。
当 kafka_handle_error_mode='stream'
时的其他虚拟列
_raw_message
- 无法成功解析的原始消息。数据类型:String
。_error
- 解析失败期间发生的异常消息。数据类型:String
。
注意:只有在解析期间发生异常的情况下才会填充 _raw_message
和 _error
虚拟列,当消息成功解析时,它们始终为空。
数据格式支持
Kafka 引擎支持 ClickHouse 支持的所有 格式。一条 Kafka 消息中的行数取决于格式是基于行还是基于块。
- 对于基于行的格式,可以通过设置
kafka_max_rows_per_message
来控制一条 Kafka 消息中的行数。 - 对于基于块的格式,我们无法将块划分为更小的部分,但可以通过通用设置 max_block_size 来控制一个块中的行数。
在 ClickHouse Keeper 中存储已提交偏移量的实验引擎
如果启用了 allow_experimental_kafka_offsets_storage_in_keeper
,则可以为 Kafka 表引擎指定另外两个设置。
kafka_keeper_path
指定 ClickHouse Keeper 中表的路径。kafka_replica_name
指定 ClickHouse Keeper 中的副本名称。
这两个设置必须同时指定或都不指定。当两者都指定时,将使用一个新的实验性 Kafka 引擎。新引擎不依赖于在 Kafka 中存储已提交的偏移量,而是将它们存储在 ClickHouse Keeper 中。它仍然尝试将偏移量提交到 Kafka,但它仅在创建表时依赖这些偏移量。在任何其他情况下(表重新启动或在某些错误后恢复),存储在 ClickHouse Keeper 中的偏移量将用作继续消费消息的偏移量。除了已提交的偏移量外,它还存储上次批次中消费了多少条消息,因此如果插入失败,将消费相同数量的消息,从而在必要时启用重复数据删除。
示例
CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
kafka_keeper_path = '/clickhouse/{database}/experimental_kafka',
kafka_replica_name = 'r1'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;
或者类似于 ReplicatedMergeTree 利用 uuid
和 replica
宏。
CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
kafka_keeper_path = '/clickhouse/{database}/{uuid}',
kafka_replica_name = '{replica}'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;
已知限制
由于新引擎是实验性的,因此尚未准备好用于生产环境。实现有一些已知的限制。
- 最大的限制是引擎不支持直接读取。使用物化视图从引擎读取和写入引擎可以工作,但直接读取不行。因此,所有直接
SELECT
查询都将失败。 - 快速删除和重新创建表或为不同的引擎指定相同的 ClickHouse Keeper 路径可能会导致问题。最佳实践是在
kafka_keeper_path
中使用{uuid}
来避免路径冲突。 - 为了进行可重复读取,不能在单个线程上从多个分区消费消息。另一方面,必须定期轮询 Kafka 消费者以保持其存活。由于这两个目标,我们决定仅在启用
kafka_thread_per_consumer
时才允许创建多个消费者,否则避免定期轮询消费者的问题过于复杂。 - 新存储引擎创建的消费者不会显示在
system.kafka_consumers
表中。
另请参阅