将 Apache Spark 与 ClickHouse 集成
Apache Spark Apache Spark™ 是一个多语言引擎,用于在单节点机器或集群上执行数据工程、数据科学和机器学习。
有两种主要方法可以连接 Apache Spark 和 ClickHouse
- Spark 连接器 - Spark 连接器实现了
DataSourceV2
并拥有自己的目录管理。截至今天,这是集成 ClickHouse 和 Spark 的推荐方法。 - Spark JDBC - 使用 JDBC 数据源 集成 Spark 和 ClickHouse。
Spark 连接器
此连接器利用 ClickHouse 特定的优化,例如高级分区和谓词下推,以提高查询性能和数据处理能力。该连接器基于 ClickHouse 的官方 JDBC 连接器,并管理其自己的目录。
要求
- Java 8 或 17
- Scala 2.12 或 2.13
- Apache Spark 3.3 或 3.4 或 3.5
兼容性矩阵
版本 | 兼容的 Spark 版本 | ClickHouse JDBC 版本 |
---|---|---|
主分支 | Spark 3.3、3.4、3.5 | 0.6.3 |
0.8.0 | Spark 3.3、3.4、3.5 | 0.6.3 |
0.7.3 | Spark 3.3、3.4 | 0.4.6 |
0.6.0 | Spark 3.3 | 0.3.2-patch11 |
0.5.0 | Spark 3.2、3.3 | 0.3.2-patch11 |
0.4.0 | Spark 3.2、3.3 | 不依赖 |
0.3.0 | Spark 3.2、3.3 | 不依赖 |
0.2.1 | Spark 3.2 | 不依赖 |
0.1.2 | Spark 3.2 | 不依赖 |
下载库
二进制 JAR 的名称模式为
clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar
您可以在 Maven 中央存储库 中找到所有可用的已发布 JAR,以及在 Sonatype OSS 快照存储库 中找到所有每日构建的 SNAPSHOT JAR。
作为依赖项导入
Gradle
dependencies {
implementation("com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}")
implementation("com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all") { transitive = false }
}
如果您想使用 SNAPSHOT 版本,请添加以下存储库
repositries {
maven { url = "https://s01.oss.sonatype.org/content/repositories/snapshots" }
}
Maven
<dependency>
<groupId>com.clickhouse.spark</groupId>
<artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
<version>{{ stable_version }}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<classifier>all</classifier>
<version>{{ clickhouse_jdbc_version }}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
如果您想使用 SNAPSHOT 版本,请添加以下存储库。
<repositories>
<repository>
<id>sonatype-oss-snapshots</id>
<name>Sonatype OSS Snapshots Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</repository>
</repositories>
使用 Spark SQL
注意:对于仅限 SQL 的用例,建议在生产环境中使用 Apache Kyuubi。
启动 Spark SQL CLI
$SPARK_HOME/bin/spark-sql \
--conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \
--conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
--conf spark.sql.catalog.clickhouse.protocol=http \
--conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
--conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
--conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
--conf spark.sql.catalog.clickhouse.database=default \
--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar
以下参数
--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar
可以替换为
--repositories https://{maven-cental-mirror or private-nexus-repo} \
--packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all
以避免将 JAR 复制到 Spark 客户端节点。
操作
基本操作,例如创建数据库、创建表、写入表、读取表等。
spark-sql> use clickhouse;
Time taken: 0.016 seconds
spark-sql> create database if not exists test_db;
Time taken: 0.022 seconds
spark-sql> show databases;
default
system
test_db
Time taken: 0.289 seconds, Fetched 3 row(s)
spark-sql> CREATE TABLE test_db.tbl_sql (
> create_time TIMESTAMP NOT NULL,
> m INT NOT NULL COMMENT 'part key',
> id BIGINT NOT NULL COMMENT 'sort key',
> value STRING
> ) USING ClickHouse
> PARTITIONED BY (m)
> TBLPROPERTIES (
> engine = 'MergeTree()',
> order_by = 'id',
> settings.index_granularity = 8192
> );
Time taken: 0.242 seconds
spark-sql> insert into test_db.tbl_sql values
> (timestamp'2021-01-01 10:10:10', 1, 1L, '1'),
> (timestamp'2022-02-02 10:10:10', 2, 2L, '2')
> as tabl(create_time, m, id, value);
Time taken: 0.276 seconds
spark-sql> select * from test_db.tbl_sql;
2021-01-01 10:10:10 1 1 1
2022-02-02 10:10:10 2 2 2
Time taken: 0.116 seconds, Fetched 2 row(s)
spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
Time taken: 1.028 seconds
spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
Time taken: 0.462 seconds
spark-sql> select count(*) from test_db.tbl_sql;
6
Time taken: 1.421 seconds, Fetched 1 row(s)
spark-sql> select * from test_db.tbl_sql;
2021-01-01 10:10:10 1 1 1
2021-01-01 10:10:10 1 1 1
2021-01-01 10:10:10 1 1 1
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
Time taken: 0.123 seconds, Fetched 6 row(s)
spark-sql> delete from test_db.tbl_sql where id = 1;
Time taken: 0.129 seconds
spark-sql> select * from test_db.tbl_sql;
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
Time taken: 0.101 seconds, Fetched 3 row(s)
使用 Spark Shell
启动 Spark Shell
$SPARK_HOME/bin/spark-shell \
--conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \
--conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
--conf spark.sql.catalog.clickhouse.protocol=http \
--conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
--conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
--conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
--conf spark.sql.catalog.clickhouse.database=default \
--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar
以下参数
--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar
可以替换为
--repositories https://{maven-cental-mirror or private-nexus-repo} \
--packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all
以避免将 JAR 复制到 Spark 客户端节点。
操作
基本操作,例如创建数据库、创建表、写入表、读取表等。
scala> spark.sql("use clickhouse")
res0: org.apache.spark.sql.DataFrame = []
scala> spark.sql("create database test_db")
res1: org.apache.spark.sql.DataFrame = []
scala> spark.sql("show databases").show
+---------+
|namespace|
+---------+
| default|
| system|
| test_db|
+---------+
scala> spark.sql("""
| CREATE TABLE test_db.tbl (
| create_time TIMESTAMP NOT NULL,
| m INT NOT NULL COMMENT 'part key',
| id BIGINT NOT NULL COMMENT 'sort key',
| value STRING
| ) USING ClickHouse
| PARTITIONED BY (m)
| TBLPROPERTIES (
| engine = 'MergeTree()',
| order_by = 'id',
| settings.index_granularity = 8192
| )
| """)
res2: org.apache.spark.sql.DataFrame = []
scala> :paste
// Entering paste mode (ctrl-D to finish)
spark.createDataFrame(Seq(
("2021-01-01 10:10:10", 1L, "1"),
("2022-02-02 10:10:10", 2L, "2")
)).toDF("create_time", "id", "value")
.withColumn("create_time", to_timestamp($"create_time"))
.withColumn("m", month($"create_time"))
.select($"create_time", $"m", $"id", $"value")
.writeTo("test_db.tbl")
.append
// Exiting paste mode, now interpreting.
scala> spark.table("test_db.tbl").show
+-------------------+---+---+-----+
| create_time| m| id|value|
+-------------------+---+---+-----+
|2021-01-01 10:10:10| 1| 1| 1|
|2022-02-02 10:10:10| 2| 2| 2|
+-------------------+---+---+-----+
scala> spark.sql("DELETE FROM test_db.tbl WHERE id=1")
res3: org.apache.spark.sql.DataFrame = []
scala> spark.table("test_db.tbl").show
+-------------------+---+---+-----+
| create_time| m| id|value|
+-------------------+---+---+-----+
|2022-02-02 10:10:10| 2| 2| 2|
+-------------------+---+---+-----+
执行 ClickHouse 原生 SQL。
scala> val options = Map(
| "host" -> "clickhouse",
| "protocol" -> "http",
| "http_port" -> "8123",
| "user" -> "default",
| "password" -> ""
| )
scala> val sql = """
| |CREATE TABLE test_db.person (
| | id Int64,
| | name String,
| | age Nullable(Int32)
| |)
| |ENGINE = MergeTree()
| |ORDER BY id
| """.stripMargin
scala> spark.executeCommand("com.clickhouse.spark.ClickHouseCommandRunner", sql, options)
scala> spark.sql("show tables in clickhouse_s1r1.test_db").show
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| test_db| person| false|
+---------+---------+-----------+
scala> spark.table("clickhouse_s1r1.test_db.person").printSchema
root
|-- id: long (nullable = false)
|-- name: string (nullable = false)
|-- age: integer (nullable = true)
支持的数据类型
本节概述了 Spark 和 ClickHouse 之间数据类型的映射。下表提供了在从 ClickHouse 读取数据到 Spark 以及在将数据从 Spark 插入到 ClickHouse 时转换数据类型的快速参考。
从 ClickHouse 读取数据到 Spark
ClickHouse 数据类型 | Spark 数据类型 | 支持 | 是否为基本类型 | 备注 |
---|---|---|---|---|
无 | NullType | ✅ | 是 | |
Bool | BooleanType | ✅ | 是 | |
UInt8 、Int16 | ShortType | ✅ | 是 | |
Int8 | ByteType | ✅ | 是 | |
UInt16 、Int32 | IntegerType | ✅ | 是 | |
UInt32 、Int64 、UInt64 | LongType | ✅ | 是 | |
Int128 、UInt128 、Int256 、UInt256 | DecimalType(38, 0) | ✅ | 是 | |
Float32 | FloatType | ✅ | 是 | |
Float64 | DoubleType | ✅ | 是 | |
String 、JSON 、UUID 、Enum8 、Enum16 、IPv4 、IPv6 | StringType | ✅ | 是 | |
FixedString | BinaryType 、StringType | ✅ | 是 | 由配置 READ_FIXED_STRING_AS 控制 |
Decimal | DecimalType | ✅ | 是 | 精度和比例最高可达 Decimal128 |
Decimal32 | DecimalType(9, scale) | ✅ | 是 | |
Decimal64 | DecimalType(18, scale) | ✅ | 是 | |
Decimal128 | DecimalType(38, scale) | ✅ | 是 | |
Date 、Date32 | DateType | ✅ | 是 | |
DateTime 、DateTime32 、DateTime64 | TimestampType | ✅ | 是 | |
Array | ArrayType | ✅ | 否 | 数组元素类型也会被转换 |
Map | MapType | ✅ | 否 | 键限于 StringType |
IntervalYear | YearMonthIntervalType(Year) | ✅ | 是 | |
IntervalMonth | YearMonthIntervalType(Month) | ✅ | 是 | |
IntervalDay 、IntervalHour 、IntervalMinute 、IntervalSecond | DayTimeIntervalType | ✅ | 否 | 使用特定的间隔类型 |
Object | ❌ | |||
Nested | ❌ | |||
Tuple | ❌ | |||
Point | ❌ | |||
Polygon | ❌ | |||
MultiPolygon | ❌ | |||
Ring | ❌ | |||
IntervalQuarter | ❌ | |||
IntervalWeek | ❌ | |||
Decimal256 | ❌ | |||
AggregateFunction | ❌ | |||
SimpleAggregateFunction | ❌ |
将数据从 Spark 插入到 ClickHouse
Spark 数据类型 | ClickHouse 数据类型 | 支持 | 是否为基本类型 | 备注 |
---|---|---|---|---|
BooleanType | UInt8 | ✅ | 是 | |
ByteType | Int8 | ✅ | 是 | |
ShortType | Int16 | ✅ | 是 | |
IntegerType | Int32 | ✅ | 是 | |
LongType | Int64 | ✅ | 是 | |
FloatType | Float32 | ✅ | 是 | |
DoubleType | Float64 | ✅ | 是 | |
StringType | String | ✅ | 是 | |
VarcharType | String | ✅ | 是 | |
CharType | String | ✅ | 是 | |
DecimalType | Decimal(p, s) | ✅ | 是 | 精度和比例最高可达 Decimal128 |
DateType | Date | ✅ | 是 | |
TimestampType | DateTime | ✅ | 是 | |
ArrayType (列表、元组或数组) | Array | ✅ | 否 | 数组元素类型也会被转换 |
MapType | Map | ✅ | 否 | 键限于 StringType |
Object | ❌ | |||
Nested | ❌ |
Spark JDBC
Spark 支持的常用数据源之一是 JDBC。在本节中,我们将详细介绍如何将 ClickHouse 官方 JDBC 连接器 与 Spark 一起使用。
读取数据
public static void main(String[] args) {
// Initialize Spark session
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
// JDBC connection details
String jdbcUrl = "jdbc:ch://127.0.0.1:8123/default";
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");
// Load the table from ClickHouse
Dataset<Row> df = spark.read().jdbc(jdbcUrl, "example_table", jdbcProperties);
// Show the DataFrame
df.show();
// Stop the Spark session
spark.stop();
}
写入数据
截至今天,您只能使用 JDBC 将数据插入到现有表中。
public static void main(String[] args) {
// Initialize Spark session
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
// JDBC connection details
String jdbcUrl = "jdbc:ch://127.0.0.1:8123/default";
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "******");
// Create a sample DataFrame
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false)
});
List<Row> rows = new ArrayList<Row>();
rows.add(RowFactory.create(1, "John"));
rows.add(RowFactory.create(2, "Doe"));
Dataset<Row> df = spark.createDataFrame(rows, schema);
df.write()
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "my_table", jdbcProperties);
// Show the DataFrame
df.show();
// Stop the Spark session
spark.stop();
}
使用 Spark JDBC 时,Spark 使用单个分区读取数据。为了实现更高的并发性,您必须指定 partitionColumn
、lowerBound
、upperBound
和 numPartitions
,这些描述了如何在并行读取来自多个工作程序时对表进行分区。有关 JDBC 配置 的更多信息,请访问 Apache Spark 的官方文档。