diff --git a/docs/configurations/02_sql_configurations.md b/docs/configurations/02_sql_configurations.md index b22ed0f4..85753678 100644 --- a/docs/configurations/02_sql_configurations.md +++ b/docs/configurations/02_sql_configurations.md @@ -20,6 +20,7 @@ spark.clickhouse.ignoreUnsupportedTransform|false|ClickHouse supports using comp spark.clickhouse.read.compression.codec|lz4|The codec used to decompress data for reading. Supported codecs: none, lz4.|0.5.0 spark.clickhouse.read.distributed.convertLocal|true|When reading Distributed table, read local table instead of itself. If `true`, ignore `spark.clickhouse.read.distributed.useClusterNodes`.|0.1.0 spark.clickhouse.read.fixedStringAs|binary|Read ClickHouse FixedString type as the specified Spark data type. Supported types: binary, string|0.8.0 +spark.clickhouse.read.withSettings|""|Query-level settings when reading from ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0 spark.clickhouse.read.format|json|Serialize format for reading. Supported formats: json, binary|0.6.0 spark.clickhouse.read.runtimeFilter.enabled|false|Enable runtime filter for reading.|0.8.0 spark.clickhouse.read.splitByPartitionId|true|If `true`, construct input partition filter by virtual column `_partition_id`, instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+|0.4.0 diff --git a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala index 5068a8c3..0212c94a 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record]( val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES) val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL) + private val readWithSettings: String = conf.getConf(READ_WITH_SETTINGS) val database: String = part.table.database val table: String = part.table.name @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record]( |WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr}) |${scanJob.groupByClause.getOrElse("")} |${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")} + |${if (readWithSettings.nonEmpty) s"SETTINGS $readWithSettings" else ""} |""".stripMargin } diff --git a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index ce5c28d3..dc6e7ed8 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -209,4 +209,13 @@ object ClickHouseSQLConf { .stringConf .transform(_.toLowerCase) .createWithDefault("binary") + + val READ_WITH_SETTINGS: ConfigEntry[String] = + buildConf("spark.clickhouse.read.withSettings") + .doc("Query-level settings when reading from ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createWithDefault("") + } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala index 5068a8c3..0212c94a 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record]( val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES) val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL) + private val readWithSettings: String = conf.getConf(READ_WITH_SETTINGS) val database: String = part.table.database val table: String = part.table.name @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record]( |WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr}) |${scanJob.groupByClause.getOrElse("")} |${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")} + |${if (readWithSettings.nonEmpty) s"SETTINGS $readWithSettings" else ""} |""".stripMargin } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index ce5c28d3..dc6e7ed8 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -209,4 +209,13 @@ object ClickHouseSQLConf { .stringConf .transform(_.toLowerCase) .createWithDefault("binary") + + val READ_WITH_SETTINGS: ConfigEntry[String] = + buildConf("spark.clickhouse.read.withSettings") + .doc("Query-level settings when reading from ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createWithDefault("") + } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala index c19ddbaa..d1837ba9 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record]( val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES) val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL) + private val readWithSettings: String = conf.getConf(READ_WITH_SETTINGS) val database: String = part.table.database val table: String = part.table.name @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record]( |WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr}) |${scanJob.groupByClause.getOrElse("")} |${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")} + |${if (readWithSettings.nonEmpty) s"SETTINGS $readWithSettings" else ""} |""".stripMargin } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index ce5c28d3..dc6e7ed8 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -209,4 +209,13 @@ object ClickHouseSQLConf { .stringConf .transform(_.toLowerCase) .createWithDefault("binary") + + val READ_WITH_SETTINGS: ConfigEntry[String] = + buildConf("spark.clickhouse.read.withSettings") + .doc("Query-level settings when reading from ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createWithDefault("") + }