Skip to content

Commit

Permalink
[SPARK-49666][SQL] Add feature flag for trim collation feature
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Introducing new specifier for trim collations (both leading and trailing trimming). These are initial changes so that trim specifier is recognized and put under feature flag (all code paths blocked).

### Why are the changes needed?
Support for trailing space trimming is one of the requested feature by users.

### Does this PR introduce _any_ user-facing change?
This is guarded by feature flag.

### How was this patch tested?
Added tests to CollationSuite, SqlConfSuite and QueryCompilationErrorSuite.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#48222 from jovanpavl-db/trim-collation-feature-initial-support.

Authored-by: Jovan Pavlovic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
jovanpavl-db authored and cloud-fan committed Sep 30, 2024
1 parent d85e7bc commit c54c017
Show file tree
Hide file tree
Showing 11 changed files with 381 additions and 104 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,8 @@ class CollationFactorySuite extends AnyFunSuite with Matchers { // scalastyle:ig
1 << 15, // UTF8_BINARY mandatory zero bit 15 breach.
1 << 16, // UTF8_BINARY mandatory zero bit 16 breach.
1 << 17, // UTF8_BINARY mandatory zero bit 17 breach.
1 << 18, // UTF8_BINARY mandatory zero bit 18 breach.
1 << 19, // UTF8_BINARY mandatory zero bit 19 breach.
1 << 20, // UTF8_BINARY mandatory zero bit 20 breach.
1 << 21, // UTF8_BINARY mandatory zero bit 21 breach.
1 << 23, // UTF8_BINARY mandatory zero bit 23 breach.
1 << 24, // UTF8_BINARY mandatory zero bit 24 breach.
1 << 25, // UTF8_BINARY mandatory zero bit 25 breach.
Expand All @@ -382,8 +381,6 @@ class CollationFactorySuite extends AnyFunSuite with Matchers { // scalastyle:ig
(1 << 29) | (1 << 13), // ICU mandatory zero bit 13 breach.
(1 << 29) | (1 << 14), // ICU mandatory zero bit 14 breach.
(1 << 29) | (1 << 15), // ICU mandatory zero bit 15 breach.
(1 << 29) | (1 << 18), // ICU mandatory zero bit 18 breach.
(1 << 29) | (1 << 19), // ICU mandatory zero bit 19 breach.
(1 << 29) | (1 << 20), // ICU mandatory zero bit 20 breach.
(1 << 29) | (1 << 21), // ICU mandatory zero bit 21 breach.
(1 << 29) | (1 << 22), // ICU mandatory zero bit 22 breach.
Expand Down
10 changes: 5 additions & 5 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -4886,11 +4886,6 @@
"Catalog <catalogName> does not support <operation>."
]
},
"COLLATION" : {
"message" : [
"Collation is not yet supported."
]
},
"COMBINATION_QUERY_RESULT_CLAUSES" : {
"message" : [
"Combination of ORDER BY/SORT BY/DISTRIBUTE BY/CLUSTER BY."
Expand Down Expand Up @@ -5117,6 +5112,11 @@
"message" : [
"TRANSFORM with SERDE is only supported in hive mode."
]
},
"TRIM_COLLATION" : {
"message" : [
"TRIM specifier in the collation."
]
}
},
"sqlState" : "0A000"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ object CollateExpressionBuilder extends ExpressionBuilder {
if (evalCollation == null) {
throw QueryCompilationErrors.unexpectedNullError("collation", collationExpr)
} else {
if (!SQLConf.get.trimCollationEnabled &&
evalCollation.toString.toUpperCase().contains("TRIM")) {
throw QueryCompilationErrors.trimCollationNotEnabledError()
}
Collate(e, evalCollation.toString)
}
case (_: StringType, false) => throw QueryCompilationErrors.nonFoldableArgumentError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2557,6 +2557,10 @@ class AstBuilder extends DataTypeAstBuilder
}

override def visitCollateClause(ctx: CollateClauseContext): String = withOrigin(ctx) {
val collationName = ctx.collationName.getText
if (!SQLConf.get.trimCollationEnabled && collationName.toUpperCase().contains("TRIM")) {
throw QueryCompilationErrors.trimCollationNotEnabledError()
}
ctx.identifier.getText
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
)
}

def trimCollationNotEnabledError(): Throwable = {
new AnalysisException(
errorClass = "UNSUPPORTED_FEATURE.TRIM_COLLATION",
messageParameters = Map.empty
)
}

def unresolvedUsingColForJoinError(
colName: String, suggestion: String, side: String): Throwable = {
new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,18 @@ object SQLConf {
.checkValue(_ > 0, "The initial number of partitions must be positive.")
.createOptional

lazy val TRIM_COLLATION_ENABLED =
buildConf("spark.sql.collation.trim.enabled")
.internal()
.doc(
"Trim collation feature is under development and its use should be done under this" +
"feature flag. Trim collation trims leading, trailing or both spaces depending of" +
"specifier (LTRIM, RTRIM, TRIM)."
)
.version("4.0.0")
.booleanConf
.createWithDefault(Utils.isTesting)

val DEFAULT_COLLATION =
buildConf(SqlApiConfHelper.DEFAULT_COLLATION)
.doc("Sets default collation to use for string literals, parameter markers or the string" +
Expand Down Expand Up @@ -5482,6 +5494,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
}
}

def trimCollationEnabled: Boolean = getConf(TRIM_COLLATION_ENABLED)

override def defaultStringType: StringType = {
if (getConf(DEFAULT_COLLATION).toUpperCase(Locale.ROOT) == "UTF8_BINARY") {
StringType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ class SparkSqlAstBuilder extends AstBuilder {
* }}}
*/
override def visitSetCollation(ctx: SetCollationContext): LogicalPlan = withOrigin(ctx) {
val collationName = ctx.collationName.getText
if (!SQLConf.get.trimCollationEnabled && collationName.toUpperCase().contains("TRIM")) {
throw QueryCompilationErrors.trimCollationNotEnabledError()
}
val key = SQLConf.DEFAULT_COLLATION.key
SetCommand(Some(key -> Some(ctx.identifier.getText.toUpperCase(Locale.ROOT))))
}
Expand Down
56 changes: 46 additions & 10 deletions sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,57 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
private val allFileBasedDataSources = collationPreservingSources ++ collationNonPreservingSources

test("collate returns proper type") {
Seq("utf8_binary", "utf8_lcase", "unicode", "unicode_ci").foreach { collationName =>
Seq(
"utf8_binary",
"utf8_lcase",
"unicode",
"unicode_ci",
"unicode_ltrim_ci",
"utf8_lcase_trim",
"utf8_binary_rtrim"
).foreach { collationName =>
checkAnswer(sql(s"select 'aaa' collate $collationName"), Row("aaa"))
val collationId = CollationFactory.collationNameToId(collationName)
assert(sql(s"select 'aaa' collate $collationName").schema(0).dataType
== StringType(collationId))
assert(
sql(s"select 'aaa' collate $collationName").schema(0).dataType
== StringType(collationId)
)
}
}

test("collation name is case insensitive") {
Seq("uTf8_BiNaRy", "utf8_lcase", "uNicOde", "UNICODE_ci").foreach { collationName =>
Seq(
"uTf8_BiNaRy",
"utf8_lcase",
"uNicOde",
"UNICODE_ci",
"uNiCoDE_ltRIm_cI",
"UtF8_lCaSE_tRIM",
"utf8_biNAry_RtRiM"
).foreach { collationName =>
checkAnswer(sql(s"select 'aaa' collate $collationName"), Row("aaa"))
val collationId = CollationFactory.collationNameToId(collationName)
assert(sql(s"select 'aaa' collate $collationName").schema(0).dataType
== StringType(collationId))
assert(
sql(s"select 'aaa' collate $collationName").schema(0).dataType
== StringType(collationId)
)
}
}

test("collation expression returns name of collation") {
Seq("utf8_binary", "utf8_lcase", "unicode", "unicode_ci").foreach { collationName =>
Seq(
"utf8_binary",
"utf8_lcase",
"unicode",
"unicode_ci",
"unicode_ci_ltrim",
"utf8_lcase_trim",
"utf8_binary_rtrim"
).foreach { collationName =>
checkAnswer(
sql(s"select collation('aaa' collate $collationName)"), Row(collationName.toUpperCase()))
sql(s"select collation('aaa' collate $collationName)"),
Row(collationName.toUpperCase())
)
}
}

Expand All @@ -77,9 +107,15 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {

test("collate function syntax with default collation set") {
withSQLConf(SqlApiConf.DEFAULT_COLLATION -> "UTF8_LCASE") {
assert(sql(s"select collate('aaa', 'utf8_lcase')").schema(0).dataType ==
StringType("UTF8_LCASE"))
assert(
sql(s"select collate('aaa', 'utf8_lcase')").schema(0).dataType ==
StringType("UTF8_LCASE")
)
assert(sql(s"select collate('aaa', 'UNICODE')").schema(0).dataType == StringType("UNICODE"))
assert(
sql(s"select collate('aaa', 'UNICODE_TRIM')").schema(0).dataType ==
StringType("UNICODE_TRIM")
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,39 @@ class QueryCompilationErrorsSuite
"inputTypes" -> "[\"INT\", \"STRING\", \"STRING\"]"))
}

test("SPARK-49666: the trim collation feature is off without collate builder call") {
withSQLConf(SQLConf.TRIM_COLLATION_ENABLED.key -> "false") {
Seq(
"CREATE TABLE t(col STRING COLLATE EN_TRIM_CI) USING parquet",
"CREATE TABLE t(col STRING COLLATE UTF8_LCASE_TRIM) USING parquet",
"SELECT 'aaa' COLLATE UNICODE_LTRIM_CI"
).foreach { sqlText =>
checkError(
exception = intercept[AnalysisException](sql(sqlText)),
condition = "UNSUPPORTED_FEATURE.TRIM_COLLATION"
)
}
}
}

test("SPARK-49666: the trim collation feature is off with collate builder call") {
withSQLConf(SQLConf.TRIM_COLLATION_ENABLED.key -> "false") {
Seq(
"SELECT collate('aaa', 'UNICODE_TRIM')",
"SELECT collate('aaa', 'UTF8_BINARY_TRIM')",
"SELECT collate('aaa', 'EN_AI_RTRIM')"
).foreach { sqlText =>
checkError(
exception = intercept[AnalysisException](sql(sqlText)),
condition = "UNSUPPORTED_FEATURE.TRIM_COLLATION",
parameters = Map.empty,
context =
ExpectedContext(fragment = sqlText.substring(7), start = 7, stop = sqlText.length - 1)
)
}
}
}

test("UNSUPPORTED_CALL: call the unsupported method update()") {
checkError(
exception = intercept[SparkUnsupportedOperationException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,13 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
"confName" -> "spark.sql.session.collation.default",
"proposals" -> "UNICODE"
))

withSQLConf(SQLConf.TRIM_COLLATION_ENABLED.key -> "false") {
checkError(
exception = intercept[AnalysisException](sql(s"SET COLLATION UNICODE_CI_TRIM")),
condition = "UNSUPPORTED_FEATURE.TRIM_COLLATION"
)
}
}

test("SPARK-43028: config not found error") {
Expand Down

0 comments on commit c54c017

Please sign in to comment.