跳至主要内容

JDBC 连接器

注意

此连接器仅应在您的数据很简单且由基本数据类型(例如 int)组成的情况下使用。不支持 ClickHouse 特定的类型,例如映射。

在我们的示例中,我们使用 Confluent 分发的 Kafka Connect。

以下是简单的安装说明,从单个 Kafka 主题中提取消息并将行插入 ClickHouse 表。我们推荐使用 Confluent Cloud,它为那些没有 Kafka 环境的用户提供慷慨的免费层。

请注意,JDBC 连接器需要模式(您不能将纯 JSON 或 CSV 与 JDBC 连接器一起使用)。虽然模式可以在每条消息中编码,但我们强烈建议使用 Confluent 模式注册表以避免相关开销。假定 Kafka 的键为字符串。有关 Kafka 模式 的更多详细信息,请点击这里

许可证

JDBC 连接器在Confluent 社区许可证下分发

步骤

收集您的连接详细信息

要使用 HTTP(S) 连接到 ClickHouse,您需要以下信息

  • 主机和端口:通常,在使用 TLS 时端口为 8443,在不使用 TLS 时端口为 8123。

  • 数据库名称:默认情况下,有一个名为 default 的数据库,请使用您要连接到的数据库的名称。

  • 用户名和密码:默认情况下,用户名为 default。请使用适合您的用例的用户名。

您的 ClickHouse Cloud 服务的详细信息可在 ClickHouse Cloud 控制台中找到。选择您要连接到的服务,然后单击 **连接**

ClickHouse Cloud service connect button

选择 **HTTPS**,详细信息将在示例 curl 命令中提供。

ClickHouse Cloud HTTPS connection details

如果您使用的是自托管 ClickHouse,则连接详细信息由您的 ClickHouse 管理员设置。

1. 安装 Kafka Connect 和连接器

我们假设您已下载 Confluent 包并将其安装在本地。按照此处记录的说明安装连接器。

如果您使用 confluent-hub 安装方法,您的本地配置文件将更新。

要将数据从 Kafka 发送到 ClickHouse,我们使用连接器的 Sink 组件。

2. 下载并安装 JDBC 驱动程序

此处下载并安装 ClickHouse JDBC 驱动程序 clickhouse-jdbc-<version>-shaded.jar。按照此处说明安装到 Kafka Connect 中。其他驱动程序可能可以使用,但尚未经过测试。

注意

常见问题:文档建议将 jar 复制到 share/java/kafka-connect-jdbc/。如果您在 Connect 查找驱动程序时遇到问题,请将驱动程序复制到 share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/。或者修改 plugin.path 以包含驱动程序 - 如下所示。

3. 准备配置

按照这些说明设置与您的安装类型相关的 Connect,注意独立集群和分布式集群之间的区别。如果使用 Confluent Cloud,则分布式设置适用。

以下参数与将 JDBC 连接器与 ClickHouse 一起使用相关。完整的参数列表可以在这里找到

  • _connection.url_ - 这应该采用以下形式:jdbc:clickhouse://&lt;clickhouse host>:&lt;clickhouse http port>/&lt;target database>
  • connection.user - 具有目标数据库写入权限的用户
  • table.name.format- 要插入数据的 ClickHouse 表。这必须存在。
  • batch.size - 要在单个批次中发送的行数。确保将其设置为适当的大值。根据 ClickHouse 的建议,1000 的值应被视为最小值。
  • tasks.max - JDBC Sink 连接器支持运行一个或多个任务。这可用于提高性能。与批次大小一起,它代表您提高性能的主要方法。
  • value.converter.schemas.enable - 如果使用模式注册表,则设置为 false,如果将模式嵌入消息中,则设置为 true。
  • value.converter - 根据您的数据类型设置,例如对于 JSON,则为“io.confluent.connect.json.JsonSchemaConverter”。
  • key.converter - 设置为“org.apache.kafka.connect.storage.StringConverter”。我们使用字符串键。
  • pk.mode - 与 ClickHouse 不相关。设置为 none。
  • auto.create - 不支持,必须为 false。
  • auto.evolve - 我们建议将此设置设置为 false,尽管它可能在将来得到支持。
  • insert.mode - 设置为“insert”。其他模式目前不支持。
  • key.converter - 根据您的键类型设置。
  • value.converter - 根据您主题上的数据类型设置。此数据必须具有受支持的模式 - JSON、Avro 或 Protobuf 格式。

如果您使用我们的示例数据集进行测试,请确保以下设置已设置

  • value.converter.schemas.enable - 设置为 false,因为我们使用模式注册表。如果将模式嵌入每条消息中,则设置为 true。
  • key.converter - 设置为“org.apache.kafka.connect.storage.StringConverter”。我们使用字符串键。
  • value.converter - 设置“io.confluent.connect.json.JsonSchemaConverter”。
  • value.converter.schema.registry.url - 设置为模式服务器 URL,以及通过参数 value.converter.schema.registry.basic.auth.user.info 设置模式服务器的凭据。

Github 示例数据的示例配置文件可以在这里找到,假设 Connect 以独立模式运行,Kafka 托管在 Confluent Cloud 中。

4. 创建 ClickHouse 表

确保表已创建,如果表已存在(来自之前的示例),请将其删除。与缩减的 Github 数据集兼容的示例如下所示。请注意没有当前不支持的任何数组或映射类型

CREATE TABLE github
(
file_time DateTime,
event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
actor_login LowCardinality(String),
repo_name LowCardinality(String),
created_at DateTime,
updated_at DateTime,
action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
comment_id UInt64,
path String,
ref LowCardinality(String),
ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
creator_user_login LowCardinality(String),
number UInt32,
title String,
state Enum('none' = 0, 'open' = 1, 'closed' = 2),
assignee LowCardinality(String),
closed_at DateTime,
merged_at DateTime,
merge_commit_sha String,
merged_by LowCardinality(String),
review_comments UInt32,
member_login LowCardinality(String)
) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at)

5. 启动 Kafka Connect

独立分布式模式启动 Kafka Connect。

./bin/connect-standalone connect.properties.ini github-jdbc-sink.properties.ini

6. 将数据添加到 Kafka

使用提供的脚本和配置将消息插入 Kafka。您需要修改 github.config 以包含您的 Kafka 凭据。该脚本当前配置为与 Confluent Cloud 一起使用。

python producer.py -c github.config

此脚本可用于将任何 ndjson 文件插入 Kafka 主题。这将尝试自动为您推断一个模式。提供的示例配置仅插入 10k 条消息 -在此处修改(如果需要)。此配置还在插入 Kafka 时从数据集中删除了所有不兼容的数组字段。

这是 JDBC 连接器将消息转换为 INSERT 语句所必需的。如果您使用的是自己的数据,请确保您在每条消息中插入一个模式(将 _value.converter.schemas.enable _设置为 true)或确保您的客户端发布引用模式注册表中的模式的消息。

Kafka Connect 应该开始消费消息并将行插入 ClickHouse。请注意,关于“[JDBC 兼容模式]不支持事务。”的警告是可以预期到的,可以忽略。

对目标表“Github”进行简单的读取应该可以确认数据插入。

SELECT count() FROM default.github;
| count\(\) |
| :--- |
| 10000 |