跳到主要内容
跳到主要内容
编辑此页

Spark JDBC

Spark 支持的最常用的数据源之一是 JDBC。在本节中,我们将详细介绍如何将 ClickHouse 官方 JDBC 连接器 与 Spark 一起使用。

读取数据

public static void main(String[] args) {
// Initialize Spark session
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

String jdbcURL = "jdbc:ch://127.0.0.1:8123/default";
String query = "select * from example_table where id > 2";


//---------------------------------------------------------------------------------------------------
// Load the table from ClickHouse using jdbc method
//---------------------------------------------------------------------------------------------------
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");

Dataset<Row> df1 = spark.read().jdbc(jdbcURL, String.format("(%s)", query), jdbcProperties);

df1.show();

//---------------------------------------------------------------------------------------------------
// Load the table from ClickHouse using load method
//---------------------------------------------------------------------------------------------------
Dataset<Row> df2 = spark.read()
.format("jdbc")
.option("url", jdbcURL)
.option("user", "default")
.option("password", "123456")
.option("query", query)
.load();


df2.show();


// Stop the Spark session
spark.stop();
}

写入数据

 public static void main(String[] args) {
// Initialize Spark session
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

// JDBC connection details
String jdbcUrl = "jdbc:ch://127.0.0.1:8123/default";
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");

// Create a sample DataFrame
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false)
});

List<Row> rows = new ArrayList<Row>();
rows.add(RowFactory.create(1, "John"));
rows.add(RowFactory.create(2, "Doe"));


Dataset<Row> df = spark.createDataFrame(rows, schema);

//---------------------------------------------------------------------------------------------------
// Write the df to ClickHouse using the jdbc method
//---------------------------------------------------------------------------------------------------

df.write()
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "example_table", jdbcProperties);

//---------------------------------------------------------------------------------------------------
// Write the df to ClickHouse using the save method
//---------------------------------------------------------------------------------------------------

df.write()
.format("jdbc")
.mode("append")
.option("url", jdbcUrl)
.option("dbtable", "example_table")
.option("user", "default")
.option("password", "123456")
.option("SaveMode", "append")
.save();


// Stop the Spark session
spark.stop();
}

并行性

当使用 Spark JDBC 时,Spark 使用单个分区读取数据。为了实现更高的并发性,您必须指定 partitionColumnlowerBoundupperBoundnumPartitions,这些参数描述了在从多个 worker 并行读取时如何对表进行分区。有关 JDBC 配置 的更多信息,请访问 Apache Spark 官方文档。

JDBC 限制

  • 截至今天,您只能使用 JDBC 将数据插入到现有表中(目前无法在 DF 插入时自动创建表,就像 Spark 对其他连接器所做的那样)。