diff --git a/.github/docker/build_image.sh b/.github/docker/build_image.sh index a126fb02..9a7a4439 100755 --- a/.github/docker/build_image.sh +++ b/.github/docker/build_image.sh @@ -33,8 +33,8 @@ cd ../.. # Build docker image docker build \ ---build-arg SPARK_URL=https://archive.apache.org/dist/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz \ ---build-arg SPARK_BINARY_NAME=spark-3.4.0-bin-hadoop3.tgz \ +--build-arg SPARK_URL=https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz \ +--build-arg SPARK_BINARY_NAME=spark-3.3.2-bin-hadoop3.tgz \ --build-arg JDBC_URL=https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/${TEST_JDBC_VERSION}/$JDBC_JAR_NAME \ --build-arg JDBC_BINARY_NAME=$JDBC_JAR_NAME \ --build-arg SPARK_CONNECTOR_LOCATION=target/scala-${TEST_SCALA_VERSION}/$SPARK_CONNECTOR_JAR_NAME \ diff --git a/.github/workflows/ClusterTest.yml b/.github/workflows/ClusterTest.yml index 656f3a65..c3618587 100644 --- a/.github/workflows/ClusterTest.yml +++ b/.github/workflows/ClusterTest.yml @@ -13,13 +13,13 @@ jobs: strategy: matrix: scala_version: [ '2.12.11' ] - spark_version: [ '3.4.0' ] + spark_version: [ '3.3.2' ] use_copy_unload: [ 'true' ] cloud_provider: [ 'gcp' ] env: SNOWFLAKE_TEST_CONFIG_SECRET: ${{ secrets.SNOWFLAKE_TEST_CONFIG_SECRET }} - TEST_SPARK_VERSION: '3.4' - DOCKER_IMAGE_TAG: 'snowflakedb/spark-base:3.4.0' + TEST_SPARK_VERSION: '3.3' + DOCKER_IMAGE_TAG: 'snowflakedb/spark-base:3.3.2' TEST_SCALA_VERSION: '2.12' TEST_COMPILE_SCALA_VERSION: '2.12.11' TEST_SPARK_CONNECTOR_VERSION: '2.15.0' diff --git a/.github/workflows/IntegrationTest_2.12.yml b/.github/workflows/IntegrationTest_2.12.yml index 80235ce8..3643e77c 100644 --- a/.github/workflows/IntegrationTest_2.12.yml +++ b/.github/workflows/IntegrationTest_2.12.yml @@ -13,7 +13,7 @@ jobs: strategy: matrix: scala_version: [ '2.12.11' ] - spark_version: [ '3.4.0' ] + spark_version: [ '3.3.2' ] use_copy_unload: [ 'true', 'false' ] cloud_provider: [ 'aws', 'azure' ] # run_query_in_async can be removed after async mode is stable diff --git a/.github/workflows/IntegrationTest_2.13.yml b/.github/workflows/IntegrationTest_2.13.yml index 99999310..ecffd544 100644 --- a/.github/workflows/IntegrationTest_2.13.yml +++ b/.github/workflows/IntegrationTest_2.13.yml @@ -13,7 +13,7 @@ jobs: strategy: matrix: scala_version: [ '2.13.9' ] - spark_version: [ '3.4.0' ] + spark_version: [ '3.3.2' ] use_copy_unload: [ 'true', 'false' ] cloud_provider: [ 'aws', 'azure' ] # run_query_in_async can be removed after async mode is stable diff --git a/.github/workflows/IntegrationTest_gcp_2.12.yml b/.github/workflows/IntegrationTest_gcp_2.12.yml index 674e0b8f..a128c304 100644 --- a/.github/workflows/IntegrationTest_gcp_2.12.yml +++ b/.github/workflows/IntegrationTest_gcp_2.12.yml @@ -13,7 +13,7 @@ jobs: strategy: matrix: scala_version: [ '2.12.11' ] - spark_version: [ '3.4.0' ] + spark_version: [ '3.3.2' ] use_copy_unload: [ 'false' ] cloud_provider: [ 'gcp' ] # run_query_in_async can be removed after async mode is stable diff --git a/.github/workflows/IntegrationTest_gcp_2.13.yml b/.github/workflows/IntegrationTest_gcp_2.13.yml index 4a0f2b64..668f44c8 100644 --- a/.github/workflows/IntegrationTest_gcp_2.13.yml +++ b/.github/workflows/IntegrationTest_gcp_2.13.yml @@ -13,7 +13,7 @@ jobs: strategy: matrix: scala_version: [ '2.13.9' ] - spark_version: [ '3.4.0' ] + spark_version: [ '3.3.2' ] use_copy_unload: [ 'false' ] cloud_provider: [ 'gcp' ] # run_query_in_async can be removed after async mode is stable diff --git a/ClusterTest/build.sbt b/ClusterTest/build.sbt index da670026..447fdad1 100644 --- a/ClusterTest/build.sbt +++ b/ClusterTest/build.sbt @@ -16,8 +16,8 @@ val sparkConnectorVersion = "2.15.0" val scalaVersionMajor = "2.12" -val sparkVersionMajor = "3.4" -val sparkVersion = s"${sparkVersionMajor}.0" +val sparkVersionMajor = "3.3" +val sparkVersion = s"${sparkVersionMajor}.2" val testSparkVersion = sys.props.get("spark.testVersion").getOrElse(sparkVersion) unmanagedJars in Compile += file(s"../target/scala-${scalaVersionMajor}/" + diff --git a/build.sbt b/build.sbt index 45fbc922..6b606963 100644 --- a/build.sbt +++ b/build.sbt @@ -16,8 +16,8 @@ import scala.util.Properties -val sparkVersion = "3.4" -val testSparkVersion = sys.props.get("spark.testVersion").getOrElse("3.4.0") +val sparkVersion = "3.3" +val testSparkVersion = sys.props.get("spark.testVersion").getOrElse("3.3.2") /* * Don't change the variable name "sparkConnectorVersion" because @@ -41,7 +41,7 @@ lazy val root = project.withId("spark-snowflake").in(file(".")) .settings( name := "spark-snowflake", organization := "net.snowflake", - version := s"${sparkConnectorVersion}-spark_3.4", + version := s"${sparkConnectorVersion}-spark_3.3", scalaVersion := sys.props.getOrElse("SPARK_SCALA_VERSION", default = "2.12.11"), // Spark 3.2 supports scala 2.12 and 2.13 crossScalaVersions := Seq("2.12.11", "2.13.9"), diff --git a/src/main/scala/net/snowflake/spark/snowflake/SnowflakeConnectorUtils.scala b/src/main/scala/net/snowflake/spark/snowflake/SnowflakeConnectorUtils.scala index f1b50b53..90dd5e6f 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/SnowflakeConnectorUtils.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/SnowflakeConnectorUtils.scala @@ -34,7 +34,7 @@ object SnowflakeConnectorUtils { * Check Spark version, if Spark version matches SUPPORT_SPARK_VERSION enable PushDown, * otherwise disable it. */ - val SUPPORT_SPARK_VERSION = "3.4" + val SUPPORT_SPARK_VERSION = "3.3" def checkVersionAndEnablePushdown(session: SparkSession): Boolean = if (session.version.startsWith(SUPPORT_SPARK_VERSION)) { diff --git a/src/main/scala/net/snowflake/spark/snowflake/pushdowns/querygeneration/MiscStatement.scala b/src/main/scala/net/snowflake/spark/snowflake/pushdowns/querygeneration/MiscStatement.scala index 30d3c50e..8439ab80 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/pushdowns/querygeneration/MiscStatement.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/pushdowns/querygeneration/MiscStatement.scala @@ -8,7 +8,6 @@ import net.snowflake.spark.snowflake.{ SnowflakePushdownUnsupportedException, SnowflakeSQLStatement } -import org.apache.spark.sql.catalyst.expressions.EvalMode.LEGACY import org.apache.spark.sql.catalyst.expressions.{ Alias, Ascending, @@ -31,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.{ } import org.apache.spark.sql.types.{Decimal, _} import org.apache.spark.unsafe.types.UTF8String +import net.snowflake.spark.snowflake.Utils /** Extractors for everything else. */ private[querygeneration] object MiscStatement { @@ -54,7 +54,7 @@ private[querygeneration] object MiscStatement { // - New Type: evalMode: EvalMode.Value = EvalMode.fromSQLConf(SQLConf.get) // Currently, there are 3 modes: LEGACY, ANSI, TRY // support to pushdown, if the mode is LEGACY. - case Cast(child, t, _, evalMode) if evalMode == LEGACY => + case Cast(child, t, _, ansiEnabled) if !ansiEnabled => getCastType(t) match { case Some(cast) => // For known unsupported data conversion, raise exception to break the @@ -122,7 +122,7 @@ private[querygeneration] object MiscStatement { // Spark 3.4 introduce join hint. The join hint doesn't affect correctness. // So it can be ignored in the pushdown process // https://github.com/apache/spark/commit/0fa9c554fc0b3940a47c3d1c6a5a17ca9a8cee8e - case ScalarSubquery(subquery, _, _, joinCond, _) if joinCond.isEmpty => + case ScalarSubquery(subquery, _, _, joinCond) if joinCond.isEmpty => blockStatement(new QueryBuilder(subquery).statement) case UnscaledValue(child) => diff --git a/src/main/scala/net/snowflake/spark/snowflake/pushdowns/querygeneration/NumericStatement.scala b/src/main/scala/net/snowflake/spark/snowflake/pushdowns/querygeneration/NumericStatement.scala index 86ace904..37be54d6 100644 --- a/src/main/scala/net/snowflake/spark/snowflake/pushdowns/querygeneration/NumericStatement.scala +++ b/src/main/scala/net/snowflake/spark/snowflake/pushdowns/querygeneration/NumericStatement.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{ Pi, Pow, // PromotePrecision is removed from Spark 3.4 - // PromotePrecision, + PromotePrecision, Rand, Round, Sin, @@ -78,7 +78,7 @@ private[querygeneration] object NumericStatement { // PromotePrecision is removed from Spark 3.4 // https://github.com/apache/spark/pull/36698 - // case PromotePrecision(child) => convertStatement(child, fields) + case PromotePrecision(child) => convertStatement(child, fields) case CheckOverflow(child, t, _) => MiscStatement.getCastType(t) match { @@ -101,7 +101,7 @@ private[querygeneration] object NumericStatement { // Spark 3.4 adds a new argument: ansiEnabled // https://github.com/apache/spark/commit/42721120f3c7206a9fc22db5d0bb7cf40f0cacfd // The pushdown is supported for non-ANSI mode. - case Round(child, scale, ansiEnabled) if !ansiEnabled => + case Round(child, scale) => ConstantString("ROUND") + blockStatement( convertStatements(fields, child, scale) )