将 Apache NiFi 连接到 ClickHouse
Apache NiFi 是一款开源工作流管理软件,旨在自动化软件系统之间的数据流。它允许创建 ETL 数据管道,并附带 300 多个数据处理器。本分步教程展示了如何将 Apache NiFi 连接到 ClickHouse 作为源和目标,以及如何加载示例数据集。1. 收集您的连接详细信息
要使用 HTTP(S) 连接到 ClickHouse,您需要以下信息
HOST 和 PORT:通常,使用 TLS 时端口为 8443,不使用 TLS 时端口为 8123。
DATABASE NAME:默认情况下,有一个名为
default
的数据库,请使用您要连接到的数据库的名称。USERNAME 和 PASSWORD:默认情况下,用户名为
default
。请使用适合您的使用场景的用户名。
您的 ClickHouse Cloud 服务的详细信息可在 ClickHouse Cloud 控制台中找到。选择您要连接的服务,然后点击 连接
选择 HTTPS,详细信息将在示例 curl
命令中提供。
如果您使用的是自管理 ClickHouse,连接详细信息由您的 ClickHouse 管理员设置。
2. 下载并运行 Apache NiFi
- 对于新的安装,请从 https://nifi.apache.org/download.html 下载二进制文件,并通过运行
./bin/nifi.sh start
启动。
3. 下载 ClickHouse JDBC 驱动程序
- 访问 GitHub 上的 ClickHouse JDBC 驱动程序发行页面,并查找最新的 JDBC 发行版本
- 在发行版本中,点击“显示所有 xx 资产”,并查找包含“shaded”或“all”关键字的 JAR 文件,例如
clickhouse-jdbc-0.5.0-all.jar
- 将 JAR 文件放在 Apache NiFi 可以访问的文件夹中,并记下绝对路径
4. 添加 DBCPConnectionPool 控制器服务并配置其属性
要在 Apache NiFi 中配置控制器服务,请点击“齿轮”按钮访问 NiFi 流程配置页面
选择“控制器服务”选项卡,然后点击右上角的
+
按钮添加新的控制器服务搜索
DBCPConnectionPool
,然后点击“添加”按钮默认情况下,新添加的 DBCPConnectionPool 将处于无效状态。点击“齿轮”按钮开始配置
在“属性”部分中,输入以下值
属性 值 备注 数据库连接 URL jdbc:ch:https://HOSTNAME:8443/default?ssl=true 根据情况替换连接 URL 中的 HOSTNAME 数据库驱动程序类名 com.clickhouse.jdbc.ClickHouseDriver 数据库驱动程序位置 /etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jar ClickHouse JDBC 驱动程序 JAR 文件的绝对路径 数据库用户 default ClickHouse 用户名 密码 password ClickHouse 密码 在“设置”部分中,将控制器服务的名称更改为“ClickHouse JDBC”,以便于参考
通过点击“闪电”按钮,然后点击“启用”按钮来激活 DBCPConnectionPool 控制器服务
检查“控制器服务”选项卡,并确保控制器服务已启用
5. 使用 ExecuteSQL 处理器从表中读取数据
添加 ExecuteSQL 处理器,以及适当的上游和下游处理器
在 ExecuteSQL 处理器的“属性”部分中,输入以下值
属性 值 备注 数据库连接池服务 ClickHouse JDBC 选择为 ClickHouse 配置的控制器服务 SQL select 查询 SELECT * FROM system.metrics 在此输入您的查询 启动 ExecuteSQL 处理器
要确认查询已成功处理,请检查输出队列中的某个 FlowFile
切换视图到“格式化”,以查看输出 FlowFile 的结果
6. 使用 MergeRecord 和 PutDatabaseRecord 处理器写入表
要将多行写入单个插入操作,我们需要首先将多个记录合并为单个记录。这可以通过使用 MergeRecord 处理器来完成
在 MergeRecord 处理器的“属性”部分中,输入以下值
属性 值 备注 记录读取器 JSONTreeReader 选择合适的记录读取器 记录写入器 JSONReadSetWriter 选择合适的记录写入器 最小记录数 1000 将其更改为更大的数字,以便合并的最小行数形成单个记录。默认设置为 1 行 最大记录数 10000 将其更改为大于“最小记录数”的数字。默认设置为 1,000 行 要确认多个记录已合并为一个,请检查 MergeRecord 处理器的输入和输出。请注意,输出是多个输入记录的数组
输入
输出
在 PutDatabaseRecord 处理器的“属性”部分中,输入以下值
属性 值 备注 记录读取器 JSONTreeReader 选择合适的记录读取器 数据库类型 通用 保持默认值 语句类型 INSERT 数据库连接池服务 ClickHouse JDBC 选择 ClickHouse 控制器服务 表名 tbl 在此输入您的表名 翻译字段名称 false 设置为“false”,以便插入的字段名称必须与列名匹配 最大批次大小 1000 每次插入操作的最大行数。此值不应低于 MergeRecord 处理器中定义的“最小记录数”的值 要确认每次插入操作都包含多行,请检查表中的行数是否至少按 MergeRecord 中定义的“最小记录数”的值递增。
恭喜 - 您已成功使用 Apache Nifi 将数据加载到 ClickHouse 中!