跳至主要内容

Kafka

此引擎与 Apache Kafka 一起使用。

Kafka 允许您

  • 发布或订阅数据流。
  • 组织容错存储。
  • 在数据可用时处理流。

创建表

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [ALIAS expr1],
name2 [type2] [ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];

必需参数

  • kafka_broker_list — 以逗号分隔的代理列表(例如,localhost:9092)。
  • kafka_topic_list — Kafka 主题列表。
  • kafka_group_name — Kafka 消费者的一个组。每个组的读取偏移量都会单独跟踪。如果您不希望消息在集群中重复,请在所有地方使用相同的组名。
  • kafka_format — 消息格式。使用与 SQL FORMAT 函数相同的表示法,例如 JSONEachRow。有关更多信息,请参见 格式 部分。

可选参数

  • kafka_schema — 如果格式需要架构定义,则必须使用此参数。例如,Cap’n Proto 需要架构文件路径和根 schema.capnp:Message 对象的名称。
  • kafka_num_consumers — 每个表的消费者数量。如果一个消费者的吞吐量不足,请指定更多消费者。消费者的总数不应超过主题中的分区数,因为每个分区只能分配一个消费者,并且不得大于部署 ClickHouse 的服务器上的物理核心数。默认值:1
  • kafka_max_block_size — 轮询的最大批次大小(以消息计)。默认值:max_insert_block_size
  • kafka_skip_broken_messages — Kafka 消息解析器对每个块中与架构不兼容的消息的容忍度。如果 kafka_skip_broken_messages = N,则引擎将跳过N个无法解析的 Kafka 消息(一条消息等于一行数据)。默认值:0
  • kafka_commit_every_batch — 提交每个已消费和处理的批次,而不是在写入整个块后进行单次提交。默认值:0
  • kafka_client_id — 客户端标识符。默认为空。
  • kafka_poll_timeout_ms — 从 Kafka 单次轮询的超时时间。默认值:stream_poll_timeout_ms
  • kafka_poll_max_batch_size — 在 Kafka 单次轮询中轮询的最大消息量。默认值:max_block_size
  • kafka_flush_interval_ms — 从 Kafka 刷新数据的超时时间。默认值:stream_flush_interval_ms
  • kafka_thread_per_consumer — 为每个消费者提供独立的线程。启用后,每个消费者都会独立并行刷新数据(否则,来自多个消费者的行会被压缩以形成一个块)。默认值:0
  • kafka_handle_error_mode — 如何处理 Kafka 引擎的错误。可能的值:default(如果我们无法解析消息,则会抛出异常),stream(异常消息和原始消息将保存在虚拟列 _error_raw_message 中)。
  • kafka_commit_on_select — 在执行 select 查询时提交消息。默认值:false
  • kafka_max_rows_per_message — 基于行的格式在一则 Kafka 消息中写入的最大行数。默认值:1

示例

  CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

SELECT * FROM queue LIMIT 5;

CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;

CREATE TABLE queue3 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
SETTINGS kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
创建表的已弃用方法
注意

在新项目中不要使用此方法。如果可能,请将旧项目切换到上面描述的方法。

Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_max_block_size, kafka_skip_broken_messages, kafka_commit_every_batch, kafka_client_id, kafka_poll_timeout_ms, kafka_poll_max_batch_size, kafka_flush_interval_ms, kafka_thread_per_consumer, kafka_handle_error_mode, kafka_commit_on_select, kafka_max_rows_per_message]);
信息

Kafka 表引擎不支持带有 默认值 的列。如果您需要带有默认值的列,可以在物化视图级别添加它们(请参见下文)。

描述

传递的消息会自动跟踪,因此每个组中的消息只会被计算一次。如果您想获取数据两次,则使用另一个组名创建表的副本。

组是灵活的,并在集群中同步。例如,如果您在集群中拥有 10 个主题和 5 个表的副本,则每个副本将获得 2 个主题。如果副本的数量发生变化,主题将自动重新分布到副本之间。有关此内容的更多信息,请访问 http://kafka.apache.org/intro

SELECT 对读取消息(调试除外)并不是特别有用,因为每条消息只能读取一次。使用物化视图创建实时线程更实用。为此

  1. 使用引擎创建 Kafka 消费者,并将其视为数据流。
  2. 创建具有所需结构的表。
  3. 创建一个物化视图,将数据从引擎转换并放入先前创建的表中。

MATERIALIZED VIEW 连接引擎时,它会在后台开始收集数据。这允许您持续接收来自 Kafka 的消息,并使用SELECT 将其转换为所需的格式。一个 kafka 表可以拥有任意数量的物化视图,它们不会直接从 kafka 表读取数据,而是接收新的记录(以块为单位),这样您就可以写入多个具有不同详细级别的表(带有分组 - 聚合和不带分组)。

示例

  CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

CREATE TABLE daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);

CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;

SELECT level, sum(total) FROM daily GROUP BY level;

为了提高性能,接收到的消息将分组到大小为 max_insert_block_size 的块中。如果在 stream_flush_interval_ms 毫秒内未形成块,则无论块是否完整,数据都将刷新到表中。

要停止接收主题数据或更改转换逻辑,请分离物化视图

  DETACH TABLE consumer;
ATTACH TABLE consumer;

如果您想使用ALTER 更改目标表,我们建议禁用物化视图,以避免目标表与视图中的数据之间出现差异。

配置

与 GraphiteMergeTree 类似,Kafka 引擎支持使用 ClickHouse 配置文件进行扩展配置。您可以使用两个配置键:全局(在<kafka> 下面)和主题级别(在<kafka><kafka_topic> 下面)。首先应用全局配置,然后应用主题级别配置(如果存在)。

  <kafka>
<!-- Global configuration options for all tables of Kafka engine type -->
<debug>cgrp</debug>
<statistics_interval_ms>3000</statistics_interval_ms>

<kafka_topic>
<name>logs</name>
<statistics_interval_ms>4000</statistics_interval_ms>
</kafka_topic>

<!-- Settings for consumer -->
<consumer>
<auto_offset_reset>smallest</auto_offset_reset>
<kafka_topic>
<name>logs</name>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_topic>

<kafka_topic>
<name>stats</name>
<fetch_min_bytes>50000</fetch_min_bytes>
</kafka_topic>
</consumer>

<!-- Settings for producer -->
<producer>
<kafka_topic>
<name>logs</name>
<retry_backoff_ms>250</retry_backoff_ms>
</kafka_topic>

<kafka_topic>
<name>stats</name>
<retry_backoff_ms>400</retry_backoff_ms>
</kafka_topic>
</producer>
</kafka>

有关可能的配置选项列表,请参见 librdkafka 配置参考。在 ClickHouse 配置中使用下划线 (_) 代替点。例如,check.crcs=true 将变为 <check_crcs>true</check_crcs>

Kerberos 支持

要处理 Kerberos 感知的 Kafka,请添加具有 sasl_plaintext 值的 security_protocol 子元素。如果 Kerberos 票据授予票据已由操作系统工具获取并缓存,则就足够了。ClickHouse 能够使用密钥文件维护 Kerberos 凭据。请考虑 sasl_kerberos_service_namesasl_kerberos_keytabsasl_kerberos_principal 子元素。

示例

  <!-- Kerberos-aware Kafka -->
<kafka>
<security_protocol>SASL_PLAINTEXT</security_protocol>
<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/[email protected]</sasl_kerberos_principal>
</kafka>

虚拟列

  • _topic — Kafka 主题。数据类型:LowCardinality(String)
  • _key — 消息的键。数据类型:String
  • _offset — 消息的偏移量。数据类型:UInt64
  • _timestamp — 消息的时间戳。数据类型:Nullable(DateTime)
  • _timestamp_ms — 消息以毫秒为单位的时间戳。数据类型:Nullable(DateTime64(3))
  • _partition — Kafka 主题的分区。数据类型:UInt64
  • _headers.name — 消息头键的数组。数据类型:Array(String)
  • _headers.value — 消息头值的数组。数据类型:Array(String)

kafka_handle_error_mode='stream' 时的其他虚拟列

  • _raw_message - 无法成功解析的原始消息。数据类型:String
  • _error - 解析失败期间发生的异常消息。数据类型:String

注意:只有在解析期间发生异常的情况下才会填充 _raw_message_error 虚拟列,当消息成功解析时,它们始终为空。

数据格式支持

Kafka 引擎支持 ClickHouse 支持的所有 格式。一条 Kafka 消息中的行数取决于格式是基于行还是基于块。

  • 对于基于行的格式,可以通过设置 kafka_max_rows_per_message 来控制一条 Kafka 消息中的行数。
  • 对于基于块的格式,我们无法将块划分为更小的部分,但可以通过通用设置 max_block_size 来控制一个块中的行数。

在 ClickHouse Keeper 中存储已提交偏移量的实验引擎

如果启用了 allow_experimental_kafka_offsets_storage_in_keeper,则可以为 Kafka 表引擎指定另外两个设置。

  • kafka_keeper_path 指定 ClickHouse Keeper 中表的路径。
  • kafka_replica_name 指定 ClickHouse Keeper 中的副本名称。

这两个设置必须同时指定或都不指定。当两者都指定时,将使用一个新的实验性 Kafka 引擎。新引擎不依赖于在 Kafka 中存储已提交的偏移量,而是将它们存储在 ClickHouse Keeper 中。它仍然尝试将偏移量提交到 Kafka,但它仅在创建表时依赖这些偏移量。在任何其他情况下(表重新启动或在某些错误后恢复),存储在 ClickHouse Keeper 中的偏移量将用作继续消费消息的偏移量。除了已提交的偏移量外,它还存储上次批次中消费了多少条消息,因此如果插入失败,将消费相同数量的消息,从而在必要时启用重复数据删除。

示例

CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
kafka_keeper_path = '/clickhouse/{database}/experimental_kafka',
kafka_replica_name = 'r1'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;

或者类似于 ReplicatedMergeTree 利用 uuidreplica 宏。

CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
kafka_keeper_path = '/clickhouse/{database}/{uuid}',
kafka_replica_name = '{replica}'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;

已知限制

由于新引擎是实验性的,因此尚未准备好用于生产环境。实现有一些已知的限制。

  • 最大的限制是引擎不支持直接读取。使用物化视图从引擎读取和写入引擎可以工作,但直接读取不行。因此,所有直接 SELECT 查询都将失败。
  • 快速删除和重新创建表或为不同的引擎指定相同的 ClickHouse Keeper 路径可能会导致问题。最佳实践是在 kafka_keeper_path 中使用 {uuid} 来避免路径冲突。
  • 为了进行可重复读取,不能在单个线程上从多个分区消费消息。另一方面,必须定期轮询 Kafka 消费者以保持其存活。由于这两个目标,我们决定仅在启用 kafka_thread_per_consumer 时才允许创建多个消费者,否则避免定期轮询消费者的问题过于复杂。
  • 新存储引擎创建的消费者不会显示在 system.kafka_consumers 表中。

另请参阅