跳到主要内容
跳到主要内容

Kafka

ClickHouse Cloud 中不支持
注意

建议 ClickHouse Cloud 用户使用 ClickPipes 将 Kafka 数据流式传输到 ClickHouse。这原生支持高性能插入,同时确保关注点分离,并能够独立扩展摄取和集群资源。

此引擎与 Apache Kafka 协同工作。

Kafka 允许您

  • 发布或订阅数据流。
  • 组织容错存储。
  • 在数据流可用时处理它们。

创建表

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [ALIAS expr1],
name2 [type2] [ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];

必需参数

  • kafka_broker_list — 以逗号分隔的代理列表(例如,localhost:9092)。
  • kafka_topic_list — Kafka 主题列表。
  • kafka_group_name — Kafka 消费者组。读取偏移量会为每个组单独跟踪。如果您不希望消息在集群中重复,请在所有地方使用相同的组名称。
  • kafka_format — 消息格式。使用与 SQL FORMAT 函数相同的表示法,例如 JSONEachRow。有关更多信息,请参阅“格式”部分。

可选参数

  • kafka_schema — 如果格式需要模式定义,则必须使用的参数。例如,Cap’n Proto 需要模式文件的路径和根 schema.capnp:Message 对象的名称。
  • kafka_num_consumers — 每个表的消费者数量。如果单个消费者的吞吐量不足,请指定更多消费者。消费者总数不应超过主题中的分区数,因为每个分区只能分配一个消费者,并且不得大于部署 ClickHouse 的服务器上的物理核心数。默认值:1
  • kafka_max_block_size — 轮询的最大批量大小(以消息为单位)。默认值:max_insert_block_size
  • kafka_skip_broken_messages — Kafka 消息解析器对每个块中与模式不兼容的消息的容忍度。如果 kafka_skip_broken_messages = N,则引擎将跳过 N 个无法解析的 Kafka 消息(一条消息等于一行数据)。默认值:0
  • kafka_commit_every_batch — 提交每个已消费和处理的批次,而不是在写入整个块后进行单次提交。默认值:0
  • kafka_client_id — 客户端标识符。默认情况下为空。
  • kafka_poll_timeout_ms — 从 Kafka 单次轮询的超时时间。默认值:stream_poll_timeout_ms
  • kafka_poll_max_batch_size — 单次 Kafka 轮询中要轮询的最大消息量。默认值:max_block_size
  • kafka_flush_interval_ms — 从 Kafka 刷新数据的超时时间。默认值:stream_flush_interval_ms
  • kafka_thread_per_consumer — 为每个消费者提供独立的线程。启用后,每个消费者都会独立并行地刷新数据(否则 — 来自多个消费者的行会被压缩以形成一个块)。默认值:0
  • kafka_handle_error_mode — 如何处理 Kafka 引擎的错误。可能的值:default(如果我们无法解析消息,将抛出异常),stream(异常消息和原始消息将保存在虚拟列 _error_raw_message 中)。
  • kafka_commit_on_select — 在执行 select 查询时提交消息。默认值:false
  • kafka_max_rows_per_message — 对于基于行的格式,在一条 Kafka 消息中写入的最大行数。默认值:1

示例

  CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

SELECT * FROM queue LIMIT 5;

CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;

CREATE TABLE queue3 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
SETTINGS kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
创建表的已弃用方法
注意

在新项目​​中不要使用此方法。如果可能,请将旧项目切换到上述方法。

Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers, kafka_max_block_size, kafka_skip_broken_messages, kafka_commit_every_batch, kafka_client_id, kafka_poll_timeout_ms, kafka_poll_max_batch_size, kafka_flush_interval_ms, kafka_thread_per_consumer, kafka_handle_error_mode, kafka_commit_on_select, kafka_max_rows_per_message]);
信息

Kafka 表引擎不支持具有默认值的列。如果您需要具有默认值的列,可以在物化视图级别添加它们(见下文)。

描述

传递的消息会自动跟踪,因此组中的每条消息仅计数一次。如果您想获取两次数据,请使用另一个组名称创建表的副本。

组是灵活的,并在集群上同步。例如,如果您在集群中有 10 个主题和 5 个表的副本,则每个副本获得 2 个主题。如果副本数量发生变化,主题将在副本之间自动重新分配。有关更多信息,请访问 http://kafka.apache.org/intro

SELECT 对于读取消息(调试除外)不是特别有用,因为每条消息只能读取一次。使用物化视图创建实时线程更为实用。为此

  1. 使用引擎创建 Kafka 消费者,并将其视为数据流。
  2. 创建具有所需结构的表。
  3. 创建一个物化视图,该视图转换来自引擎的数据并将其放入先前创建的表中。

MATERIALIZED VIEW 加入引擎时,它开始在后台收集数据。这使您可以持续接收来自 Kafka 的消息,并使用 SELECT 将其转换为所需格式。一个 Kafka 表可以拥有任意数量的物化视图,它们不直接从 Kafka 表读取数据,而是接收新记录(以块为单位),这样您就可以写入具有不同详细级别(带分组 - 聚合和不带分组)的多个表。

示例

  CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');

CREATE TABLE daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);

CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;

SELECT level, sum(total) FROM daily GROUP BY level;

为了提高性能,收到的消息被分组为 max_insert_block_size 大小的块。如果未在 stream_flush_interval_ms 毫秒内形成块,则无论块的完整性如何,数据都将刷新到表中。

要停止接收主题数据或更改转换逻辑,请分离物化视图

  DETACH TABLE consumer;
ATTACH TABLE consumer;

如果您想使用 ALTER 更改目标表,我们建议禁用物化视图,以避免目标表和视图中的数据之间存在差异。

配置

与 GraphiteMergeTree 类似,Kafka 引擎支持使用 ClickHouse 配置文件进行扩展配置。您可以使用两个配置键:全局配置(在 <kafka> 下)和主题级别配置(在 <kafka><kafka_topic> 下)。首先应用全局配置,然后应用主题级别配置(如果存在)。

  <kafka>
<!-- Global configuration options for all tables of Kafka engine type -->
<debug>cgrp</debug>
<statistics_interval_ms>3000</statistics_interval_ms>

<kafka_topic>
<name>logs</name>
<statistics_interval_ms>4000</statistics_interval_ms>
</kafka_topic>

<!-- Settings for consumer -->
<consumer>
<auto_offset_reset>smallest</auto_offset_reset>
<kafka_topic>
<name>logs</name>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_topic>

<kafka_topic>
<name>stats</name>
<fetch_min_bytes>50000</fetch_min_bytes>
</kafka_topic>
</consumer>

<!-- Settings for producer -->
<producer>
<kafka_topic>
<name>logs</name>
<retry_backoff_ms>250</retry_backoff_ms>
</kafka_topic>

<kafka_topic>
<name>stats</name>
<retry_backoff_ms>400</retry_backoff_ms>
</kafka_topic>
</producer>
</kafka>

有关可能的配置选项列表,请参阅 librdkafka 配置参考。在 ClickHouse 配置中使用下划线 (_) 而不是点。例如,check.crcs=true 将是 <check_crcs>true</check_crcs>

Kerberos 支持

要处理支持 Kerberos 的 Kafka,请添加 security_protocol 子元素,其值为 sasl_plaintext。如果 Kerberos 票据授予票据已获取并由操作系统设施缓存,则已足够。ClickHouse 能够使用 keytab 文件维护 Kerberos 凭据。请考虑 sasl_kerberos_service_namesasl_kerberos_keytabsasl_kerberos_principal 子元素。

示例

  <!-- Kerberos-aware Kafka -->
<kafka>
<security_protocol>SASL_PLAINTEXT</security_protocol>
<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/[email protected]</sasl_kerberos_principal>
</kafka>

虚拟列

  • _topic — Kafka 主题。数据类型:LowCardinality(String)
  • _key — 消息的键。数据类型:String
  • _offset — 消息的偏移量。数据类型:UInt64
  • _timestamp — 消息的时间戳。数据类型:Nullable(DateTime)
  • _timestamp_ms — 消息的时间戳(以毫秒为单位)。数据类型:Nullable(DateTime64(3))
  • _partition — Kafka 主题的分区。数据类型:UInt64
  • _headers.name — 消息标头键的数组。数据类型:Array(String)
  • _headers.value — 消息标头值的数组。数据类型:Array(String)

kafka_handle_error_mode='stream' 时的其他虚拟列

  • _raw_message - 无法成功解析的原始消息。数据类型:String
  • _error - 解析失败期间发生的异常消息。数据类型:String

注意:_raw_message_error 虚拟列仅在解析期间发生异常时填充,当消息成功解析时,它们始终为空。

数据格式支持

Kafka 引擎支持 ClickHouse 中支持的所有格式。一条 Kafka 消息中的行数取决于格式是基于行还是基于块

  • 对于基于行的格式,一条 Kafka 消息中的行数可以通过设置 kafka_max_rows_per_message 来控制。
  • 对于基于块的格式,我们无法将块分成更小的部分,但一个块中的行数可以通过常规设置 max_block_size 来控制。

在 ClickHouse Keeper 中存储已提交偏移量的引擎

实验性功能。 了解更多。

如果启用 allow_experimental_kafka_offsets_storage_in_keeper,则可以为 Kafka 表引擎指定两个更多设置

  • kafka_keeper_path 指定 ClickHouse Keeper 中表的路径
  • kafka_replica_name 指定 ClickHouse Keeper 中的副本名称

必须同时指定这两个设置,或者都不指定。当同时指定这两个设置时,将使用新的实验性 Kafka 引擎。新引擎不依赖于将已提交的偏移量存储在 Kafka 中,而是将它们存储在 ClickHouse Keeper 中。它仍然尝试将偏移量提交到 Kafka,但仅在创建表时才依赖于这些偏移量。在任何其他情况下(表重新启动或在某些错误后恢复),存储在 ClickHouse Keeper 中的偏移量将用作继续从 Kafka 消费消息的偏移量。除了已提交的偏移量之外,它还存储了上一批次中消费了多少消息,因此如果插入失败,将消费相同数量的消息,从而在必要时实现重复数据删除。

示例

CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
kafka_keeper_path = '/clickhouse/{database}/experimental_kafka',
kafka_replica_name = 'r1'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;

或者,可以像 ReplicatedMergeTree 一样利用 uuidreplica

CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
kafka_keeper_path = '/clickhouse/{database}/{uuid}',
kafka_replica_name = '{replica}'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;

已知限制

由于新引擎是实验性的,因此尚未准备好用于生产环境。该实现存在一些已知的限制

  • 最大的限制是该引擎不支持直接读取。使用物化视图从引擎读取和写入引擎工作,但直接读取不起作用。因此,所有直接 SELECT 查询都将失败。
  • 快速删除和重新创建表或为不同的引擎指定相同的 ClickHouse Keeper 路径可能会导致问题。作为最佳实践,您可以在 kafka_keeper_path 中使用 {uuid} 以避免路径冲突。
  • 为了进行可重复读取,不能从单个线程上的多个分区消费消息。另一方面,必须定期轮询 Kafka 消费者以保持其活动状态。由于这两个目标,我们决定仅在启用 kafka_thread_per_consumer 时才允许创建多个消费者,否则避免定期轮询消费者的问题太复杂。
  • 由新存储引擎创建的消费者不会显示在 system.kafka_consumers 表中。

另请参阅