跳到主要内容
跳到主要内容
编辑此页

JDBC 连接器

注意

此连接器仅应在您的数据简单且由原始数据类型(例如 int)组成时使用。不支持 ClickHouse 特定的类型,例如 map。

在我们的示例中,我们使用 Confluent 发行版的 Kafka Connect。

下面我们描述一个简单的安装,从单个 Kafka 主题拉取消息并将行插入到 ClickHouse 表中。我们推荐 Confluent Cloud,它为那些没有 Kafka 环境的用户提供了慷慨的免费层。

请注意,JDBC 连接器需要模式(您不能将纯 JSON 或 CSV 与 JDBC 连接器一起使用)。虽然模式可以编码在每条消息中;但强烈建议使用 Confluent 模式注册表以避免相关的开销。提供的插入脚本会自动从消息中推断模式并将其插入到注册表中 - 因此,此脚本可以重复用于其他数据集。Kafka 的键假定为字符串。有关 Kafka 模式的更多详细信息,请点击此处

许可证

JDBC 连接器根据 Confluent Community License 分发。

步骤

收集您的连接详细信息

要使用 HTTP(S) 连接到 ClickHouse,您需要以下信息

  • 主机和端口:通常,使用 TLS 时端口为 8443,不使用 TLS 时端口为 8123。

  • 数据库名称:开箱即用,有一个名为 default 的数据库,请使用您要连接的数据库的名称。

  • 用户名和密码:开箱即用,用户名为 default。请使用适合您用例的用户名。

您的 ClickHouse Cloud 服务的详细信息可在 ClickHouse Cloud 控制台中找到。选择您要连接的服务,然后单击“连接

ClickHouse Cloud service connect button

选择 HTTPS,详细信息将在示例 curl 命令中提供。

ClickHouse Cloud HTTPS connection details

如果您使用的是自托管 ClickHouse,则连接详细信息由您的 ClickHouse 管理员设置。

1. 安装 Kafka Connect 和连接器

我们假设您已下载 Confluent 软件包并在本地安装了它。请按照此处的文档说明安装连接器的安装说明进行操作。

如果您使用 confluent-hub 安装方法,您的本地配置文件将被更新。

为了从 Kafka 向 ClickHouse 发送数据,我们使用连接器的 Sink 组件。

2. 下载并安装 JDBC 驱动程序

此处下载并安装 ClickHouse JDBC 驱动程序 clickhouse-jdbc-<version>-shaded.jar。按照此处的详细信息将其安装到 Kafka Connect 中。其他驱动程序可能有效,但尚未经过测试。

注意

常见问题:文档建议将 jar 复制到 share/java/kafka-connect-jdbc/。如果您在 Connect 查找驱动程序时遇到问题,请将驱动程序复制到 share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/。或者修改 plugin.path 以包含驱动程序 - 请参见下文。

3. 准备配置

请按照这些说明设置与您的安装类型相关的 Connect,注意独立集群和分布式集群之间的差异。如果使用 Confluent Cloud,则分布式设置是相关的。

以下参数与将 JDBC 连接器与 ClickHouse 结合使用有关。完整的参数列表可以在此处找到

  • _connection.url_ - 这应采用 jdbc:clickhouse://&lt;clickhouse host>:&lt;clickhouse http port>/&lt;target database> 的形式
  • connection.user - 具有目标数据库写入权限的用户
  • table.name.format - 要插入数据的 ClickHouse 表。此表必须存在。
  • batch.size - 在单个批处理中发送的行数。确保此设置设置为适当的大数字。根据 ClickHouse 建议,值 1000 应被视为最小值。
  • tasks.max - JDBC Sink 连接器支持运行一个或多个任务。这可以用来提高性能。与批处理大小一起,这代表了您提高性能的主要手段。
  • value.converter.schemas.enable - 如果使用模式注册表,则设置为 false;如果您将模式嵌入到消息中,则设置为 true。
  • value.converter - 根据您的数据类型进行设置,例如对于 JSON,设置为 io.confluent.connect.json.JsonSchemaConverter
  • key.converter - 设置为 org.apache.kafka.connect.storage.StringConverter。我们使用字符串键。
  • pk.mode - 与 ClickHouse 无关。设置为 none。
  • auto.create - 不支持,必须为 false。
  • auto.evolve - 我们建议将此设置设为 false,尽管将来可能会支持它。
  • insert.mode - 设置为“insert”。当前不支持其他模式。
  • key.converter - 根据您的键的类型进行设置。
  • value.converter - 根据您的主题上的数据类型进行设置。此数据必须具有受支持的模式 - JSON、Avro 或 Protobuf 格式。

如果使用我们的示例数据集进行测试,请确保设置以下内容

  • value.converter.schemas.enable - 设置为 false,因为我们使用模式注册表。如果您将模式嵌入到每条消息中,则设置为 true。
  • key.converter - 设置为 "org.apache.kafka.connect.storage.StringConverter"。我们使用字符串键。
  • value.converter - 设置为 "io.confluent.connect.json.JsonSchemaConverter"。
  • value.converter.schema.registry.url - 设置为模式服务器 URL 以及通过参数 value.converter.schema.registry.basic.auth.user.info 获取的模式服务器凭据。

Github 示例数据的示例配置文件可以在此处找到,假设 Connect 在独立模式下运行,并且 Kafka 托管在 Confluent Cloud 中。

4. 创建 ClickHouse 表

确保已创建表,如果之前的示例中已存在该表,则删除它。下面显示了一个与缩减的 Github 数据集兼容的示例。注意,当前不支持任何 Array 或 Map 类型

CREATE TABLE github
(
file_time DateTime,
event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22),
actor_login LowCardinality(String),
repo_name LowCardinality(String),
created_at DateTime,
updated_at DateTime,
action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20),
comment_id UInt64,
path String,
ref LowCardinality(String),
ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4),
creator_user_login LowCardinality(String),
number UInt32,
title String,
state Enum('none' = 0, 'open' = 1, 'closed' = 2),
assignee LowCardinality(String),
closed_at DateTime,
merged_at DateTime,
merge_commit_sha String,
merged_by LowCardinality(String),
review_comments UInt32,
member_login LowCardinality(String)
) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at)

5. 启动 Kafka Connect

独立分布式模式下启动 Kafka Connect。

./bin/connect-standalone connect.properties.ini github-jdbc-sink.properties.ini

6. 向 Kafka 添加数据

使用提供的脚本和配置将消息插入到 Kafka 中。您需要修改 github.config 以包含您的 Kafka 凭据。该脚本当前配置为与 Confluent Cloud 一起使用。

python producer.py -c github.config

此脚本可用于将任何 ndjson 文件插入到 Kafka 主题中。这将尝试自动为您推断模式。提供的示例配置仅插入 1 万条消息 - 如果需要,请在此处修改。此配置还在插入到 Kafka 期间从数据集中删除任何不兼容的 Array 字段。

这是 JDBC 连接器将消息转换为 INSERT 语句所必需的。如果您使用的是自己的数据,请确保您要么在每条消息中插入模式(将 _value.converter.schemas.enable _设置为 true),要么确保您的客户端发布引用注册表中模式的消息。

Kafka Connect 应开始消费消息并将行插入到 ClickHouse 中。请注意,有关“[JDBC 兼容模式] 不支持事务。”的警告是预期的,可以忽略。

对目标表“Github”进行简单读取应确认数据插入。

SELECT count() FROM default.github;
| count\(\) |
| :--- |
| 10000 |