Skip to content

Commit

Permalink
[KYUUBI #6425] Fix tests in spark engine and kyuubi server modules wi…
Browse files Browse the repository at this point in the history
…th Spark 4.0

# 🔍 Description

This PR fixes tests in spark engine and kyuubi server modules with Spark 4.0.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

Since Spark 4.0.0-preview1 is still under voting phase, this PR does not add CI, the change was tested in #6407 with Spark 4.0.0-preview1 RC1

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6425 from pan3793/spark-4.

Closes #6425

1019864 [Cheng Pan] Fix tests in spark engine and kyuubi server modules with Spark 4.0

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
pan3793 committed May 27, 2024
1 parent c1b55d8 commit 1e08064
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.SparkSession

import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION

trait WithSparkSQLEngine extends KyuubiFunSuite {
protected var spark: SparkSession = _
Expand All @@ -31,10 +32,10 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {

protected var connectionUrl: String = _

// Affected by such configuration' default value
// engine.initialize.sql='SHOW DATABASES'
// SPARK-35378
protected val initJobId: Int = 1
// Behavior is affected by the initialization SQL: 'SHOW DATABASES'
// SPARK-35378 (3.2.0) makes it triggers job
// SPARK-43124 (4.0.0) makes it avoid triggering job
protected val initJobId: Int = if (SPARK_ENGINE_RUNTIME_VERSION >= "4.0") 0 else 1

override def beforeAll(): Unit = {
startSparkEngine()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.kyuubi

import java.util.UUID

import org.apache.spark.SparkException
import org.apache.spark.{SparkArithmeticException, SparkException}
import org.apache.spark.sql.internal.SQLConf.ANSI_ENABLED
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime

Expand Down Expand Up @@ -47,8 +47,10 @@ abstract class SparkSQLEngineDeregisterSuite
assert(engine.frontendServices.head.discoveryService.get.getServiceState ===
ServiceState.STARTED)
(0 until maxJobFailures).foreach { _ =>
val e = intercept[SparkException](spark.sql(query).collect())
assert(e.getCause.isInstanceOf[ArithmeticException])
intercept[Exception](spark.sql(query).collect()) match {
case se: SparkException => assert(se.getCause.isInstanceOf[ArithmeticException])
case e => assert(e.isInstanceOf[SparkArithmeticException])
}
}
eventually(timeout(5.seconds), interval(1.second)) {
assert(engine.frontendServices.head.discoveryService.get.getServiceState ===
Expand Down Expand Up @@ -113,12 +115,18 @@ class SparkSQLEngineDeregisterExceptionTTLSuite
assert(engine.frontendServices.head.discoveryService.get.getServiceState ===
ServiceState.STARTED)

intercept[SparkException](spark.sql(query).collect())
intercept[Exception](spark.sql(query).collect()) match {
case se: SparkException => assert(se.getCause.isInstanceOf[ArithmeticException])
case e => assert(e.isInstanceOf[SparkArithmeticException])
}

Thread.sleep(deregisterExceptionTTL + 1000)

(0 until maxJobFailures).foreach { _ =>
val e = intercept[SparkException](spark.sql(query).collect())
assert(e.getCause.isInstanceOf[ArithmeticException])
intercept[Exception](spark.sql(query).collect()) match {
case se: SparkException => assert(se.getCause.isInstanceOf[ArithmeticException])
case e => assert(e.isInstanceOf[SparkArithmeticException])
}
}
eventually(timeout(5.seconds), interval(1.second)) {
assert(engine.frontendServices.head.discoveryService.get.getServiceState ===
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
val sql = "select date_sub(date'2011-11-11', '1.2')"
val errors = Set(
"The second argument of 'date_sub' function needs to be an integer.",
// unquoted since Spark-3.4, see https://github.com/apache/spark/pull/36693
"The second argument of date_sub function needs to be an integer.")
"SECOND_FUNCTION_ARGUMENT_NOT_INTEGER",
"CAST_INVALID_INPUT")

withJdbcStatement() { statement =>
val e = intercept[SQLException] {
Expand Down Expand Up @@ -254,7 +254,7 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
rs.next()
// scala repl will return resX = YYYYY, and here we only check YYYYY
val sparkVer = rs.getString(1).split("=")(1).trim
assert("\\d\\.\\d\\.\\d(-SNAPSHOT)?".r.pattern.matcher(sparkVer).matches())
assert("\\d\\.\\d\\.\\d.*".r.pattern.matcher(sparkVer).matches())
assert(rs.getMetaData.getColumnName(1) === "output")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {
}
}

test("kyuubi #3214: Plan only mode with an incorrect value") {
test("KYUUBI #3214: Plan only mode with an incorrect value") {
withSessionConf()(Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> "parse"))(Map.empty) {
withJdbcStatement() { statement =>
statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=parser")
Expand All @@ -196,7 +196,11 @@ class PlanOnlyOperationSuite extends WithKyuubiServer with HiveJDBCTestHelper {
statement.executeQuery(s"set ${KyuubiConf.OPERATION_PLAN_ONLY_MODE.key}=parse")
val result = statement.executeQuery("select 1")
assert(result.next())
assert(result.getString(1).contains("Project [unresolvedalias(1, None)]"))
val plan = result.getString(1)
assert {
plan.contains("Project [unresolvedalias(1, None)]") ||
plan.contains("Project [unresolvedalias(1)]")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite

// check both kyuubi log and engine log
assert(
logs.exists(_.contains("/bin/spark-submit")) &&
logs.exists(_.contains(s"SparkContext: Submitted application: $sparkBatchTestAppName")))
logs.exists(_.contains("bin/spark-submit")) &&
logs.exists(_.contains(s"Submitted application: $sparkBatchTestAppName")))
}

// invalid user name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ class MySQLSparkQuerySuite extends WithKyuubiServer with MySQLJDBCTestHelper {
val sql = "select date_sub(date'2011-11-11', '1.2')"
val errors = Set(
"The second argument of 'date_sub' function needs to be an integer.",
// unquoted since Spark-3.4, see https://github.com/apache/spark/pull/36693
"The second argument of date_sub function needs to be an integer.")
"SECOND_FUNCTION_ARGUMENT_NOT_INTEGER",
"CAST_INVALID_INPUT")

withJdbcStatement() { statement =>
val e = intercept[SQLException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
ldapUserPasswd,
"--forward")
result = testPrematureExitForControlCli(logArgs, "")
assert(result.contains(s"Submitted application: ${sparkBatchTestAppName}"))
assert(result.contains("ShutdownHookManager: Shutdown hook called"))
assert(result.contains(s"Submitted application: $sparkBatchTestAppName"))
assert(result.contains("Shutdown hook called"))
}

test("submit batch test") {
Expand All @@ -271,8 +271,8 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
"--password",
ldapUserPasswd)
val result = testPrematureExitForControlCli(submitArgs, "")
assert(result.contains(s"Submitted application: ${sparkBatchTestAppName}"))
assert(result.contains("ShutdownHookManager: Shutdown hook called"))
assert(result.contains(s"Submitted application: $sparkBatchTestAppName"))
assert(result.contains("Shutdown hook called"))
}

test("submit batch test with waitCompletion=false") {
Expand All @@ -288,8 +288,8 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
"--conf",
s"${CtlConf.CTL_BATCH_LOG_QUERY_INTERVAL.key}=100")
val result = testPrematureExitForControlCli(submitArgs, "")
assert(result.contains(s"/bin/spark-submit"))
assert(!result.contains("ShutdownHookManager: Shutdown hook called"))
assert(result.contains("bin/spark-submit"))
assert(!result.contains("Shutdown hook called"))
}

test("list batch test") {
Expand Down

0 comments on commit 1e08064

Please sign in to comment.