跳至主要内容

Confluent HTTP Sink 连接器

HTTP Sink 连接器与数据类型无关,因此不需要 Kafka 模式,并且支持 ClickHouse 特定的数据类型,例如 Maps 和 Arrays。这种额外的灵活性会稍微增加配置的复杂性。

下面我们将介绍一个简单的安装过程,从单个 Kafka 主题中提取消息并将行插入到 ClickHouse 表中。

注意

HTTP 连接器在 Confluent 企业许可证 下分发。

快速入门步骤

1. 收集连接详细信息

要使用 HTTP(S) 连接到 ClickHouse,您需要以下信息

  • 主机和端口:通常,使用 TLS 时端口为 8443,不使用 TLS 时端口为 8123。

  • 数据库名称:默认情况下,有一个名为 default 的数据库,请使用您要连接到的数据库的名称。

  • 用户名和密码:默认情况下,用户名为 default。请使用适合您用例的用户名。

您的 ClickHouse Cloud 服务的详细信息可在 ClickHouse Cloud 控制台中找到。选择您要连接的服务,然后点击**连接**

ClickHouse Cloud service connect button

选择**HTTPS**,详细信息可在 curl 命令示例中找到。

ClickHouse Cloud HTTPS connection details

如果您使用的是自托管 ClickHouse,则连接详细信息由您的 ClickHouse 管理员设置。

2. 运行 Kafka Connect 和 HTTP Sink 连接器

您有两个选择

  • 自托管:下载 Confluent 软件包并在本地安装。按照此处记录的安装连接器的安装说明进行操作 此处。如果您使用 confluent-hub 安装方法,您的本地配置文件将更新。

  • Confluent Cloud:对于那些使用 Confluent Cloud 托管 Kafka 的用户,可以使用 HTTP Sink 的完全托管版本。这要求您的 ClickHouse 环境可从 Confluent Cloud 访问。

注意

以下示例使用 Confluent Cloud。

3. 在 ClickHouse 中创建目标表

在进行连接测试之前,让我们先在 ClickHouse Cloud 中创建一个测试表,此表将接收来自 Kafka 的数据

CREATE TABLE default.my_table
(
`side` String,
`quantity` Int32,
`symbol` String,
`price` Int32,
`account` String,
`userid` String
)
ORDER BY tuple()

4. 配置 HTTP Sink

创建一个 Kafka 主题和一个 HTTP Sink 连接器实例

Create HTTP Sink

配置 HTTP Sink 连接器

  • 提供您创建的主题名称
  • 身份验证
    • HTTP Url - ClickHouse Cloud URL,其中指定了 INSERT 查询 <protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow。**注意:**查询必须进行编码。
    • Endpoint Authentication type - BASIC
    • Auth username - ClickHouse 用户名
    • Auth password - ClickHouse 密码
注意

此 HTTP Url 易于出错。确保转义准确,以避免问题。

Auth options for Confluent HTTP Sink
  • 配置
    • Input Kafka record value format取决于您的源数据,但在大多数情况下为 JSON 或 Avro。在以下设置中,我们假设为 JSON
    • 高级配置 部分
      • HTTP Request Method - 设置为 POST
      • Request Body Format - json
      • Batch batch size - 根据 ClickHouse 的建议,将其设置为至少 1000
      • Batch json as array - true
      • Retry on HTTP codes - 400-500,但根据需要进行调整,例如,如果您在 ClickHouse 前面有一个 HTTP 代理,则此设置可能会更改。
      • Maximum Reties - 默认值 (10) 适用,但可以根据需要调整以实现更强大的重试。
Advanced options for Confluent HTTP Sink

5. 测试连接性

在 HTTP Sink 配置的主题中创建一条消息

Create a message in the topic

并验证已创建的消息是否已写入您的 ClickHouse 实例。

故障排除

HTTP Sink 未批量处理消息

来自 Sink 文档

HTTP Sink 连接器不会为包含不同 Kafka 标头值的邮件批量处理请求。

  1. 验证您的 Kafka 记录是否具有相同的键。
  2. 当您向 HTTP API URL 添加参数时,每条记录都可能导致一个唯一的 URL。因此,在使用其他 URL 参数时,会禁用批量处理。

400 错误请求

CANNOT_PARSE_QUOTED_STRING

如果在将 JSON 对象插入 String 列时,HTTP Sink 出现以下消息

Code: 26. DB::ParsingException: Cannot parse JSON string: expected opening quote: (while reading the value of key key_name): While executing JSONEachRowRowInputFormat: (at row 1). (CANNOT_PARSE_QUOTED_STRING)

在 URL 中将 input_format_json_read_objects_as_strings=1 设置设置为编码字符串 SETTINGS%20input_format_json_read_objects_as_strings%3D1

加载 GitHub 数据集(可选)

请注意,此示例保留了 Github 数据集的数组字段。我们假设您在示例中有一个空的 github 主题,并使用 kcat 将消息插入 Kafka。

1. 准备配置

按照 这些说明 设置与您的安装类型相关的 Connect,注意独立集群和分布式集群之间的区别。如果使用 Confluent Cloud,则相关的设置是分布式设置。

最重要的参数是 http.api.url。ClickHouse 的 HTTP 接口 要求您将 INSERT 语句作为 URL 中的参数进行编码。这必须包括格式(在本例中为 JSONEachRow)和目标数据库。格式必须与 Kafka 数据一致,Kafka 数据将在 HTTP 有效负载中转换为字符串。这些参数必须进行 URL 编码。下面显示了 Github 数据集的此格式示例(假设您在本地运行 ClickHouse)

<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow

https://127.0.0.1:8123?query=INSERT%20INTO%20default.github%20FORMAT%20JSONEachRow

以下其他参数与将 HTTP Sink 与 ClickHouse 一起使用相关。完整的参数列表可以在 此处 找到

  • request.method - 设置为**POST**
  • retry.on.status.codes - 设置为 400-500 以在任何错误代码上重试。根据数据中预期的错误进行调整。
  • request.body.format - 在大多数情况下,这将是 JSON。
  • auth.type - 如果您使用 ClickHouse 进行安全保护,则将其设置为 BASIC。目前不支持其他 ClickHouse 兼容的身份验证机制。
  • ssl.enabled - 如果使用 SSL,则设置为 true。
  • connection.user - ClickHouse 的用户名。
  • connection.password - ClickHouse 的密码。
  • batch.max.size - 单次批量发送的行数。确保将其设置为适当的大数值。根据 ClickHouse 的 建议,应将 1000 视为最小值。
  • tasks.max - HTTP Sink 连接器支持运行一个或多个任务。这可以用来提高性能。与批量大小一起,这是提高性能的主要方法。
  • key.converter - 根据键的类型进行设置。
  • value.converter - 根据主题上数据的类型进行设置。此数据不需要模式。此处的格式必须与参数 http.api.url 中指定的 FORMAT 保持一致。最简单的方法是使用 JSON 和 org.apache.kafka.connect.json.JsonConverter 转换器。也可以将值作为字符串处理,通过转换器 org.apache.kafka.connect.storage.StringConverter,但这需要用户在插入语句中使用函数提取值。如果使用 io.confluent.connect.avro.AvroConverter 转换器,ClickHouse 也支持 Avro 格式

完整的设置列表,包括如何配置代理、重试和高级 SSL,可以 在此处 找到。

Github 示例数据的示例配置文件可以 在此处 找到,假设 Connect 以独立模式运行,Kafka 托管在 Confluent Cloud 中。

2. 创建 ClickHouse 表

确保表已创建。下面显示了一个使用标准 MergeTree 的最小 github 数据集示例。

CREATE TABLE github
(
file_time DateTime,
event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4,'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
actor_login LowCardinality(String),
repo_name LowCardinality(String),
created_at DateTime,
updated_at DateTime,
action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
comment_id UInt64,
path String,
ref LowCardinality(String),
ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
creator_user_login LowCardinality(String),
number UInt32,
title String,
labels Array(LowCardinality(String)),
state Enum('none' = 0, 'open' = 1, 'closed' = 2),
assignee LowCardinality(String),
assignees Array(LowCardinality(String)),
closed_at DateTime,
merged_at DateTime,
merge_commit_sha String,
requested_reviewers Array(LowCardinality(String)),
merged_by LowCardinality(String),
review_comments UInt32,
member_login LowCardinality(String)
) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at)

3. 将数据添加到 Kafka

将消息插入 Kafka。下面我们使用 kcat 插入 10k 条消息。

head -n 10000 github_all_columns.ndjson | kcat -b <host>:<port> -X security.protocol=sasl_ssl -X sasl.mechanisms=PLAIN -X sasl.username=<username>  -X sasl.password=<password> -t github

对目标表“Github”进行简单的读取应该可以确认数据的插入。

SELECT count() FROM default.github;

| count\(\) |
| :--- |
| 10000 |