跳至主要内容

kafka-vector

使用 Vector 与 Kafka 和 ClickHouse

Vector 是一款与供应商无关的数据管道,能够从 Kafka 读取数据并将事件发送到 ClickHouse。

一个 入门 使用 Vector 与 ClickHouse 的指南侧重于日志用例和从文件读取事件。我们使用 Github 示例数据集,其中事件保存在 Kafka 主题上。

Vector 使用 通过推或拉模型检索数据。另一方面,接收器 提供事件的目标。因此,我们使用 Kafka 源和 ClickHouse 接收器。请注意,虽然 Kafka 作为接收器受支持,但 ClickHouse 源不可用。因此,Vector 不适合希望将数据从 ClickHouse 传输到 Kafka 的用户。

Vector 还支持数据的 转换。但这超出了本指南的范围。如果用户需要在数据集中使用此功能,请参考 Vector 文档。

请注意,ClickHouse 接收器的当前实现使用 HTTP 接口。ClickHouse 接收器目前不支持使用 JSON 架构。必须以纯 JSON 格式或字符串形式将数据发布到 Kafka。

许可证

Vector 在 MPL-2.0 许可证 下分发

收集您的连接详细信息

要使用 HTTP(S) 连接到 ClickHouse,您需要以下信息

  • HOST 和 PORT:通常,在使用 TLS 时端口为 8443,在不使用 TLS 时端口为 8123。

  • DATABASE NAME:默认情况下,存在一个名为 default 的数据库,请使用您要连接到的数据库的名称。

  • USERNAME 和 PASSWORD:默认情况下,用户名为 default。请使用适合您的用例的用户名。

您 ClickHouse Cloud 服务的详细信息可在 ClickHouse Cloud 控制台中找到。选择您要连接到的服务,然后单击 **连接**

ClickHouse Cloud service connect button

选择 **HTTPS**,详细信息将在示例 curl 命令中提供。

ClickHouse Cloud HTTPS connection details

如果您使用的是自托管的 ClickHouse,则连接详细信息由您的 ClickHouse 管理员设置。

步骤

  1. 创建 Kafka github 主题并插入 Github 数据集
cat /opt/data/github/github_all_columns.ndjson | kcat -b <host>:<port> -X security.protocol=sasl_ssl -X sasl.mechanisms=PLAIN -X sasl.username=<username> -X sasl.password=<password> -t github

此数据集包含 200,000 行,重点关注 ClickHouse/ClickHouse 存储库。

  1. 确保目标表已创建。下面我们使用默认数据库。

CREATE TABLE github
(
file_time DateTime,
event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4,
'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
actor_login LowCardinality(String),
repo_name LowCardinality(String),
created_at DateTime,
updated_at DateTime,
action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
comment_id UInt64,
path String,
ref LowCardinality(String),
ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
creator_user_login LowCardinality(String),
number UInt32,
title String,
labels Array(LowCardinality(String)),
state Enum('none' = 0, 'open' = 1, 'closed' = 2),
assignee LowCardinality(String),
assignees Array(LowCardinality(String)),
closed_at DateTime,
merged_at DateTime,
merge_commit_sha String,
requested_reviewers Array(LowCardinality(String)),
merged_by LowCardinality(String),
review_comments UInt32,
member_login LowCardinality(String)
) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at);

  1. 下载并安装 Vector。创建一个 kafka.toml 配置文件,并修改 Kafka 和 ClickHouse 实例的值。
[sources.github]
type = "kafka"
auto_offset_reset = "smallest"
bootstrap_servers = "<kafka_host>:<kafka_port>"
group_id = "vector"
topics = [ "github" ]
tls.enabled = true
sasl.enabled = true
sasl.mechanism = "PLAIN"
sasl.username = "<username>"
sasl.password = "<password>"
decoding.codec = "json"

[sinks.clickhouse]
type = "clickhouse"
inputs = ["github"]
endpoint = "https://127.0.0.1:8123"
database = "default"
table = "github"
skip_unknown_fields = true
auth.strategy = "basic"
auth.user = "username"
auth.password = "password"
buffer.max_events = 10000
batch.timeout_secs = 1

关于此配置和 Vector 行为的一些重要注意事项

  • 此示例已在 Confluent Cloud 上进行了测试。因此,sasl.*ssl.enabled 安全选项可能不适用于自托管情况。
  • 配置参数 bootstrap_servers 不需要协议前缀,例如 pkc-2396y.us-east-1.aws.confluent.cloud:9092
  • 源参数 decoding.codec = "json" 确保将消息作为单个 JSON 对象传递到 ClickHouse 接收器。如果将消息处理为字符串并使用默认的 bytes 值,则消息的内容将附加到 message 字段。在大多数情况下,这将需要在 ClickHouse 中进行处理,如 Vector 入门 指南中所述。
  • Vector 向消息添加了许多字段。在我们的示例中,我们通过配置参数 skip_unknown_fields = true 在 ClickHouse 接收器中忽略这些字段。这将忽略目标表架构中不存在的字段。您可以随意调整架构以确保添加这些元字段,例如 offset
  • 请注意接收器如何通过 inputs 参数引用事件源。
  • 请注意,ClickHouse 接收器的行为如 此处 所述。为了获得最佳吞吐量,用户可能希望调整 buffer.max_eventsbatch.timeout_secsbatch.max_bytes 参数。根据 ClickHouse 的 建议,任何单个批次中的事件数应至少为 1000。对于统一的高吞吐量用例,用户可以增加参数 buffer.max_events。更可变的吞吐量可能需要更改参数 batch.timeout_secs
  • 参数 auto_offset_reset = "smallest" 强制 Kafka 源从主题的开头开始 - 因此确保我们消耗在步骤 (1) 中发布的消息。用户可能需要不同的行为。请参阅 此处 以了解详细信息。
  1. 启动 Vector
vector --config ./kafka.toml

默认情况下,在开始向 ClickHouse 插入数据之前,需要进行 健康检查。这将确保可以建立连接并读取架构。在遇到问题时,添加 VECTOR_LOG=debug 可能有助于获取更多日志。

  1. 确认数据的插入。
SELECT count() as count FROM github;
count
200000