diff --git a/docs/en/integrations/data-ingestion/apache-spark/index.md b/docs/en/integrations/data-ingestion/apache-spark/index.md index 5e85156e8d0..87ede66b68c 100644 --- a/docs/en/integrations/data-ingestion/apache-spark/index.md +++ b/docs/en/integrations/data-ingestion/apache-spark/index.md @@ -1,10 +1,11 @@ --- -sidebar_label: Apache Spark +sidebar_label: Integrating Apache Spark with ClickHouse sidebar_position: 1 -slug: /en/integrations/apache-spark/ +slug: /en/integrations/apache-spark description: Introduction to Apache Spark with ClickHouse keywords: [ clickhouse, apache, spark, migrating, data ] --- + import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; import TOCInline from '@theme/TOCInline'; @@ -18,757 +19,12 @@ science, and machine learning on single-node machines or clusters. There are two main ways to connect Apache Spark and ClickHouse: -1. [Spark Connector](#spark-connector) - the Spark connector implements the `DataSourceV2` and has its own Catalog +1. [Spark Connector](./apache-spark/spark-native-connector) - the Spark connector implements the `DataSourceV2` and has its own Catalog management. As of today, this is the recommended way to integrate ClickHouse and Spark. -2. [Spark JDBC](#spark-jdbc) - Integrate Spark and ClickHouse +2. [Spark JDBC](./apache-spark/spark-jdbc) - Integrate Spark and ClickHouse using a [JDBC data source](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html).

+Both solutions have been successfully tested and are fully compatible with various APIs, including Java, Scala, PySpark, and SparkSQL. - - -## Spark Connector - -This connector leverages ClickHouse-specific optimizations, such as advanced partitioning and predicate pushdown, to -improve query performance and data handling. -The connector is based on [ClickHouse's official JDBC connector](https://github.com/ClickHouse/clickhouse-java), and -manages its own catalog. - -### Requirements - -- Java 8 or 17 -- Scala 2.12 or 2.13 -- Apache Spark 3.3 or 3.4 or 3.5 - -### Compatibility Matrix - -| Version | Compatible Spark Versions | ClickHouse JDBC version | -|---------|---------------------------|-------------------------| -| main | Spark 3.3, 3.4, 3.5 | 0.6.3 | -| 0.8.0 | Spark 3.3, 3.4, 3.5 | 0.6.3 | -| 0.7.3 | Spark 3.3, 3.4 | 0.4.6 | -| 0.6.0 | Spark 3.3 | 0.3.2-patch11 | -| 0.5.0 | Spark 3.2, 3.3 | 0.3.2-patch11 | -| 0.4.0 | Spark 3.2, 3.3 | Not depend on | -| 0.3.0 | Spark 3.2, 3.3 | Not depend on | -| 0.2.1 | Spark 3.2 | Not depend on | -| 0.1.2 | Spark 3.2 | Not depend on | - -### Download the library - -The name pattern of the binary JAR is: - -``` -clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar -``` - -You can find all available released JARs -in the [Maven Central Repository](https://repo1.maven.org/maven2/com/clickhouse/spark/) -and all daily build SNAPSHOT JARs -in the [Sonatype OSS Snapshots Repository](https://s01.oss.sonatype.org/content/repositories/snapshots/com/clickhouse/). - -### Import as a dependency - -#### Gradle - -``` -dependencies { - implementation("com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}") - implementation("com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all") { transitive = false } -} -``` - -Add the following repository if you want to use the SNAPSHOT version: - -``` -repositries { - maven { url = "https://s01.oss.sonatype.org/content/repositories/snapshots" } -} -``` - -#### Maven - -``` - - com.clickhouse.spark - clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }} - {{ stable_version }} - - - com.clickhouse - clickhouse-jdbc - all - {{ clickhouse_jdbc_version }} - - - * - * - - - -``` - -Add the following repository if you want to use SNAPSHOT version. - -``` - - - sonatype-oss-snapshots - Sonatype OSS Snapshots Repository - https://s01.oss.sonatype.org/content/repositories/snapshots - - -``` - -### Play with Spark SQL - -Note: For SQL-only use cases, [Apache Kyuubi](https://github.com/apache/kyuubi) is recommended -for production. - -#### Launch Spark SQL CLI - -```shell -$SPARK_HOME/bin/spark-sql \ - --conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \ - --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \ - --conf spark.sql.catalog.clickhouse.protocol=http \ - --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \ - --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \ - --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \ - --conf spark.sql.catalog.clickhouse.database=default \ - --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar -``` - -The following argument - -``` - --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar -``` - -can be replaced by - -``` - --repositories https://{maven-cental-mirror or private-nexus-repo} \ - --packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all -``` - -to avoid copying the JAR to your Spark client node. - -### Operations - -Basic operations, e.g. create database, create table, write table, read table, etc. - -``` -spark-sql> use clickhouse; -Time taken: 0.016 seconds - -spark-sql> create database if not exists test_db; -Time taken: 0.022 seconds - -spark-sql> show databases; -default -system -test_db -Time taken: 0.289 seconds, Fetched 3 row(s) - -spark-sql> CREATE TABLE test_db.tbl_sql ( - > create_time TIMESTAMP NOT NULL, - > m INT NOT NULL COMMENT 'part key', - > id BIGINT NOT NULL COMMENT 'sort key', - > value STRING - > ) USING ClickHouse - > PARTITIONED BY (m) - > TBLPROPERTIES ( - > engine = 'MergeTree()', - > order_by = 'id', - > settings.index_granularity = 8192 - > ); -Time taken: 0.242 seconds - -spark-sql> insert into test_db.tbl_sql values - > (timestamp'2021-01-01 10:10:10', 1, 1L, '1'), - > (timestamp'2022-02-02 10:10:10', 2, 2L, '2') - > as tabl(create_time, m, id, value); -Time taken: 0.276 seconds - -spark-sql> select * from test_db.tbl_sql; -2021-01-01 10:10:10 1 1 1 -2022-02-02 10:10:10 2 2 2 -Time taken: 0.116 seconds, Fetched 2 row(s) - -spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql; -Time taken: 1.028 seconds - -spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql; -Time taken: 0.462 seconds - -spark-sql> select count(*) from test_db.tbl_sql; -6 -Time taken: 1.421 seconds, Fetched 1 row(s) - -spark-sql> select * from test_db.tbl_sql; -2021-01-01 10:10:10 1 1 1 -2021-01-01 10:10:10 1 1 1 -2021-01-01 10:10:10 1 1 1 -2022-02-02 10:10:10 2 2 2 -2022-02-02 10:10:10 2 2 2 -2022-02-02 10:10:10 2 2 2 -Time taken: 0.123 seconds, Fetched 6 row(s) - -spark-sql> delete from test_db.tbl_sql where id = 1; -Time taken: 0.129 seconds - -spark-sql> select * from test_db.tbl_sql; -2022-02-02 10:10:10 2 2 2 -2022-02-02 10:10:10 2 2 2 -2022-02-02 10:10:10 2 2 2 -Time taken: 0.101 seconds, Fetched 3 row(s) -``` - -### Play with Spark Shell - -#### Launch Spark Shell - -```shell -$SPARK_HOME/bin/spark-shell \ - --conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \ - --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \ - --conf spark.sql.catalog.clickhouse.protocol=http \ - --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \ - --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \ - --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \ - --conf spark.sql.catalog.clickhouse.database=default \ - --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar -``` - -The following argument - -``` - --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar -``` - -can be replaced by - -``` - --repositories https://{maven-cental-mirror or private-nexus-repo} \ - --packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all -``` - -to avoid copying the JAR to your Spark client node. - -#### Operations - -Basic operations, e.g. create database, create table, write table, read table, etc. - -``` -scala> spark.sql("use clickhouse") -res0: org.apache.spark.sql.DataFrame = [] - -scala> spark.sql("create database test_db") -res1: org.apache.spark.sql.DataFrame = [] - -scala> spark.sql("show databases").show -+---------+ -|namespace| -+---------+ -| default| -| system| -| test_db| -+---------+ - -scala> spark.sql(""" - | CREATE TABLE test_db.tbl ( - | create_time TIMESTAMP NOT NULL, - | m INT NOT NULL COMMENT 'part key', - | id BIGINT NOT NULL COMMENT 'sort key', - | value STRING - | ) USING ClickHouse - | PARTITIONED BY (m) - | TBLPROPERTIES ( - | engine = 'MergeTree()', - | order_by = 'id', - | settings.index_granularity = 8192 - | ) - | """) -res2: org.apache.spark.sql.DataFrame = [] - -scala> :paste -// Entering paste mode (ctrl-D to finish) - -spark.createDataFrame(Seq( - ("2021-01-01 10:10:10", 1L, "1"), - ("2022-02-02 10:10:10", 2L, "2") -)).toDF("create_time", "id", "value") - .withColumn("create_time", to_timestamp($"create_time")) - .withColumn("m", month($"create_time")) - .select($"create_time", $"m", $"id", $"value") - .writeTo("test_db.tbl") - .append - -// Exiting paste mode, now interpreting. - -scala> spark.table("test_db.tbl").show -+-------------------+---+---+-----+ -| create_time| m| id|value| -+-------------------+---+---+-----+ -|2021-01-01 10:10:10| 1| 1| 1| -|2022-02-02 10:10:10| 2| 2| 2| -+-------------------+---+---+-----+ - -scala> spark.sql("DELETE FROM test_db.tbl WHERE id=1") -res3: org.apache.spark.sql.DataFrame = [] - -scala> spark.table("test_db.tbl").show -+-------------------+---+---+-----+ -| create_time| m| id|value| -+-------------------+---+---+-----+ -|2022-02-02 10:10:10| 2| 2| 2| -+-------------------+---+---+-----+ -``` - -Execute ClickHouse native SQL. - -``` -scala> val options = Map( - | "host" -> "clickhouse", - | "protocol" -> "http", - | "http_port" -> "8123", - | "user" -> "default", - | "password" -> "" - | ) - -scala> val sql = """ - | |CREATE TABLE test_db.person ( - | | id Int64, - | | name String, - | | age Nullable(Int32) - | |) - | |ENGINE = MergeTree() - | |ORDER BY id - | """.stripMargin - -scala> spark.executeCommand("com.clickhouse.spark.ClickHouseCommandRunner", sql, options) - -scala> spark.sql("show tables in clickhouse_s1r1.test_db").show -+---------+---------+-----------+ -|namespace|tableName|isTemporary| -+---------+---------+-----------+ -| test_db| person| false| -+---------+---------+-----------+ - -scala> spark.table("clickhouse_s1r1.test_db.person").printSchema -root - |-- id: long (nullable = false) - |-- name: string (nullable = false) - |-- age: integer (nullable = true) -``` - -### Supported Data Types - -This section outlines the mapping of data types between Spark and ClickHouse. The tables below provide quick references -for converting data types when reading from ClickHouse into Spark and when inserting data from Spark into ClickHouse. - -#### Reading data from ClickHouse into Spark - -| ClickHouse Data Type | Spark Data Type | Supported | Is Primitive | Notes | -|-------------------------------------------------------------------|--------------------------------|-----------|--------------|----------------------------------------------------| -| `Nothing` | `NullType` | ✅ | Yes | | -| `Bool` | `BooleanType` | ✅ | Yes | | -| `UInt8`, `Int16` | `ShortType` | ✅ | Yes | | -| `Int8` | `ByteType` | ✅ | Yes | | -| `UInt16`,`Int32` | `IntegerType` | ✅ | Yes | | -| `UInt32`,`Int64`, `UInt64` | `LongType` | ✅ | Yes | | -| `Int128`,`UInt128`, `Int256`, `UInt256` | `DecimalType(38, 0)` | ✅ | Yes | | -| `Float32` | `FloatType` | ✅ | Yes | | -| `Float64` | `DoubleType` | ✅ | Yes | | -| `String`, `JSON`, `UUID`, `Enum8`, `Enum16`, `IPv4`, `IPv6` | `StringType` | ✅ | Yes | | -| `FixedString` | `BinaryType`, `StringType` | ✅ | Yes | Controlled by configuration `READ_FIXED_STRING_AS` | -| `Decimal` | `DecimalType` | ✅ | Yes | Precision and scale up to `Decimal128` | -| `Decimal32` | `DecimalType(9, scale)` | ✅ | Yes | | -| `Decimal64` | `DecimalType(18, scale)` | ✅ | Yes | | -| `Decimal128` | `DecimalType(38, scale)` | ✅ | Yes | | -| `Date`, `Date32` | `DateType` | ✅ | Yes | | -| `DateTime`, `DateTime32`, `DateTime64` | `TimestampType` | ✅ | Yes | | -| `Array` | `ArrayType` | ✅ | No | Array element type is also converted | -| `Map` | `MapType` | ✅ | No | Keys are limited to `StringType` | -| `IntervalYear` | `YearMonthIntervalType(Year)` | ✅ | Yes | | -| `IntervalMonth` | `YearMonthIntervalType(Month)` | ✅ | Yes | | -| `IntervalDay`, `IntervalHour`, `IntervalMinute`, `IntervalSecond` | `DayTimeIntervalType` | ✅ | No | Specific interval type is used | -| `Object` | | ❌ | | | -| `Nested` | | ❌ | | | -| `Tuple` | | ❌ | | | -| `Point` | | ❌ | | | -| `Polygon` | | ❌ | | | -| `MultiPolygon` | | ❌ | | | -| `Ring` | | ❌ | | | -| `IntervalQuarter` | | ❌ | | | -| `IntervalWeek` | | ❌ | | | -| `Decimal256` | | ❌ | | | -| `AggregateFunction` | | ❌ | | | -| `SimpleAggregateFunction` | | ❌ | | | - -#### Inserting data from Spark into ClickHouse - -| Spark Data Type | ClickHouse Data Type | Supported | Is Primitive | Notes | -|-------------------------------------|----------------------|-----------|--------------|----------------------------------------| -| `BooleanType` | `UInt8` | ✅ | Yes | | -| `ByteType` | `Int8` | ✅ | Yes | | -| `ShortType` | `Int16` | ✅ | Yes | | -| `IntegerType` | `Int32` | ✅ | Yes | | -| `LongType` | `Int64` | ✅ | Yes | | -| `FloatType` | `Float32` | ✅ | Yes | | -| `DoubleType` | `Float64` | ✅ | Yes | | -| `StringType` | `String` | ✅ | Yes | | -| `VarcharType` | `String` | ✅ | Yes | | -| `CharType` | `String` | ✅ | Yes | | -| `DecimalType` | `Decimal(p, s)` | ✅ | Yes | Precision and scale up to `Decimal128` | -| `DateType` | `Date` | ✅ | Yes | | -| `TimestampType` | `DateTime` | ✅ | Yes | | -| `ArrayType` (list, tuple, or array) | `Array` | ✅ | No | Array element type is also converted | -| `MapType` | `Map` | ✅ | No | Keys are limited to `StringType` | -| `Object` | | ❌ | | | -| `Nested` | | ❌ | | | - -## Spark JDBC - -One of the most used data sources supported by Spark is JDBC. -In this section, we will provide details on how to -use the [ClickHouse official JDBC connector](https://github.com/ClickHouse/clickhouse-java) with Spark. - - -### Read data - - - - -```java -public static void main(String[] args) { - // Initialize Spark session - SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate(); - - String jdbcURL = "jdbc:ch://localhost:8123/default"; - String query = "select * from example_table where id > 2"; - - - //--------------------------------------------------------------------------------------------------- - // Load the table from ClickHouse using jdbc method - //--------------------------------------------------------------------------------------------------- - Properties jdbcProperties = new Properties(); - jdbcProperties.put("user", "default"); - jdbcProperties.put("password", "123456"); - - Dataset df1 = spark.read().jdbc(jdbcURL, String.format("(%s)", query), jdbcProperties); - - df1.show(); - - //--------------------------------------------------------------------------------------------------- - // Load the table from ClickHouse using load method - //--------------------------------------------------------------------------------------------------- - Dataset df2 = spark.read() - .format("jdbc") - .option("url", jdbcURL) - .option("user", "default") - .option("password", "123456") - .option("query", query) - .load(); - - - df2.show(); - - - // Stop the Spark session - spark.stop(); - } -``` - - - -```java -object ReadData extends App { - // Initialize Spark session - val spark: SparkSession = SparkSession.builder.appName("example").master("local").getOrCreate - - val jdbcURL = "jdbc:ch://localhost:8123/default" - val query: String = "select * from example_table where id > 2" - - - //--------------------------------------------------------------------------------------------------- - // Load the table from ClickHouse using jdbc method - //--------------------------------------------------------------------------------------------------- - val connectionProperties = new Properties() - connectionProperties.put("user", "default") - connectionProperties.put("password", "123456") - - val df1: Dataset[Row] = spark.read. - jdbc(jdbcURL, s"($query)", connectionProperties) - - df1.show() - //--------------------------------------------------------------------------------------------------- - // Load the table from ClickHouse using load method - //--------------------------------------------------------------------------------------------------- - val df2: Dataset[Row] = spark.read - .format("jdbc") - .option("url", jdbcURL) - .option("user", "default") - .option("password", "123456") - .option("query", query) - .load() - - df2.show() - - - - // Stop the Spark session// Stop the Spark session - spark.stop() - -} -``` - -Find more details on the [`s3` table function page](/docs/en/sql-reference/table-functions/s3.md). - - - - -```python -from pyspark.sql import SparkSession - -jar_files = [ - "jars/clickhouse-jdbc-X.X.X-SNAPSHOT-all.jar" -] - -# Initialize Spark session with JARs -spark = SparkSession.builder \ - .appName("example") \ - .master("local") \ - .config("spark.jars", ",".join(jar_files)) \ - .getOrCreate() - -url = "jdbc:ch://localhost:8123/default" -user = "your_user" -password = "your_password" -query = "select * from example_table where id > 2" -driver = "com.clickhouse.jdbc.ClickHouseDriver" - -df = (spark.read - .format('jdbc') - .option('driver', driver) - .option('url', url) - .option('user', user) - .option('password', password).option( - 'query', query).load()) - -df.show() - -``` - - - - -```sql - CREATE TEMPORARY VIEW jdbcTable - USING org.apache.spark.sql.jdbc - OPTIONS ( - url "jdbc:ch://localhost:8123/default", - dbtable "schema.tablename", - user "username", - password "password", - driver "com.clickhouse.jdbc.ClickHouseDriver" - ); - - SELECT * FROM jdbcTable; -``` - - - - -### Write data - - - - - -```java - public static void main(String[] args) { - // Initialize Spark session - SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate(); - - // JDBC connection details - String jdbcUrl = "jdbc:ch://localhost:8123/default"; - Properties jdbcProperties = new Properties(); - jdbcProperties.put("user", "default"); - jdbcProperties.put("password", "123456"); - - // Create a sample DataFrame - StructType schema = new StructType(new StructField[]{ - DataTypes.createStructField("id", DataTypes.IntegerType, false), - DataTypes.createStructField("name", DataTypes.StringType, false) - }); - - List rows = new ArrayList(); - rows.add(RowFactory.create(1, "John")); - rows.add(RowFactory.create(2, "Doe")); - - - Dataset df = spark.createDataFrame(rows, schema); - - //--------------------------------------------------------------------------------------------------- - // Write the df to ClickHouse using the jdbc method - //--------------------------------------------------------------------------------------------------- - - df.write() - .mode(SaveMode.Append) - .jdbc(jdbcUrl, "example_table", jdbcProperties); - - //--------------------------------------------------------------------------------------------------- - // Write the df to ClickHouse using the save method - //--------------------------------------------------------------------------------------------------- - - df.write() - .format("jdbc") - .mode("append") - .option("url", jdbcUrl) - .option("dbtable", "example_table") - .option("user", "default") - .option("password", "123456") - .option("SaveMode", "append") - .save(); - - - // Stop the Spark session - spark.stop(); - } -``` - - - -```java -object WriteData extends App { - - val spark: SparkSession = SparkSession.builder.appName("example").master("local").getOrCreate - - // JDBC connection details - val jdbcUrl: String = "jdbc:ch://localhost:8123/default" - val jdbcProperties: Properties = new Properties - jdbcProperties.put("user", "default") - jdbcProperties.put("password", "123456") - - // Create a sample DataFrame - - - val rows = Seq(Row(1, "John"), Row(2, "Doe")) - - val schema = List( - StructField("id", DataTypes.IntegerType, nullable = false), - StructField("name", StringType, nullable = true) - ) - - val df: DataFrame = spark.createDataFrame( - spark.sparkContext.parallelize(rows), - StructType(schema) - ) - - //---------------------------------------------------------------------------------------------------//--------------------------------------------------------------------------------------------------- - // Write the df to ClickHouse using the jdbc method// Write the df to ClickHouse using the jdbc method - //---------------------------------------------------------------------------------------------------//--------------------------------------------------------------------------------------------------- - - df.write - .mode(SaveMode.Append) - .jdbc(jdbcUrl, "example_table", jdbcProperties) - - //---------------------------------------------------------------------------------------------------//--------------------------------------------------------------------------------------------------- - // Write the df to ClickHouse using the save method// Write the df to ClickHouse using the save method - //---------------------------------------------------------------------------------------------------//--------------------------------------------------------------------------------------------------- - - df.write - .format("jdbc") - .mode("append") - .option("url", jdbcUrl) - .option("dbtable", "example_table") - .option("user", "default") - .option("password", "123456") - .option("SaveMode", "append") - .save() - - - // Stop the Spark session// Stop the Spark session - spark.stop() - -} -``` - -Find more details on the [`s3` table function page](/docs/en/sql-reference/table-functions/s3.md). - - - - -```python -from pyspark.sql import SparkSession -from pyspark.sql import Row - -jar_files = [ - "jars/clickhouse-jdbc-X.X.X-SNAPSHOT-all.jar" -] - -# Initialize Spark session with JARs -spark = SparkSession.builder \ - .appName("example") \ - .master("local") \ - .config("spark.jars", ",".join(jar_files)) \ - .getOrCreate() - -# Create DataFrame -data = [Row(id=11, name="John"), Row(id=12, name="Doe")] -df = spark.createDataFrame(data) - -url = "jdbc:ch://localhost:8123/default" -user = "your_user" -password = "your_password" -driver = "com.clickhouse.jdbc.ClickHouseDriver" - -# Write DataFrame to ClickHouse -df.write \ - .format("jdbc") \ - .option("driver", driver) \ - .option("url", url) \ - .option("user", user) \ - .option("password", password) \ - .option("dbtable", "example_table") \ - .mode("append") \ - .save() - - -``` - - - - -```sql - CREATE TEMPORARY VIEW jdbcTable - USING org.apache.spark.sql.jdbc - OPTIONS ( - url "jdbc:ch://localhost:8123/default", - dbtable "schema.tablename", - user "username", - password "password", - driver "com.clickhouse.jdbc.ClickHouseDriver" - ); - -- resultTable could be created with df.createTempView or with SparkSQL - INSERT INTO TABLE jdbcTable - SELECT * FROM resultTable; - -``` - - - - - -### JDBC Limitations -* As of today, you can insert data using JDBC only into existing tables (currently there is no way to auto create the table on DF insertion, as Spark does with other connectors). - - - -:::important -When using Spark JDBC, Spark reads the data using a single partition. To achieve higher concurrency, you must specify `partitionColumn`, `lowerBound`, `upperBound`, and `numPartitions`, which describe how to partition the table when reading in parallel from multiple workers. -Please visit Apache Spark's official documentation for more information on [JDBC configurations](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option). -::: \ No newline at end of file diff --git a/docs/en/integrations/data-ingestion/apache-spark/spark-jdbc.md b/docs/en/integrations/data-ingestion/apache-spark/spark-jdbc.md new file mode 100644 index 00000000000..0262a711fdb --- /dev/null +++ b/docs/en/integrations/data-ingestion/apache-spark/spark-jdbc.md @@ -0,0 +1,354 @@ +--- +sidebar_label: Spark JDBC +sidebar_position: 3 +slug: /en/integrations/apache-spark/spark-jdbc +description: Introduction to Apache Spark with ClickHouse +keywords: [ clickhouse, apache, spark, jdbc, migrating, data ] +--- +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; +import TOCInline from '@theme/TOCInline'; + +# Spark JDBC +One of the most used data sources supported by Spark is JDBC. +In this section, we will provide details on how to +use the [ClickHouse official JDBC connector](https://clickhouse.com/docs/en/integrations/java/jdbc-driver) with Spark. + + + + +## Read data + + + + +```java +public static void main(String[] args) { + // Initialize Spark session + SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate(); + + String jdbcURL = "jdbc:ch://localhost:8123/default"; + String query = "select * from example_table where id > 2"; + + + //--------------------------------------------------------------------------------------------------- + // Load the table from ClickHouse using jdbc method + //--------------------------------------------------------------------------------------------------- + Properties jdbcProperties = new Properties(); + jdbcProperties.put("user", "default"); + jdbcProperties.put("password", "123456"); + + Dataset df1 = spark.read().jdbc(jdbcURL, String.format("(%s)", query), jdbcProperties); + + df1.show(); + + //--------------------------------------------------------------------------------------------------- + // Load the table from ClickHouse using load method + //--------------------------------------------------------------------------------------------------- + Dataset df2 = spark.read() + .format("jdbc") + .option("url", jdbcURL) + .option("user", "default") + .option("password", "123456") + .option("query", query) + .load(); + + + df2.show(); + + + // Stop the Spark session + spark.stop(); + } +``` + + + + +```java +object ReadData extends App { + // Initialize Spark session + val spark: SparkSession = SparkSession.builder.appName("example").master("local").getOrCreate + + val jdbcURL = "jdbc:ch://localhost:8123/default" + val query: String = "select * from example_table where id > 2" + + + //--------------------------------------------------------------------------------------------------- + // Load the table from ClickHouse using jdbc method + //--------------------------------------------------------------------------------------------------- + val connectionProperties = new Properties() + connectionProperties.put("user", "default") + connectionProperties.put("password", "123456") + + val df1: Dataset[Row] = spark.read. + jdbc(jdbcURL, s"($query)", connectionProperties) + + df1.show() + //--------------------------------------------------------------------------------------------------- + // Load the table from ClickHouse using load method + //--------------------------------------------------------------------------------------------------- + val df2: Dataset[Row] = spark.read + .format("jdbc") + .option("url", jdbcURL) + .option("user", "default") + .option("password", "123456") + .option("query", query) + .load() + + df2.show() + + + + // Stop the Spark session// Stop the Spark session + spark.stop() + +} +``` + + + + +```python +from pyspark.sql import SparkSession + +jar_files = [ + "jars/clickhouse-jdbc-X.X.X-SNAPSHOT-all.jar" +] + +# Initialize Spark session with JARs +spark = SparkSession.builder \ + .appName("example") \ + .master("local") \ + .config("spark.jars", ",".join(jar_files)) \ + .getOrCreate() + +url = "jdbc:ch://localhost:8123/default" +user = "your_user" +password = "your_password" +query = "select * from example_table where id > 2" +driver = "com.clickhouse.jdbc.ClickHouseDriver" + +df = (spark.read + .format('jdbc') + .option('driver', driver) + .option('url', url) + .option('user', user) + .option('password', password).option( + 'query', query).load()) + +df.show() + +``` + + + + +```sql + CREATE TEMPORARY VIEW jdbcTable + USING org.apache.spark.sql.jdbc + OPTIONS ( + url "jdbc:ch://localhost:8123/default", + dbtable "schema.tablename", + user "username", + password "password", + driver "com.clickhouse.jdbc.ClickHouseDriver" + ); + + SELECT * FROM jdbcTable; +``` + + + + +## Write data + + + + +```java + public static void main(String[] args) { + // Initialize Spark session + SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate(); + + // JDBC connection details + String jdbcUrl = "jdbc:ch://localhost:8123/default"; + Properties jdbcProperties = new Properties(); + jdbcProperties.put("user", "default"); + jdbcProperties.put("password", "123456"); + + // Create a sample DataFrame + StructType schema = new StructType(new StructField[]{ + DataTypes.createStructField("id", DataTypes.IntegerType, false), + DataTypes.createStructField("name", DataTypes.StringType, false) + }); + + List rows = new ArrayList(); + rows.add(RowFactory.create(1, "John")); + rows.add(RowFactory.create(2, "Doe")); + + + Dataset df = spark.createDataFrame(rows, schema); + + //--------------------------------------------------------------------------------------------------- + // Write the df to ClickHouse using the jdbc method + //--------------------------------------------------------------------------------------------------- + + df.write() + .mode(SaveMode.Append) + .jdbc(jdbcUrl, "example_table", jdbcProperties); + + //--------------------------------------------------------------------------------------------------- + // Write the df to ClickHouse using the save method + //--------------------------------------------------------------------------------------------------- + + df.write() + .format("jdbc") + .mode("append") + .option("url", jdbcUrl) + .option("dbtable", "example_table") + .option("user", "default") + .option("password", "123456") + .option("SaveMode", "append") + .save(); + + + // Stop the Spark session + spark.stop(); + } +``` + + + + +```java +object WriteData extends App { + + val spark: SparkSession = SparkSession.builder.appName("example").master("local").getOrCreate + + // JDBC connection details + val jdbcUrl: String = "jdbc:ch://localhost:8123/default" + val jdbcProperties: Properties = new Properties + jdbcProperties.put("user", "default") + jdbcProperties.put("password", "123456") + + // Create a sample DataFrame + + + val rows = Seq(Row(1, "John"), Row(2, "Doe")) + + val schema = List( + StructField("id", DataTypes.IntegerType, nullable = false), + StructField("name", StringType, nullable = true) + ) + + val df: DataFrame = spark.createDataFrame( + spark.sparkContext.parallelize(rows), + StructType(schema) + ) + + //---------------------------------------------------------------------------------------------------//--------------------------------------------------------------------------------------------------- + // Write the df to ClickHouse using the jdbc method// Write the df to ClickHouse using the jdbc method + //---------------------------------------------------------------------------------------------------//--------------------------------------------------------------------------------------------------- + + df.write + .mode(SaveMode.Append) + .jdbc(jdbcUrl, "example_table", jdbcProperties) + + //---------------------------------------------------------------------------------------------------//--------------------------------------------------------------------------------------------------- + // Write the df to ClickHouse using the save method// Write the df to ClickHouse using the save method + //---------------------------------------------------------------------------------------------------//--------------------------------------------------------------------------------------------------- + + df.write + .format("jdbc") + .mode("append") + .option("url", jdbcUrl) + .option("dbtable", "example_table") + .option("user", "default") + .option("password", "123456") + .option("SaveMode", "append") + .save() + + + // Stop the Spark session// Stop the Spark session + spark.stop() + +} +``` + + + + +```python +from pyspark.sql import SparkSession +from pyspark.sql import Row + +jar_files = [ + "jars/clickhouse-jdbc-X.X.X-SNAPSHOT-all.jar" +] + +# Initialize Spark session with JARs +spark = SparkSession.builder \ + .appName("example") \ + .master("local") \ + .config("spark.jars", ",".join(jar_files)) \ + .getOrCreate() + +# Create DataFrame +data = [Row(id=11, name="John"), Row(id=12, name="Doe")] +df = spark.createDataFrame(data) + +url = "jdbc:ch://localhost:8123/default" +user = "your_user" +password = "your_password" +driver = "com.clickhouse.jdbc.ClickHouseDriver" + +# Write DataFrame to ClickHouse +df.write \ + .format("jdbc") \ + .option("driver", driver) \ + .option("url", url) \ + .option("user", user) \ + .option("password", password) \ + .option("dbtable", "example_table") \ + .mode("append") \ + .save() + + +``` + + + + +```sql + CREATE TEMPORARY VIEW jdbcTable + USING org.apache.spark.sql.jdbc + OPTIONS ( + url "jdbc:ch://localhost:8123/default", + dbtable "schema.tablename", + user "username", + password "password", + driver "com.clickhouse.jdbc.ClickHouseDriver" + ); + -- resultTable could be created with df.createTempView or with SparkSQL + INSERT INTO TABLE jdbcTable + SELECT * FROM resultTable; + +``` + + + + + +## Parallelism + +When using Spark JDBC, Spark reads the data using a single partition. To achieve higher concurrency, you must specify +`partitionColumn`, `lowerBound`, `upperBound`, and `numPartitions`, which describe how to partition the table when +reading in parallel from multiple workers. +Please visit Apache Spark's official documentation for more information +on [JDBC configurations](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option). + +## JDBC Limitations + +* As of today, you can insert data using JDBC only into existing tables (currently there is no way to auto create the + table on DF insertion, as Spark does with other connectors). diff --git a/docs/en/integrations/data-ingestion/apache-spark/spark-native-connector.md b/docs/en/integrations/data-ingestion/apache-spark/spark-native-connector.md new file mode 100644 index 00000000000..127b522ce21 --- /dev/null +++ b/docs/en/integrations/data-ingestion/apache-spark/spark-native-connector.md @@ -0,0 +1,569 @@ +--- +sidebar_label: Spark Native Connector +sidebar_position: 2 +slug: /en/integrations/apache-spark/spark-native-connector +description: Introduction to Apache Spark with ClickHouse +keywords: [ clickhouse, apache, spark, migrating, data ] +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; +import TOCInline from '@theme/TOCInline'; + +# Spark Connector + +This connector leverages ClickHouse-specific optimizations, such as advanced partitioning and predicate pushdown, to +improve query performance and data handling. +The connector is based on [ClickHouse's official JDBC connector](https://github.com/ClickHouse/clickhouse-java), and +manages its own catalog. + +Before Spark 3.0, Spark lacked a built-in catalog concept, so users typically relied on external catalog systems such as +Hive Metastore or AWS Glue. +With these external solutions, users had to register their data source tables manually before accessing them in Spark. +However, since Spark 3.0 introduced the catalog concept, Spark can now automatically discover tables by registering +catalog plugins. + +Spark default catalog is `spark_catalog`, and tables are identified by `{catalog name}.{database}.{table}`. With the new +catalog feature, it is now possible to add and work with multiple catalogs in a single Spark application. + + + +## Requirements + +- Java 8 or 17 +- Scala 2.12 or 2.13 +- Apache Spark 3.3 or 3.4 or 3.5 + +## Compatibility Matrix + +| Version | Compatible Spark Versions | ClickHouse JDBC version | +|---------|---------------------------|-------------------------| +| main | Spark 3.3, 3.4, 3.5 | 0.6.3 | +| 0.8.1 | Spark 3.3, 3.4, 3.5 | 0.6.3 | +| 0.8.0 | Spark 3.3, 3.4, 3.5 | 0.6.3 | +| 0.7.3 | Spark 3.3, 3.4 | 0.4.6 | +| 0.6.0 | Spark 3.3 | 0.3.2-patch11 | +| 0.5.0 | Spark 3.2, 3.3 | 0.3.2-patch11 | +| 0.4.0 | Spark 3.2, 3.3 | Not depend on | +| 0.3.0 | Spark 3.2, 3.3 | Not depend on | +| 0.2.1 | Spark 3.2 | Not depend on | +| 0.1.2 | Spark 3.2 | Not depend on | + +## Installation & Setup + +For integrating ClickHouse with Spark, there are multiple installation options to suit different project setups. +You can add the ClickHouse Spark connector as a dependency directly in your project’s build file (such as in `pom.xml` +for Maven or `build.sbt` for SBT). +Alternatively, you can put the required JAR files in your `$SPARK_HOME/jars/` folder, or pass them directly as a Spark +option using the `--jars` flag in the `spark-submit` command. +Both approaches ensure the ClickHouse connector is available in your Spark environment. + +### Import as a Dependency + + + + +```maven + + com.clickhouse.spark + clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }} + {{ stable_version }} + + + com.clickhouse + clickhouse-jdbc + all + {{ clickhouse_jdbc_version }} + + + * + * + + + +``` + +Add the following repository if you want to use SNAPSHOT version. + +``` + + + sonatype-oss-snapshots + Sonatype OSS Snapshots Repository + https://s01.oss.sonatype.org/content/repositories/snapshots + + +``` + + + + +```gradle +dependencies { + implementation("com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}") + implementation("com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all") { transitive = false } +} +``` + +Add the following repository if you want to use the SNAPSHOT version: + +```gradle +repositries { + maven { url = "https://s01.oss.sonatype.org/content/repositories/snapshots" } +} +``` + + + + +```sbt +libraryDependencies += "com.clickhouse" % "clickhouse-jdbc" % {{ clickhouse_jdbc_version }} classifier "all" +libraryDependencies += "com.clickhouse.spark" %% clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }} % {{ stable_version }} +``` + + + + +When working with Spark's shell options (Spark SQL CLI, Spark Shell CLI, Spark Submit command), the dependencies can be +registered by passing the required jars: + +```text +$SPARK_HOME/bin/spark-sql \ + --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar +``` + +If you want to avoid copying the JARs to your Spark client node, you can use the following instead: + +```text + --repositories https://{maven-cental-mirror or private-nexus-repo} \ + --packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all +``` + +Note: For SQL-only use cases, [Apache Kyuubi](https://github.com/apache/kyuubi) is recommended +for production. + + + + +### Download The Library + +The name pattern of the binary JAR is: + +``` +clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar +``` + +You can find all available released JARs +in the [Maven Central Repository](https://repo1.maven.org/maven2/com/clickhouse/spark/) +and all daily build SNAPSHOT JARs +in the [Sonatype OSS Snapshots Repository](https://s01.oss.sonatype.org/content/repositories/snapshots/com/clickhouse/). + +:::important +It's essential to include the [clickhouse-jdbc JAR](https://mvnrepository.com/artifact/com.clickhouse/clickhouse-jdbc) +with the "all" classifier, +as the connector relies on [clickhouse-http](https://mvnrepository.com/artifact/com.clickhouse/clickhouse-http-client) +and [clickhouse-client](https://mvnrepository.com/artifact/com.clickhouse/clickhouse-client) —both of which are bundled +in clickhouse-jdbc:all. +Alternatively, you can add [clickhouse-client JAR](https://mvnrepository.com/artifact/com.clickhouse/clickhouse-client) +and [clickhouse-http](https://mvnrepository.com/artifact/com.clickhouse/clickhouse-http-client) individually if you +prefer not to use the full JDBC package. + + +In any case, ensure that the package versions are compatible according to the [Compatibility Matrix](#compatibility-matrix). +::: + +## Register The Catalog (required) + +In order to access your ClickHouse tables, you must configure a new Spark catalog with the following configs: + +| Property | Value | Default Value | Required | +|----------------------------------------------|------------------------------------------|----------------|----------| +| `spark.sql.catalog.` | `com.clickhouse.spark.ClickHouseCatalog` | N/A | Yes | +| `spark.sql.catalog..host` | `` | `localhost` | No | +| `spark.sql.catalog..protocol` | `http` | `http` | No | +| `spark.sql.catalog..http_port` | `` | `8123` | No | +| `spark.sql.catalog..user` | `` | `default` | No | +| `spark.sql.catalog..password` | `` | (empty string) | No | +| `spark.sql.catalog..database` | `` | `default` | No | +| `spark..write.forma` | `json` | `arrow` | No | + +These settings could be set via one of the following: + +* Edit/Create `spark-defaults.conf`. +* Pass the configuration to your `spark-submit` command (or to your `spark-shell`/`spark-sql` CLI commands). +* Add the configuration when initiating your context. + +:::important +When working with ClickHouse cluster, you need to set a unique catalog name for each instance. +For example: + +```text +spark.sql.catalog.clickhouse1 com.clickhouse.spark.ClickHouseCatalog +spark.sql.catalog.clickhouse1.host 10.0.0.1 +spark.sql.catalog.clickhouse1.protocol https +spark.sql.catalog.clickhouse1.http_port 8443 +spark.sql.catalog.clickhouse1.user default +spark.sql.catalog.clickhouse1.password +spark.sql.catalog.clickhouse1.database default +spark.sql.catalog.clickhouse1.option.ssl true + +spark.sql.catalog.clickhouse2 com.clickhouse.spark.ClickHouseCatalog +spark.sql.catalog.clickhouse2.host 10.0.0.2 +spark.sql.catalog.clickhouse2.protocol https +spark.sql.catalog.clickhouse2.http_port 8443 +spark.sql.catalog.clickhouse2.user default +spark.sql.catalog.clickhouse2.password +spark.sql.catalog.clickhouse2.database default +spark.sql.catalog.clickhouse2.option.ssl true +``` + +That way, you would be able to access clickhouse1 table `.` from Spark SQL by +`clickhouse1..`, and access clickhouse2 table `.` by `clickhouse2..`. + +::: + +## Read Data + + + + +```java +public static void main(String[] args) { + // Create a Spark session + SparkSession spark = SparkSession.builder() + .appName("example") + .master("local[*]") + .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog") + .config("spark.sql.catalog.clickhouse.host", "127.0.0.1") + .config("spark.sql.catalog.clickhouse.protocol", "http") + .config("spark.sql.catalog.clickhouse.http_port", "8123") + .config("spark.sql.catalog.clickhouse.user", "default") + .config("spark.sql.catalog.clickhouse.password", "123456") + .config("spark.sql.catalog.clickhouse.database", "default") + .config("spark.clickhouse.write.format", "json") + .getOrCreate(); + + Dataset df = spark.sql("select * from clickhouse.default.example_table"); + + df.show(); + + spark.stop(); + } +``` + + + + +```java +object NativeSparkRead extends App { + val spark = SparkSession.builder + .appName("example") + .master("local[*]") + .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog") + .config("spark.sql.catalog.clickhouse.host", "127.0.0.1") + .config("spark.sql.catalog.clickhouse.protocol", "http") + .config("spark.sql.catalog.clickhouse.http_port", "8123") + .config("spark.sql.catalog.clickhouse.user", "default") + .config("spark.sql.catalog.clickhouse.password", "123456") + .config("spark.sql.catalog.clickhouse.database", "default") + .config("spark.clickhouse.write.format", "json") + .getOrCreate + + val df = spark.sql("select * from clickhouse.default.example_table") + + df.show() + + spark.stop() +} +``` + + + + + + +```python +from pyspark.sql import SparkSession + +packages = [ + "com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.8.0", + "com.clickhouse:clickhouse-client:0.7.0", + "com.clickhouse:clickhouse-http-client:0.7.0", + "org.apache.httpcomponents.client5:httpclient5:5.2.1" + +] + +spark = (SparkSession.builder + .config("spark.jars.packages", ",".join(packages)) + .getOrCreate()) + +spark.conf.set("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog") +spark.conf.set("spark.sql.catalog.clickhouse.host", "127.0.0.1") +spark.conf.set("spark.sql.catalog.clickhouse.protocol", "http") +spark.conf.set("spark.sql.catalog.clickhouse.http_port", "8123") +spark.conf.set("spark.sql.catalog.clickhouse.user", "default") +spark.conf.set("spark.sql.catalog.clickhouse.password", "123456") +spark.conf.set("spark.sql.catalog.clickhouse.database", "default") +spark.conf.set("spark.clickhouse.write.format", "json") + +df = spark.sql("select * from clickhouse.default.example_table") +df.show() + +``` + + + + +```sql + CREATE TEMPORARY VIEW jdbcTable + USING org.apache.spark.sql.jdbc + OPTIONS ( + url "jdbc:ch://localhost:8123/default", + dbtable "schema.tablename", + user "username", + password "password", + driver "com.clickhouse.jdbc.ClickHouseDriver" + ); + + SELECT * FROM jdbcTable; +``` + + + + +## Write Data + + + + +```java + public static void main(String[] args) throws AnalysisException { + + // Create a Spark session + SparkSession spark = SparkSession.builder() + .appName("example") + .master("local[*]") + .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog") + .config("spark.sql.catalog.clickhouse.host", "127.0.0.1") + .config("spark.sql.catalog.clickhouse.protocol", "http") + .config("spark.sql.catalog.clickhouse.http_port", "8123") + .config("spark.sql.catalog.clickhouse.user", "default") + .config("spark.sql.catalog.clickhouse.password", "123456") + .config("spark.sql.catalog.clickhouse.database", "default") + .config("spark.clickhouse.write.format", "json") + .getOrCreate(); + + // Define the schema for the DataFrame + StructType schema = new StructType(new StructField[]{ + DataTypes.createStructField("id", DataTypes.IntegerType, false), + DataTypes.createStructField("name", DataTypes.StringType, false), + }); + + + List data = Arrays.asList( + RowFactory.create(1, "Alice"), + RowFactory.create(2, "Bob") + ); + + // Create a DataFrame + Dataset df = spark.createDataFrame(data, schema); + + df.writeTo("clickhouse.default.example_table").append(); + + spark.stop(); + } +``` + + + + +```java +object NativeSparkWrite extends App { + // Create a Spark session + val spark: SparkSession = SparkSession.builder + .appName("example") + .master("local[*]") + .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog") + .config("spark.sql.catalog.clickhouse.host", "127.0.0.1") + .config("spark.sql.catalog.clickhouse.protocol", "http") + .config("spark.sql.catalog.clickhouse.http_port", "8123") + .config("spark.sql.catalog.clickhouse.user", "default") + .config("spark.sql.catalog.clickhouse.password", "123456") + .config("spark.sql.catalog.clickhouse.database", "default") + .config("spark.clickhouse.write.format", "json") + .getOrCreate + + // Define the schema for the DataFrame + val rows = Seq(Row(1, "John"), Row(2, "Doe")) + + val schema = List( + StructField("id", DataTypes.IntegerType, nullable = false), + StructField("name", StringType, nullable = true) + ) + // Create the df + val df: DataFrame = spark.createDataFrame( + spark.sparkContext.parallelize(rows), + StructType(schema) + ) + + df.writeTo("clickhouse.default.example_table").append() + + spark.stop() +} +``` + + + + + +```python +from pyspark.sql import SparkSession +from pyspark.sql import Row + +# Feel free to use any other packages combination satesfying the compatability martix provided above. +packages = [ + "com.clickhouse.spark:clickhouse-spark-runtime-3.4_2.12:0.8.0", + "com.clickhouse:clickhouse-client:0.7.0", + "com.clickhouse:clickhouse-http-client:0.7.0", + "org.apache.httpcomponents.client5:httpclient5:5.2.1" + +] + +spark = (SparkSession.builder + .config("spark.jars.packages", ",".join(packages)) + .getOrCreate()) + +spark.conf.set("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog") +spark.conf.set("spark.sql.catalog.clickhouse.host", "127.0.0.1") +spark.conf.set("spark.sql.catalog.clickhouse.protocol", "http") +spark.conf.set("spark.sql.catalog.clickhouse.http_port", "8123") +spark.conf.set("spark.sql.catalog.clickhouse.user", "default") +spark.conf.set("spark.sql.catalog.clickhouse.password", "123456") +spark.conf.set("spark.sql.catalog.clickhouse.database", "default") +spark.conf.set("spark.clickhouse.write.format", "json") + +# Create DataFrame +data = [Row(id=11, name="John"), Row(id=12, name="Doe")] +df = spark.createDataFrame(data) + +# Write DataFrame to ClickHouse +df.writeTo("clickhouse.default.example_table").append() + + + +``` + + + + +```sql + -- resultTalbe is the Spark intermediate df we want to insert into clickhouse.default.example_table + INSERT INTO TABLE clickhouse.default.example_table + SELECT * FROM resultTable; + +``` + + + + +## DDL Operations + +You can perform DDL operations on your ClickHouse instance using SparkSQL, with all changes immediately persisted in +ClickHouse. +SparkSQL allows you to write queries exactly as you would in ClickHouse, +so you can directly execute commands such as CREATE TABLE, TRUNCATE, and more - without modification, for instance: + +```sql + +use clickhouse; + +CREATE TABLE test_db.tbl_sql ( + create_time TIMESTAMP NOT NULL, + m INT NOT NULL COMMENT 'part key', + id BIGINT NOT NULL COMMENT 'sort key', + value STRING +) USING ClickHouse +PARTITIONED BY (m) +TBLPROPERTIES ( + engine = 'MergeTree()', + order_by = 'id', + settings.index_granularity = 8192 +); +``` + +The above examples demonstrate SparkSQL queries, which you can run within your application using any API—Java, Scala, +PySpark, or shell. + +## Supported Data Types + +This section outlines the mapping of data types between Spark and ClickHouse. The tables below provide quick references +for converting data types when reading from ClickHouse into Spark and when inserting data from Spark into ClickHouse. + +### Reading data from ClickHouse into Spark + +| ClickHouse Data Type | Spark Data Type | Supported | Is Primitive | Notes | +|-------------------------------------------------------------------|--------------------------------|-----------|--------------|----------------------------------------------------| +| `Nothing` | `NullType` | ✅ | Yes | | +| `Bool` | `BooleanType` | ✅ | Yes | | +| `UInt8`, `Int16` | `ShortType` | ✅ | Yes | | +| `Int8` | `ByteType` | ✅ | Yes | | +| `UInt16`,`Int32` | `IntegerType` | ✅ | Yes | | +| `UInt32`,`Int64`, `UInt64` | `LongType` | ✅ | Yes | | +| `Int128`,`UInt128`, `Int256`, `UInt256` | `DecimalType(38, 0)` | ✅ | Yes | | +| `Float32` | `FloatType` | ✅ | Yes | | +| `Float64` | `DoubleType` | ✅ | Yes | | +| `String`, `JSON`, `UUID`, `Enum8`, `Enum16`, `IPv4`, `IPv6` | `StringType` | ✅ | Yes | | +| `FixedString` | `BinaryType`, `StringType` | ✅ | Yes | Controlled by configuration `READ_FIXED_STRING_AS` | +| `Decimal` | `DecimalType` | ✅ | Yes | Precision and scale up to `Decimal128` | +| `Decimal32` | `DecimalType(9, scale)` | ✅ | Yes | | +| `Decimal64` | `DecimalType(18, scale)` | ✅ | Yes | | +| `Decimal128` | `DecimalType(38, scale)` | ✅ | Yes | | +| `Date`, `Date32` | `DateType` | ✅ | Yes | | +| `DateTime`, `DateTime32`, `DateTime64` | `TimestampType` | ✅ | Yes | | +| `Array` | `ArrayType` | ✅ | No | Array element type is also converted | +| `Map` | `MapType` | ✅ | No | Keys are limited to `StringType` | +| `IntervalYear` | `YearMonthIntervalType(Year)` | ✅ | Yes | | +| `IntervalMonth` | `YearMonthIntervalType(Month)` | ✅ | Yes | | +| `IntervalDay`, `IntervalHour`, `IntervalMinute`, `IntervalSecond` | `DayTimeIntervalType` | ✅ | No | Specific interval type is used | +| `Object` | | ❌ | | | +| `Nested` | | ❌ | | | +| `Tuple` | | ❌ | | | +| `Point` | | ❌ | | | +| `Polygon` | | ❌ | | | +| `MultiPolygon` | | ❌ | | | +| `Ring` | | ❌ | | | +| `IntervalQuarter` | | ❌ | | | +| `IntervalWeek` | | ❌ | | | +| `Decimal256` | | ❌ | | | +| `AggregateFunction` | | ❌ | | | +| `SimpleAggregateFunction` | | ❌ | | | + +### Inserting data from Spark into ClickHouse + +| Spark Data Type | ClickHouse Data Type | Supported | Is Primitive | Notes | +|-------------------------------------|----------------------|-----------|--------------|----------------------------------------| +| `BooleanType` | `UInt8` | ✅ | Yes | | +| `ByteType` | `Int8` | ✅ | Yes | | +| `ShortType` | `Int16` | ✅ | Yes | | +| `IntegerType` | `Int32` | ✅ | Yes | | +| `LongType` | `Int64` | ✅ | Yes | | +| `FloatType` | `Float32` | ✅ | Yes | | +| `DoubleType` | `Float64` | ✅ | Yes | | +| `StringType` | `String` | ✅ | Yes | | +| `VarcharType` | `String` | ✅ | Yes | | +| `CharType` | `String` | ✅ | Yes | | +| `DecimalType` | `Decimal(p, s)` | ✅ | Yes | Precision and scale up to `Decimal128` | +| `DateType` | `Date` | ✅ | Yes | | +| `TimestampType` | `DateTime` | ✅ | Yes | | +| `ArrayType` (list, tuple, or array) | `Array` | ✅ | No | Array element type is also converted | +| `MapType` | `Map` | ✅ | No | Keys are limited to `StringType` | +| `Object` | | ❌ | | | +| `Nested` | | ❌ | | | + +## Contributing and Support + +If you'd like to contribute to the project or report any issues, we welcome your input! +Visit our [GitHub repository](https://github.com/ClickHouse/spark-clickhouse-connector) to open an issue, suggest +improvements, or submit a pull request. +Contributions are welcome! Please check the contribution guidelines in the repository before starting. +Thank you for helping improve our ClickHouse Spark connector! \ No newline at end of file diff --git a/sidebars.js b/sidebars.js index 4ae4809f7c1..eb7802c0aca 100644 --- a/sidebars.js +++ b/sidebars.js @@ -578,7 +578,18 @@ const sidebars = { }, "en/integrations/data-ingestion/etl-tools/dbt/index", "en/integrations/data-ingestion/etl-tools/fivetran/index", - "en/integrations/data-ingestion/apache-spark/index", + { + type: "category", + label: "Apache Spark", + className: "top-nav-item", + collapsed: true, + collapsible: true, + items: [ + "en/integrations/data-ingestion/apache-spark/index", + "en/integrations/data-ingestion/apache-spark/spark-native-connector", + "en/integrations/data-ingestion/apache-spark/spark-jdbc", + ], + }, "en/integrations/data-ingestion/aws-glue/index", "en/integrations/data-ingestion/insert-local-files", "en/integrations/data-ingestion/dbms/jdbc-with-clickhouse",