Skip to content

Commit

Permalink
[KYUUBI #5756] Introduce specified initialized SQL to every engine
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

This pull request fixes #5756

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklists
## 📝 Author Self Checklist

- [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my feature works
- [x] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [x] Test coverage is ok
- [x] Assignees are selected.
- [x] Minimum number of approvals
- [x] No changes are requested

**Be nice. Be informative.**

Closes #5821 from hadoopkandy/KYUUBI-5756.

Closes #5756

046fe2a [kandy01.wang] [KYUUBI #5756] Introduce specified initialized SQL to every engine

Authored-by: kandy01.wang <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
hadoopkandy authored and pan3793 committed Dec 7, 2023
1 parent 5b6a729 commit f3f643a
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 13 deletions.
4 changes: 4 additions & 0 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.flink.table.gateway.service.context.DefaultContext
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_INITIALIZE_SQL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_FLINK_INITIALIZE_SQL
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.service.Serverable
Expand Down Expand Up @@ -139,7 +139,7 @@ object FlinkSQLEngine extends Logging {
tableEnv.executeSql("select 'kyuubi'").await()
}

kyuubiConf.get(ENGINE_INITIALIZE_SQL).foreach { stmt =>
kyuubiConf.get(ENGINE_FLINK_INITIALIZE_SQL).foreach { stmt =>
tableEnv.executeSql(stmt).await()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ class FlinkSessionImpl(
override def open(): Unit = {
val executor = fSession.createExecutor(Configuration.fromMap(fSession.getSessionConfig))

sessionManager.getConf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sql =>
sessionManager.getConf.get(ENGINE_SESSION_FLINK_INITIALIZE_SQL).foreach { sql =>
try {
executor.executeStatement(OperationHandle.create, sql)
} catch {
case NonFatal(e) =>
throw KyuubiSQLException(s"execute ${ENGINE_SESSION_INITIALIZE_SQL.key} $sql ", e)
throw KyuubiSQLException(s"execute ${ENGINE_SESSION_FLINK_INITIALIZE_SQL.key} $sql ", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
ENGINE_TYPE.key -> "FLINK_SQL",
ENGINE_SHARE_LEVEL.key -> shareLevel,
OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name,
ENGINE_INITIALIZE_SQL.key -> ENGINE_INITIALIZE_SQL_VALUE,
ENGINE_SESSION_INITIALIZE_SQL.key -> ENGINE_SESSION_INITIALIZE_SQL_VALUE,
ENGINE_FLINK_INITIALIZE_SQL.key -> ENGINE_INITIALIZE_SQL_VALUE,
ENGINE_SESSION_FLINK_INITIALIZE_SQL.key -> ENGINE_SESSION_INITIALIZE_SQL_VALUE,
KYUUBI_SESSION_USER_KEY -> "kandy")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ object SparkSQLEngine extends Logging {

KyuubiSparkUtil.initializeSparkSession(
session,
kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++ kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
kyuubiConf.get(ENGINE_SPARK_INITIALIZE_SQL) ++ kyuubiConf.get(
ENGINE_SESSION_SPARK_INITIALIZE_SQL))
session.sparkContext.setLocalProperty(KYUUBI_ENGINE_URL, KyuubiSparkUtil.engineUrl)
session
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)

private def newSparkSession(rootSparkSession: SparkSession): SparkSession = {
val newSparkSession = rootSparkSession.newSession()
KyuubiSparkUtil.initializeSparkSession(newSparkSession, conf.get(ENGINE_SESSION_INITIALIZE_SQL))
KyuubiSparkUtil.initializeSparkSession(
newSparkSession,
conf.get(ENGINE_SESSION_SPARK_INITIALIZE_SQL))
newSparkSession
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class SparkEngineSuites extends KyuubiFunSuite {
withSystemProperty(Map(
s"spark.$KYUUBI_ENGINE_SUBMIT_TIME_KEY" -> String.valueOf(submitTime),
s"spark.${ENGINE_INIT_TIMEOUT.key}" -> String.valueOf(timeout),
s"spark.${ENGINE_INITIALIZE_SQL.key}" ->
s"spark.${ENGINE_SPARK_INITIALIZE_SQL.key}" ->
"select 1 where java_method('java.lang.Thread', 'sleep', 60000L) is null")) {
SparkSQLEngine.setupConf()
SparkSQLEngine.currentEngine = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class SingleSessionSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
ENGINE_SHARE_LEVEL.key -> "SERVER",
ENGINE_SINGLE_SPARK_SESSION.key -> "true",
(
ENGINE_SESSION_INITIALIZE_SQL.key,
ENGINE_SESSION_SPARK_INITIALIZE_SQL.key,
"CREATE DATABASE IF NOT EXISTS INIT_DB_SOLO;" +
"CREATE TABLE IF NOT EXISTS INIT_DB_SOLO.test(a int) USING CSV;" +
"INSERT INTO INIT_DB_SOLO.test VALUES (2);"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2140,6 +2140,13 @@ object KyuubiConf {
.toSequence(";")
.createWithDefault(Nil)

val ENGINE_SESSION_FLINK_INITIALIZE_SQL: ConfigEntry[Seq[String]] =
buildConf("kyuubi.session.engine.flink.initialize.sql")
.doc("The initialize sql for Flink session. " +
"It fallback to `kyuubi.engine.session.initialize.sql`")
.version("1.8.1")
.fallbackConf(ENGINE_SESSION_INITIALIZE_SQL)

val ENGINE_DEREGISTER_EXCEPTION_CLASSES: ConfigEntry[Set[String]] =
buildConf("kyuubi.engine.deregister.exception.classes")
.doc("A comma-separated list of exception classes. If there is any exception thrown," +
Expand Down Expand Up @@ -2583,6 +2590,13 @@ object KyuubiConf {
.stringConf
.createWithDefault("yyyy-MM-dd HH:mm:ss.SSS")

val ENGINE_SESSION_SPARK_INITIALIZE_SQL: ConfigEntry[Seq[String]] =
buildConf("kyuubi.session.engine.spark.initialize.sql")
.doc("The initialize sql for Spark session. " +
"It fallback to `kyuubi.engine.session.initialize.sql`")
.version("1.8.1")
.fallbackConf(ENGINE_SESSION_INITIALIZE_SQL)

val ENGINE_TRINO_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.trino.memory")
.doc("The heap memory for the Trino query engine")
Expand Down Expand Up @@ -2657,6 +2671,12 @@ object KyuubiConf {
.stringConf
.createOptional

val ENGINE_FLINK_INITIALIZE_SQL: ConfigEntry[Seq[String]] =
buildConf("kyuubi.engine.flink.initialize.sql")
.doc("The initialize sql for Flink engine. It fallback to `kyuubi.engine.initialize.sql`.")
.version("1.8.1")
.fallbackConf(ENGINE_INITIALIZE_SQL)

val SERVER_LIMIT_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
buildConf("kyuubi.server.limit.connections.per.user")
.doc("Maximum kyuubi server connections per user." +
Expand Down Expand Up @@ -3154,6 +3174,12 @@ object KyuubiConf {
.toSequence()
.createWithDefault(Seq("spark.driver.memory", "spark.executor.memory"))

val ENGINE_SPARK_INITIALIZE_SQL: ConfigEntry[Seq[String]] =
buildConf("kyuubi.engine.spark.initialize.sql")
.doc("The initialize sql for Spark engine. It fallback to `kyuubi.engine.initialize.sql`.")
.version("1.8.1")
.fallbackConf(ENGINE_INITIALIZE_SQL)

val ENGINE_HIVE_EVENT_LOGGERS: ConfigEntry[Seq[String]] =
buildConf("kyuubi.engine.hive.event.loggers")
.doc("A comma-separated list of engine history loggers, where engine/session/operation etc" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ package org.apache.kyuubi.engine.spark

import org.apache.kyuubi.WithKyuubiServer
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INITIALIZE_SQL, ENGINE_SESSION_INITIALIZE_SQL}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SESSION_SPARK_INITIALIZE_SQL, ENGINE_SPARK_INITIALIZE_SQL}
import org.apache.kyuubi.operation.HiveJDBCTestHelper

class InitializeSQLSuite extends WithKyuubiServer with HiveJDBCTestHelper {
override protected val conf: KyuubiConf = {
KyuubiConf()
.set(
ENGINE_INITIALIZE_SQL.key,
ENGINE_SPARK_INITIALIZE_SQL.key,
"CREATE DATABASE IF NOT EXISTS INIT_DB;" +
"CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
"INSERT OVERWRITE TABLE INIT_DB.test VALUES (1);")
.set(
ENGINE_SESSION_INITIALIZE_SQL.key,
ENGINE_SESSION_SPARK_INITIALIZE_SQL.key,
"CREATE DATABASE IF NOT EXISTS INIT_DB;" +
"CREATE TABLE IF NOT EXISTS INIT_DB.test(a int) USING CSV;" +
"INSERT INTO INIT_DB.test VALUES (2);")
Expand Down

0 comments on commit f3f643a

Please sign in to comment.