Skip to content

Commit

Permalink
Feat/add settings to write (#3)
Browse files Browse the repository at this point in the history
* Support ClickHouse insert settings for table writes (ClickHouse#369)

- Enable custom ClickHouse insert settings when writing to tables.
- Add support for `spark.clickhouse.write.settings` configuration.
- Update documentation to describe usage of write settings.

Closes ClickHouse#369

* reformat the codes

Closes ClickHouse#369

---------

Co-authored-by: Mahdi Malverdi <[email protected]>
  • Loading branch information
mahdimalverdi and Mahdi Malverdi committed Dec 21, 2024
1 parent 7a06a13 commit 11abbaf
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/configurations/02_sql_configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ spark.clickhouse.write.repartitionNum|0|Repartition data to meet the distributio
spark.clickhouse.write.repartitionStrictly|false|If `true`, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as `true`.|0.3.0
spark.clickhouse.write.retryInterval|10s|The interval in seconds between write retry.|0.1.0
spark.clickhouse.write.retryableErrorCodes|241|The retryable error codes returned by ClickHouse server when write failing.|0.1.0
spark.clickhouse.write.settings|<undefined>|Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0
<!--end-include-->
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription)
val client = nodeClient(shardNum)
val data = serialize()
var writeTime = 0L

val settings = writeJob.writeOptions.settings
.getOrElse("")
.split(",")
.map(_.trim.split("=", 2))
.collect { case Array(key, value) => key -> value }
.toMap

Utils.retry[Unit, RetryableCHException](
writeJob.writeOptions.maxRetry,
writeJob.writeOptions.retryInterval
) {
var startWriteTime = System.currentTimeMillis
client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match {
client.syncInsertOutputJSONEachRow(
database,
table,
format,
codec,
new ByteArrayInputStream(data),
settings
) match {
case Right(_) =>
writeTime = System.currentTimeMillis - startWriteTime
_totalWriteTime.add(writeTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,12 @@ object ClickHouseSQLConf {
.transform(_.toLowerCase)
.createOptional

val WRITE_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.write.settings")
.doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createOptional

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions {

def format: String =
eval(WRITE_FORMAT.key, WRITE_FORMAT)

def settings: Option[String] =
eval(WRITE_SETTINGS.key, WRITE_SETTINGS)
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription)
val client = nodeClient(shardNum)
val data = serialize()
var writeTime = 0L

val settings = writeJob.writeOptions.settings
.getOrElse("")
.split(",")
.map(_.trim.split("=", 2))
.collect { case Array(key, value) => key -> value }
.toMap

Utils.retry[Unit, RetryableCHException](
writeJob.writeOptions.maxRetry,
writeJob.writeOptions.retryInterval
) {
var startWriteTime = System.currentTimeMillis
client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match {
client.syncInsertOutputJSONEachRow(
database,
table,
format,
codec,
new ByteArrayInputStream(data),
settings
) match {
case Right(_) =>
writeTime = System.currentTimeMillis - startWriteTime
_totalWriteTime.add(writeTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,12 @@ object ClickHouseSQLConf {
.transform(_.toLowerCase)
.createOptional

val WRITE_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.write.settings")
.doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createOptional

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions {

def format: String =
eval(WRITE_FORMAT.key, WRITE_FORMAT)

def settings: Option[String] =
eval(WRITE_SETTINGS.key, WRITE_SETTINGS)
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription)
val client = nodeClient(shardNum)
val data = serialize()
var writeTime = 0L

val settings = writeJob.writeOptions.settings
.getOrElse("")
.split(",")
.map(_.trim.split("=", 2))
.collect { case Array(key, value) => key -> value }
.toMap

Utils.retry[Unit, RetryableCHException](
writeJob.writeOptions.maxRetry,
writeJob.writeOptions.retryInterval
) {
var startWriteTime = System.currentTimeMillis
client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match {
client.syncInsertOutputJSONEachRow(
database,
table,
format,
codec,
new ByteArrayInputStream(data),
settings
) match {
case Right(_) =>
writeTime = System.currentTimeMillis - startWriteTime
_totalWriteTime.add(writeTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,12 @@ object ClickHouseSQLConf {
.transform(_.toLowerCase)
.createOptional

val WRITE_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.write.settings")
.doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createOptional

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions {

def format: String =
eval(WRITE_FORMAT.key, WRITE_FORMAT)

def settings: Option[String] =
eval(WRITE_SETTINGS.key, WRITE_SETTINGS)
}

0 comments on commit 11abbaf

Please sign in to comment.