Skip to content

Commit

Permalink
Feat/add settings to write (#2)
Browse files Browse the repository at this point in the history
* Update configurtion file (ClickHouse#372)

* Support ClickHouse insert settings for table writes (ClickHouse#369)

- Enable custom ClickHouse insert settings when writing to tables.
- Add support for `spark.clickhouse.write.settings.*` configuration.
- Ensure settings are applied to the `INSERT INTO ... SETTINGS ...` SQL command.
- Update documentation to describe usage of write settings.
- Add unit tests to validate behavior.

Closes ClickHouse#369

* Support ClickHouse insert settings for table writes (ClickHouse#369)

- Enable custom ClickHouse insert settings when writing to tables.
- Add support for `spark.clickhouse.write.settings` configuration.
- Update documentation to describe usage of write settings.

Closes ClickHouse#369

* reformat the codes

Closes ClickHouse#369

---------

Co-authored-by: Mark Zitnik <[email protected]>
Co-authored-by: Mahdi Malverdi <[email protected]>
  • Loading branch information
3 people authored Dec 21, 2024
1 parent 7a06a13 commit 48647a1
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 4 deletions.
3 changes: 2 additions & 1 deletion docs/configurations/02_sql_configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ spark.clickhouse.ignoreUnsupportedTransform|false|ClickHouse supports using comp
spark.clickhouse.read.compression.codec|lz4|The codec used to decompress data for reading. Supported codecs: none, lz4.|0.5.0
spark.clickhouse.read.distributed.convertLocal|true|When reading Distributed table, read local table instead of itself. If `true`, ignore `spark.clickhouse.read.distributed.useClusterNodes`.|0.1.0
spark.clickhouse.read.fixedStringAs|binary|Read ClickHouse FixedString type as the specified Spark data type. Supported types: binary, string|0.8.0
spark.clickhouse.read.settings|None|Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0
spark.clickhouse.read.format|json|Serialize format for reading. Supported formats: json, binary|0.6.0
spark.clickhouse.read.runtimeFilter.enabled|false|Enable runtime filter for reading.|0.8.0
spark.clickhouse.read.settings|<undefined>|Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0
spark.clickhouse.read.splitByPartitionId|true|If `true`, construct input partition filter by virtual column `_partition_id`, instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+|0.4.0
spark.clickhouse.useNullableQuerySchema|false|If `true`, mark all the fields of the query schema as nullable when executing `CREATE/REPLACE TABLE ... AS SELECT ...` on creating the table. Note, this configuration requires SPARK-43390(available in Spark 3.5), w/o this patch, it always acts as `true`.|0.8.0
spark.clickhouse.write.batchSize|10000|The number of records per batch on writing to ClickHouse.|0.1.0
Expand All @@ -38,4 +38,5 @@ spark.clickhouse.write.repartitionNum|0|Repartition data to meet the distributio
spark.clickhouse.write.repartitionStrictly|false|If `true`, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as `true`.|0.3.0
spark.clickhouse.write.retryInterval|10s|The interval in seconds between write retry.|0.1.0
spark.clickhouse.write.retryableErrorCodes|241|The retryable error codes returned by ClickHouse server when write failing.|0.1.0
spark.clickhouse.write.settings|<undefined>|Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0
<!--end-include-->
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription)
val client = nodeClient(shardNum)
val data = serialize()
var writeTime = 0L

val settings = writeJob.writeOptions.settings
.getOrElse("")
.split(",")
.map(_.trim.split("=", 2))
.collect { case Array(key, value) => key -> value }
.toMap

Utils.retry[Unit, RetryableCHException](
writeJob.writeOptions.maxRetry,
writeJob.writeOptions.retryInterval
) {
var startWriteTime = System.currentTimeMillis
client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match {
client.syncInsertOutputJSONEachRow(
database,
table,
format,
codec,
new ByteArrayInputStream(data),
settings
) match {
case Right(_) =>
writeTime = System.currentTimeMillis - startWriteTime
_totalWriteTime.add(writeTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,12 @@ object ClickHouseSQLConf {
.transform(_.toLowerCase)
.createOptional

val WRITE_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.write.settings")
.doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createOptional

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions {

def format: String =
eval(WRITE_FORMAT.key, WRITE_FORMAT)

def settings: Option[String] =
eval(WRITE_SETTINGS.key, WRITE_SETTINGS)
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription)
val client = nodeClient(shardNum)
val data = serialize()
var writeTime = 0L

val settings = writeJob.writeOptions.settings
.getOrElse("")
.split(",")
.map(_.trim.split("=", 2))
.collect { case Array(key, value) => key -> value }
.toMap

Utils.retry[Unit, RetryableCHException](
writeJob.writeOptions.maxRetry,
writeJob.writeOptions.retryInterval
) {
var startWriteTime = System.currentTimeMillis
client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match {
client.syncInsertOutputJSONEachRow(
database,
table,
format,
codec,
new ByteArrayInputStream(data),
settings
) match {
case Right(_) =>
writeTime = System.currentTimeMillis - startWriteTime
_totalWriteTime.add(writeTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,12 @@ object ClickHouseSQLConf {
.transform(_.toLowerCase)
.createOptional

val WRITE_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.write.settings")
.doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createOptional

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions {

def format: String =
eval(WRITE_FORMAT.key, WRITE_FORMAT)

def settings: Option[String] =
eval(WRITE_SETTINGS.key, WRITE_SETTINGS)
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription)
val client = nodeClient(shardNum)
val data = serialize()
var writeTime = 0L

val settings = writeJob.writeOptions.settings
.getOrElse("")
.split(",")
.map(_.trim.split("=", 2))
.collect { case Array(key, value) => key -> value }
.toMap

Utils.retry[Unit, RetryableCHException](
writeJob.writeOptions.maxRetry,
writeJob.writeOptions.retryInterval
) {
var startWriteTime = System.currentTimeMillis
client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match {
client.syncInsertOutputJSONEachRow(
database,
table,
format,
codec,
new ByteArrayInputStream(data),
settings
) match {
case Right(_) =>
writeTime = System.currentTimeMillis - startWriteTime
_totalWriteTime.add(writeTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,12 @@ object ClickHouseSQLConf {
.transform(_.toLowerCase)
.createOptional

val WRITE_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.write.settings")
.doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createOptional

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions {

def format: String =
eval(WRITE_FORMAT.key, WRITE_FORMAT)

def settings: Option[String] =
eval(WRITE_SETTINGS.key, WRITE_SETTINGS)
}

0 comments on commit 48647a1

Please sign in to comment.