跳至主要内容

使用 Kafka 表引擎

注意

ClickHouse Cloud 不支持 Kafka 表引擎。请考虑使用 ClickPipesKafka Connect

Kafka 到 ClickHouse

要使用 Kafka 表引擎,您应该大体熟悉 ClickHouse 物化视图

概述

最初,我们专注于最常见的用例:使用 Kafka 表引擎将数据从 Kafka 插入到 ClickHouse 中。

Kafka 表引擎允许 ClickHouse 直接从 Kafka 主题读取数据。虽然这对于查看主题上的消息很有用,但该引擎的设计只允许一次性检索,即当向表发出查询时,它会从队列中消费数据并在返回结果给调用方之前增加消费者偏移量。实际上,无法在不重置这些偏移量的情况下重新读取数据。

为了持久化从表引擎读取的这些数据,我们需要一种捕获数据并将其插入到另一个表中的方法。基于触发器的物化视图原生提供了此功能。物化视图在表引擎上启动读取操作,接收批量的文档。TO 子句确定数据的目标 - 通常是 Merge Tree 系列 的表。此过程如下图所示

Kafka table engine

步骤

1. 准备

如果您的目标主题上填充了数据,您可以调整以下内容以在您的数据集中使用。或者,此处提供了示例 Github 数据集。此数据集在下面的示例中使用,并使用简化的模式和部分行(具体来说,我们限制为关于 ClickHouse 存储库 的 Github 事件),与 此处提供的完整数据集相比,行数较少,以便简洁。这对于大多数 与数据集一起发布的查询 仍然足够。

2. 配置 ClickHouse

如果您连接到安全的 Kafka,则需要执行此步骤。这些设置无法通过 SQL DDL 命令传递,必须在 ClickHouse 的 config.xml 中配置。我们假设您正在连接到 SASL 安全的实例。这是与 Confluent Cloud 交互时的最简单方法。

<clickhouse>
<kafka>
<sasl_username>username</sasl_username>
<sasl_password>password</sasl_password>
<security_protocol>sasl_ssl</security_protocol>
<sasl_mechanisms>PLAIN</sasl_mechanisms>
</kafka>
</clickhouse>

将以上代码段放在 conf.d/ 目录下的新文件中,或将其合并到现有的配置文件中。有关可以配置的设置,请参阅 此处

我们还将创建一个名为 KafkaEngine 的数据库,在本教程中使用它

CREATE DATABASE KafkaEngine;

创建数据库后,您需要切换到它

USE KafkaEngine;
3. 创建目标表

准备您的目标表。在下面的示例中,我们使用简化的 GitHub 模式以简洁起见。请注意,虽然我们使用 MergeTree 表引擎,但此示例可以轻松地适用于 MergeTree 系列 的任何成员。

CREATE TABLE github
(
file_time DateTime,
event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
actor_login LowCardinality(String),
repo_name LowCardinality(String),
created_at DateTime,
updated_at DateTime,
action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
comment_id UInt64,
path String,
ref LowCardinality(String),
ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
creator_user_login LowCardinality(String),
number UInt32,
title String,
labels Array(LowCardinality(String)),
state Enum('none' = 0, 'open' = 1, 'closed' = 2),
assignee LowCardinality(String),
assignees Array(LowCardinality(String)),
closed_at DateTime,
merged_at DateTime,
merge_commit_sha String,
requested_reviewers Array(LowCardinality(String)),
merged_by LowCardinality(String),
review_comments UInt32,
member_login LowCardinality(String)
) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at)
4. 创建并填充主题

接下来,我们将创建一个主题。我们可以使用多种工具来执行此操作。如果我们在本地机器或 Docker 容器内运行 Kafka,则 RPK 非常有效。我们可以通过运行以下命令创建一个名为 github,具有 5 个分区的主题

rpk topic create -p 5 github --brokers <host>:<port>

如果我们在 Confluent Cloud 上运行 Kafka,我们可能更喜欢使用 Confluent CLI

confluent kafka topic create --if-not-exists github

现在我们需要使用一些数据填充此主题,我们将使用 kcat 来执行此操作。如果我们在本地运行 Kafka 且身份验证已禁用,则可以运行类似以下的命令

cat github_all_columns.ndjson | 
kcat -P \
-b <host>:<port> \
-t github

或者,如果我们的 Kafka 集群使用 SASL 进行身份验证,则可以运行以下命令

cat github_all_columns.ndjson | 
kcat -P \
-b <host>:<port> \
-t github
-X security.protocol=sasl_ssl \
-X sasl.mechanisms=PLAIN \
-X sasl.username=<username> \
-X sasl.password=<password> \

数据集包含 200,000 行,因此应该在几秒钟内完成导入。如果您想使用更大的数据集,请查看 ClickHouse/kafka-samples GitHub 存储库的 大型数据集部分

5. 创建 Kafka 表引擎

以下示例创建了一个与合并树表具有相同模式的表引擎。这不是严格要求的,因为您可以在目标表中使用别名或临时列。但是,设置非常重要 - 请注意使用 JSONEachRow 作为从 Kafka 主题消费 JSON 的数据类型。值 githubclickhouse 分别表示主题和消费者组名称。主题实际上可以是值的列表。

CREATE TABLE github_queue
(
file_time DateTime,
event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
actor_login LowCardinality(String),
repo_name LowCardinality(String),
created_at DateTime,
updated_at DateTime,
action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
comment_id UInt64,
path String,
ref LowCardinality(String),
ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
creator_user_login LowCardinality(String),
number UInt32,
title String,
labels Array(LowCardinality(String)),
state Enum('none' = 0, 'open' = 1, 'closed' = 2),
assignee LowCardinality(String),
assignees Array(LowCardinality(String)),
closed_at DateTime,
merged_at DateTime,
merge_commit_sha String,
requested_reviewers Array(LowCardinality(String)),
merged_by LowCardinality(String),
review_comments UInt32,
member_login LowCardinality(String)
)
ENGINE = Kafka('kafka_host:9092', 'github', 'clickhouse',
'JSONEachRow') settings kafka_thread_per_consumer = 0, kafka_num_consumers = 1;

我们在下面讨论引擎设置和性能调整。此时,对表 github_queue 进行简单的选择应该会读取一些行。请注意,这将使消费者偏移量向前移动,从而阻止在不进行 重置 的情况下重新读取这些行。请注意 limit 和必需参数 stream_like_engine_allow_direct_select.

6. 创建物化视图

物化视图将连接两个先前创建的表,从 Kafka 表引擎读取数据并将其插入到目标合并树表中。我们可以进行许多数据转换。我们将进行简单的读取和插入操作。使用 * 假设列名相同(区分大小写)。

CREATE MATERIALIZED VIEW github_mv TO github AS
SELECT *
FROM github_queue;

在创建时,物化视图连接到 Kafka 引擎并开始读取:将行插入到目标表中。此过程将无限期地继续,Kafka 中后续的消息插入将被消费。随意重新运行插入脚本以将更多消息插入 Kafka。

7. 确认行已插入

确认数据存在于目标表中

SELECT count() FROM github;

您应该看到 200,000 行

┌─count()─┐
│ 200000 │
└─────────┘

常见操作

停止和重新启动消息消费

要停止消息消费,您可以分离 Kafka 引擎表

DETACH TABLE github_queue;

这不会影响消费者组的偏移量。要重新启动消费并从以前的偏移量继续,请重新附加表。

ATTACH TABLE github_queue;
添加 Kafka 元数据

在将原始 Kafka 消息导入到 ClickHouse 后,跟踪其元数据可能很有用。例如,我们可能想知道我们消费了特定主题或分区的多少内容。为此,Kafka 表引擎公开了几个 虚拟列。可以通过修改我们的模式和物化视图的 select 语句,将这些列作为目标表中的列持久化。

首先,在向目标表添加列之前,我们执行上面描述的停止操作。

DETACH TABLE github_queue;

下面,我们添加信息列以识别行源自的源主题和分区。

ALTER TABLE github
ADD COLUMN topic String,
ADD COLUMN partition UInt64;

接下来,我们需要确保虚拟列按要求映射。虚拟列以_作为前缀。虚拟列的完整列表可以在此处找到。

要使用虚拟列更新我们的表,我们需要删除物化视图,重新附加 Kafka 引擎表,然后重新创建物化视图。

DROP VIEW github_mv;
ATTACH TABLE github_queue;
CREATE MATERIALIZED VIEW github_mv TO github AS
SELECT *, _topic as topic, _partition as partition
FROM github_queue;

新消费的行应该包含元数据。

SELECT actor_login, event_type, created_at, topic, partition 
FROM github
LIMIT 10;

结果如下所示

actor_loginevent_typecreated_attopicpartition
IgorMinarCommitCommentEvent2011-02-12 02:22:00github0
queeupCommitCommentEvent2011-02-12 02:23:23github0
IgorMinarCommitCommentEvent2011-02-12 02:23:24github0
IgorMinarCommitCommentEvent2011-02-12 02:24:50github0
IgorMinarCommitCommentEvent2011-02-12 02:25:20github0
dapiCommitCommentEvent2011-02-12 06:18:36github0
sourcerebelsCommitCommentEvent2011-02-12 06:34:10github0
jamierumbelowCommitCommentEvent2011-02-12 12:21:40github0
jpnCommitCommentEvent2011-02-12 12:24:31github0
OxoniumCommitCommentEvent2011-02-12 12:31:28github0
修改 Kafka 引擎设置

我们建议删除 Kafka 引擎表并使用新设置重新创建它。在此过程中无需修改物化视图 - Kafka 引擎表重新创建后,消息消费将恢复。

调试问题

诸如身份验证问题之类的错误不会在对 Kafka 引擎 DDL 的响应中报告。为了诊断问题,我们建议使用 ClickHouse 主日志文件 clickhouse-server.err.log。可以通过配置启用底层 Kafka 客户端库librdkafka 的进一步跟踪日志记录。

<kafka>
<debug>all</debug>
</kafka>
处理格式错误的消息

Kafka 通常用作数据的“转储地”。这会导致主题包含混合的消息格式和不一致的字段名称。避免这种情况并利用 Kafka 功能(如 Kafka Streams 或 ksqlDB)来确保消息在插入 Kafka 之前格式良好且一致。如果这些选项不可用,ClickHouse 有一些功能可以提供帮助。

  • 将消息字段视为字符串。如果需要,可以在物化视图语句中使用函数执行清理和转换。这不能作为生产解决方案,但可能有助于一次性导入。
  • 如果您正在从主题中使用 JSONEachRow 格式消费 JSON,请使用设置input_format_skip_unknown_fields。在写入数据时,默认情况下,如果输入数据包含目标表中不存在的列,ClickHouse 会抛出异常。但是,如果启用此选项,这些多余的列将被忽略。同样,这也不是生产级解决方案,可能会让其他人感到困惑。
  • 考虑设置 kafka_skip_broken_messages。这要求用户为格式错误的消息指定每个块的容错级别 - 在 kafka_max_block_size 的上下文中考虑。如果超过此容错级别(以绝对消息衡量),则通常的异常行为将恢复,其他消息将被跳过。
交付语义和重复项带来的挑战

Kafka 表引擎具有至少一次语义。在一些已知的罕见情况下可能会出现重复。例如,消息可以从 Kafka 中读取并成功插入到 ClickHouse 中。在提交新偏移量之前,与 Kafka 的连接断开。在这种情况下,需要重试该块。可以使用分布式表或 ReplicatedMergeTree 作为目标表来去重该块。虽然这降低了重复行的可能性,但它依赖于相同的块。Kafka 重新平衡等事件可能会使此假设无效,从而在极少数情况下导致重复。

基于仲裁的插入

在 ClickHouse 中需要更高交付保证的情况下,您可能需要基于仲裁的插入。这不能在物化视图或目标表上设置。但是,它可以为用户配置文件设置,例如

<profiles>
<default>
<insert_quorum>2</insert_quorum>
</default>
</profiles>

ClickHouse 到 Kafka

虽然是一个不太常见的用例,但 ClickHouse 数据也可以持久化到 Kafka 中。例如,我们将手动将行插入到 Kafka 表引擎中。这些数据将由同一个 Kafka 引擎读取,其物化视图将数据放入 Merge Tree 表中。最后,我们演示了物化视图在 Kafka 插入中的应用,以读取现有源表中的表。

步骤

我们的初始目标最好通过以下方式说明

Kafka table engine with inserts

我们假设您已在Kafka 到 ClickHouse的步骤下创建了表和视图,并且主题已完全消费。

1. 直接插入行

首先,确认目标表的行数。

SELECT count() FROM github;

您应该有 200,000 行

┌─count()─┐
│ 200000 │
└─────────┘

现在将 GitHub 目标表中的行插入回 Kafka 表引擎 github_queue 中。请注意我们如何使用 JSONEachRow 格式并将 select 限制为 100。

INSERT INTO github_queue SELECT * FROM github LIMIT 100 FORMAT JSONEachRow

重新计算 GitHub 中的行数以确认它已增加了 100。如上图所示,行已通过 Kafka 表引擎插入到 Kafka 中,然后由同一引擎重新读取并通过我们的物化视图插入到 GitHub 目标表中!

SELECT count() FROM github;

您应该看到 100 行新增行

┌─count()─┐
│ 200100 │
└─────────┘
2. 使用物化视图

我们可以利用物化视图在将文档插入表时将消息推送到 Kafka 引擎(和主题)。当行插入到 GitHub 表中时,会触发一个物化视图,这会导致将行插入回 Kafka 引擎和新的主题中。同样,这最好通过以下方式说明

Kafka table engine inserts with materialized view

创建一个新的 Kafka 主题 github_out 或等效主题。确保 Kafka 表引擎 github_out_queue 指向此主题。

CREATE TABLE github_out_queue
(
file_time DateTime,
event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
actor_login LowCardinality(String),
repo_name LowCardinality(String),
created_at DateTime,
updated_at DateTime,
action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
comment_id UInt64,
path String,
ref LowCardinality(String),
ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
creator_user_login LowCardinality(String),
number UInt32,
title String,
labels Array(LowCardinality(String)),
state Enum('none' = 0, 'open' = 1, 'closed' = 2),
assignee LowCardinality(String),
assignees Array(LowCardinality(String)),
closed_at DateTime,
merged_at DateTime,
merge_commit_sha String,
requested_reviewers Array(LowCardinality(String)),
merged_by LowCardinality(String),
review_comments UInt32,
member_login LowCardinality(String)
)
ENGINE = Kafka('host:port', 'github_out', 'clickhouse_out',
'JSONEachRow') settings kafka_thread_per_consumer = 0, kafka_num_consumers = 1;

现在创建一个新的物化视图 github_out_mv 以指向 GitHub 表,并在触发时将行插入到上述引擎中。因此,GitHub 表的添加将被推送到我们的新 Kafka 主题。

CREATE MATERIALIZED VIEW github_out_mv TO github_out_queue AS
SELECT file_time, event_type, actor_login, repo_name,
created_at, updated_at, action, comment_id, path,
ref, ref_type, creator_user_login, number, title,
labels, state, assignee, assignees, closed_at, merged_at,
merge_commit_sha, requested_reviewers, merged_by,
review_comments, member_login
FROM github
FORMAT JsonEachRow;

如果您插入到作为Kafka 到 ClickHouse一部分创建的原始 github 主题中,文档将神奇地出现在“github_clickhouse”主题中。使用原生 Kafka 工具确认这一点。例如,下面我们使用kcat为 Confluent Cloud 托管的主题插入 100 行到 github 主题中

head -n 10 github_all_columns.ndjson | 
kcat -P \
-b <host>:<port> \
-t github
-X security.protocol=sasl_ssl \
-X sasl.mechanisms=PLAIN \
-X sasl.username=<username> \
-X sasl.password=<password>

github_out 主题的读取应确认消息的传递。

kcat -C \
-b <host>:<port> \
-t github_out \
-X security.protocol=sasl_ssl \
-X sasl.mechanisms=PLAIN \
-X sasl.username=<username> \
-X sasl.password=<password> \
-e -q |
wc -l

虽然这是一个复杂的示例,但这说明了物化视图与 Kafka 引擎结合使用时的强大功能。

集群和性能

使用 ClickHouse 集群

通过 Kafka 消费者组,多个 ClickHouse 实例可以潜在地从同一个主题读取。每个消费者将被分配到一个主题分区,形成 1:1 映射。在使用 Kafka 表引擎扩展 ClickHouse 消费时,请考虑集群中消费者的总数不能超过主题上的分区数。因此,请提前确保为主题正确配置分区。

多个 ClickHouse 实例都可以配置为使用相同的消费者组 ID(在 Kafka 表引擎创建期间指定)从主题读取。因此,每个实例将从一个或多个分区读取,并将段插入到其本地目标表中。然后,可以将目标表配置为使用 ReplicatedMergeTree 来处理数据的重复。这种方法允许 Kafka 读取随着 ClickHouse 集群进行扩展,前提是 Kafka 分区足够。

Replicated Kafka table engine

调整性能

在寻求提高 Kafka 引擎表吞吐量性能时,请考虑以下事项

  • 性能将根据消息大小、格式和目标表类型而有所不同。单个表引擎上的 100k 行/秒应被视为可实现的。默认情况下,消息按块读取,由参数 kafka_max_block_size 控制。默认情况下,此值设置为max_insert_block_size,默认为 1,048,576。除非消息非常大,否则几乎总是应该增加此值。500k 到 1M 之间的值并不少见。测试并评估对吞吐量性能的影响。
  • 可以使用 kafka_num_consumers 增加表引擎的消费者数量。但是,默认情况下,插入将在单个线程中线性化,除非 kafka_thread_per_consumer 从默认值 1 更改。将其设置为 1 以确保并行执行刷新。请注意,使用 N 个消费者(以及 kafka_thread_per_consumer=1)创建 Kafka 引擎表在逻辑上等同于创建 N 个 Kafka 引擎,每个引擎都具有一个物化视图和 kafka_thread_per_consumer=0。
  • 增加消费者不是一项免费的操作。每个消费者都维护自己的缓冲区和线程,从而增加服务器的开销。注意消费者的开销,并首先在线性扩展您的集群,如果可能的话。
  • 如果 Kafka 消息的吞吐量可变且可以接受延迟,请考虑增加 stream_flush_interval_ms 以确保刷新更大的块。
  • background_message_broker_schedule_pool_size 设置执行后台任务的线程数。这些线程用于 Kafka 流。此设置在 ClickHouse 服务器启动时应用,不能在用户会话中更改,默认为 16。如果您在日志中看到超时,则可能需要增加此值。
  • 为了与 Kafka 通信,使用了 librdkafka 库,该库本身会创建线程。因此,大量 Kafka 表或消费者可能会导致大量上下文切换。将此负载分布到集群中,如果可能,只复制目标表,或者考虑使用表引擎从多个主题读取 - 支持值列表。多个物化视图可以从单个表读取,每个物化视图都过滤特定主题中的数据。

任何设置更改都应进行测试。我们建议监控 Kafka 消费者延迟以确保您已正确扩展。

其他设置

除了上面讨论的设置外,以下内容可能也值得关注

  • Kafka_max_wait_ms - 从 Kafka 读取消息之前重试的等待时间(毫秒)。在用户配置文件级别设置,默认为 5000。

所有设置来自底层 librdkafka 也可以放置在 ClickHouse 配置文件中的 kafka 元素内 - 设置名称应为 XML 元素,其中点用下划线替换,例如

<clickhouse>
<kafka>
<enable_ssl_certificate_verification>false</enable_ssl_certificate_verification>
</kafka>
</clickhouse>

这些是专家设置,我们建议您参考 Kafka 文档以获取详细解释。