跳至主要内容

将 Apache Spark 与 ClickHouse 集成

Apache Spark Apache Spark™ 是一个多语言引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习。

有两种主要方法可以连接 Apache Spark 和 ClickHouse

  1. Spark 连接器 - Spark 连接器实现了 DataSourceV2 并拥有自己的目录管理。截至今天,这是集成 ClickHouse 和 Spark 的推荐方法。
  2. Spark JDBC - 使用 JDBC 数据源 集成 Spark 和 ClickHouse。

Spark 连接器

此连接器利用 ClickHouse 特定的优化,例如高级分区和谓词下推,以提高查询性能和数据处理能力。该连接器基于 ClickHouse 的官方 JDBC 连接器,并管理其自己的目录。

要求

  • Java 8 或 17
  • Scala 2.12 或 2.13
  • Apache Spark 3.3 或 3.4 或 3.5

兼容性矩阵

版本兼容的 Spark 版本ClickHouse JDBC 版本
主分支Spark 3.3、3.4、3.50.6.3
0.8.0Spark 3.3、3.4、3.50.6.3
0.7.3Spark 3.3、3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2、3.30.3.2-patch11
0.4.0Spark 3.2、3.3不依赖
0.3.0Spark 3.2、3.3不依赖
0.2.1Spark 3.2不依赖
0.1.2Spark 3.2不依赖

下载库

二进制 JAR 的名称模式为

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

您可以在 Maven 中央存储库 中找到所有可用的已发布 JAR,以及在 Sonatype OSS 快照存储库 中找到所有每日构建的 SNAPSHOT JAR。

作为依赖项导入

Gradle

dependencies {
implementation("com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}")
implementation("com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all") { transitive = false }
}

如果您想使用 SNAPSHOT 版本,请添加以下存储库

repositries {
maven { url = "https://s01.oss.sonatype.org/content/repositories/snapshots" }
}

Maven

<dependency>
<groupId>com.clickhouse.spark</groupId>
<artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
<version>{{ stable_version }}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<classifier>all</classifier>
<version>{{ clickhouse_jdbc_version }}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

如果您想使用 SNAPSHOT 版本,请添加以下存储库。

<repositories>
<repository>
<id>sonatype-oss-snapshots</id>
<name>Sonatype OSS Snapshots Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</repository>
</repositories>

使用 Spark SQL

注意:对于仅限 SQL 的用例,建议在生产环境中使用 Apache Kyuubi

启动 Spark SQL CLI

$SPARK_HOME/bin/spark-sql \
--conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \
--conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
--conf spark.sql.catalog.clickhouse.protocol=http \
--conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
--conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
--conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
--conf spark.sql.catalog.clickhouse.database=default \
--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar

以下参数

  --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar

可以替换为

  --repositories https://{maven-cental-mirror or private-nexus-repo} \
--packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all

以避免将 JAR 复制到 Spark 客户端节点。

操作

基本操作,例如创建数据库、创建表、写入表、读取表等。

spark-sql> use clickhouse;
Time taken: 0.016 seconds

spark-sql> create database if not exists test_db;
Time taken: 0.022 seconds

spark-sql> show databases;
default
system
test_db
Time taken: 0.289 seconds, Fetched 3 row(s)

spark-sql> CREATE TABLE test_db.tbl_sql (
> create_time TIMESTAMP NOT NULL,
> m INT NOT NULL COMMENT 'part key',
> id BIGINT NOT NULL COMMENT 'sort key',
> value STRING
> ) USING ClickHouse
> PARTITIONED BY (m)
> TBLPROPERTIES (
> engine = 'MergeTree()',
> order_by = 'id',
> settings.index_granularity = 8192
> );
Time taken: 0.242 seconds

spark-sql> insert into test_db.tbl_sql values
> (timestamp'2021-01-01 10:10:10', 1, 1L, '1'),
> (timestamp'2022-02-02 10:10:10', 2, 2L, '2')
> as tabl(create_time, m, id, value);
Time taken: 0.276 seconds

spark-sql> select * from test_db.tbl_sql;
2021-01-01 10:10:10 1 1 1
2022-02-02 10:10:10 2 2 2
Time taken: 0.116 seconds, Fetched 2 row(s)

spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
Time taken: 1.028 seconds

spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
Time taken: 0.462 seconds

spark-sql> select count(*) from test_db.tbl_sql;
6
Time taken: 1.421 seconds, Fetched 1 row(s)

spark-sql> select * from test_db.tbl_sql;
2021-01-01 10:10:10 1 1 1
2021-01-01 10:10:10 1 1 1
2021-01-01 10:10:10 1 1 1
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
Time taken: 0.123 seconds, Fetched 6 row(s)

spark-sql> delete from test_db.tbl_sql where id = 1;
Time taken: 0.129 seconds

spark-sql> select * from test_db.tbl_sql;
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
Time taken: 0.101 seconds, Fetched 3 row(s)

使用 Spark Shell

启动 Spark Shell

$SPARK_HOME/bin/spark-shell \
--conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \
--conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
--conf spark.sql.catalog.clickhouse.protocol=http \
--conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
--conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
--conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
--conf spark.sql.catalog.clickhouse.database=default \
--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar

以下参数

  --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar

可以替换为

  --repositories https://{maven-cental-mirror or private-nexus-repo} \
--packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all

以避免将 JAR 复制到 Spark 客户端节点。

操作

基本操作,例如创建数据库、创建表、写入表、读取表等。

scala> spark.sql("use clickhouse")
res0: org.apache.spark.sql.DataFrame = []

scala> spark.sql("create database test_db")
res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql("show databases").show
+---------+
|namespace|
+---------+
| default|
| system|
| test_db|
+---------+

scala> spark.sql("""
| CREATE TABLE test_db.tbl (
| create_time TIMESTAMP NOT NULL,
| m INT NOT NULL COMMENT 'part key',
| id BIGINT NOT NULL COMMENT 'sort key',
| value STRING
| ) USING ClickHouse
| PARTITIONED BY (m)
| TBLPROPERTIES (
| engine = 'MergeTree()',
| order_by = 'id',
| settings.index_granularity = 8192
| )
| """)
res2: org.apache.spark.sql.DataFrame = []

scala> :paste
// Entering paste mode (ctrl-D to finish)

spark.createDataFrame(Seq(
("2021-01-01 10:10:10", 1L, "1"),
("2022-02-02 10:10:10", 2L, "2")
)).toDF("create_time", "id", "value")
.withColumn("create_time", to_timestamp($"create_time"))
.withColumn("m", month($"create_time"))
.select($"create_time", $"m", $"id", $"value")
.writeTo("test_db.tbl")
.append

// Exiting paste mode, now interpreting.

scala> spark.table("test_db.tbl").show
+-------------------+---+---+-----+
| create_time| m| id|value|
+-------------------+---+---+-----+
|2021-01-01 10:10:10| 1| 1| 1|
|2022-02-02 10:10:10| 2| 2| 2|
+-------------------+---+---+-----+

scala> spark.sql("DELETE FROM test_db.tbl WHERE id=1")
res3: org.apache.spark.sql.DataFrame = []

scala> spark.table("test_db.tbl").show
+-------------------+---+---+-----+
| create_time| m| id|value|
+-------------------+---+---+-----+
|2022-02-02 10:10:10| 2| 2| 2|
+-------------------+---+---+-----+

执行 ClickHouse 原生 SQL。

scala> val options = Map(
| "host" -> "clickhouse",
| "protocol" -> "http",
| "http_port" -> "8123",
| "user" -> "default",
| "password" -> ""
| )

scala> val sql = """
| |CREATE TABLE test_db.person (
| | id Int64,
| | name String,
| | age Nullable(Int32)
| |)
| |ENGINE = MergeTree()
| |ORDER BY id
| """.stripMargin

scala> spark.executeCommand("com.clickhouse.spark.ClickHouseCommandRunner", sql, options)

scala> spark.sql("show tables in clickhouse_s1r1.test_db").show
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| test_db| person| false|
+---------+---------+-----------+

scala> spark.table("clickhouse_s1r1.test_db.person").printSchema
root
|-- id: long (nullable = false)
|-- name: string (nullable = false)
|-- age: integer (nullable = true)

支持的数据类型

本节概述了 Spark 和 ClickHouse 之间数据类型的映射。下表提供了在从 ClickHouse 读取数据到 Spark 以及在将数据从 Spark 插入到 ClickHouse 时转换数据类型的快速参考。

从 ClickHouse 读取数据到 Spark

ClickHouse 数据类型Spark 数据类型支持是否为基本类型备注
NullType
BoolBooleanType
UInt8Int16ShortType
Int8ByteType
UInt16Int32IntegerType
UInt32Int64UInt64LongType
Int128UInt128Int256UInt256DecimalType(38, 0)
Float32FloatType
Float64DoubleType
StringJSONUUIDEnum8Enum16IPv4IPv6StringType
FixedStringBinaryTypeStringType由配置 READ_FIXED_STRING_AS 控制
DecimalDecimalType精度和比例最高可达 Decimal128
Decimal32DecimalType(9, scale)
Decimal64DecimalType(18, scale)
Decimal128DecimalType(38, scale)
DateDate32DateType
DateTimeDateTime32DateTime64TimestampType
ArrayArrayType数组元素类型也会被转换
MapMapType键限于 StringType
IntervalYearYearMonthIntervalType(Year)
IntervalMonthYearMonthIntervalType(Month)
IntervalDayIntervalHourIntervalMinuteIntervalSecondDayTimeIntervalType使用特定的间隔类型
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

将数据从 Spark 插入到 ClickHouse

Spark 数据类型ClickHouse 数据类型支持是否为基本类型备注
BooleanTypeUInt8
ByteTypeInt8
ShortTypeInt16
IntegerTypeInt32
LongTypeInt64
FloatTypeFloat32
DoubleTypeFloat64
StringTypeString
VarcharTypeString
CharTypeString
DecimalTypeDecimal(p, s)精度和比例最高可达 Decimal128
DateTypeDate
TimestampTypeDateTime
ArrayType(列表、元组或数组)Array数组元素类型也会被转换
MapTypeMap键限于 StringType
Object
Nested

Spark JDBC

Spark 支持的常用数据源之一是 JDBC。在本节中,我们将详细介绍如何将 ClickHouse 官方 JDBC 连接器 与 Spark 一起使用。

读取数据

public static void main(String[] args) {
// Initialize Spark session
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

// JDBC connection details
String jdbcUrl = "jdbc:ch://127.0.0.1:8123/default";
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");

// Load the table from ClickHouse
Dataset<Row> df = spark.read().jdbc(jdbcUrl, "example_table", jdbcProperties);

// Show the DataFrame
df.show();

// Stop the Spark session
spark.stop();
}

写入数据

信息

截至今天,您只能使用 JDBC 将数据插入到现有表中。

    public static void main(String[] args) {
// Initialize Spark session
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

// JDBC connection details
String jdbcUrl = "jdbc:ch://127.0.0.1:8123/default";
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "******");
// Create a sample DataFrame
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false)
});

List<Row> rows = new ArrayList<Row>();
rows.add(RowFactory.create(1, "John"));
rows.add(RowFactory.create(2, "Doe"));

Dataset<Row> df = spark.createDataFrame(rows, schema);

df.write()
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "my_table", jdbcProperties);
// Show the DataFrame
df.show();

// Stop the Spark session
spark.stop();
}
信息

使用 Spark JDBC 时,Spark 使用单个分区读取数据。为了实现更高的并发性,您必须指定 partitionColumnlowerBoundupperBoundnumPartitions,这些描述了如何在并行读取来自多个工作程序时对表进行分区。有关 JDBC 配置 的更多信息,请访问 Apache Spark 的官方文档。