跳到主要内容
跳到主要内容
编辑此页面

Spark 连接器

此连接器利用 ClickHouse 特定的优化,例如高级分区和谓词下推,以提高查询性能和数据处理能力。该连接器基于 ClickHouse 的官方 JDBC 连接器,并管理其自己的目录。

在 Spark 3.0 之前,Spark 缺少内置目录概念,因此用户通常依赖于外部目录系统,例如 Hive Metastore 或 AWS Glue。使用这些外部解决方案,用户必须先手动注册其数据源表,然后才能在 Spark 中访问它们。但是,由于 Spark 3.0 引入了目录概念,Spark 现在可以通过注册目录插件来自动发现表。

Spark 默认目录是 spark_catalog,表由 {catalog name}.{database}.{table} 标识。借助新的目录功能,现在可以在单个 Spark 应用程序中添加和使用多个目录。

要求

  • Java 8 或 17
  • Scala 2.12 或 2.13
  • Apache Spark 3.3 或 3.4 或 3.5

兼容性矩阵

版本兼容的 Spark 版本ClickHouse JDBC 版本
主分支Spark 3.3、3.4、3.50.6.3
0.8.1Spark 3.3、3.4、3.50.6.3
0.8.0Spark 3.3、3.4、3.50.6.3
0.7.3Spark 3.3、3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2、3.30.3.2-patch11
0.4.0Spark 3.2、3.3不依赖于
0.3.0Spark 3.2、3.3不依赖于
0.2.1Spark 3.2不依赖于
0.1.2Spark 3.2不依赖于

安装与设置

为了将 ClickHouse 与 Spark 集成,有多种安装选项以适应不同的项目设置。您可以将 ClickHouse Spark 连接器作为依赖项直接添加到项目的构建文件(例如 Maven 的 pom.xml 或 SBT 的 build.sbt)中。或者,您可以将所需的 JAR 文件放在 $SPARK_HOME/jars/ 文件夹中,或者使用 spark-submit 命令中的 --jars 标志直接将其作为 Spark 选项传递。这两种方法都确保 ClickHouse 连接器在您的 Spark 环境中可用。

作为依赖项导入

<dependency>
<groupId>com.clickhouse.spark</groupId>
<artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
<version>{{ stable_version }}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<classifier>all</classifier>
<version>{{ clickhouse_jdbc_version }}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

如果要使用 SNAPSHOT 版本,请添加以下仓库。

<repositories>
<repository>
<id>sonatype-oss-snapshots</id>
<name>Sonatype OSS Snapshots Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</repository>
</repositories>

下载库

二进制 JAR 的名称模式是

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

您可以在 Maven Central Repository 中找到所有可用的已发布 JAR 文件,并在 Sonatype OSS Snapshots Repository 中找到所有每日构建的 SNAPSHOT JAR 文件。

信息

务必包含带有 "all" 分类器的 clickhouse-jdbc JAR,因为连接器依赖于 clickhouse-httpclickhouse-client —— 这两者都捆绑在 clickhouse-jdbc 中:all. 或者,如果您不想使用完整的 JDBC 包,您可以单独添加 clickhouse-client JARclickhouse-http

在任何情况下,请确保软件包版本根据兼容性矩阵兼容。

注册目录(必需)

为了访问您的 ClickHouse 表,您必须使用以下配置配置新的 Spark 目录

属性默认值必需
spark.sql.catalog.<catalog_name>com.clickhouse.spark.ClickHouseCatalogN/A
spark.sql.catalog.<catalog_name>.host<clickhouse_host>localhost
spark.sql.catalog.<catalog_name>.protocolhttphttp
spark.sql.catalog.<catalog_name>.http_port<clickhouse_port>8123
spark.sql.catalog.<catalog_name>.user<clickhouse_username>default
spark.sql.catalog.<catalog_name>.password<clickhouse_password>(空字符串)
spark.sql.catalog.<catalog_name>.database<database>default
spark.<catalog_name>.write.formajsonarrow

这些设置可以通过以下方式之一设置

  • 编辑/创建 spark-defaults.conf
  • 将配置传递给您的 spark-submit 命令(或您的 spark-shell/spark-sql CLI 命令)。
  • 在初始化上下文时添加配置。
信息

当使用 ClickHouse 集群时,您需要为每个实例设置唯一的目录名称。例如

spark.sql.catalog.clickhouse1                com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse1.host 10.0.0.1
spark.sql.catalog.clickhouse1.protocol https
spark.sql.catalog.clickhouse1.http_port 8443
spark.sql.catalog.clickhouse1.user default
spark.sql.catalog.clickhouse1.password
spark.sql.catalog.clickhouse1.database default
spark.sql.catalog.clickhouse1.option.ssl true

spark.sql.catalog.clickhouse2 com.clickhouse.spark.ClickHouseCatalog
spark.sql.catalog.clickhouse2.host 10.0.0.2
spark.sql.catalog.clickhouse2.protocol https
spark.sql.catalog.clickhouse2.http_port 8443
spark.sql.catalog.clickhouse2.user default
spark.sql.catalog.clickhouse2.password
spark.sql.catalog.clickhouse2.database default
spark.sql.catalog.clickhouse2.option.ssl true

这样,您就可以通过 clickhouse1.<ck_db>.<ck_table> 从 Spark SQL 访问 clickhouse1 表 <ck_db>.<ck_table>,并通过 clickhouse2.<ck_db>.<ck_table> 访问 clickhouse2 表 <ck_db>.<ck_table>

读取数据

public static void main(String[] args) {
// Create a Spark session
SparkSession spark = SparkSession.builder()
.appName("example")
.master("local[*]")
.config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
.config("spark.sql.catalog.clickhouse.protocol", "http")
.config("spark.sql.catalog.clickhouse.http_port", "8123")
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "123456")
.config("spark.sql.catalog.clickhouse.database", "default")
.config("spark.clickhouse.write.format", "json")
.getOrCreate();

Dataset<Row> df = spark.sql("select * from clickhouse.default.example_table");

df.show();

spark.stop();
}

写入数据

 public static void main(String[] args) throws AnalysisException {

// Create a Spark session
SparkSession spark = SparkSession.builder()
.appName("example")
.master("local[*]")
.config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
.config("spark.sql.catalog.clickhouse.host", "127.0.0.1")
.config("spark.sql.catalog.clickhouse.protocol", "http")
.config("spark.sql.catalog.clickhouse.http_port", "8123")
.config("spark.sql.catalog.clickhouse.user", "default")
.config("spark.sql.catalog.clickhouse.password", "123456")
.config("spark.sql.catalog.clickhouse.database", "default")
.config("spark.clickhouse.write.format", "json")
.getOrCreate();

// Define the schema for the DataFrame
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
});


List<Row> data = Arrays.asList(
RowFactory.create(1, "Alice"),
RowFactory.create(2, "Bob")
);

// Create a DataFrame
Dataset<Row> df = spark.createDataFrame(data, schema);

df.writeTo("clickhouse.default.example_table").append();

spark.stop();
}

DDL 操作

您可以使用 Spark SQL 在 ClickHouse 实例上执行 DDL 操作,所有更改都会立即持久化到 ClickHouse 中。 Spark SQL 允许您像在 ClickHouse 中一样编写查询,因此您可以直接执行诸如 CREATE TABLE、TRUNCATE 等命令 - 无需修改,例如


use clickhouse;

CREATE TABLE test_db.tbl_sql (
create_time TIMESTAMP NOT NULL,
m INT NOT NULL COMMENT 'part key',
id BIGINT NOT NULL COMMENT 'sort key',
value STRING
) USING ClickHouse
PARTITIONED BY (m)
TBLPROPERTIES (
engine = 'MergeTree()',
order_by = 'id',
settings.index_granularity = 8192
);

以上示例演示了 Spark SQL 查询,您可以使用任何 API(Java、Scala、PySpark 或 shell)在应用程序中运行这些查询。

配置

以下是连接器中可调整的配置


默认值描述起始版本
spark.clickhouse.ignoreUnsupportedTransformfalseClickHouse 支持使用复杂表达式作为分片键或分区值,例如 cityHash64(col_1, col_2),而 Spark 现在不支持这些表达式。如果为 true,则忽略不支持的表达式,否则快速失败并抛出异常。注意,当启用 spark.clickhouse.write.distributed.convertLocal 时,忽略不支持的分片键可能会损坏数据。0.4.0
spark.clickhouse.read.compression.codeclz4用于解压缩读取数据的编解码器。支持的编解码器:none、lz4。0.5.0
spark.clickhouse.read.distributed.convertLocaltrue当读取 Distributed 表时,读取本地表而不是自身。如果为 true,则忽略 spark.clickhouse.read.distributed.useClusterNodes0.1.0
spark.clickhouse.read.fixedStringAsbinary将 ClickHouse FixedString 类型读取为指定的 Spark 数据类型。支持的类型:binary、string0.8.0
spark.clickhouse.read.formatjson用于读取的序列化格式。支持的格式:json、binary0.6.0
spark.clickhouse.read.runtimeFilter.enabledfalse启用用于读取的运行时过滤器。0.8.0
spark.clickhouse.read.splitByPartitionIdtrue如果为 true,则通过虚拟列 _partition_id 而不是分区值来构造输入分区过滤器。通过分区值组装 SQL 谓词存在已知错误。此功能需要 ClickHouse Server v21.6+0.4.0
spark.clickhouse.useNullableQuerySchemafalse如果为 true,则在创建表时,将查询模式的所有字段标记为可为空,以便在创建表时执行 CREATE/REPLACE TABLE ... AS SELECT ...。注意,此配置需要 SPARK-43390(在 Spark 3.5 中可用),如果没有此补丁,则始终表现为 true0.8.0
spark.clickhouse.write.batchSize10000每次写入 ClickHouse 的记录数。0.1.0
spark.clickhouse.write.compression.codeclz4用于压缩写入数据的编解码器。支持的编解码器:none、lz4。0.3.0
spark.clickhouse.write.distributed.convertLocalfalse当写入 Distributed 表时,写入本地表而不是自身。如果为 true,则忽略 spark.clickhouse.write.distributed.useClusterNodes0.1.0
spark.clickhouse.write.distributed.useClusterNodestrue当写入 Distributed 表时,写入集群的所有节点。0.1.0
spark.clickhouse.write.formatarrow用于写入的序列化格式。支持的格式:json、arrow0.4.0
spark.clickhouse.write.localSortByKeytrue如果为 true,则在写入之前按排序键进行本地排序。0.3.0
spark.clickhouse.write.localSortByPartitionspark.clickhouse.write.repartitionByPartition 的值如果为 true,则在写入之前按分区进行本地排序。如果未设置,则等于 spark.clickhouse.write.repartitionByPartition0.3.0
spark.clickhouse.write.maxRetry3对于单个批次写入失败并返回可重试代码的情况,我们将重试的最大次数。0.1.0
spark.clickhouse.write.repartitionByPartitiontrue是否在写入之前按 ClickHouse 分区键重新分区数据,以满足 ClickHouse 表的分布要求。0.3.0
spark.clickhouse.write.repartitionNum0在写入之前需要重新分区数据以满足 ClickHouse 表的分布,使用此配置来指定重新分区的数量,小于 1 的值表示没有要求。0.1.0
spark.clickhouse.write.repartitionStrictlyfalse如果为 true,Spark 将严格地在分区之间分配传入的记录,以满足在写入时传递记录到数据源表之前所需的分发。否则,Spark 可能会应用某些优化来加速查询,但会破坏分布要求。注意,此配置需要 SPARK-37523(在 Spark 3.4 中可用),如果没有此补丁,则始终表现为 true0.3.0
spark.clickhouse.write.retryInterval10 秒写入重试之间的间隔(秒)。0.1.0
spark.clickhouse.write.retryableErrorCodes241写入失败时 ClickHouse 服务器返回的可重试错误代码。0.1.0

支持的数据类型

本节概述了 Spark 和 ClickHouse 之间的数据类型映射。下表提供了从 ClickHouse 读取数据到 Spark 以及从 Spark 插入数据到 ClickHouse 时转换数据类型的快速参考。

从 ClickHouse 读取数据到 Spark

ClickHouse 数据类型Spark 数据类型支持是原始类型注释
NothingNullType
BoolBooleanType
UInt8, Int16ShortType
Int8ByteType
UInt16,Int32IntegerType
UInt32,Int64, UInt64LongType
Int128,UInt128, Int256, UInt256DecimalType(38, 0)
Float32FloatType
Float64DoubleType
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringType
FixedStringBinaryType, StringType由配置 READ_FIXED_STRING_AS 控制
DecimalDecimalType精度和小数位数高达 Decimal128
Decimal32DecimalType(9, scale)
Decimal64DecimalType(18, scale)
Decimal128DecimalType(38, scale)
Date, Date32DateType
DateTime, DateTime32, DateTime64TimestampType
ArrayArrayType数组元素类型也会被转换
MapMapType键仅限于 StringType
IntervalYearYearMonthIntervalType(Year)
IntervalMonthYearMonthIntervalType(Month)
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalType使用特定的间隔类型
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

将数据从 Spark 插入到 ClickHouse

Spark 数据类型ClickHouse 数据类型支持是原始类型注释
BooleanTypeUInt8
ByteTypeInt8
ShortTypeInt16
IntegerTypeInt32
LongTypeInt64
FloatTypeFloat32
DoubleTypeFloat64
StringTypeString
VarcharTypeString
CharTypeString
DecimalTypeDecimal(p, s)精度和小数位数高达 Decimal128
DateTypeDate
TimestampTypeDateTime
ArrayType(list、tuple 或 array)Array数组元素类型也会被转换
MapTypeMap键仅限于 StringType
Object
Nested

贡献与支持

如果您想为项目做贡献或报告任何问题,我们欢迎您的意见!访问我们的 GitHub 仓库 以打开问题、提出改进建议或提交拉取请求。欢迎贡献!开始之前,请查看仓库中的贡献指南。感谢您帮助改进我们的 ClickHouse Spark 连接器!