From a2689a3c42f604c714206797d68dacaf311b9fff Mon Sep 17 00:00:00 2001 From: himadripal Date: Thu, 6 Feb 2025 10:16:28 -0800 Subject: [PATCH 1/5] enable decimal to decimal and enable castoptions to be passed use a regex to match arrow invalid argument error. --- native/spark-expr/src/conversion_funcs/cast.rs | 9 ++++++++- .../apache/comet/expressions/CometCast.scala | 9 ++------- .../scala/org/apache/comet/CometCastSuite.scala | 17 +++++++++++++---- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/native/spark-expr/src/conversion_funcs/cast.rs b/native/spark-expr/src/conversion_funcs/cast.rs index 22b09f6106..79bd863add 100644 --- a/native/spark-expr/src/conversion_funcs/cast.rs +++ b/native/spark-expr/src/conversion_funcs/cast.rs @@ -872,6 +872,13 @@ fn cast_array( let array = array_with_timezone(array, cast_options.timezone.clone(), Some(to_type))?; let from_type = array.data_type().clone(); + let native_cast_options: CastOptions = CastOptions { + safe: !matches!(cast_options.eval_mode, EvalMode::Ansi), // take safe mode from cast_options passed + format_options: FormatOptions::new() + .with_timestamp_tz_format(TIMESTAMP_FORMAT) + .with_timestamp_format(TIMESTAMP_FORMAT), + }; + let array = match &from_type { Dictionary(key_type, value_type) if key_type.as_ref() == &Int32 @@ -963,7 +970,7 @@ fn cast_array( || is_datafusion_spark_compatible(from_type, to_type, cast_options.allow_incompat) => { // use DataFusion cast only when we know that it is compatible with Spark - Ok(cast_with_options(&array, to_type, &CAST_OPTIONS)?) + Ok(cast_with_options(&array, to_type, &native_cast_options)?) } _ => { // we should never reach this code because the Scala code should be checking diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 9aa7f7c8b4..e0e89b35fc 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -70,13 +70,8 @@ object CometCast { case _ => Unsupported } - case (from: DecimalType, to: DecimalType) => - if (to.precision < from.precision) { - // https://github.com/apache/datafusion/issues/13492 - Incompatible(Some("Casting to smaller precision is not supported")) - } else { - Compatible() - } + case (_: DecimalType, _: DecimalType) => + Compatible() case (DataTypes.StringType, _) => canCastFromString(toType, timeZoneId, evalMode) case (_, DataTypes.StringType) => diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index e2b2ed55a1..25ed41369d 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -913,8 +913,8 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { val values = Seq(BigDecimal("12345.6789"), BigDecimal("9876.5432"), BigDecimal("123.4567")) val df = withNulls(values) .toDF("b") - .withColumn("a", col("b").cast(DecimalType(6, 2))) - checkSparkAnswer(df) + .withColumn("a", col("b").cast(DecimalType(38, 28))) + castTest(df, DecimalType(6, 2)) } test("cast between decimals with higher precision than source") { @@ -1133,8 +1133,17 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { .replace(".WITH_SUGGESTION] ", "]") .startsWith(cometMessage)) } else if (CometSparkSessionExtensions.isSpark34Plus) { - // for Spark 3.4 we expect to reproduce the error message exactly - assert(cometMessage == sparkMessage) + // for comet decimal conversion throws ArrowError(string) from arrow + if (cometMessage.contains("Invalid argument error")) { + val regex = + "\\[\\[?(NUMERIC_VALUE_OUT_OF_RANGE|Invalid argument error)\\]?:? .*? (\\d+(\\.\\d+)?) .*? Decimal\\(?(\\d+),?\\s?(\\d+)\\)?.*?\\]?" + assert(cometMessage.matches(regex) == sparkMessage.matches(regex)) + + } else { + // for Spark 3.4 we expect to reproduce the error message exactly + assert(cometMessage == sparkMessage) + + } } else { // for Spark 3.3 we just need to strip the prefix from the Comet message // before comparing From 99727b883b5f0231e36a1369f180eeb7a85dab59 Mon Sep 17 00:00:00 2001 From: himadripal Date: Tue, 11 Feb 2025 14:15:37 -0800 Subject: [PATCH 2/5] make decimal precision error check hard coded common in test for all spark version --- .../org/apache/comet/CometCastSuite.scala | 51 +++++++++---------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 25ed41369d..5eadb1c131 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -1126,36 +1126,35 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { val cometMessage = if (cometException.getCause != null) cometException.getCause.getMessage else cometException.getMessage - if (CometSparkSessionExtensions.isSpark40Plus) { - // for Spark 4 we expect to sparkException carries the message + // for comet decimal conversion throws ArrowError(string) from arrow - across spark version the message dont match. + if (sparkMessage.contains("NUMERIC_VALUE_OUT_OF_RANGE") && cometMessage.contains( + "Invalid argument error")) { assert( - sparkException.getMessage - .replace(".WITH_SUGGESTION] ", "]") - .startsWith(cometMessage)) - } else if (CometSparkSessionExtensions.isSpark34Plus) { - // for comet decimal conversion throws ArrowError(string) from arrow - if (cometMessage.contains("Invalid argument error")) { - val regex = - "\\[\\[?(NUMERIC_VALUE_OUT_OF_RANGE|Invalid argument error)\\]?:? .*? (\\d+(\\.\\d+)?) .*? Decimal\\(?(\\d+),?\\s?(\\d+)\\)?.*?\\]?" - assert(cometMessage.matches(regex) == sparkMessage.matches(regex)) - - } else { + sparkMessage.contains("cannot be represented as"), + cometMessage.contains("too large to store")) + } else { + if (CometSparkSessionExtensions.isSpark40Plus) { + // for Spark 4 we expect to sparkException carries the message + assert( + sparkException.getMessage + .replace(".WITH_SUGGESTION] ", "]") + .startsWith(cometMessage)) + } else if (CometSparkSessionExtensions.isSpark34Plus) { // for Spark 3.4 we expect to reproduce the error message exactly assert(cometMessage == sparkMessage) - - } - } else { - // for Spark 3.3 we just need to strip the prefix from the Comet message - // before comparing - val cometMessageModified = cometMessage - .replace("[CAST_INVALID_INPUT] ", "") - .replace("[CAST_OVERFLOW] ", "") - .replace("[NUMERIC_VALUE_OUT_OF_RANGE] ", "") - - if (sparkMessage.contains("cannot be represented as")) { - assert(cometMessage.contains("cannot be represented as")) } else { - assert(cometMessageModified == sparkMessage) + // for Spark 3.3 we just need to strip the prefix from the Comet message + // before comparing + val cometMessageModified = cometMessage + .replace("[CAST_INVALID_INPUT] ", "") + .replace("[CAST_OVERFLOW] ", "") + .replace("[NUMERIC_VALUE_OUT_OF_RANGE] ", "") + + if (sparkMessage.contains("cannot be represented as")) { + assert(cometMessage.contains("cannot be represented as")) + } else { + assert(cometMessageModified == sparkMessage) + } } } } From 42648f4965599902dca162a266bc9a6478cb8f6f Mon Sep 17 00:00:00 2001 From: himadripal Date: Thu, 13 Feb 2025 16:57:55 -0800 Subject: [PATCH 3/5] review comments and fix failing test. --- docs/source/user-guide/compatibility.md | 107 +++++++++--------- .../org/apache/comet/CometCastSuite.scala | 30 ++--- 2 files changed, 71 insertions(+), 66 deletions(-) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index d603c0b99d..9733425a48 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -73,76 +73,77 @@ Spark. The following cast operations are generally compatible with Spark except for the differences noted here. | From Type | To Type | Notes | -|-|-|-| -| boolean | byte | | -| boolean | short | | +|-|---------|-| +| boolean | byte | | +| boolean | short | | | boolean | integer | | -| boolean | long | | -| boolean | float | | -| boolean | double | | -| boolean | string | | +| boolean | long | | +| boolean | float | | +| boolean | double | | +| boolean | string | | | byte | boolean | | -| byte | short | | +| byte | short | | | byte | integer | | -| byte | long | | -| byte | float | | -| byte | double | | +| byte | long | | +| byte | float | | +| byte | double | | | byte | decimal | | -| byte | string | | +| byte | string | | | short | boolean | | -| short | byte | | +| short | byte | | | short | integer | | -| short | long | | -| short | float | | -| short | double | | +| short | long | | +| short | float | | +| short | double | | | short | decimal | | -| short | string | | +| short | string | | | integer | boolean | | -| integer | byte | | -| integer | short | | -| integer | long | | -| integer | float | | -| integer | double | | -| integer | string | | +| integer | byte | | +| integer | short | | +| integer | long | | +| integer | float | | +| integer | double | | +| integer | string | | | long | boolean | | -| long | byte | | -| long | short | | +| long | byte | | +| long | short | | | long | integer | | -| long | float | | -| long | double | | -| long | string | | +| long | float | | +| long | double | | +| long | string | | | float | boolean | | -| float | byte | | -| float | short | | +| float | byte | | +| float | short | | | float | integer | | -| float | long | | -| float | double | | -| float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | +| float | long | | +| float | double | | +| float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | | double | boolean | | -| double | byte | | -| double | short | | +| double | byte | | +| double | short | | | double | integer | | -| double | long | | -| double | float | | -| double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | -| decimal | byte | | -| decimal | short | | +| double | long | | +| double | float | | +| double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | +| decimal | byte | | +| decimal | short | | | decimal | integer | | -| decimal | long | | -| decimal | float | | -| decimal | double | | -| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | +| decimal | long | | +| decimal | float | | +| decimal | double | | +| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | +| decimal | decimal | | | string | boolean | | -| string | byte | | -| string | short | | +| string | byte | | +| string | short | | | string | integer | | -| string | long | | -| string | binary | | -| string | date | Only supports years between 262143 BC and 262142 AD | -| date | string | | -| timestamp | long | | -| timestamp | string | | -| timestamp | date | | +| string | long | | +| string | binary | | +| string | date | Only supports years between 262143 BC and 262142 AD | +| date | string | | +| timestamp | long | | +| timestamp | string | | +| timestamp | date | | ### Incompatible Casts diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 5eadb1c131..43e39b61ba 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -25,12 +25,12 @@ import scala.util.Random import scala.util.matching.Regex import org.apache.hadoop.fs.Path -import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode} +import org.apache.spark.sql.{CometTestBase, DataFrame, Row, SaveMode} import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType} +import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructField, StructType} import org.apache.comet.expressions.{CometCast, CometEvalMode, Compatible} @@ -909,11 +909,14 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("cast between decimals with different precision and scale") { - // cast between default Decimal(38, 18) to Decimal(6,2) - val values = Seq(BigDecimal("12345.6789"), BigDecimal("9876.5432"), BigDecimal("123.4567")) - val df = withNulls(values) - .toDF("b") - .withColumn("a", col("b").cast(DecimalType(38, 28))) + val rowData = Seq( + Row(BigDecimal("12345.6789")), + Row(BigDecimal("9876.5432")), + Row(BigDecimal("123.4567"))) + val df = spark.createDataFrame( + spark.sparkContext.parallelize(rowData), + StructType(Seq(StructField("a", DataTypes.createDecimalType(10, 4))))) + castTest(df, DecimalType(6, 2)) } @@ -1126,12 +1129,11 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { val cometMessage = if (cometException.getCause != null) cometException.getCause.getMessage else cometException.getMessage - // for comet decimal conversion throws ArrowError(string) from arrow - across spark version the message dont match. - if (sparkMessage.contains("NUMERIC_VALUE_OUT_OF_RANGE") && cometMessage.contains( - "Invalid argument error")) { + // for comet decimal conversion throws ArrowError(string) from arrow - across spark versions the message dont match. + if (sparkMessage.contains("cannot be represented as")) { assert( - sparkMessage.contains("cannot be represented as"), - cometMessage.contains("too large to store")) + cometMessage.contains("cannot be represented as") || cometMessage.contains( + "too large to store")) } else { if (CometSparkSessionExtensions.isSpark40Plus) { // for Spark 4 we expect to sparkException carries the message @@ -1151,7 +1153,9 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { .replace("[NUMERIC_VALUE_OUT_OF_RANGE] ", "") if (sparkMessage.contains("cannot be represented as")) { - assert(cometMessage.contains("cannot be represented as")) + assert( + cometMessage.contains("cannot be represented as") || cometMessage.contains( + "too large to store")) } else { assert(cometMessageModified == sparkMessage) } From 4df3e683802c615511db84094a8e8d35a8a6ae2a Mon Sep 17 00:00:00 2001 From: himadripal Date: Sun, 16 Feb 2025 07:18:43 -0800 Subject: [PATCH 4/5] review comments --- docs/source/user-guide/compatibility.md | 107 +++++++++--------- .../org/apache/comet/CometCastSuite.scala | 4 +- 2 files changed, 54 insertions(+), 57 deletions(-) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index 9733425a48..d603c0b99d 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -73,77 +73,76 @@ Spark. The following cast operations are generally compatible with Spark except for the differences noted here. | From Type | To Type | Notes | -|-|---------|-| -| boolean | byte | | -| boolean | short | | +|-|-|-| +| boolean | byte | | +| boolean | short | | | boolean | integer | | -| boolean | long | | -| boolean | float | | -| boolean | double | | -| boolean | string | | +| boolean | long | | +| boolean | float | | +| boolean | double | | +| boolean | string | | | byte | boolean | | -| byte | short | | +| byte | short | | | byte | integer | | -| byte | long | | -| byte | float | | -| byte | double | | +| byte | long | | +| byte | float | | +| byte | double | | | byte | decimal | | -| byte | string | | +| byte | string | | | short | boolean | | -| short | byte | | +| short | byte | | | short | integer | | -| short | long | | -| short | float | | -| short | double | | +| short | long | | +| short | float | | +| short | double | | | short | decimal | | -| short | string | | +| short | string | | | integer | boolean | | -| integer | byte | | -| integer | short | | -| integer | long | | -| integer | float | | -| integer | double | | -| integer | string | | +| integer | byte | | +| integer | short | | +| integer | long | | +| integer | float | | +| integer | double | | +| integer | string | | | long | boolean | | -| long | byte | | -| long | short | | +| long | byte | | +| long | short | | | long | integer | | -| long | float | | -| long | double | | -| long | string | | +| long | float | | +| long | double | | +| long | string | | | float | boolean | | -| float | byte | | -| float | short | | +| float | byte | | +| float | short | | | float | integer | | -| float | long | | -| float | double | | -| float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | +| float | long | | +| float | double | | +| float | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | | double | boolean | | -| double | byte | | -| double | short | | +| double | byte | | +| double | short | | | double | integer | | -| double | long | | -| double | float | | -| double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | -| decimal | byte | | -| decimal | short | | +| double | long | | +| double | float | | +| double | string | There can be differences in precision. For example, the input "1.4E-45" will produce 1.0E-45 instead of 1.4E-45 | +| decimal | byte | | +| decimal | short | | | decimal | integer | | -| decimal | long | | -| decimal | float | | -| decimal | double | | -| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | -| decimal | decimal | | +| decimal | long | | +| decimal | float | | +| decimal | double | | +| decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | | string | boolean | | -| string | byte | | -| string | short | | +| string | byte | | +| string | short | | | string | integer | | -| string | long | | -| string | binary | | -| string | date | Only supports years between 262143 BC and 262142 AD | -| date | string | | -| timestamp | long | | -| timestamp | string | | -| timestamp | date | | +| string | long | | +| string | binary | | +| string | date | Only supports years between 262143 BC and 262142 AD | +| date | string | | +| timestamp | long | | +| timestamp | string | | +| timestamp | date | | ### Incompatible Casts diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 43e39b61ba..befc39896c 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -1153,9 +1153,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { .replace("[NUMERIC_VALUE_OUT_OF_RANGE] ", "") if (sparkMessage.contains("cannot be represented as")) { - assert( - cometMessage.contains("cannot be represented as") || cometMessage.contains( - "too large to store")) + assert(cometMessage.contains("too large to store")) } else { assert(cometMessageModified == sparkMessage) } From 5cce0c436c68faae6e237bf44c32a058a856527b Mon Sep 17 00:00:00 2001 From: himadripal Date: Fri, 21 Feb 2025 11:05:09 -0800 Subject: [PATCH 5/5] add special handling for decimal in doc generation, fix the assert as double to decimal and few others paths still uses spark, hence generates spark error message. --- docs/source/user-guide/compatibility.md | 1 + spark/src/main/scala/org/apache/comet/GenerateDocs.scala | 3 ++- spark/src/test/scala/org/apache/comet/CometCastSuite.scala | 5 ++--- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index d603c0b99d..6d5f7d1de1 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -131,6 +131,7 @@ The following cast operations are generally compatible with Spark except for the | decimal | long | | | decimal | float | | | decimal | double | | +| decimal | decimal | | | decimal | string | There can be formatting differences in some case due to Spark using scientific notation where Comet does not | | string | boolean | | | string | byte | | diff --git a/spark/src/main/scala/org/apache/comet/GenerateDocs.scala b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala index fb86389fee..d962743688 100644 --- a/spark/src/main/scala/org/apache/comet/GenerateDocs.scala +++ b/spark/src/main/scala/org/apache/comet/GenerateDocs.scala @@ -69,7 +69,8 @@ object GenerateDocs { w.write("|-|-|-|\n".getBytes) for (fromType <- CometCast.supportedTypes) { for (toType <- CometCast.supportedTypes) { - if (Cast.canCast(fromType, toType) && fromType != toType) { + if (Cast.canCast(fromType, toType) && (fromType != toType || fromType.typeName + .contains("decimal"))) { val fromTypeName = fromType.typeName.replace("(10,2)", "") val toTypeName = toType.typeName.replace("(10,2)", "") CometCast.isSupported(fromType, toType, None, CometEvalMode.LEGACY) match { diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index befc39896c..49b517f24e 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -1131,9 +1131,8 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { else cometException.getMessage // for comet decimal conversion throws ArrowError(string) from arrow - across spark versions the message dont match. if (sparkMessage.contains("cannot be represented as")) { - assert( - cometMessage.contains("cannot be represented as") || cometMessage.contains( - "too large to store")) + cometMessage.contains("cannot be represented as") || cometMessage.contains( + "too large to store") } else { if (CometSparkSessionExtensions.isSpark40Plus) { // for Spark 4 we expect to sparkException carries the message