Skip to content

Commit

Permalink
allow read with settings
Browse files Browse the repository at this point in the history
  • Loading branch information
huashi-st committed Nov 8, 2024
1 parent 35088ca commit ccd6d3f
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/configurations/02_sql_configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ spark.clickhouse.ignoreUnsupportedTransform|false|ClickHouse supports using comp
spark.clickhouse.read.compression.codec|lz4|The codec used to decompress data for reading. Supported codecs: none, lz4.|0.5.0
spark.clickhouse.read.distributed.convertLocal|true|When reading Distributed table, read local table instead of itself. If `true`, ignore `spark.clickhouse.read.distributed.useClusterNodes`.|0.1.0
spark.clickhouse.read.fixedStringAs|binary|Read ClickHouse FixedString type as the specified Spark data type. Supported types: binary, string|0.8.0
spark.clickhouse.read.withSettings|""|Query-level settings when reading from ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0
spark.clickhouse.read.format|json|Serialize format for reading. Supported formats: json, binary|0.6.0
spark.clickhouse.read.runtimeFilter.enabled|false|Enable runtime filter for reading.|0.8.0
spark.clickhouse.read.splitByPartitionId|true|If `true`, construct input partition filter by virtual column `_partition_id`, instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+|0.4.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record](

val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES)
val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL)
private val readWithSettings: String = conf.getConf(READ_WITH_SETTINGS)

val database: String = part.table.database
val table: String = part.table.name
Expand All @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record](
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
|${scanJob.groupByClause.getOrElse("")}
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
|${if (readWithSettings.nonEmpty) s"SETTINGS $readWithSettings" else ""}
|""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,13 @@ object ClickHouseSQLConf {
.stringConf
.transform(_.toLowerCase)
.createWithDefault("binary")

val READ_WITH_SETTINGS: ConfigEntry[String] =
buildConf("spark.clickhouse.read.withSettings")
.doc("Query-level settings when reading from ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createWithDefault("")

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record](

val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES)
val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL)
private val readWithSettings: String = conf.getConf(READ_WITH_SETTINGS)

val database: String = part.table.database
val table: String = part.table.name
Expand All @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record](
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
|${scanJob.groupByClause.getOrElse("")}
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
|${if (readWithSettings.nonEmpty) s"SETTINGS $readWithSettings" else ""}
|""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,13 @@ object ClickHouseSQLConf {
.stringConf
.transform(_.toLowerCase)
.createWithDefault("binary")

val READ_WITH_SETTINGS: ConfigEntry[String] =
buildConf("spark.clickhouse.read.withSettings")
.doc("Query-level settings when reading from ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createWithDefault("")

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record](

val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES)
val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL)
private val readWithSettings: String = conf.getConf(READ_WITH_SETTINGS)

val database: String = part.table.database
val table: String = part.table.name
Expand All @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record](
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
|${scanJob.groupByClause.getOrElse("")}
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
|${if (readWithSettings.nonEmpty) s"SETTINGS $readWithSettings" else ""}
|""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,13 @@ object ClickHouseSQLConf {
.stringConf
.transform(_.toLowerCase)
.createWithDefault("binary")

val READ_WITH_SETTINGS: ConfigEntry[String] =
buildConf("spark.clickhouse.read.withSettings")
.doc("Query-level settings when reading from ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createWithDefault("")

}

0 comments on commit ccd6d3f

Please sign in to comment.