From 48647a11c1822bc689c52385dad7f666c0d4fe1e Mon Sep 17 00:00:00 2001 From: Mahdi Malverdi Date: Sat, 21 Dec 2024 16:29:02 +0330 Subject: [PATCH] Feat/add settings to write (#2) * Update configurtion file (#372) * Support ClickHouse insert settings for table writes (#369) - Enable custom ClickHouse insert settings when writing to tables. - Add support for `spark.clickhouse.write.settings.*` configuration. - Ensure settings are applied to the `INSERT INTO ... SETTINGS ...` SQL command. - Update documentation to describe usage of write settings. - Add unit tests to validate behavior. Closes #369 * Support ClickHouse insert settings for table writes (#369) - Enable custom ClickHouse insert settings when writing to tables. - Add support for `spark.clickhouse.write.settings` configuration. - Update documentation to describe usage of write settings. Closes #369 * reformat the codes Closes #369 --------- Co-authored-by: Mark Zitnik Co-authored-by: Mahdi Malverdi --- docs/configurations/02_sql_configurations.md | 3 ++- .../spark/write/ClickHouseWriter.scala | 17 ++++++++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ .../spark/write/ClickHouseWriter.scala | 17 ++++++++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ .../spark/write/ClickHouseWriter.scala | 17 ++++++++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ 10 files changed, 83 insertions(+), 4 deletions(-) diff --git a/docs/configurations/02_sql_configurations.md b/docs/configurations/02_sql_configurations.md index 3328cd21..85b26718 100644 --- a/docs/configurations/02_sql_configurations.md +++ b/docs/configurations/02_sql_configurations.md @@ -20,9 +20,9 @@ spark.clickhouse.ignoreUnsupportedTransform|false|ClickHouse supports using comp spark.clickhouse.read.compression.codec|lz4|The codec used to decompress data for reading. Supported codecs: none, lz4.|0.5.0 spark.clickhouse.read.distributed.convertLocal|true|When reading Distributed table, read local table instead of itself. If `true`, ignore `spark.clickhouse.read.distributed.useClusterNodes`.|0.1.0 spark.clickhouse.read.fixedStringAs|binary|Read ClickHouse FixedString type as the specified Spark data type. Supported types: binary, string|0.8.0 -spark.clickhouse.read.settings|None|Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0 spark.clickhouse.read.format|json|Serialize format for reading. Supported formats: json, binary|0.6.0 spark.clickhouse.read.runtimeFilter.enabled|false|Enable runtime filter for reading.|0.8.0 +spark.clickhouse.read.settings||Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0 spark.clickhouse.read.splitByPartitionId|true|If `true`, construct input partition filter by virtual column `_partition_id`, instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+|0.4.0 spark.clickhouse.useNullableQuerySchema|false|If `true`, mark all the fields of the query schema as nullable when executing `CREATE/REPLACE TABLE ... AS SELECT ...` on creating the table. Note, this configuration requires SPARK-43390(available in Spark 3.5), w/o this patch, it always acts as `true`.|0.8.0 spark.clickhouse.write.batchSize|10000|The number of records per batch on writing to ClickHouse.|0.1.0 @@ -38,4 +38,5 @@ spark.clickhouse.write.repartitionNum|0|Repartition data to meet the distributio spark.clickhouse.write.repartitionStrictly|false|If `true`, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as `true`.|0.3.0 spark.clickhouse.write.retryInterval|10s|The interval in seconds between write retry.|0.1.0 spark.clickhouse.write.retryableErrorCodes|241|The retryable error codes returned by ClickHouse server when write failing.|0.1.0 +spark.clickhouse.write.settings||Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0 diff --git a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index 13953a2a..6383c1f1 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -220,12 +220,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow( + database, + table, + format, + codec, + new ByteArrayInputStream(data), + settings + ) match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index bedd827c..c3d7d106 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -246,12 +246,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow( + database, + table, + format, + codec, + new ByteArrayInputStream(data), + settings + ) match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index 6f9b267b..e0c8a622 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -248,12 +248,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow( + database, + table, + format, + codec, + new ByteArrayInputStream(data), + settings + ) match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) }