跳到主要内容
跳到主要内容
编辑此页

kafka-vector

将 Vector 与 Kafka 和 ClickHouse 结合使用

Vector 是一个供应商中立的数据管道,能够从 Kafka 读取数据并将事件发送到 ClickHouse。

Vector 与 ClickHouse 的入门指南侧重于日志使用场景和从文件读取事件。我们使用Github 示例数据集,其中事件保存在 Kafka 主题中。

Vector 利用通过推送或拉取模型检索数据。Sink 同时为事件提供目标。因此,我们使用 Kafka 源和 ClickHouse sink。请注意,虽然 Kafka 支持作为 Sink,但 ClickHouse 源不可用。因此,Vector 不适合希望将数据从 ClickHouse 传输到 Kafka 的用户。

Vector 还支持数据的转换。这超出了本指南的范围。如果用户需要在其数据集上执行此操作,请参阅 Vector 文档。

请注意,ClickHouse sink 的当前实现使用 HTTP 接口。ClickHouse sink 目前不支持使用 JSON 模式。数据必须以纯 JSON 格式或字符串形式发布到 Kafka。

许可证

Vector 在 MPL-2.0 许可证下分发

收集您的连接详细信息

要使用 HTTP(S) 连接到 ClickHouse,您需要以下信息

  • HOST 和 PORT:通常,使用 TLS 时端口为 8443,不使用 TLS 时端口为 8123。

  • 数据库名称:开箱即用,有一个名为 default 的数据库,使用您要连接的数据库的名称。

  • 用户名和密码:开箱即用,用户名是 default。使用适合您用例的用户名。

您的 ClickHouse Cloud 服务的详细信息在 ClickHouse Cloud 控制台中可用。选择您要连接的服务,然后单击连接

ClickHouse Cloud service connect button

选择 HTTPS,详细信息在示例 curl 命令中可用。

ClickHouse Cloud HTTPS connection details

如果您使用自托管 ClickHouse,则连接详细信息由您的 ClickHouse 管理员设置。

步骤

  1. 创建 Kafka github 主题并插入 Github 数据集
cat /opt/data/github/github_all_columns.ndjson | kcat -b <host>:<port> -X security.protocol=sasl_ssl -X sasl.mechanisms=PLAIN -X sasl.username=<username> -X sasl.password=<password> -t github

此数据集包含 200,000 行,重点关注 ClickHouse/ClickHouse 存储库。

  1. 确保已创建目标表。下面我们使用默认数据库。

CREATE TABLE github
(
file_time DateTime,
event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4,
'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
actor_login LowCardinality(String),
repo_name LowCardinality(String),
created_at DateTime,
updated_at DateTime,
action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
comment_id UInt64,
path String,
ref LowCardinality(String),
ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
creator_user_login LowCardinality(String),
number UInt32,
title String,
labels Array(LowCardinality(String)),
state Enum('none' = 0, 'open' = 1, 'closed' = 2),
assignee LowCardinality(String),
assignees Array(LowCardinality(String)),
closed_at DateTime,
merged_at DateTime,
merge_commit_sha String,
requested_reviewers Array(LowCardinality(String)),
merged_by LowCardinality(String),
review_comments UInt32,
member_login LowCardinality(String)
) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at);

  1. 下载并安装 Vector。创建一个 kafka.toml 配置文件,并修改 Kafka 和 ClickHouse 实例的值。
[sources.github]
type = "kafka"
auto_offset_reset = "smallest"
bootstrap_servers = "<kafka_host>:<kafka_port>"
group_id = "vector"
topics = [ "github" ]
tls.enabled = true
sasl.enabled = true
sasl.mechanism = "PLAIN"
sasl.username = "<username>"
sasl.password = "<password>"
decoding.codec = "json"

[sinks.clickhouse]
type = "clickhouse"
inputs = ["github"]
endpoint = "https://127.0.0.1:8123"
database = "default"
table = "github"
skip_unknown_fields = true
auth.strategy = "basic"
auth.user = "username"
auth.password = "password"
buffer.max_events = 10000
batch.timeout_secs = 1

关于此配置和 Vector 行为的一些重要说明

  • 此示例已针对 Confluent Cloud 进行过测试。因此,sasl.*ssl.enabled 安全选项可能不适用于自托管情况。
  • 配置参数 bootstrap_servers 不需要协议前缀,例如 pkc-2396y.us-east-1.aws.confluent.cloud:9092
  • 源参数 decoding.codec = "json" 确保消息作为单个 JSON 对象传递到 ClickHouse sink。如果将消息作为字符串处理并使用默认的 bytes 值,则消息的内容将附加到字段 message。在大多数情况下,这将需要在 ClickHouse 中进行处理,如Vector 入门指南中所述。
  • Vector 向消息添加了许多字段。在我们的示例中,我们通过配置参数 skip_unknown_fields = true 在 ClickHouse sink 中忽略这些字段。这将忽略不属于目标表架构的字段。您可以随意调整架构以确保添加诸如 offset 之类的元字段。
  • 请注意 sink 如何通过参数 inputs 引用事件源。
  • 请注意此处描述的 ClickHouse sink 的行为。为了获得最佳吞吐量,用户可能希望调整 buffer.max_eventsbatch.timeout_secsbatch.max_bytes 参数。根据 ClickHouse 建议,对于任何单个批次中的事件数,应考虑的最小值为 1000。对于统一的高吞吐量用例,用户可以增加参数 buffer.max_events。更多可变的吞吐量可能需要更改参数 batch.timeout_secs
  • 参数 auto_offset_reset = "smallest" 强制 Kafka 源从主题的开头开始 - 从而确保我们消费在步骤 (1) 中发布的消息。用户可能需要不同的行为。有关更多详细信息,请参阅此处
  1. 启动 Vector
vector --config ./kafka.toml

默认情况下,在开始插入到 ClickHouse 之前,需要进行健康检查。这确保可以建立连接并读取架构。在前面添加 VECTOR_LOG=debug 以获得更多日志记录,这在您遇到问题时可能会有所帮助。

  1. 确认数据插入。
SELECT count() as count FROM github;
计数
200000