Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Support read with settings #367

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configurations/02_sql_configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ spark.clickhouse.ignoreUnsupportedTransform|false|ClickHouse supports using comp
spark.clickhouse.read.compression.codec|lz4|The codec used to decompress data for reading. Supported codecs: none, lz4.|0.5.0
spark.clickhouse.read.distributed.convertLocal|true|When reading Distributed table, read local table instead of itself. If `true`, ignore `spark.clickhouse.read.distributed.useClusterNodes`.|0.1.0
spark.clickhouse.read.fixedStringAs|binary|Read ClickHouse FixedString type as the specified Spark data type. Supported types: binary, string|0.8.0
spark.clickhouse.read.settings|Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0
spark.clickhouse.read.format|json|Serialize format for reading. Supported formats: json, binary|0.6.0
spark.clickhouse.read.runtimeFilter.enabled|false|Enable runtime filter for reading.|0.8.0
spark.clickhouse.read.splitByPartitionId|true|If `true`, construct input partition filter by virtual column `_partition_id`, instead of partition value. There are known bugs to assemble SQL predication by partition value. This feature requires ClickHouse Server v21.6+|0.4.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record](

val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES)
val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL)
private val readSettings: Option[String] = conf.getConf(READ_SETTINGS)

val database: String = part.table.database
val table: String = part.table.name
Expand All @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record](
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
|${scanJob.groupByClause.getOrElse("")}
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
|${readSettings.map(settings => s"SETTINGS $settings").getOrElse("")}
|""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package org.apache.spark.sql.clickhouse

import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
import org.apache.spark.sql.internal.SQLConf._
import com.clickhouse.spark.exception.ClickHouseErrCode._

Expand Down Expand Up @@ -209,4 +209,13 @@ object ClickHouseSQLConf {
.stringConf
.transform(_.toLowerCase)
.createWithDefault("binary")

val READ_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.read.settings")
.doc("Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why should convert the input to lower case?

Copy link
Contributor Author

@harryshi10 harryshi10 Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case we'd like to log scanQuery which keeps SQL keywords in uppercase and other text in lowercase

.createOptional

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record](

val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES)
val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL)
private val readSettings: Option[String] = conf.getConf(READ_SETTINGS)

val database: String = part.table.database
val table: String = part.table.name
Expand All @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record](
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
|${scanJob.groupByClause.getOrElse("")}
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
|${readSettings.map(settings => s"SETTINGS $settings").getOrElse("")}
|""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package org.apache.spark.sql.clickhouse

import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
import org.apache.spark.sql.internal.SQLConf._
import com.clickhouse.spark.exception.ClickHouseErrCode._

Expand Down Expand Up @@ -209,4 +209,13 @@ object ClickHouseSQLConf {
.stringConf
.transform(_.toLowerCase)
.createWithDefault("binary")

val READ_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.read.settings")
.doc("Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createOptional

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ abstract class ClickHouseReader[Record](

val readDistributedUseClusterNodes: Boolean = conf.getConf(READ_DISTRIBUTED_USE_CLUSTER_NODES)
val readDistributedConvertLocal: Boolean = conf.getConf(READ_DISTRIBUTED_CONVERT_LOCAL)
private val readSettings: Option[String] = conf.getConf(READ_SETTINGS)

val database: String = part.table.database
val table: String = part.table.name
Expand All @@ -60,6 +61,7 @@ abstract class ClickHouseReader[Record](
|WHERE (${part.partFilterExpr}) AND (${scanJob.filtersExpr})
|${scanJob.groupByClause.getOrElse("")}
|${scanJob.limit.map(n => s"LIMIT $n").getOrElse("")}
|${readSettings.map(settings => s"SETTINGS $settings").getOrElse("")}
|""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package org.apache.spark.sql.clickhouse

import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
import org.apache.spark.sql.internal.SQLConf._
import com.clickhouse.spark.exception.ClickHouseErrCode._

Expand Down Expand Up @@ -209,4 +209,13 @@ object ClickHouseSQLConf {
.stringConf
.transform(_.toLowerCase)
.createWithDefault("binary")

val READ_SETTINGS: OptionalConfigEntry[String] =
buildConf("spark.clickhouse.read.settings")
.doc("Settings when read from ClickHouse. e.g. `final=1, max_execution_time=5`")
.version("0.9.0")
.stringConf
.transform(_.toLowerCase)
.createOptional

}
Loading