Confluent HTTP Sink 连接器
HTTP Sink 连接器与数据类型无关,因此不需要 Kafka 模式,并且支持 ClickHouse 特定的数据类型,例如 Maps 和 Arrays。这种额外的灵活性会稍微增加配置的复杂性。
下面我们将介绍一个简单的安装过程,从单个 Kafka 主题中提取消息并将行插入到 ClickHouse 表中。
HTTP 连接器在 Confluent 企业许可证 下分发。
快速入门步骤
1. 收集连接详细信息
要使用 HTTP(S) 连接到 ClickHouse,您需要以下信息
主机和端口:通常,使用 TLS 时端口为 8443,不使用 TLS 时端口为 8123。
数据库名称:默认情况下,有一个名为
default
的数据库,请使用您要连接到的数据库的名称。用户名和密码:默认情况下,用户名为
default
。请使用适合您用例的用户名。
您的 ClickHouse Cloud 服务的详细信息可在 ClickHouse Cloud 控制台中找到。选择您要连接的服务,然后点击**连接**
选择**HTTPS**,详细信息可在 curl
命令示例中找到。
如果您使用的是自托管 ClickHouse,则连接详细信息由您的 ClickHouse 管理员设置。
2. 运行 Kafka Connect 和 HTTP Sink 连接器
您有两个选择
自托管:下载 Confluent 软件包并在本地安装。按照此处记录的安装连接器的安装说明进行操作 此处。如果您使用 confluent-hub 安装方法,您的本地配置文件将更新。
Confluent Cloud:对于那些使用 Confluent Cloud 托管 Kafka 的用户,可以使用 HTTP Sink 的完全托管版本。这要求您的 ClickHouse 环境可从 Confluent Cloud 访问。
以下示例使用 Confluent Cloud。
3. 在 ClickHouse 中创建目标表
在进行连接测试之前,让我们先在 ClickHouse Cloud 中创建一个测试表,此表将接收来自 Kafka 的数据
CREATE TABLE default.my_table
(
`side` String,
`quantity` Int32,
`symbol` String,
`price` Int32,
`account` String,
`userid` String
)
ORDER BY tuple()
4. 配置 HTTP Sink
创建一个 Kafka 主题和一个 HTTP Sink 连接器实例
配置 HTTP Sink 连接器
- 提供您创建的主题名称
- 身份验证
HTTP Url
- ClickHouse Cloud URL,其中指定了INSERT
查询<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow
。**注意:**查询必须进行编码。Endpoint Authentication type
- BASICAuth username
- ClickHouse 用户名Auth password
- ClickHouse 密码
此 HTTP Url 易于出错。确保转义准确,以避免问题。
- 配置
Input Kafka record value format
取决于您的源数据,但在大多数情况下为 JSON 或 Avro。在以下设置中,我们假设为JSON
。- 在
高级配置
部分HTTP Request Method
- 设置为 POSTRequest Body Format
- jsonBatch batch size
- 根据 ClickHouse 的建议,将其设置为至少 1000。Batch json as array
- trueRetry on HTTP codes
- 400-500,但根据需要进行调整,例如,如果您在 ClickHouse 前面有一个 HTTP 代理,则此设置可能会更改。Maximum Reties
- 默认值 (10) 适用,但可以根据需要调整以实现更强大的重试。
5. 测试连接性
在 HTTP Sink 配置的主题中创建一条消息
并验证已创建的消息是否已写入您的 ClickHouse 实例。
故障排除
HTTP Sink 未批量处理消息
来自 Sink 文档
HTTP Sink 连接器不会为包含不同 Kafka 标头值的邮件批量处理请求。
- 验证您的 Kafka 记录是否具有相同的键。
- 当您向 HTTP API URL 添加参数时,每条记录都可能导致一个唯一的 URL。因此,在使用其他 URL 参数时,会禁用批量处理。
400 错误请求
CANNOT_PARSE_QUOTED_STRING
如果在将 JSON 对象插入 String
列时,HTTP Sink 出现以下消息
Code: 26. DB::ParsingException: Cannot parse JSON string: expected opening quote: (while reading the value of key key_name): While executing JSONEachRowRowInputFormat: (at row 1). (CANNOT_PARSE_QUOTED_STRING)
在 URL 中将 input_format_json_read_objects_as_strings=1
设置设置为编码字符串 SETTINGS%20input_format_json_read_objects_as_strings%3D1
加载 GitHub 数据集(可选)
请注意,此示例保留了 Github 数据集的数组字段。我们假设您在示例中有一个空的 github 主题,并使用 kcat 将消息插入 Kafka。
1. 准备配置
按照 这些说明 设置与您的安装类型相关的 Connect,注意独立集群和分布式集群之间的区别。如果使用 Confluent Cloud,则相关的设置是分布式设置。
最重要的参数是 http.api.url
。ClickHouse 的 HTTP 接口 要求您将 INSERT 语句作为 URL 中的参数进行编码。这必须包括格式(在本例中为 JSONEachRow
)和目标数据库。格式必须与 Kafka 数据一致,Kafka 数据将在 HTTP 有效负载中转换为字符串。这些参数必须进行 URL 编码。下面显示了 Github 数据集的此格式示例(假设您在本地运行 ClickHouse)
<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow
https://127.0.0.1:8123?query=INSERT%20INTO%20default.github%20FORMAT%20JSONEachRow
以下其他参数与将 HTTP Sink 与 ClickHouse 一起使用相关。完整的参数列表可以在 此处 找到
request.method
- 设置为**POST**retry.on.status.codes
- 设置为 400-500 以在任何错误代码上重试。根据数据中预期的错误进行调整。request.body.format
- 在大多数情况下,这将是 JSON。auth.type
- 如果您使用 ClickHouse 进行安全保护,则将其设置为 BASIC。目前不支持其他 ClickHouse 兼容的身份验证机制。ssl.enabled
- 如果使用 SSL,则设置为 true。connection.user
- ClickHouse 的用户名。connection.password
- ClickHouse 的密码。batch.max.size
- 单次批量发送的行数。确保将其设置为适当的大数值。根据 ClickHouse 的 建议,应将 1000 视为最小值。tasks.max
- HTTP Sink 连接器支持运行一个或多个任务。这可以用来提高性能。与批量大小一起,这是提高性能的主要方法。key.converter
- 根据键的类型进行设置。value.converter
- 根据主题上数据的类型进行设置。此数据不需要模式。此处的格式必须与参数http.api.url
中指定的 FORMAT 保持一致。最简单的方法是使用 JSON 和 org.apache.kafka.connect.json.JsonConverter 转换器。也可以将值作为字符串处理,通过转换器 org.apache.kafka.connect.storage.StringConverter,但这需要用户在插入语句中使用函数提取值。如果使用 io.confluent.connect.avro.AvroConverter 转换器,ClickHouse 也支持 Avro 格式。
完整的设置列表,包括如何配置代理、重试和高级 SSL,可以 在此处 找到。
Github 示例数据的示例配置文件可以 在此处 找到,假设 Connect 以独立模式运行,Kafka 托管在 Confluent Cloud 中。
2. 创建 ClickHouse 表
确保表已创建。下面显示了一个使用标准 MergeTree 的最小 github 数据集示例。
CREATE TABLE github
(
file_time DateTime,
event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4,'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
actor_login LowCardinality(String),
repo_name LowCardinality(String),
created_at DateTime,
updated_at DateTime,
action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
comment_id UInt64,
path String,
ref LowCardinality(String),
ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
creator_user_login LowCardinality(String),
number UInt32,
title String,
labels Array(LowCardinality(String)),
state Enum('none' = 0, 'open' = 1, 'closed' = 2),
assignee LowCardinality(String),
assignees Array(LowCardinality(String)),
closed_at DateTime,
merged_at DateTime,
merge_commit_sha String,
requested_reviewers Array(LowCardinality(String)),
merged_by LowCardinality(String),
review_comments UInt32,
member_login LowCardinality(String)
) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at)
3. 将数据添加到 Kafka
将消息插入 Kafka。下面我们使用 kcat 插入 10k 条消息。
head -n 10000 github_all_columns.ndjson | kcat -b <host>:<port> -X security.protocol=sasl_ssl -X sasl.mechanisms=PLAIN -X sasl.username=<username> -X sasl.password=<password> -t github
对目标表“Github”进行简单的读取应该可以确认数据的插入。
SELECT count() FROM default.github;
| count\(\) |
| :--- |
| 10000 |