跳转到主要内容
跳转到主要内容

连接 Apache NiFi 到 ClickHouse

Apache NiFi 是一款开源工作流管理软件,旨在自动化软件系统之间的数据流。它允许创建 ETL 数据管道,并附带 300 多个数据处理器。本分步教程展示了如何将 Apache NiFi 连接到 ClickHouse 作为源和目标,并加载示例数据集。

1. 收集您的连接详细信息

要使用 HTTP(S) 连接到 ClickHouse,您需要以下信息

  • HOST 和 PORT:通常,使用 TLS 时端口为 8443,不使用 TLS 时端口为 8123。

  • 数据库名称:开箱即用,有一个名为 default 的数据库,请使用您要连接的数据库的名称。

  • 用户名和密码:开箱即用,用户名是 default。请使用适合您用例的用户名。

您的 ClickHouse Cloud 服务的详细信息可在 ClickHouse Cloud 控制台中找到。选择您要连接的服务,然后单击连接

ClickHouse Cloud service connect button

选择 HTTPS,详细信息将在 curl 命令示例中提供。

ClickHouse Cloud HTTPS connection details

如果您使用的是自托管 ClickHouse,则连接详细信息由您的 ClickHouse 管理员设置。

2. 下载并运行 Apache NiFi

  1. 对于新安装,请从 https://nifi.apache.org/download.html 下载二进制文件,并通过运行 ./bin/nifi.sh start 启动

3. 下载 ClickHouse JDBC 驱动程序

  1. 访问 GitHub 上的 ClickHouse JDBC 驱动程序发布页面,查找最新的 JDBC 发布版本
  2. 在发布版本中,单击“显示所有 xx 个资产”,查找包含关键字“shaded”或“all”的 JAR 文件,例如 clickhouse-jdbc-0.5.0-all.jar
  3. 将 JAR 文件放在 Apache NiFi 可以访问的文件夹中,并记下绝对路径

4. 添加 DBCPConnectionPool 控制器服务并配置其属性

  1. 要在 Apache NiFi 中配置控制器服务,请单击“齿轮”按钮访问 NiFi 流配置页面

    NiFi Flow Configuration
  2. 选择“控制器服务”选项卡,然后单击右上角的 + 按钮添加新的控制器服务

    Add Controller Service
  3. 搜索 DBCPConnectionPool,然后单击“添加”按钮

    Search for `DBCPConnectionPool`
  4. 新添加的 DBCPConnectionPool 默认将处于“无效”状态。单击“齿轮”按钮开始配置

    NiFi Flow Configuration
  5. 在“属性”部分下,输入以下值

属性备注
数据库连接 URLjdbc:ch:https://HOSTNAME:8443/default?ssl=true相应地替换连接 URL 中的 HOSTNAME
数据库驱动程序类名com.clickhouse.jdbc.ClickHouseDriver
数据库驱动程序位置/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarClickHouse JDBC 驱动程序 JAR 文件的绝对路径
数据库用户defaultClickHouse 用户名
密码passwordClickHouse 密码
  1. 在“设置”部分中,将控制器服务的名称更改为“ClickHouse JDBC”,以便于参考

    NiFi Flow Configuration
  2. 单击“闪电”按钮,然后单击“启用”按钮,激活 DBCPConnectionPool 控制器服务

    NiFi Flow Configuration
    NiFi Flow Configuration
  3. 检查“控制器服务”选项卡,确保控制器服务已启用

    NiFi Flow Configuration

5. 使用 ExecuteSQL 处理器从表中读取数据

  1. 添加 ExecuteSQL 处理器,以及适当的上游和下游处理器

    ​​`ExecuteSQL` processor
  2. ExecuteSQL 处理器的“属性”部分下,输入以下值

    属性备注
    数据库连接池服务ClickHouse JDBC选择为 ClickHouse 配置的控制器服务
    SQL select 查询SELECT * FROM system.metrics在此处输入您的查询
  3. 启动 ExecuteSQL 处理器

    `​​ExecuteSQL` processor
  4. 要确认查询已成功处理,请检查输出队列中的一个 FlowFile

    ​​`ExecuteSQL` processor
  5. 切换到“格式化”视图以查看输出 FlowFile 的结果

    `​​ExecuteSQL` processor

6. 使用 MergeRecordPutDatabaseRecord 处理器写入表

  1. 要在单个插入中写入多行,我们首先需要将多个记录合并为一个记录。这可以使用 MergeRecord 处理器完成

  2. MergeRecord 处理器的“属性”部分下,输入以下值

    属性备注
    记录读取器JSONTreeReader选择适当的记录读取器
    记录写入器JSONReadSetWriter选择适当的记录写入器
    最少记录数1000将其更改为更高的数字,以便将最少行数合并以形成单个记录。默认为 1 行
    最多记录数10000将其更改为高于“最少记录数”的数字。默认为 1,000 行
  3. 要确认多个记录已合并为一个,请检查 MergeRecord 处理器的输入和输出。请注意,输出是多个输入记录的数组

    输入

    ​`​ExecuteSQL` processor

    输出

    ​​`ExecuteSQL` processor
  4. PutDatabaseRecord 处理器的“属性”部分下,输入以下值

    属性备注
    记录读取器JSONTreeReader选择适当的记录读取器
    数据库类型通用保持默认
    语句类型INSERT
    数据库连接池服务ClickHouse JDBC选择 ClickHouse 控制器服务
    表名tbl在此处输入您的表名
    翻译字段名称false设置为“false”,以便插入的字段名称必须与列名匹配
    最大批次大小1000每次插入的最大行数。此值不应低于 MergeRecord 处理器中“最少记录数”的值
  5. 要确认每次插入包含多行,请检查表中的行数是否至少按 MergeRecord 中定义的“最少记录数”的值递增。

    `​​ExecuteSQL` processor
  6. 恭喜 - 您已使用 Apache NiFi 成功将数据加载到 ClickHouse 中!