diff --git a/docs/configurations/02_sql_configurations.md b/docs/configurations/02_sql_configurations.md index b22ed0f4..3328cd21 100644 --- a/docs/configurations/02_sql_configurations.md +++ b/docs/configurations/02_sql_configurations.md @@ -20,6 +20,7 @@ spark.clickhouse.ignoreUnsupportedTransform|false|ClickHouse supports using comp spark.clickhouse.read.compression.codec|lz4|The codec used to decompress data for reading. Supported codecs: none, lz4.|0.5.0 spark.clickhouse.read.distributed.convertLocal|true|When reading Distributed table, read local table instead of itself. If `true`, ignore `spark.clickhouse.read.distributed.useClusterNodes`.|0.1.0 spark.clickhouse.read.fixedStringAs|binary|Read ClickHouse FixedString type as the specified Spark data type. Supported types: binary, string|0.8.0 +spark.clickhouse.read.settings|None|Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0 spark.clickhouse.read.format|json|Serialize format for reading. Supported formats: json, binary|0.6.0 spark.clickhouse.read.runtimeFilter.enabled|false|Enable runtime filter for reading.|0.8.0 spark.clickhouse.read.splitByPartitionId|true|If `true`, construct input partition filter by virtual column `_partition_id`, instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+|0.4.0 diff --git a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala index 5068a8c3..4c6d71f9 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record]( val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES) val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL) + private val readSettings: Option[String] = conf.getConf(READ_SETTINGS) val database: String = part.table.database val table: String = part.table.name @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record]( |WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr}) |${scanJob.groupByClause.getOrElse("")} |${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")} + |${readSettings.map(settings => s"SETTINGS $settings").getOrElse("")} |""".stripMargin } diff --git a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index ce5c28d3..39e2bc4a 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -14,7 +14,7 @@ package org.apache.spark.sql.clickhouse -import org.apache.spark.internal.config.ConfigEntry +import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} import org.apache.spark.sql.internal.SQLConf._ import com.clickhouse.spark.exception.ClickHouseErrCode._ @@ -209,4 +209,13 @@ object ClickHouseSQLConf { .stringConf .transform(_.toLowerCase) .createWithDefault("binary") + + val READ_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.read.settings") + .doc("Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala index 5068a8c3..4c6d71f9 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record]( val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES) val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL) + private val readSettings: Option[String] = conf.getConf(READ_SETTINGS) val database: String = part.table.database val table: String = part.table.name @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record]( |WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr}) |${scanJob.groupByClause.getOrElse("")} |${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")} + |${readSettings.map(settings => s"SETTINGS $settings").getOrElse("")} |""".stripMargin } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index ce5c28d3..39e2bc4a 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -14,7 +14,7 @@ package org.apache.spark.sql.clickhouse -import org.apache.spark.internal.config.ConfigEntry +import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} import org.apache.spark.sql.internal.SQLConf._ import com.clickhouse.spark.exception.ClickHouseErrCode._ @@ -209,4 +209,13 @@ object ClickHouseSQLConf { .stringConf .transform(_.toLowerCase) .createWithDefault("binary") + + val READ_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.read.settings") + .doc("Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala index c19ddbaa..18f76870 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record]( val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES) val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL) + private val readSettings: Option[String] = conf.getConf(READ_SETTINGS) val database: String = part.table.database val table: String = part.table.name @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record]( |WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr}) |${scanJob.groupByClause.getOrElse("")} |${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")} + |${readSettings.map(settings => s"SETTINGS $settings").getOrElse("")} |""".stripMargin } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index ce5c28d3..39e2bc4a 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -14,7 +14,7 @@ package org.apache.spark.sql.clickhouse -import org.apache.spark.internal.config.ConfigEntry +import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} import org.apache.spark.sql.internal.SQLConf._ import com.clickhouse.spark.exception.ClickHouseErrCode._ @@ -209,4 +209,13 @@ object ClickHouseSQLConf { .stringConf .transform(_.toLowerCase) .createWithDefault("binary") + + val READ_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.read.settings") + .doc("Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + }