集成 Apache Beam 和 ClickHouse
Apache Beam 是一个开源的、统一的编程模型,使开发人员能够定义和执行批处理和流(连续)数据处理管道。Apache Beam 的灵活性在于其支持广泛的数据处理场景的能力,从 ETL(提取、转换、加载)操作到复杂事件处理和实时分析。此集成利用 ClickHouse 的官方 JDBC 连接器 作为底层插入层。
集成包
集成 Apache Beam 和 ClickHouse 所需的集成包在 Apache Beam I/O 连接器 下维护和开发 - 它是许多流行的数据存储系统和数据库的集成捆绑包。org.apache.beam.sdk.io.clickhouse.ClickHouseIO
实现在 Apache Beam 仓库 中。
Apache Beam ClickHouse 包的设置
包安装
将以下依赖项添加到您的包管理框架中
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-clickhouse</artifactId>
<version>${beam.version}</version>
</dependency>
推荐的 Beam 版本
建议从 Apache Beam 版本 2.59.0
开始使用 ClickHouseIO
连接器。 早期版本可能无法完全支持连接器的功能。
可以在 官方 Maven 仓库 中找到这些构件。
代码示例
以下示例读取名为 input.csv
的 CSV 文件作为 PCollection
,将其转换为 Row 对象(使用定义的模式),并使用 ClickHouseIO
将其插入到本地 ClickHouse 实例中
package org.example;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.clickhouse.ClickHouseIO;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
public class Main {
public static void main(String[] args) {
// Create a Pipeline object.
Pipeline p = Pipeline.create();
Schema SCHEMA =
Schema.builder()
.addField(Schema.Field.of("name", Schema.FieldType.STRING).withNullable(true))
.addField(Schema.Field.of("age", Schema.FieldType.INT16).withNullable(true))
.addField(Schema.Field.of("insertion_time", Schema.FieldType.DATETIME).withNullable(false))
.build();
// Apply transforms to the pipeline.
PCollection<String> lines = p.apply("ReadLines", TextIO.read().from("src/main/resources/input.csv"));
PCollection<Row> rows = lines.apply("ConvertToRow", ParDo.of(new DoFn<String, Row>() {
@ProcessElement
public void processElement(@Element String line, OutputReceiver<Row> out) {
String[] values = line.split(",");
Row row = Row.withSchema(SCHEMA)
.addValues(values[0], Short.parseShort(values[1]), DateTime.now())
.build();
out.output(row);
}
})).setRowSchema(SCHEMA);
rows.apply("Write to ClickHouse",
ClickHouseIO.write("jdbc:clickhouse://127.0.0.1:8123/default?user=default&password=******", "test_table"));
// Run the pipeline.
p.run().waitUntilFinish();
}
}
支持的数据类型
ClickHouse | Apache Beam | 是否支持 | 说明 |
---|---|---|---|
TableSchema.TypeName.FLOAT32 | Schema.TypeName#FLOAT | ✅ | |
TableSchema.TypeName.FLOAT64 | Schema.TypeName#DOUBLE | ✅ | |
TableSchema.TypeName.INT8 | Schema.TypeName#BYTE | ✅ | |
TableSchema.TypeName.INT16 | Schema.TypeName#INT16 | ✅ | |
TableSchema.TypeName.INT32 | Schema.TypeName#INT32 | ✅ | |
TableSchema.TypeName.INT64 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.STRING | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.UINT8 | Schema.TypeName#INT16 | ✅ | |
TableSchema.TypeName.UINT16 | Schema.TypeName#INT32 | ✅ | |
TableSchema.TypeName.UINT32 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.UINT64 | Schema.TypeName#INT64 | ✅ | |
TableSchema.TypeName.DATE | Schema.TypeName#DATETIME | ✅ | |
TableSchema.TypeName.DATETIME | Schema.TypeName#DATETIME | ✅ | |
TableSchema.TypeName.ARRAY | Schema.TypeName#ARRAY | ✅ | |
TableSchema.TypeName.ENUM8 | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.ENUM16 | Schema.TypeName#STRING | ✅ | |
TableSchema.TypeName.BOOL | Schema.TypeName#BOOLEAN | ✅ | |
TableSchema.TypeName.TUPLE | Schema.TypeName#ROW | ✅ | |
TableSchema.TypeName.FIXEDSTRING | FixedBytes | ✅ | FixedBytes 是一个 LogicalType ,表示固定长度的字节数组,位于 org.apache.beam.sdk.schemas.logicaltypes |
Schema.TypeName#DECIMAL | ❌ | ||
Schema.TypeName#MAP | ❌ |
限制
使用连接器时,请考虑以下限制
- 截至今天,仅支持 Sink 操作。 连接器不支持 Source 操作。
- 当插入到
ReplicatedMergeTree
或构建在ReplicatedMergeTree
之上的Distributed
表中时,ClickHouse 会执行重复数据删除。如果没有复制,如果插入失败然后成功重试,则插入到常规 MergeTree 中可能会导致重复。但是,每个块都是原子性插入的,并且可以使用ClickHouseIO.Write.withMaxInsertBlockSize(long)
配置块大小。重复数据删除是通过使用插入块的校验和来实现的。有关重复数据删除的更多信息,请访问 重复数据删除 和 重复数据删除插入配置。 - 连接器不执行任何 DDL 语句; 因此,目标表必须在插入之前存在。
相关内容
ClickHouseIO
类 文档。- 示例的
Github
仓库 clickhouse-beam-connector。