diff --git a/docs/configurations/02_sql_configurations.md b/docs/configurations/02_sql_configurations.md index 294d72c7..060b8310 100644 --- a/docs/configurations/02_sql_configurations.md +++ b/docs/configurations/02_sql_configurations.md @@ -16,6 +16,7 @@ license: | |Key | Default | Description | Since |--- | ------- | ----------- | ----- +spark.clickhouse.fixedStringReadAs|binary|read ClickHouse FixedString type as the specified Spark data type. Supported formats: binary, string|0.8.1 spark.clickhouse.ignoreUnsupportedTransform|false|ClickHouse supports using complex expressions as sharding keys or partition values, e.g. `cityHash64(col_1, col_2)`, and those can not be supported by Spark now. If `true`, ignore the unsupported expressions, otherwise fail fast w/ an exception. Note, when `spark.clickhouse.write.distributed.convertLocal` is enabled, ignore unsupported sharding keys may corrupt the data.|0.4.0 spark.clickhouse.read.compression.codec|lz4|The codec used to decompress data for reading. Supported codecs: none, lz4.|0.5.0 spark.clickhouse.read.distributed.convertLocal|true|When reading Distributed table, read local table instead of itself. If `true`, ignore `spark.clickhouse.read.distributed.useClusterNodes`.|0.1.0 diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index c61218af..0d47f7de 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -201,4 +201,12 @@ object ClickHouseSQLConf { .version("0.8.0") .booleanConf .createWithDefault(false) + + val FIXED_STRING_READ_AS: ConfigEntry[String] = + buildConf("spark.clickhouse.fixedStringReadAs") + .doc("read ClickHouse FixedString type as the specified Spark data type. Supported formats: binary, string") + .version("0.8.1") + .stringConf + .transform(_.toLowerCase) + .createWithDefault("binary") } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SchemaUtils.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SchemaUtils.scala index 54d88c75..d72f885f 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SchemaUtils.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SchemaUtils.scala @@ -18,15 +18,22 @@ import com.clickhouse.data.ClickHouseDataType._ import com.clickhouse.data.{ClickHouseColumn, ClickHouseDataType} import org.apache.spark.sql.types._ import xenon.clickhouse.exception.CHClientException +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.FIXED_STRING_READ_AS -object SchemaUtils { +object SchemaUtils extends SQLConfHelper { def fromClickHouseType(chColumn: ClickHouseColumn): (DataType, Boolean) = { val catalystType = chColumn.getDataType match { case Nothing => NullType case Bool => BooleanType case String | JSON | UUID | Enum8 | Enum16 | IPv4 | IPv6 => StringType - case FixedString => BinaryType + case FixedString => + conf.getConf(FIXED_STRING_READ_AS) match { + case "binary" => BinaryType + case "string" => StringType + case unsupported => throw CHClientException(s"Unsupported fixed string read format mapping: $unsupported") + } case Int8 => ByteType case UInt8 | Int16 => ShortType case UInt16 | Int32 => IntegerType diff --git a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index c61218af..0d47f7de 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -201,4 +201,12 @@ object ClickHouseSQLConf { .version("0.8.0") .booleanConf .createWithDefault(false) + + val FIXED_STRING_READ_AS: ConfigEntry[String] = + buildConf("spark.clickhouse.fixedStringReadAs") + .doc("read ClickHouse FixedString type as the specified Spark data type. Supported formats: binary, string") + .version("0.8.1") + .stringConf + .transform(_.toLowerCase) + .createWithDefault("binary") } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SchemaUtils.scala b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SchemaUtils.scala index 54d88c75..d72f885f 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SchemaUtils.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SchemaUtils.scala @@ -18,15 +18,22 @@ import com.clickhouse.data.ClickHouseDataType._ import com.clickhouse.data.{ClickHouseColumn, ClickHouseDataType} import org.apache.spark.sql.types._ import xenon.clickhouse.exception.CHClientException +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.FIXED_STRING_READ_AS -object SchemaUtils { +object SchemaUtils extends SQLConfHelper { def fromClickHouseType(chColumn: ClickHouseColumn): (DataType, Boolean) = { val catalystType = chColumn.getDataType match { case Nothing => NullType case Bool => BooleanType case String | JSON | UUID | Enum8 | Enum16 | IPv4 | IPv6 => StringType - case FixedString => BinaryType + case FixedString => + conf.getConf(FIXED_STRING_READ_AS) match { + case "binary" => BinaryType + case "string" => StringType + case unsupported => throw CHClientException(s"Unsupported fixed string read format mapping: $unsupported") + } case Int8 => ByteType case UInt8 | Int16 => ShortType case UInt16 | Int32 => IntegerType