跳过至主要内容

将 Apache NiFi 连接到 ClickHouse

Apache NiFi 是一款开源工作流管理软件,旨在自动化软件系统之间的数据流。它允许创建 ETL 数据管道,并附带 300 多个数据处理器。本分步教程展示了如何将 Apache NiFi 连接到 ClickHouse 作为源和目标,以及如何加载示例数据集。

1. 收集您的连接详细信息

要使用 HTTP(S) 连接到 ClickHouse,您需要以下信息

  • HOST 和 PORT:通常,使用 TLS 时端口为 8443,不使用 TLS 时端口为 8123。

  • DATABASE NAME:默认情况下,有一个名为 default 的数据库,请使用您要连接到的数据库的名称。

  • USERNAME 和 PASSWORD:默认情况下,用户名为 default。请使用适合您的使用场景的用户名。

您的 ClickHouse Cloud 服务的详细信息可在 ClickHouse Cloud 控制台中找到。选择您要连接的服务,然后点击 连接

ClickHouse Cloud service connect button

选择 HTTPS,详细信息将在示例 curl 命令中提供。

ClickHouse Cloud HTTPS connection details

如果您使用的是自管理 ClickHouse,连接详细信息由您的 ClickHouse 管理员设置。

2. 下载并运行 Apache NiFi

  1. 对于新的安装,请从 https://nifi.apache.org/download.html 下载二进制文件,并通过运行 ./bin/nifi.sh start 启动。

3. 下载 ClickHouse JDBC 驱动程序

  1. 访问 GitHub 上的 ClickHouse JDBC 驱动程序发行页面,并查找最新的 JDBC 发行版本
  2. 在发行版本中,点击“显示所有 xx 资产”,并查找包含“shaded”或“all”关键字的 JAR 文件,例如 clickhouse-jdbc-0.5.0-all.jar
  3. 将 JAR 文件放在 Apache NiFi 可以访问的文件夹中,并记下绝对路径

4. 添加 DBCPConnectionPool 控制器服务并配置其属性

  1. 要在 Apache NiFi 中配置控制器服务,请点击“齿轮”按钮访问 NiFi 流程配置页面

    Nifi Flow Configuration
  2. 选择“控制器服务”选项卡,然后点击右上角的 + 按钮添加新的控制器服务

    Add Controller Service
  3. 搜索 DBCPConnectionPool,然后点击“添加”按钮

    Search for DBCPConnectionPool
  4. 默认情况下,新添加的 DBCPConnectionPool 将处于无效状态。点击“齿轮”按钮开始配置

    Nifi Flow Configuration
  5. 在“属性”部分中,输入以下值

    属性备注
    数据库连接 URLjdbc:ch:https://HOSTNAME:8443/default?ssl=true根据情况替换连接 URL 中的 HOSTNAME
    数据库驱动程序类名com.clickhouse.jdbc.ClickHouseDriver
    数据库驱动程序位置/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarClickHouse JDBC 驱动程序 JAR 文件的绝对路径
    数据库用户defaultClickHouse 用户名
    密码passwordClickHouse 密码
  6. 在“设置”部分中,将控制器服务的名称更改为“ClickHouse JDBC”,以便于参考

    Nifi Flow Configuration
  7. 通过点击“闪电”按钮,然后点击“启用”按钮来激活 DBCPConnectionPool 控制器服务

    Nifi Flow Configuration
    Nifi Flow Configuration
  8. 检查“控制器服务”选项卡,并确保控制器服务已启用

    Nifi Flow Configuration

5. 使用 ExecuteSQL 处理器从表中读取数据

  1. 添加 ​​ExecuteSQL 处理器,以及适当的上游和下游处理器

    ​​ExecuteSQL processor
  2. 在 ​​ExecuteSQL 处理器的“属性”部分中,输入以下值

    属性备注
    数据库连接池服务ClickHouse JDBC选择为 ClickHouse 配置的控制器服务
    SQL select 查询SELECT * FROM system.metrics在此输入您的查询
  3. 启动 ​​ExecuteSQL 处理器

    ​​ExecuteSQL processor
  4. 要确认查询已成功处理,请检查输出队列中的某个 FlowFile

    ​​ExecuteSQL processor
  5. 切换视图到“格式化”,以查看输出 FlowFile 的结果

    ​​ExecuteSQL processor

6. 使用 MergeRecord 和 PutDatabaseRecord 处理器写入表

  1. 要将多行写入单个插入操作,我们需要首先将多个记录合并为单个记录。这可以通过使用 MergeRecord 处理器来完成

  2. 在 MergeRecord 处理器的“属性”部分中,输入以下值

    属性备注
    记录读取器JSONTreeReader选择合适的记录读取器
    记录写入器JSONReadSetWriter选择合适的记录写入器
    最小记录数1000将其更改为更大的数字,以便合并的最小行数形成单个记录。默认设置为 1 行
    最大记录数10000将其更改为大于“最小记录数”的数字。默认设置为 1,000 行
  3. 要确认多个记录已合并为一个,请检查 MergeRecord 处理器的输入和输出。请注意,输出是多个输入记录的数组

    输入

    ​​ExecuteSQL processor

    输出

    ​​ExecuteSQL processor
  4. 在 PutDatabaseRecord 处理器的“属性”部分中,输入以下值

    属性备注
    记录读取器JSONTreeReader选择合适的记录读取器
    数据库类型通用保持默认值
    语句类型INSERT
    数据库连接池服务ClickHouse JDBC选择 ClickHouse 控制器服务
    表名tbl在此输入您的表名
    翻译字段名称false设置为“false”,以便插入的字段名称必须与列名匹配
    最大批次大小1000每次插入操作的最大行数。此值不应低于 MergeRecord 处理器中定义的“最小记录数”的值
  5. 要确认每次插入操作都包含多行,请检查表中的行数是否至少按 MergeRecord 中定义的“最小记录数”的值递增。

    ​​ExecuteSQL processor
  6. 恭喜 - 您已成功使用 Apache Nifi 将数据加载到 ClickHouse 中!