diff --git a/.nojekyll b/.nojekyll new file mode 100644 index 00000000..e69de29b diff --git a/404.html b/404.html new file mode 100644 index 00000000..fceaa194 --- /dev/null +++ b/404.html @@ -0,0 +1,785 @@ + + + +
+ + + + + + + + + + + + + + + +Put clickhouse-spark-runtime-3.3_2.12-0.7.2.jar
and
+clickhouse-jdbc-0.4.5-all.jar
into $SPARK_HOME/jars/
, then you don't need to bundle the jar
+into your Spark application, and --jar
is not required when using spark-shell
or spark-sql
(again, for SQL-only
+use cases, Apache Kyuubi is recommended for Production).
Persist catalog configurations into $SPARK_HOME/conf/spark-defaults.conf
, then --conf
s are not required when using
+spark-shell
or spark-sql
.
spark.sql.catalog.ck_01=xenon.clickhouse.ClickHouseCatalog
+spark.sql.catalog.ck_01.host=10.0.0.1
+spark.sql.catalog.ck_01.protocol=http
+spark.sql.catalog.ck_01.http_port=8123
+spark.sql.catalog.ck_01.user=app
+spark.sql.catalog.ck_01.password=pwd
+spark.sql.catalog.ck_01.database=default
+
+spark.sql.catalog.ck_02=xenon.clickhouse.ClickHouseCatalog
+spark.sql.catalog.ck_02.host=10.0.0.2
+spark.sql.catalog.ck_02.protocol=http
+spark.sql.catalog.ck_02.http_port=8123
+spark.sql.catalog.ck_02.user=app
+spark.sql.catalog.ck_02.password=pwd
+spark.sql.catalog.ck_02.database=default
+
+
+
+
+
+
+
+ Suppose you have one ClickHouse instance which installed on 10.0.0.1
and exposes HTTP on 8123
.
Edit $SPARK_HOME/conf/spark-defaults.conf
.
########################################
+## register a catalog named "clickhouse"
+########################################
+spark.sql.catalog.clickhouse xenon.clickhouse.ClickHouseCatalog
+
+################################################
+## basic configurations for "clickhouse" catalog
+################################################
+spark.sql.catalog.clickhouse.host 10.0.0.1
+spark.sql.catalog.clickhouse.protocol http
+spark.sql.catalog.clickhouse.http_port 8123
+spark.sql.catalog.clickhouse.user default
+spark.sql.catalog.clickhouse.password
+spark.sql.catalog.clickhouse.database default
+
+###############################################################
+## custom options of clickhouse-client for "clickhouse" catalog
+###############################################################
+spark.sql.catalog.clickhouse.option.async false
+spark.sql.catalog.clickhouse.option.client_name spark
+
+Then you can access ClickHouse table <ck_db>.<ck_table>
from Spark SQL by using clickhouse.<ck_db>.<ck_table>
.
For ClickHouse cluster, give an unique catalog name for each instances.
+Suppose you have two ClickHouse instances, one installed on 10.0.0.1
and exposes HTTP on port 8123
named
+clickhouse1, and another installed on 10.0.0.2
and exposes HTTP on port 8123
named clickhouse2.
Edit $SPARK_HOME/conf/spark-defaults.conf
.
spark.sql.catalog.clickhouse1 xenon.clickhouse.ClickHouseCatalog
+spark.sql.catalog.clickhouse1.host 10.0.0.1
+spark.sql.catalog.clickhouse1.protocol http
+spark.sql.catalog.clickhouse1.http_port 8123
+spark.sql.catalog.clickhouse1.user default
+spark.sql.catalog.clickhouse1.password
+spark.sql.catalog.clickhouse1.database default
+spark.sql.catalog.clickhouse1.option.async false
+
+spark.sql.catalog.clickhouse2 xenon.clickhouse.ClickHouseCatalog
+spark.sql.catalog.clickhouse2.host 10.0.0.2
+spark.sql.catalog.clickhouse2.protocol http
+spark.sql.catalog.clickhouse2.http_port 8123
+spark.sql.catalog.clickhouse2.user default
+spark.sql.catalog.clickhouse2.password
+spark.sql.catalog.clickhouse2.database default
+spark.sql.catalog.clickhouse2.option.async false
+
+Then you can access clickhouse1 table <ck_db>.<ck_table>
from Spark SQL by clickhouse1.<ck_db>.<ck_table>
,
+and access clickhouse2 table <ck_db>.<ck_table>
by clickhouse2.<ck_db>.<ck_table>
.
Key | +Default | +Description | +Since | +
---|---|---|---|
spark.clickhouse.ignoreUnsupportedTransform | +false | +ClickHouse supports using complex expressions as sharding keys or partition values, e.g. cityHash64(col_1, col_2) , and those can not be supported by Spark now. If true , ignore the unsupported expressions, otherwise fail fast w/ an exception. Note, when spark.clickhouse.write.distributed.convertLocal is enabled, ignore unsupported sharding keys may corrupt the data. |
+0.4.0 | +
spark.clickhouse.read.compression.codec | +lz4 | +The codec used to decompress data for reading. Supported codecs: none, lz4. | +0.5.0 | +
spark.clickhouse.read.distributed.convertLocal | +true | +When reading Distributed table, read local table instead of itself. If true , ignore spark.clickhouse.read.distributed.useClusterNodes . |
+0.1.0 | +
spark.clickhouse.read.format | +json | +Serialize format for reading. Supported formats: json, binary | +0.6.0 | +
spark.clickhouse.read.runtimeFilter.enabled | +false | +Enable runtime filter for reading. | +0.8.0 | +
spark.clickhouse.read.splitByPartitionId | +true | +If true , construct input partition filter by virtual column _partition_id , instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+ |
+0.4.0 | +
spark.clickhouse.useNullableQuerySchema | +false | +If true , mark all the fields of the query schema as nullable when executing CREATE/REPLACE TABLE ... AS SELECT ... on creating the table. Note, this configuration requires SPARK-43390(available in Spark 3.5), w/o this patch, it always acts as true . |
+0.8.0 | +
spark.clickhouse.write.batchSize | +10000 | +The number of records per batch on writing to ClickHouse. | +0.1.0 | +
spark.clickhouse.write.compression.codec | +lz4 | +The codec used to compress data for writing. Supported codecs: none, lz4. | +0.3.0 | +
spark.clickhouse.write.distributed.convertLocal | +false | +When writing Distributed table, write local table instead of itself. If true , ignore spark.clickhouse.write.distributed.useClusterNodes . |
+0.1.0 | +
spark.clickhouse.write.distributed.useClusterNodes | +true | +Write to all nodes of cluster when writing Distributed table. | +0.1.0 | +
spark.clickhouse.write.format | +arrow | +Serialize format for writing. Supported formats: json, arrow | +0.4.0 | +
spark.clickhouse.write.localSortByKey | +true | +If true , do local sort by sort keys before writing. |
+0.3.0 | +
spark.clickhouse.write.localSortByPartition | +If true , do local sort by partition before writing. If not set, it equals to spark.clickhouse.write.repartitionByPartition . |
+0.3.0 | +|
spark.clickhouse.write.maxRetry | +3 | +The maximum number of write we will retry for a single batch write failed with retryable codes. | +0.1.0 | +
spark.clickhouse.write.repartitionByPartition | +true | +Whether to repartition data by ClickHouse partition keys to meet the distributions of ClickHouse table before writing. | +0.3.0 | +
spark.clickhouse.write.repartitionNum | +0 | +Repartition data to meet the distributions of ClickHouse table is required before writing, use this conf to specific the repartition number, value less than 1 mean no requirement. | +0.1.0 | +
spark.clickhouse.write.repartitionStrictly | +false | +If true , Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as true . |
+0.3.0 | +
spark.clickhouse.write.retryInterval | +10s | +The interval in seconds between write retry. | +0.1.0 | +
spark.clickhouse.write.retryableErrorCodes | +241 | +The retryable error codes returned by ClickHouse server when write failing. | +0.1.0 | +
Suppose you have one ClickHouse instance which installed on 10.0.0.1
and exposes HTTP on 8123
.
Edit $SPARK_HOME/conf/spark-defaults.conf
.
########################################
+## register a catalog named "clickhouse"
+########################################
+spark.sql.catalog.clickhouse xenon.clickhouse.ClickHouseCatalog
+
+################################################
+## basic configurations for "clickhouse" catalog
+################################################
+spark.sql.catalog.clickhouse.host 10.0.0.1
+spark.sql.catalog.clickhouse.protocol http
+spark.sql.catalog.clickhouse.http_port 8123
+spark.sql.catalog.clickhouse.user default
+spark.sql.catalog.clickhouse.password
+spark.sql.catalog.clickhouse.database default
+
+###############################################################
+## custom options of clickhouse-client for "clickhouse" catalog
+###############################################################
+spark.sql.catalog.clickhouse.option.async false
+spark.sql.catalog.clickhouse.option.client_name spark
+
+Then you can access ClickHouse table <ck_db>.<ck_table>
from Spark SQL by using clickhouse.<ck_db>.<ck_table>
.
For ClickHouse cluster, give an unique catalog name for each instances.
+Suppose you have two ClickHouse instances, one installed on 10.0.0.1
and exposes HTTP on port 8123
named
+clickhouse1, and another installed on 10.0.0.2
and exposes HTTP on port 8123
named clickhouse2.
Edit $SPARK_HOME/conf/spark-defaults.conf
.
spark.sql.catalog.clickhouse1 xenon.clickhouse.ClickHouseCatalog
+spark.sql.catalog.clickhouse1.host 10.0.0.1
+spark.sql.catalog.clickhouse1.protocol http
+spark.sql.catalog.clickhouse1.http_port 8123
+spark.sql.catalog.clickhouse1.user default
+spark.sql.catalog.clickhouse1.password
+spark.sql.catalog.clickhouse1.database default
+spark.sql.catalog.clickhouse1.option.async false
+
+spark.sql.catalog.clickhouse2 xenon.clickhouse.ClickHouseCatalog
+spark.sql.catalog.clickhouse2.host 10.0.0.2
+spark.sql.catalog.clickhouse2.protocol http
+spark.sql.catalog.clickhouse2.http_port 8123
+spark.sql.catalog.clickhouse2.user default
+spark.sql.catalog.clickhouse2.password
+spark.sql.catalog.clickhouse2.database default
+spark.sql.catalog.clickhouse2.option.async false
+
+Then you can access clickhouse1 table <ck_db>.<ck_table>
from Spark SQL by clickhouse1.<ck_db>.<ck_table>
,
+and access clickhouse2 table <ck_db>.<ck_table>
by clickhouse2.<ck_db>.<ck_table>
.
SQL Configurations could be overwritten by SET <key>=<value>
in runtime.
Key | +Default | +Description | +Since | +
---|---|---|---|
spark.clickhouse.ignoreUnsupportedTransform | +false | +ClickHouse supports using complex expressions as sharding keys or partition values, e.g. cityHash64(col_1, col_2) , and those can not be supported by Spark now. If true , ignore the unsupported expressions, otherwise fail fast w/ an exception. Note, when spark.clickhouse.write.distributed.convertLocal is enabled, ignore unsupported sharding keys may corrupt the data. |
+0.4.0 | +
spark.clickhouse.read.compression.codec | +lz4 | +The codec used to decompress data for reading. Supported codecs: none, lz4. | +0.5.0 | +
spark.clickhouse.read.distributed.convertLocal | +true | +When reading Distributed table, read local table instead of itself. If true , ignore spark.clickhouse.read.distributed.useClusterNodes . |
+0.1.0 | +
spark.clickhouse.read.format | +json | +Serialize format for reading. Supported formats: json, binary | +0.6.0 | +
spark.clickhouse.read.runtimeFilter.enabled | +false | +Enable runtime filter for reading. | +0.8.0 | +
spark.clickhouse.read.splitByPartitionId | +true | +If true , construct input partition filter by virtual column _partition_id , instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+ |
+0.4.0 | +
spark.clickhouse.useNullableQuerySchema | +false | +If true , mark all the fields of the query schema as nullable when executing CREATE/REPLACE TABLE ... AS SELECT ... on creating the table. Note, this configuration requires SPARK-43390(available in Spark 3.5), w/o this patch, it always acts as true . |
+0.8.0 | +
spark.clickhouse.write.batchSize | +10000 | +The number of records per batch on writing to ClickHouse. | +0.1.0 | +
spark.clickhouse.write.compression.codec | +lz4 | +The codec used to compress data for writing. Supported codecs: none, lz4. | +0.3.0 | +
spark.clickhouse.write.distributed.convertLocal | +false | +When writing Distributed table, write local table instead of itself. If true , ignore spark.clickhouse.write.distributed.useClusterNodes . |
+0.1.0 | +
spark.clickhouse.write.distributed.useClusterNodes | +true | +Write to all nodes of cluster when writing Distributed table. | +0.1.0 | +
spark.clickhouse.write.format | +arrow | +Serialize format for writing. Supported formats: json, arrow | +0.4.0 | +
spark.clickhouse.write.localSortByKey | +true | +If true , do local sort by sort keys before writing. |
+0.3.0 | +
spark.clickhouse.write.localSortByPartition | +If true , do local sort by partition before writing. If not set, it equals to spark.clickhouse.write.repartitionByPartition . |
+0.3.0 | +|
spark.clickhouse.write.maxRetry | +3 | +The maximum number of write we will retry for a single batch write failed with retryable codes. | +0.1.0 | +
spark.clickhouse.write.repartitionByPartition | +true | +Whether to repartition data by ClickHouse partition keys to meet the distributions of ClickHouse table before writing. | +0.3.0 | +
spark.clickhouse.write.repartitionNum | +0 | +Repartition data to meet the distributions of ClickHouse table is required before writing, use this conf to specific the repartition number, value less than 1 mean no requirement. | +0.1.0 | +
spark.clickhouse.write.repartitionStrictly | +false | +If true , Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as true . |
+0.3.0 | +
spark.clickhouse.write.retryInterval | +10s | +The interval in seconds between write retry. | +0.1.0 | +
spark.clickhouse.write.retryableErrorCodes | +241 | +The retryable error codes returned by ClickHouse server when write failing. | +0.1.0 | +
Check out source code from GitHub
+git checkout https://github.com/housepower/spark-clickhouse-connector.git
+
+Build w/o test
+./gradlew clean build -x test
+
+Go to spark-3.3/clickhouse-spark-runtime/build/libs/
to find the output jar
+clickhouse-spark-runtime-3.3_2.12-0.8.0-SNAPSHOT.jar
.
The project leverage Testcontainers and Docker Compose +to do integration tests, you should install Docker and Docker Compose +before running test, and check more details on Testcontainers document if you'd +like to run test with remote Docker daemon.
+Run all test
+./gradlew clean test
Run single test
+./gradlew test --tests=ConvertDistToLocalWriteSuite
Test against custom ClickHouse image
+CLICKHOUSE_IMAGE=custom-org/clickhouse-server:custom-tag ./gradlew test
Follow the Python official document to install.
+pyenv
on macOS (optional)Optionally, recommend to manage Python environments by pyenv.
+Install from Homebrew
+brew install pyenv pyenv-virtualenv
+
+Setup in ~/.zshrc
eval "$(pyenv init -)"
+eval "$(pyenv virtualenv-init -)"
+
+Install virtualenv
pyenv install 3.9.13
+pyenv virtualenv 3.9.13 scc
+
+Localize virtualenv
pyenv local scc
+
+pip install -r requirements.txt
+
+mkdocs serve
+
+Open http://127.0.0.1:8000/ in browser.
+ + + + + + +Tip
+Internal Release means deploying to private Nexus Repository. Please make sure you are granted to access your +company private Nexus Repository.
+Configure Gradle in ~/.gradle/gradle.properties
.
mavenUser=xxx
+mavenPassword=xxx
+mavenReleasesRepo=xxx
+mavenSnapshotsRepo=xxx
+
+Modify version in version.txt
and docker/.env-dev
Publish to Maven Repository using ./gradlew publish
Notice
+Public Release means deploying to Maven Central. Only core team members are granted to deploy into Public Repository.
+Note
+Most of the steps for a public release are done by the GitHub workflow.
+The daily snapshot release is managed by Publish Snapshot +workflow, it is scheduled to be deployed at midnight every day.
+branch-0.3
;version.txt
and docker/.env-dev
, e.g. from 0.3.0-SNAPSHOT
to 0.3.0
;v0.3.0
, it will trigger the Publish Release
+ workflow; version.txt
and docker/.env-dev
, e.g. from 0.3.0
to 0.3.1-SNAPSHOT
;version.txt
and docker/.env-dev
, e.g. from 0.3.0-SNAPSHOT
to 0.4.0-SNAPSHOT
;Just emit step 1 and step 7 from feature release.
+ + + + + + +Spark ClickHouse Connector is a high performance connector build on top of Spark DataSource V2.
+ +For old versions, please refer the compatible matrix.
+Version | +Compatible Spark Versions | +ClickHouse JDBC version | +
---|---|---|
0.8.0 | +Spark 3.3, 3.4 | +0.4.6 | +
0.7.2 | +Spark 3.3, 3.4 | +0.4.6 | +
0.6.0 | +Spark 3.3 | +0.3.2-patch11 | +
0.5.0 | +Spark 3.2, 3.3 | +0.3.2-patch11 | +
0.4.0 | +Spark 3.2, 3.3 | +Not depend on | +
0.3.0 | +Spark 3.2, 3.3 | +Not depend on | +
0.2.1 | +Spark 3.2 | +Not depend on | +
0.1.2 | +Spark 3.2 | +Not depend on | +
One important end user facing feature of DataSource V2 is supporting of multi-catalogs.
+In the early stage of Spark, it does not have catalog concept, usually, user uses Hive Metastore or Glue to +manage table metadata, hence user must register external DataSource tables in the centralized metastore before +accessing.
+In the centralized metastore model, a table is identified by <database>.<table>
.
For example, register a MySQL table into metastore, then access it using Spark SQL.
+CREATE TABLE <db>.<tbl>
+USING org.apache.spark.sql.jdbc
+OPTIONS (
+ url "jdbc:mysql://<mysql_host>:<mysql_port>",
+ dbtable "<mysql_db>.<mysql_tbl>",
+ user "<mysql_username>",
+ password "<mysql_password>"
+);
+
+SELECT * FROM <db>.<tbl>;
+INSERT INTO <db>.<tbl> SELECT ...
+
+Things changed in DataSource V2, starting from Spark 3.0, catalog concept is introduced to allow Spark to discover +tables automatically by registering catalog plugins.
+The default catalog has a fixed name spark_catalog
, and typically, a table is identified by <catalog>.<database>.<table>
.
For example, we can register a PostgreSQL database as Spark catalog named pg
, and access it using Spark SQL.
# spark-defaults.conf
+spark.sql.catalog.pg=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
+spark.sql.catalog.pg.url=jdbc:postgresql://<pg_host>:<pg_host>/<pg_db>
+spark.sql.catalog.pg.driver=org.postgresql.Driver
+spark.sql.catalog.pg.user=<pg_username>
+spark.sql.catalog.pg.password=<pg_password>
+
+SELECT * FROM pg.<db>.<tbl>;
+INSERT INTO pg.<db>.<tbl> SELECT ...
+
+
+
+
+
+
+
+ Spark supports push down the processing of queries, or parts of queries, into the connected data source. This means that +a specific predicate, aggregation function, or other operation, could be passed through to ClickHouse for processing.
+The results of this push down can include the following benefits:
+Improved overall query performance
+Reduced network traffic between Spark and ClickHouse
+Reduced load on ClickHouse
+These benefits often result in significant cost reduction.
+The connector implements most push down interfaces defined by DataSource V2, such as SupportsPushDownLimit
,
+SupportsPushDownFilters
, SupportsPushDownAggregates
, SupportsPushDownRequiredColumns
.
The below example shows how SupportsPushDownAggregates
and SupportsPushDownRequiredColumns
work.
Sort merge join is a general solution for two large table inner join, it requires two table shuffle by join key first, +then do local sort by join key in each data partition, finally do stream-stream like look up to get the final result.
+In some cases, the tables store collocated by join keys, w/ +Storage-Partitioned Join(or V2 Bucket Join), Spark could leverage +the existing ClickHouse table layout to eliminate the expensive shuffle and sort operations.
+ + + + + + + + +As we know, the ClickHouse MergeTree
is a LSM-like format, it's not optimized for frequent and random record insertion,
+batch append operation is recommended for large amount of data ingestion.
So, to achieve better performance, we should re-organize the DataFrame
to fit ClickHouse data layout before inserting.
SPARK-23889 allows data source connector to expose sorting and
+clustering requirements of DataFrame
before writing. By default, for Distributed
table, this connector requires the
+DataFrame
clustered by [sharding keys, partition keys]
and sorted by [sharding keys, partition keys, ordering keys]
;
+for normal *MergeTree
table, this connector requires the DataFrame
sorted by [partition keys, ordering keys]
and
+sorted by [partition keys, ordering keys]
.
Warning
+Limitation: Spark does NOT support expressions in sharding keys and partition keys w/o +SPARK-39607.
+In some cases, the strict data distribution requirements may lead small parallelism and data skew, and finally cause +bad performance. SPARK-37523(requires Spark 3.4+) is introduced to +allow relaxing the data distribution requirements to overcome those shortages.
+Also, you can consider disabling some configurations like
+spark.clickhouse.write.repartitionByPartition
to avoid such performance degradation.
In high level, Spark ClickHouse Connector is a connector build on top of Spark DataSource V2 and +ClickHouse HTTP protocol.
+ + + + + + + +The name pattern of binary jar is
+clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar
+
+you can find all available released jars under Maven Central Repository +and all daily build SNAPSHOT jars under Sonatype OSS Snapshots Repository.
+dependencies {
+ implementation("com.github.housepower:clickhouse-spark-runtime-3.3_2.12:0.7.2")
+ implementation("com.clickhouse:clickhouse-jdbc:0.4.5:all") { transitive = false }
+}
+
+Add the following repository if you want to use SNAPSHOT version.
+repositries {
+ maven { url = "https://oss.sonatype.org/content/repositories/snapshots" }
+}
+
+<dependency>
+ <groupId>com.github.housepower</groupId>
+ <artifactId>clickhouse-spark-runtime-3.3_2.12</artifactId>
+ <version>0.7.2</version>
+</dependency>
+<dependency>
+ <groupId>com.clickhouse</groupId>
+ <artifactId>clickhouse-jdbc</artifactId>
+ <classifier>all</classifier>
+ <version>0.4.5</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+</dependency>
+
+Add the following repository if you want to use SNAPSHOT version.
+<repositories>
+ <repository>
+ <id>sonatype-oss-snapshots</id>
+ <name>Sonatype OSS Snapshots Repository</name>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ </repository>
+</repositories>
+
+
+
+
+
+
+
+ Note: For SQL-only use cases, Apache Kyuubi is recommended +for Production.
+$SPARK_HOME/bin/spark-sql \
+ --conf spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog \
+ --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
+ --conf spark.sql.catalog.clickhouse.protocol=http \
+ --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
+ --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
+ --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
+ --conf spark.sql.catalog.clickhouse.database=default \
+ --jars /path/clickhouse-spark-runtime-3.3_2.12:0.7.2.jar,/path/clickhouse-jdbc-0.4.5-all.jar
+
+The following argument
+ --jars /path/clickhouse-spark-runtime-3.3_2.12:0.7.2.jar,/path/clickhouse-jdbc-0.4.5-all.jar
+
+can be replaced by
+ --repositories https://{maven-cental-mirror or private-nexus-repo} \
+ --packages com.github.housepower:clickhouse-spark-runtime-3.3_2.12:0.7.2,com.clickhouse:clickhouse-jdbc:0.4.5:all
+
+to avoid copying jar to your Spark client node.
+Basic operations, e.g. create database, create table, write table, read table, etc.
+spark-sql> use clickhouse;
+Time taken: 0.016 seconds
+
+spark-sql> create database if not exists test_db;
+Time taken: 0.022 seconds
+
+spark-sql> show databases;
+default
+system
+test_db
+Time taken: 0.289 seconds, Fetched 3 row(s)
+
+spark-sql> CREATE TABLE test_db.tbl_sql (
+ > create_time TIMESTAMP NOT NULL,
+ > m INT NOT NULL COMMENT 'part key',
+ > id BIGINT NOT NULL COMMENT 'sort key',
+ > value STRING
+ > ) USING ClickHouse
+ > PARTITIONED BY (m)
+ > TBLPROPERTIES (
+ > engine = 'MergeTree()',
+ > order_by = 'id',
+ > settings.index_granularity = 8192
+ > );
+Time taken: 0.242 seconds
+
+spark-sql> insert into test_db.tbl_sql values
+ > (timestamp'2021-01-01 10:10:10', 1, 1L, '1'),
+ > (timestamp'2022-02-02 10:10:10', 2, 2L, '2')
+ > as tabl(create_time, m, id, value);
+Time taken: 0.276 seconds
+
+spark-sql> select * from test_db.tbl_sql;
+2021-01-01 10:10:10 1 1 1
+2022-02-02 10:10:10 2 2 2
+Time taken: 0.116 seconds, Fetched 2 row(s)
+
+spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
+Time taken: 1.028 seconds
+
+spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
+Time taken: 0.462 seconds
+
+spark-sql> select count(*) from test_db.tbl_sql;
+6
+Time taken: 1.421 seconds, Fetched 1 row(s)
+
+spark-sql> select * from test_db.tbl_sql;
+2021-01-01 10:10:10 1 1 1
+2021-01-01 10:10:10 1 1 1
+2021-01-01 10:10:10 1 1 1
+2022-02-02 10:10:10 2 2 2
+2022-02-02 10:10:10 2 2 2
+2022-02-02 10:10:10 2 2 2
+Time taken: 0.123 seconds, Fetched 6 row(s)
+
+spark-sql> delete from test_db.tbl_sql where id = 1;
+Time taken: 0.129 seconds
+
+spark-sql> select * from test_db.tbl_sql;
+2022-02-02 10:10:10 2 2 2
+2022-02-02 10:10:10 2 2 2
+2022-02-02 10:10:10 2 2 2
+Time taken: 0.101 seconds, Fetched 3 row(s)
+
+
+
+
+
+
+
+ $SPARK_HOME/bin/spark-shell \
+ --conf spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog \
+ --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
+ --conf spark.sql.catalog.clickhouse.protocol=http \
+ --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
+ --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
+ --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
+ --conf spark.sql.catalog.clickhouse.database=default \
+ --jars /path/clickhouse-spark-runtime-3.3_2.12:0.7.2.jar,/path/clickhouse-jdbc-0.4.5-all.jar
+
+The following argument
+ --jars /path/clickhouse-spark-runtime-3.3_2.12:0.7.2.jar,/path/clickhouse-jdbc-0.4.5-all.jar
+
+can be replaced by
+ --repositories https://{maven-cental-mirror or private-nexus-repo} \
+ --packages com.github.housepower:clickhouse-spark-runtime-3.3_2.12:0.7.2,com.clickhouse:clickhouse-jdbc:0.4.5:all
+
+to avoid copying jar to your Spark client node.
+Basic operations, e.g. create database, create table, write table, read table, etc.
+scala> spark.sql("use clickhouse")
+res0: org.apache.spark.sql.DataFrame = []
+
+scala> spark.sql("create database test_db")
+res1: org.apache.spark.sql.DataFrame = []
+
+scala> spark.sql("show databases").show
++---------+
+|namespace|
++---------+
+| default|
+| system|
+| test_db|
++---------+
+
+scala> spark.sql("""
+ | CREATE TABLE test_db.tbl (
+ | create_time TIMESTAMP NOT NULL,
+ | m INT NOT NULL COMMENT 'part key',
+ | id BIGINT NOT NULL COMMENT 'sort key',
+ | value STRING
+ | ) USING ClickHouse
+ | PARTITIONED BY (m)
+ | TBLPROPERTIES (
+ | engine = 'MergeTree()',
+ | order_by = 'id',
+ | settings.index_granularity = 8192
+ | )
+ | """)
+res2: org.apache.spark.sql.DataFrame = []
+
+scala> :paste
+// Entering paste mode (ctrl-D to finish)
+
+spark.createDataFrame(Seq(
+ ("2021-01-01 10:10:10", 1L, "1"),
+ ("2022-02-02 10:10:10", 2L, "2")
+)).toDF("create_time", "id", "value")
+ .withColumn("create_time", to_timestamp($"create_time"))
+ .withColumn("m", month($"create_time"))
+ .select($"create_time", $"m", $"id", $"value")
+ .writeTo("test_db.tbl")
+ .append
+
+// Exiting paste mode, now interpreting.
+
+scala> spark.table("test_db.tbl").show
++-------------------+---+---+-----+
+| create_time| m| id|value|
++-------------------+---+---+-----+
+|2021-01-01 10:10:10| 1| 1| 1|
+|2022-02-02 10:10:10| 2| 2| 2|
++-------------------+---+---+-----+
+
+scala> spark.sql("DELETE FROM test_db.tbl WHERE id=1")
+res3: org.apache.spark.sql.DataFrame = []
+
+scala> spark.table("test_db.tbl").show
++-------------------+---+---+-----+
+| create_time| m| id|value|
++-------------------+---+---+-----+
+|2022-02-02 10:10:10| 2| 2| 2|
++-------------------+---+---+-----+
+
+Execute ClickHouse native SQL.
+scala> val options = Map(
+ | "host" -> "clickhouse",
+ | "protocol" -> "http",
+ | "http_port" -> "8123",
+ | "user" -> "default",
+ | "password" -> ""
+ | )
+
+scala> val sql = """
+ | |CREATE TABLE test_db.person (
+ | | id Int64,
+ | | name String,
+ | | age Nullable(Int32)
+ | |)
+ | |ENGINE = MergeTree()
+ | |ORDER BY id
+ | """.stripMargin
+
+scala> spark.executeCommand("xenon.clickhouse.ClickHouseCommandRunner", sql, options)
+
+scala> spark.sql("show tables in clickhouse_s1r1.test_db").show
++---------+---------+-----------+
+|namespace|tableName|isTemporary|
++---------+---------+-----------+
+| test_db| person| false|
++---------+---------+-----------+
+
+scala> spark.table("clickhouse_s1r1.test_db.person").printSchema
+root
+ |-- id: long (nullable = false)
+ |-- name: string (nullable = false)
+ |-- age: integer (nullable = true)
+
+
+
+
+
+
+
+ Spark ClickHouse Connector is a high performance connector build on top of Spark DataSource V2.
"},{"location":"#requirements","title":"Requirements","text":"For old versions, please refer the compatible matrix.
Version Compatible Spark Versions ClickHouse JDBC version 0.8.0 Spark 3.3, 3.4 0.4.6 0.7.2 Spark 3.3, 3.4 0.4.6 0.6.0 Spark 3.3 0.3.2-patch11 0.5.0 Spark 3.2, 3.3 0.3.2-patch11 0.4.0 Spark 3.2, 3.3 Not depend on 0.3.0 Spark 3.2, 3.3 Not depend on 0.2.1 Spark 3.2 Not depend on 0.1.2 Spark 3.2 Not depend on"},{"location":"best_practices/","title":"TODO","text":""},{"location":"best_practices/01_deployment/","title":"Deployment","text":""},{"location":"best_practices/01_deployment/#jar","title":"Jar","text":"Put clickhouse-spark-runtime-3.3_2.12-0.7.2.jar
and clickhouse-jdbc-0.4.5-all.jar
into $SPARK_HOME/jars/
, then you don't need to bundle the jar into your Spark application, and --jar
is not required when using spark-shell
or spark-sql
(again, for SQL-only use cases, Apache Kyuubi is recommended for Production).
Persist catalog configurations into $SPARK_HOME/conf/spark-defaults.conf
, then --conf
s are not required when using spark-shell
or spark-sql
.
spark.sql.catalog.ck_01=xenon.clickhouse.ClickHouseCatalog\nspark.sql.catalog.ck_01.host=10.0.0.1\nspark.sql.catalog.ck_01.protocol=http\nspark.sql.catalog.ck_01.http_port=8123\nspark.sql.catalog.ck_01.user=app\nspark.sql.catalog.ck_01.password=pwd\nspark.sql.catalog.ck_01.database=default\n\nspark.sql.catalog.ck_02=xenon.clickhouse.ClickHouseCatalog\nspark.sql.catalog.ck_02.host=10.0.0.2\nspark.sql.catalog.ck_02.protocol=http\nspark.sql.catalog.ck_02.http_port=8123\nspark.sql.catalog.ck_02.user=app\nspark.sql.catalog.ck_02.password=pwd\nspark.sql.catalog.ck_02.database=default\n
"},{"location":"configurations/","title":"Configurations","text":""},{"location":"configurations/#catalog-configurations","title":"Catalog Configurations","text":""},{"location":"configurations/#single-instance","title":"Single Instance","text":"Suppose you have one ClickHouse instance which installed on 10.0.0.1
and exposes HTTP on 8123
.
Edit $SPARK_HOME/conf/spark-defaults.conf
.
########################################\n## register a catalog named \"clickhouse\"\n########################################\nspark.sql.catalog.clickhouse xenon.clickhouse.ClickHouseCatalog\n\n################################################\n## basic configurations for \"clickhouse\" catalog\n################################################\nspark.sql.catalog.clickhouse.host 10.0.0.1\nspark.sql.catalog.clickhouse.protocol http\nspark.sql.catalog.clickhouse.http_port 8123\nspark.sql.catalog.clickhouse.user default\nspark.sql.catalog.clickhouse.password\nspark.sql.catalog.clickhouse.database default\n\n###############################################################\n## custom options of clickhouse-client for \"clickhouse\" catalog\n###############################################################\nspark.sql.catalog.clickhouse.option.async false\nspark.sql.catalog.clickhouse.option.client_name spark\n
Then you can access ClickHouse table <ck_db>.<ck_table>
from Spark SQL by using clickhouse.<ck_db>.<ck_table>
.
For ClickHouse cluster, give an unique catalog name for each instances.
Suppose you have two ClickHouse instances, one installed on 10.0.0.1
and exposes HTTP on port 8123
named clickhouse1, and another installed on 10.0.0.2
and exposes HTTP on port 8123
named clickhouse2.
Edit $SPARK_HOME/conf/spark-defaults.conf
.
spark.sql.catalog.clickhouse1 xenon.clickhouse.ClickHouseCatalog\nspark.sql.catalog.clickhouse1.host 10.0.0.1\nspark.sql.catalog.clickhouse1.protocol http\nspark.sql.catalog.clickhouse1.http_port 8123\nspark.sql.catalog.clickhouse1.user default\nspark.sql.catalog.clickhouse1.password\nspark.sql.catalog.clickhouse1.database default\nspark.sql.catalog.clickhouse1.option.async false\n\nspark.sql.catalog.clickhouse2 xenon.clickhouse.ClickHouseCatalog\nspark.sql.catalog.clickhouse2.host 10.0.0.2\nspark.sql.catalog.clickhouse2.protocol http\nspark.sql.catalog.clickhouse2.http_port 8123\nspark.sql.catalog.clickhouse2.user default\nspark.sql.catalog.clickhouse2.password\nspark.sql.catalog.clickhouse2.database default\nspark.sql.catalog.clickhouse2.option.async false\n
Then you can access clickhouse1 table <ck_db>.<ck_table>
from Spark SQL by clickhouse1.<ck_db>.<ck_table>
, and access clickhouse2 table <ck_db>.<ck_table>
by clickhouse2.<ck_db>.<ck_table>
.
SQL Configurations could be overwritten by SET <key>=<value>
in runtime.
cityHash64(col_1, col_2)
, and those can not be supported by Spark now. If true
, ignore the unsupported expressions, otherwise fail fast w/ an exception. Note, when spark.clickhouse.write.distributed.convertLocal
is enabled, ignore unsupported sharding keys may corrupt the data. 0.4.0 spark.clickhouse.read.compression.codec lz4 The codec used to decompress data for reading. Supported codecs: none, lz4. 0.5.0 spark.clickhouse.read.distributed.convertLocal true When reading Distributed table, read local table instead of itself. If true
, ignore spark.clickhouse.read.distributed.useClusterNodes
. 0.1.0 spark.clickhouse.read.format json Serialize format for reading. Supported formats: json, binary 0.6.0 spark.clickhouse.read.runtimeFilter.enabled false Enable runtime filter for reading. 0.8.0 spark.clickhouse.read.splitByPartitionId true If true
, construct input partition filter by virtual column _partition_id
, instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+ 0.4.0 spark.clickhouse.useNullableQuerySchema false If true
, mark all the fields of the query schema as nullable when executing CREATE/REPLACE TABLE ... AS SELECT ...
on creating the table. Note, this configuration requires SPARK-43390(available in Spark 3.5), w/o this patch, it always acts as true
. 0.8.0 spark.clickhouse.write.batchSize 10000 The number of records per batch on writing to ClickHouse. 0.1.0 spark.clickhouse.write.compression.codec lz4 The codec used to compress data for writing. Supported codecs: none, lz4. 0.3.0 spark.clickhouse.write.distributed.convertLocal false When writing Distributed table, write local table instead of itself. If true
, ignore spark.clickhouse.write.distributed.useClusterNodes
. 0.1.0 spark.clickhouse.write.distributed.useClusterNodes true Write to all nodes of cluster when writing Distributed table. 0.1.0 spark.clickhouse.write.format arrow Serialize format for writing. Supported formats: json, arrow 0.4.0 spark.clickhouse.write.localSortByKey true If true
, do local sort by sort keys before writing. 0.3.0 spark.clickhouse.write.localSortByPartition If true
, do local sort by partition before writing. If not set, it equals to spark.clickhouse.write.repartitionByPartition
. 0.3.0 spark.clickhouse.write.maxRetry 3 The maximum number of write we will retry for a single batch write failed with retryable codes. 0.1.0 spark.clickhouse.write.repartitionByPartition true Whether to repartition data by ClickHouse partition keys to meet the distributions of ClickHouse table before writing. 0.3.0 spark.clickhouse.write.repartitionNum 0 Repartition data to meet the distributions of ClickHouse table is required before writing, use this conf to specific the repartition number, value less than 1 mean no requirement. 0.1.0 spark.clickhouse.write.repartitionStrictly false If true
, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as true
. 0.3.0 spark.clickhouse.write.retryInterval 10s The interval in seconds between write retry. 0.1.0 spark.clickhouse.write.retryableErrorCodes 241 The retryable error codes returned by ClickHouse server when write failing. 0.1.0"},{"location":"configurations/01_catalog_configurations/","title":"01 catalog configurations","text":""},{"location":"configurations/01_catalog_configurations/#single-instance","title":"Single Instance","text":"Suppose you have one ClickHouse instance which installed on 10.0.0.1
and exposes HTTP on 8123
.
Edit $SPARK_HOME/conf/spark-defaults.conf
.
########################################\n## register a catalog named \"clickhouse\"\n########################################\nspark.sql.catalog.clickhouse xenon.clickhouse.ClickHouseCatalog\n\n################################################\n## basic configurations for \"clickhouse\" catalog\n################################################\nspark.sql.catalog.clickhouse.host 10.0.0.1\nspark.sql.catalog.clickhouse.protocol http\nspark.sql.catalog.clickhouse.http_port 8123\nspark.sql.catalog.clickhouse.user default\nspark.sql.catalog.clickhouse.password\nspark.sql.catalog.clickhouse.database default\n\n###############################################################\n## custom options of clickhouse-client for \"clickhouse\" catalog\n###############################################################\nspark.sql.catalog.clickhouse.option.async false\nspark.sql.catalog.clickhouse.option.client_name spark\n
Then you can access ClickHouse table <ck_db>.<ck_table>
from Spark SQL by using clickhouse.<ck_db>.<ck_table>
.
For ClickHouse cluster, give an unique catalog name for each instances.
Suppose you have two ClickHouse instances, one installed on 10.0.0.1
and exposes HTTP on port 8123
named clickhouse1, and another installed on 10.0.0.2
and exposes HTTP on port 8123
named clickhouse2.
Edit $SPARK_HOME/conf/spark-defaults.conf
.
spark.sql.catalog.clickhouse1 xenon.clickhouse.ClickHouseCatalog\nspark.sql.catalog.clickhouse1.host 10.0.0.1\nspark.sql.catalog.clickhouse1.protocol http\nspark.sql.catalog.clickhouse1.http_port 8123\nspark.sql.catalog.clickhouse1.user default\nspark.sql.catalog.clickhouse1.password\nspark.sql.catalog.clickhouse1.database default\nspark.sql.catalog.clickhouse1.option.async false\n\nspark.sql.catalog.clickhouse2 xenon.clickhouse.ClickHouseCatalog\nspark.sql.catalog.clickhouse2.host 10.0.0.2\nspark.sql.catalog.clickhouse2.protocol http\nspark.sql.catalog.clickhouse2.http_port 8123\nspark.sql.catalog.clickhouse2.user default\nspark.sql.catalog.clickhouse2.password\nspark.sql.catalog.clickhouse2.database default\nspark.sql.catalog.clickhouse2.option.async false\n
Then you can access clickhouse1 table <ck_db>.<ck_table>
from Spark SQL by clickhouse1.<ck_db>.<ck_table>
, and access clickhouse2 table <ck_db>.<ck_table>
by clickhouse2.<ck_db>.<ck_table>
.
cityHash64(col_1, col_2)
, and those can not be supported by Spark now. If true
, ignore the unsupported expressions, otherwise fail fast w/ an exception. Note, when spark.clickhouse.write.distributed.convertLocal
is enabled, ignore unsupported sharding keys may corrupt the data. 0.4.0 spark.clickhouse.read.compression.codec lz4 The codec used to decompress data for reading. Supported codecs: none, lz4. 0.5.0 spark.clickhouse.read.distributed.convertLocal true When reading Distributed table, read local table instead of itself. If true
, ignore spark.clickhouse.read.distributed.useClusterNodes
. 0.1.0 spark.clickhouse.read.format json Serialize format for reading. Supported formats: json, binary 0.6.0 spark.clickhouse.read.runtimeFilter.enabled false Enable runtime filter for reading. 0.8.0 spark.clickhouse.read.splitByPartitionId true If true
, construct input partition filter by virtual column _partition_id
, instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+ 0.4.0 spark.clickhouse.useNullableQuerySchema false If true
, mark all the fields of the query schema as nullable when executing CREATE/REPLACE TABLE ... AS SELECT ...
on creating the table. Note, this configuration requires SPARK-43390(available in Spark 3.5), w/o this patch, it always acts as true
. 0.8.0 spark.clickhouse.write.batchSize 10000 The number of records per batch on writing to ClickHouse. 0.1.0 spark.clickhouse.write.compression.codec lz4 The codec used to compress data for writing. Supported codecs: none, lz4. 0.3.0 spark.clickhouse.write.distributed.convertLocal false When writing Distributed table, write local table instead of itself. If true
, ignore spark.clickhouse.write.distributed.useClusterNodes
. 0.1.0 spark.clickhouse.write.distributed.useClusterNodes true Write to all nodes of cluster when writing Distributed table. 0.1.0 spark.clickhouse.write.format arrow Serialize format for writing. Supported formats: json, arrow 0.4.0 spark.clickhouse.write.localSortByKey true If true
, do local sort by sort keys before writing. 0.3.0 spark.clickhouse.write.localSortByPartition If true
, do local sort by partition before writing. If not set, it equals to spark.clickhouse.write.repartitionByPartition
. 0.3.0 spark.clickhouse.write.maxRetry 3 The maximum number of write we will retry for a single batch write failed with retryable codes. 0.1.0 spark.clickhouse.write.repartitionByPartition true Whether to repartition data by ClickHouse partition keys to meet the distributions of ClickHouse table before writing. 0.3.0 spark.clickhouse.write.repartitionNum 0 Repartition data to meet the distributions of ClickHouse table is required before writing, use this conf to specific the repartition number, value less than 1 mean no requirement. 0.1.0 spark.clickhouse.write.repartitionStrictly false If true
, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as true
. 0.3.0 spark.clickhouse.write.retryInterval 10s The interval in seconds between write retry. 0.1.0 spark.clickhouse.write.retryableErrorCodes 241 The retryable error codes returned by ClickHouse server when write failing. 0.1.0"},{"location":"developers/","title":"TODO","text":""},{"location":"developers/01_build_and_test/","title":"Build and Test","text":""},{"location":"developers/01_build_and_test/#build","title":"Build","text":"Check out source code from GitHub
git checkout https://github.com/housepower/spark-clickhouse-connector.git\n
Build w/o test
./gradlew clean build -x test\n
Go to spark-3.3/clickhouse-spark-runtime/build/libs/
to find the output jar clickhouse-spark-runtime-3.3_2.12-0.8.0-SNAPSHOT.jar
.
The project leverage Testcontainers and Docker Compose to do integration tests, you should install Docker and Docker Compose before running test, and check more details on Testcontainers document if you'd like to run test with remote Docker daemon.
Run all test
./gradlew clean test
Run single test
./gradlew test --tests=ConvertDistToLocalWriteSuite
Test against custom ClickHouse image
CLICKHOUSE_IMAGE=custom-org/clickhouse-server:custom-tag ./gradlew test
Follow the Python official document to install.
"},{"location":"developers/02_docs_and_website/#setup-pyenv-on-macos-optional","title":"Setuppyenv
on macOS (optional)","text":"Optionally, recommend to manage Python environments by pyenv.
Install from Homebrew
brew install pyenv pyenv-virtualenv\n
Setup in ~/.zshrc
eval \"$(pyenv init -)\"\neval \"$(pyenv virtualenv-init -)\"\n
Install virtualenv
pyenv install 3.9.13\npyenv virtualenv 3.9.13 scc\n
Localize virtualenv
pyenv local scc\n
"},{"location":"developers/02_docs_and_website/#install-dependencies","title":"Install dependencies","text":"pip install -r requirements.txt\n
"},{"location":"developers/02_docs_and_website/#preview-website","title":"Preview website","text":"mkdocs serve\n
Open http://127.0.0.1:8000/ in browser.
"},{"location":"developers/03_private_release/","title":"Private Release","text":"Tip
Internal Release means deploying to private Nexus Repository. Please make sure you are granted to access your company private Nexus Repository.
"},{"location":"developers/03_private_release/#repository-and-authentication","title":"Repository and Authentication","text":"Configure Gradle in ~/.gradle/gradle.properties
.
mavenUser=xxx\nmavenPassword=xxx\nmavenReleasesRepo=xxx\nmavenSnapshotsRepo=xxx\n
"},{"location":"developers/03_private_release/#upgrade-version","title":"Upgrade Version","text":"Modify version in version.txt
and docker/.env-dev
Publish to Maven Repository using ./gradlew publish
Notice
Public Release means deploying to Maven Central. Only core team members are granted to deploy into Public Repository.
Note
Most of the steps for a public release are done by the GitHub workflow.
"},{"location":"developers/04_public_release/#snapshot-release","title":"Snapshot Release","text":"The daily snapshot release is managed by Publish Snapshot workflow, it is scheduled to be deployed at midnight every day.
"},{"location":"developers/04_public_release/#feature-release","title":"Feature Release","text":"branch-0.3
;version.txt
and docker/.env-dev
, e.g. from 0.3.0-SNAPSHOT
to 0.3.0
;v0.3.0
, it will trigger the Publish Release workflow; version.txt
and docker/.env-dev
, e.g. from 0.3.0
to 0.3.1-SNAPSHOT
;version.txt
and docker/.env-dev
, e.g. from 0.3.0-SNAPSHOT
to 0.4.0-SNAPSHOT
;Just emit step 1 and step 7 from feature release.
"},{"location":"internals/","title":"Overview Design","text":"In high level, Spark ClickHouse Connector is a connector build on top of Spark DataSource V2 and ClickHouse HTTP protocol.
"},{"location":"internals/01_catalog/","title":"Catalog Management","text":"One important end user facing feature of DataSource V2 is supporting of multi-catalogs.
In the early stage of Spark, it does not have catalog concept, usually, user uses Hive Metastore or Glue to manage table metadata, hence user must register external DataSource tables in the centralized metastore before accessing.
In the centralized metastore model, a table is identified by <database>.<table>
.
For example, register a MySQL table into metastore, then access it using Spark SQL.
CREATE TABLE <db>.<tbl>\nUSING org.apache.spark.sql.jdbc\nOPTIONS (\n url \"jdbc:mysql://<mysql_host>:<mysql_port>\",\n dbtable \"<mysql_db>.<mysql_tbl>\",\n user \"<mysql_username>\",\n password \"<mysql_password>\"\n);\n
SELECT * FROM <db>.<tbl>;\nINSERT INTO <db>.<tbl> SELECT ...\n
Things changed in DataSource V2, starting from Spark 3.0, catalog concept is introduced to allow Spark to discover tables automatically by registering catalog plugins.
The default catalog has a fixed name spark_catalog
, and typically, a table is identified by <catalog>.<database>.<table>
.
For example, we can register a PostgreSQL database as Spark catalog named pg
, and access it using Spark SQL.
# spark-defaults.conf\nspark.sql.catalog.pg=org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog\nspark.sql.catalog.pg.url=jdbc:postgresql://<pg_host>:<pg_host>/<pg_db>\nspark.sql.catalog.pg.driver=org.postgresql.Driver\nspark.sql.catalog.pg.user=<pg_username>\nspark.sql.catalog.pg.password=<pg_password>\n
SELECT * FROM pg.<db>.<tbl>;\nINSERT INTO pg.<db>.<tbl> SELECT ...\n
"},{"location":"internals/02_read/","title":"How reading of the connector works?","text":""},{"location":"internals/02_read/#push-down","title":"Push Down","text":"Spark supports push down the processing of queries, or parts of queries, into the connected data source. This means that a specific predicate, aggregation function, or other operation, could be passed through to ClickHouse for processing.
The results of this push down can include the following benefits:
Improved overall query performance
Reduced network traffic between Spark and ClickHouse
Reduced load on ClickHouse
These benefits often result in significant cost reduction.
The connector implements most push down interfaces defined by DataSource V2, such as SupportsPushDownLimit
, SupportsPushDownFilters
, SupportsPushDownAggregates
, SupportsPushDownRequiredColumns
.
The below example shows how SupportsPushDownAggregates
and SupportsPushDownRequiredColumns
work.
Push Down disabled
Push Down enabled"},{"location":"internals/02_read/#bucket-join","title":"Bucket Join","text":"
Sort merge join is a general solution for two large table inner join, it requires two table shuffle by join key first, then do local sort by join key in each data partition, finally do stream-stream like look up to get the final result.
In some cases, the tables store collocated by join keys, w/ Storage-Partitioned Join(or V2 Bucket Join), Spark could leverage the existing ClickHouse table layout to eliminate the expensive shuffle and sort operations.
Sort Merge Join
Bucket Join"},{"location":"internals/03_write/","title":"How writing of the connector works?","text":"
As we know, the ClickHouse MergeTree
is a LSM-like format, it's not optimized for frequent and random record insertion, batch append operation is recommended for large amount of data ingestion.
So, to achieve better performance, we should re-organize the DataFrame
to fit ClickHouse data layout before inserting.
SPARK-23889 allows data source connector to expose sorting and clustering requirements of DataFrame
before writing. By default, for Distributed
table, this connector requires the DataFrame
clustered by [sharding keys, partition keys]
and sorted by [sharding keys, partition keys, ordering keys]
; for normal *MergeTree
table, this connector requires the DataFrame
sorted by [partition keys, ordering keys]
and sorted by [partition keys, ordering keys]
.
Warning
Limitation: Spark does NOT support expressions in sharding keys and partition keys w/o SPARK-39607.
In some cases, the strict data distribution requirements may lead small parallelism and data skew, and finally cause bad performance. SPARK-37523(requires Spark 3.4+) is introduced to allow relaxing the data distribution requirements to overcome those shortages.
Also, you can consider disabling some configurations like spark.clickhouse.write.repartitionByPartition
to avoid such performance degradation.
The name pattern of binary jar is
clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar\n
you can find all available released jars under Maven Central Repository and all daily build SNAPSHOT jars under Sonatype OSS Snapshots Repository.
"},{"location":"quick_start/01_get_the_library/#import-as-dependency","title":"Import as Dependency","text":""},{"location":"quick_start/01_get_the_library/#gradle","title":"Gradle","text":"dependencies {\n implementation(\"com.github.housepower:clickhouse-spark-runtime-3.3_2.12:0.7.2\")\n implementation(\"com.clickhouse:clickhouse-jdbc:0.4.5:all\") { transitive = false }\n}\n
Add the following repository if you want to use SNAPSHOT version.
repositries {\n maven { url = \"https://oss.sonatype.org/content/repositories/snapshots\" }\n}\n
"},{"location":"quick_start/01_get_the_library/#maven","title":"Maven","text":"<dependency>\n <groupId>com.github.housepower</groupId>\n <artifactId>clickhouse-spark-runtime-3.3_2.12</artifactId>\n <version>0.7.2</version>\n</dependency>\n<dependency>\n <groupId>com.clickhouse</groupId>\n <artifactId>clickhouse-jdbc</artifactId>\n <classifier>all</classifier>\n <version>0.4.5</version>\n <exclusions>\n <exclusion>\n <groupId>*</groupId>\n <artifactId>*</artifactId>\n </exclusion>\n </exclusions>\n</dependency>\n
Add the following repository if you want to use SNAPSHOT version.
<repositories>\n <repository>\n <id>sonatype-oss-snapshots</id>\n <name>Sonatype OSS Snapshots Repository</name>\n <url>https://oss.sonatype.org/content/repositories/snapshots</url>\n </repository>\n</repositories>\n
"},{"location":"quick_start/02_play_with_spark_sql/","title":"Play with Spark SQL","text":"Note: For SQL-only use cases, Apache Kyuubi is recommended for Production.
"},{"location":"quick_start/02_play_with_spark_sql/#launch-spark-sql-cli","title":"Launch Spark SQL CLI","text":"$SPARK_HOME/bin/spark-sql \\\n --conf spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog \\\n --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \\\n --conf spark.sql.catalog.clickhouse.protocol=http \\\n --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \\\n --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \\\n --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \\\n --conf spark.sql.catalog.clickhouse.database=default \\\n --jars /path/clickhouse-spark-runtime-3.3_2.12:0.7.2.jar,/path/clickhouse-jdbc-0.4.5-all.jar\n
The following argument
--jars /path/clickhouse-spark-runtime-3.3_2.12:0.7.2.jar,/path/clickhouse-jdbc-0.4.5-all.jar\n
can be replaced by
--repositories https://{maven-cental-mirror or private-nexus-repo} \\\n --packages com.github.housepower:clickhouse-spark-runtime-3.3_2.12:0.7.2,com.clickhouse:clickhouse-jdbc:0.4.5:all\n
to avoid copying jar to your Spark client node.
"},{"location":"quick_start/02_play_with_spark_sql/#operations","title":"Operations","text":"Basic operations, e.g. create database, create table, write table, read table, etc.
spark-sql> use clickhouse;\nTime taken: 0.016 seconds\n\nspark-sql> create database if not exists test_db;\nTime taken: 0.022 seconds\n\nspark-sql> show databases;\ndefault\nsystem\ntest_db\nTime taken: 0.289 seconds, Fetched 3 row(s)\n\nspark-sql> CREATE TABLE test_db.tbl_sql (\n > create_time TIMESTAMP NOT NULL,\n > m INT NOT NULL COMMENT 'part key',\n > id BIGINT NOT NULL COMMENT 'sort key',\n > value STRING\n > ) USING ClickHouse\n > PARTITIONED BY (m)\n > TBLPROPERTIES (\n > engine = 'MergeTree()',\n > order_by = 'id',\n > settings.index_granularity = 8192\n > );\nTime taken: 0.242 seconds\n\nspark-sql> insert into test_db.tbl_sql values\n > (timestamp'2021-01-01 10:10:10', 1, 1L, '1'),\n > (timestamp'2022-02-02 10:10:10', 2, 2L, '2')\n > as tabl(create_time, m, id, value);\nTime taken: 0.276 seconds\n\nspark-sql> select * from test_db.tbl_sql;\n2021-01-01 10:10:10 1 1 1\n2022-02-02 10:10:10 2 2 2\nTime taken: 0.116 seconds, Fetched 2 row(s)\n\nspark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;\nTime taken: 1.028 seconds\n\nspark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;\nTime taken: 0.462 seconds\n\nspark-sql> select count(*) from test_db.tbl_sql;\n6\nTime taken: 1.421 seconds, Fetched 1 row(s)\n\nspark-sql> select * from test_db.tbl_sql;\n2021-01-01 10:10:10 1 1 1\n2021-01-01 10:10:10 1 1 1\n2021-01-01 10:10:10 1 1 1\n2022-02-02 10:10:10 2 2 2\n2022-02-02 10:10:10 2 2 2\n2022-02-02 10:10:10 2 2 2\nTime taken: 0.123 seconds, Fetched 6 row(s)\n\nspark-sql> delete from test_db.tbl_sql where id = 1;\nTime taken: 0.129 seconds\n\nspark-sql> select * from test_db.tbl_sql;\n2022-02-02 10:10:10 2 2 2\n2022-02-02 10:10:10 2 2 2\n2022-02-02 10:10:10 2 2 2\nTime taken: 0.101 seconds, Fetched 3 row(s)\n
"},{"location":"quick_start/03_play_with_spark_shell/","title":"Play with Spark Shell","text":""},{"location":"quick_start/03_play_with_spark_shell/#launch-spark-shell","title":"Launch Spark Shell","text":"$SPARK_HOME/bin/spark-shell \\\n --conf spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog \\\n --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \\\n --conf spark.sql.catalog.clickhouse.protocol=http \\\n --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \\\n --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \\\n --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \\\n --conf spark.sql.catalog.clickhouse.database=default \\\n --jars /path/clickhouse-spark-runtime-3.3_2.12:0.7.2.jar,/path/clickhouse-jdbc-0.4.5-all.jar\n
The following argument
--jars /path/clickhouse-spark-runtime-3.3_2.12:0.7.2.jar,/path/clickhouse-jdbc-0.4.5-all.jar\n
can be replaced by
--repositories https://{maven-cental-mirror or private-nexus-repo} \\\n --packages com.github.housepower:clickhouse-spark-runtime-3.3_2.12:0.7.2,com.clickhouse:clickhouse-jdbc:0.4.5:all\n
to avoid copying jar to your Spark client node.
"},{"location":"quick_start/03_play_with_spark_shell/#operations","title":"Operations","text":"Basic operations, e.g. create database, create table, write table, read table, etc.
scala> spark.sql(\"use clickhouse\")\nres0: org.apache.spark.sql.DataFrame = []\n\nscala> spark.sql(\"create database test_db\")\nres1: org.apache.spark.sql.DataFrame = []\n\nscala> spark.sql(\"show databases\").show\n+---------+\n|namespace|\n+---------+\n| default|\n| system|\n| test_db|\n+---------+\n\nscala> spark.sql(\"\"\"\n | CREATE TABLE test_db.tbl (\n | create_time TIMESTAMP NOT NULL,\n | m INT NOT NULL COMMENT 'part key',\n | id BIGINT NOT NULL COMMENT 'sort key',\n | value STRING\n | ) USING ClickHouse\n | PARTITIONED BY (m)\n | TBLPROPERTIES (\n | engine = 'MergeTree()',\n | order_by = 'id',\n | settings.index_granularity = 8192\n | )\n | \"\"\")\nres2: org.apache.spark.sql.DataFrame = []\n\nscala> :paste\n// Entering paste mode (ctrl-D to finish)\n\nspark.createDataFrame(Seq(\n (\"2021-01-01 10:10:10\", 1L, \"1\"),\n (\"2022-02-02 10:10:10\", 2L, \"2\")\n)).toDF(\"create_time\", \"id\", \"value\")\n .withColumn(\"create_time\", to_timestamp($\"create_time\"))\n .withColumn(\"m\", month($\"create_time\"))\n .select($\"create_time\", $\"m\", $\"id\", $\"value\")\n .writeTo(\"test_db.tbl\")\n .append\n\n// Exiting paste mode, now interpreting.\n\nscala> spark.table(\"test_db.tbl\").show\n+-------------------+---+---+-----+\n| create_time| m| id|value|\n+-------------------+---+---+-----+\n|2021-01-01 10:10:10| 1| 1| 1|\n|2022-02-02 10:10:10| 2| 2| 2|\n+-------------------+---+---+-----+\n\nscala> spark.sql(\"DELETE FROM test_db.tbl WHERE id=1\")\nres3: org.apache.spark.sql.DataFrame = []\n\nscala> spark.table(\"test_db.tbl\").show\n+-------------------+---+---+-----+\n| create_time| m| id|value|\n+-------------------+---+---+-----+\n|2022-02-02 10:10:10| 2| 2| 2|\n+-------------------+---+---+-----+\n
Execute ClickHouse native SQL.
scala> val options = Map(\n | \"host\" -> \"clickhouse\",\n | \"protocol\" -> \"http\",\n | \"http_port\" -> \"8123\",\n | \"user\" -> \"default\",\n | \"password\" -> \"\"\n | )\n\nscala> val sql = \"\"\"\n | |CREATE TABLE test_db.person (\n | | id Int64,\n | | name String,\n | | age Nullable(Int32)\n | |)\n | |ENGINE = MergeTree()\n | |ORDER BY id\n | \"\"\".stripMargin\n\nscala> spark.executeCommand(\"xenon.clickhouse.ClickHouseCommandRunner\", sql, options) \n\nscala> spark.sql(\"show tables in clickhouse_s1r1.test_db\").show\n+---------+---------+-----------+\n|namespace|tableName|isTemporary|\n+---------+---------+-----------+\n| test_db| person| false|\n+---------+---------+-----------+\n\nscala> spark.table(\"clickhouse_s1r1.test_db.person\").printSchema\nroot\n |-- id: long (nullable = false)\n |-- name: string (nullable = false)\n |-- age: integer (nullable = true)\n
"}]}
\ No newline at end of file
diff --git a/sitemap.xml b/sitemap.xml
new file mode 100644
index 00000000..a1c67ef1
--- /dev/null
+++ b/sitemap.xml
@@ -0,0 +1,93 @@
+
+