diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index b3a9ff337db..9ef97866195 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -434,57 +434,58 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co ### Session -| Key | Default | Meaning | Type | Since | -|------------------------------------------------------|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| -| kyuubi.session.check.interval | PT5M | The check interval for session timeout. | duration | 1.0.0 | -| kyuubi.session.close.on.disconnect | true | Session will be closed when client disconnects from kyuubi gateway. Set this to false to have session outlive its parent connection. | boolean | 1.8.0 | -| kyuubi.session.conf.advisor | <undefined> | A config advisor plugin for Kyuubi Server. This plugin can provide a list of custom configs for different users or session configs and overwrite the session configs before opening a new session. This config value should be a subclass of `org.apache.kyuubi.plugin.SessionConfAdvisor` which has a zero-arg constructor. | seq | 1.5.0 | -| kyuubi.session.conf.file.reload.interval | PT10M | When `FileSessionConfAdvisor` is used, this configuration defines the expired time of `$KYUUBI_CONF_DIR/kyuubi-session-.conf` in the cache. After exceeding this value, the file will be reloaded. | duration | 1.7.0 | -| kyuubi.session.conf.ignore.list || A comma-separated list of ignored keys. If the client connection contains any of them, the key and the corresponding value will be removed silently during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax. | set | 1.2.0 | -| kyuubi.session.conf.profile | <undefined> | Specify a profile to load session-level configurations from `$KYUUBI_CONF_DIR/kyuubi-session-.conf`. This configuration will be ignored if the file does not exist. This configuration only takes effect when `kyuubi.session.conf.advisor` is set as `org.apache.kyuubi.session.FileSessionConfAdvisor`. | string | 1.7.0 | -| kyuubi.session.conf.restrict.list || A comma-separated list of restricted keys. If the client connection contains any of them, the connection will be rejected explicitly during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax. | set | 1.2.0 | -| kyuubi.session.engine.alive.max.failures | 3 | The maximum number of failures allowed for the engine. | int | 1.8.1 | -| kyuubi.session.engine.alive.probe.enabled | false | Whether to enable the engine alive probe, it true, we will create a companion thrift client that keeps sending simple requests to check whether the engine is alive. | boolean | 1.6.0 | -| kyuubi.session.engine.alive.probe.interval | PT10S | The interval for engine alive probe. | duration | 1.6.0 | -| kyuubi.session.engine.alive.timeout | PT2M | The timeout for engine alive. If there is no alive probe success in the last timeout window, the engine will be marked as no-alive. | duration | 1.6.0 | -| kyuubi.session.engine.check.interval | PT1M | The check interval for engine timeout | duration | 1.0.0 | -| kyuubi.session.engine.flink.fetch.timeout | <undefined> | Result fetch timeout for Flink engine. If the timeout is reached, the result fetch would be stopped and the current fetched would be returned. If no data are fetched, a TimeoutException would be thrown. | duration | 1.8.0 | -| kyuubi.session.engine.flink.initialize.sql || The initialize sql for Flink session. It fallback to `kyuubi.engine.session.initialize.sql` | seq | 1.8.1 | -| kyuubi.session.engine.flink.main.resource | <undefined> | The package used to create Flink SQL engine remote job. If it is undefined, Kyuubi will use the default | string | 1.4.0 | -| kyuubi.session.engine.flink.max.rows | 1000000 | Max rows of Flink query results. For batch queries, rows exceeding the limit would be ignored. For streaming queries, the query would be canceled if the limit is reached. | int | 1.5.0 | -| kyuubi.session.engine.hive.main.resource | <undefined> | The package used to create Hive engine remote job. If it is undefined, Kyuubi will use the default | string | 1.6.0 | -| kyuubi.session.engine.idle.timeout | PT30M | engine timeout, the engine will self-terminate when it's not accessed for this duration. 0 or negative means not to self-terminate. | duration | 1.0.0 | -| kyuubi.session.engine.initialize.timeout | PT3M | Timeout for starting the background engine, e.g. SparkSQLEngine. | duration | 1.0.0 | -| kyuubi.session.engine.launch.async | true | When opening kyuubi session, whether to launch the backend engine asynchronously. When true, the Kyuubi server will set up the connection with the client without delay as the backend engine will be created asynchronously. | boolean | 1.4.0 | -| kyuubi.session.engine.log.timeout | PT24H | If we use Spark as the engine then the session submit log is the console output of spark-submit. We will retain the session submit log until over the config value. | duration | 1.1.0 | -| kyuubi.session.engine.login.timeout | PT15S | The timeout of creating the connection to remote sql query engine | duration | 1.0.0 | -| kyuubi.session.engine.open.max.attempts | 9 | The number of times an open engine will retry when encountering a special error. | int | 1.7.0 | -| kyuubi.session.engine.open.onFailure | RETRY | The behavior when opening engine failed: | string | 1.8.1 | -| kyuubi.session.engine.open.retry.wait | PT10S | How long to wait before retrying to open the engine after failure. | duration | 1.7.0 | -| kyuubi.session.engine.share.level | USER | (deprecated) - Using kyuubi.engine.share.level instead | string | 1.0.0 | -| kyuubi.session.engine.spark.initialize.sql || The initialize sql for Spark session. It fallback to `kyuubi.engine.session.initialize.sql` | seq | 1.8.1 | -| kyuubi.session.engine.spark.main.resource | <undefined> | The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default | string | 1.0.0 | -| kyuubi.session.engine.spark.max.initial.wait | PT1M | Max wait time for the initial connection to Spark engine. The engine will self-terminate no new incoming connection is established within this time. This setting only applies at the CONNECTION share level. 0 or negative means not to self-terminate. | duration | 1.8.0 | -| kyuubi.session.engine.spark.max.lifetime | PT0S | Max lifetime for Spark engine, the engine will self-terminate when it reaches the end of life. 0 or negative means not to self-terminate. | duration | 1.6.0 | -| kyuubi.session.engine.spark.progress.timeFormat | yyyy-MM-dd HH:mm:ss.SSS | The time format of the progress bar | string | 1.6.0 | -| kyuubi.session.engine.spark.progress.update.interval | PT1S | Update period of progress bar. | duration | 1.6.0 | -| kyuubi.session.engine.spark.showProgress | false | When true, show the progress bar in the Spark's engine log. | boolean | 1.6.0 | -| kyuubi.session.engine.startup.destroy.timeout | PT5S | Engine startup process destroy wait time, if the process does not stop after this time, force destroy instead. This configuration only takes effect when `kyuubi.session.engine.startup.waitCompletion=false`. | duration | 1.8.0 | -| kyuubi.session.engine.startup.error.max.size | 8192 | During engine bootstrapping, if anderror occurs, using this config to limit the length of error message(characters). | int | 1.1.0 | -| kyuubi.session.engine.startup.maxLogLines | 10 | The maximum number of engine log lines when errors occur during the engine startup phase. Note that this config effects on client-side to help track engine startup issues. | int | 1.4.0 | -| kyuubi.session.engine.startup.waitCompletion | true | Whether to wait for completion after the engine starts. If false, the startup process will be destroyed after the engine is started. Note that only use it when the driver is not running locally, such as in yarn-cluster mode; Otherwise, the engine will be killed. | boolean | 1.5.0 | -| kyuubi.session.engine.trino.connection.catalog | <undefined> | The default catalog that Trino engine will connect to | string | 1.5.0 | -| kyuubi.session.engine.trino.connection.url | <undefined> | The server url that Trino engine will connect to | string | 1.5.0 | -| kyuubi.session.engine.trino.main.resource | <undefined> | The package used to create Trino engine remote job. If it is undefined, Kyuubi will use the default | string | 1.5.0 | -| kyuubi.session.engine.trino.showProgress | true | When true, show the progress bar and final info in the Trino engine log. | boolean | 1.6.0 | -| kyuubi.session.engine.trino.showProgress.debug | false | When true, show the progress debug info in the Trino engine log. | boolean | 1.6.0 | -| kyuubi.session.group.provider | hadoop | A group provider plugin for Kyuubi Server. This plugin can provide primary group and groups information for different users or session configs. This config value should be a subclass of `org.apache.kyuubi.plugin.GroupProvider` which has a zero-arg constructor. Kyuubi provides the following built-in implementations:
  • hadoop: delegate the user group mapping to hadoop UserGroupInformation.
  • | string | 1.7.0 | -| kyuubi.session.idle.timeout | PT6H | session idle timeout, it will be closed when it's not accessed for this duration | duration | 1.2.0 | -| kyuubi.session.local.dir.allow.list || The local dir list that are allowed to access by the kyuubi session application. End-users might set some parameters such as `spark.files` and it will upload some local files when launching the kyuubi engine, if the local dir allow list is defined, kyuubi will check whether the path to upload is in the allow list. Note that, if it is empty, there is no limitation for that. And please use absolute paths. | set | 1.6.0 | -| kyuubi.session.name | <undefined> | A human readable name of the session and we use empty string by default. This name will be recorded in the event. Note that, we only apply this value from session conf. | string | 1.4.0 | -| kyuubi.session.proxy.user | <undefined> | An alternative to hive.server2.proxy.user. The current behavior is consistent with hive.server2.proxy.user and now only takes effect in RESTFul API. When both parameters are set, kyuubi.session.proxy.user takes precedence. | string | 1.9.0 | -| kyuubi.session.timeout | PT6H | (deprecated)session timeout, it will be closed when it's not accessed for this duration | duration | 1.0.0 | -| kyuubi.session.user.sign.enabled | false | Whether to verify the integrity of session user name on the engine side, e.g. Authz plugin in Spark. | boolean | 1.7.0 | +| Key | Default | Meaning | Type | Since | +|---------------------------------------------------------|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| +| kyuubi.session.check.interval | PT5M | The check interval for session timeout. | duration | 1.0.0 | +| kyuubi.session.close.on.disconnect | true | Session will be closed when client disconnects from kyuubi gateway. Set this to false to have session outlive its parent connection. | boolean | 1.8.0 | +| kyuubi.session.conf.advisor | <undefined> | A config advisor plugin for Kyuubi Server. This plugin can provide a list of custom configs for different users or session configs and overwrite the session configs before opening a new session. This config value should be a subclass of `org.apache.kyuubi.plugin.SessionConfAdvisor` which has a zero-arg constructor. | seq | 1.5.0 | +| kyuubi.session.conf.file.reload.interval | PT10M | When `FileSessionConfAdvisor` is used, this configuration defines the expired time of `$KYUUBI_CONF_DIR/kyuubi-session-.conf` in the cache. After exceeding this value, the file will be reloaded. | duration | 1.7.0 | +| kyuubi.session.conf.ignore.list || A comma-separated list of ignored keys. If the client connection contains any of them, the key and the corresponding value will be removed silently during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax. | set | 1.2.0 | +| kyuubi.session.conf.profile | <undefined> | Specify a profile to load session-level configurations from `$KYUUBI_CONF_DIR/kyuubi-session-.conf`. This configuration will be ignored if the file does not exist. This configuration only takes effect when `kyuubi.session.conf.advisor` is set as `org.apache.kyuubi.session.FileSessionConfAdvisor`. | string | 1.7.0 | +| kyuubi.session.conf.restrict.list || A comma-separated list of restricted keys. If the client connection contains any of them, the connection will be rejected explicitly during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax. | set | 1.2.0 | +| kyuubi.session.engine.alive.max.failures | 3 | The maximum number of failures allowed for the engine. | int | 1.8.1 | +| kyuubi.session.engine.alive.probe.enabled | false | Whether to enable the engine alive probe, it true, we will create a companion thrift client that keeps sending simple requests to check whether the engine is alive. | boolean | 1.6.0 | +| kyuubi.session.engine.alive.probe.interval | PT10S | The interval for engine alive probe. | duration | 1.6.0 | +| kyuubi.session.engine.alive.timeout | PT2M | The timeout for engine alive. If there is no alive probe success in the last timeout window, the engine will be marked as no-alive. | duration | 1.6.0 | +| kyuubi.session.engine.check.interval | PT1M | The check interval for engine timeout | duration | 1.0.0 | +| kyuubi.session.engine.flink.fetch.timeout | <undefined> | Result fetch timeout for Flink engine. If the timeout is reached, the result fetch would be stopped and the current fetched would be returned. If no data are fetched, a TimeoutException would be thrown. | duration | 1.8.0 | +| kyuubi.session.engine.flink.initialize.sql || The initialize sql for Flink session. It fallback to `kyuubi.engine.session.initialize.sql` | seq | 1.8.1 | +| kyuubi.session.engine.flink.main.resource | <undefined> | The package used to create Flink SQL engine remote job. If it is undefined, Kyuubi will use the default | string | 1.4.0 | +| kyuubi.session.engine.flink.max.rows | 1000000 | Max rows of Flink query results. For batch queries, rows exceeding the limit would be ignored. For streaming queries, the query would be canceled if the limit is reached. | int | 1.5.0 | +| kyuubi.session.engine.hive.main.resource | <undefined> | The package used to create Hive engine remote job. If it is undefined, Kyuubi will use the default | string | 1.6.0 | +| kyuubi.session.engine.idle.timeout | PT30M | engine timeout, the engine will self-terminate when it's not accessed for this duration. 0 or negative means not to self-terminate. | duration | 1.0.0 | +| kyuubi.session.engine.initialize.timeout | PT3M | Timeout for starting the background engine, e.g. SparkSQLEngine. | duration | 1.0.0 | +| kyuubi.session.engine.launch.async | true | When opening kyuubi session, whether to launch the backend engine asynchronously. When true, the Kyuubi server will set up the connection with the client without delay as the backend engine will be created asynchronously. | boolean | 1.4.0 | +| kyuubi.session.engine.log.timeout | PT24H | If we use Spark as the engine then the session submit log is the console output of spark-submit. We will retain the session submit log until over the config value. | duration | 1.1.0 | +| kyuubi.session.engine.login.timeout | PT15S | The timeout of creating the connection to remote sql query engine | duration | 1.0.0 | +| kyuubi.session.engine.open.max.attempts | 9 | The number of times an open engine will retry when encountering a special error. | int | 1.7.0 | +| kyuubi.session.engine.open.onFailure | RETRY | The behavior when opening engine failed:
    • RETRY: retry to open engine for kyuubi.session.engine.open.max.attempts times.
    • DEREGISTER_IMMEDIATELY: deregister the engine immediately.
    • DEREGISTER_AFTER_RETRY: deregister the engine after retry to open engine for kyuubi.session.engine.open.max.attempts times.
    | string | 1.8.1 | +| kyuubi.session.engine.open.retry.wait | PT10S | How long to wait before retrying to open the engine after failure. | duration | 1.7.0 | +| kyuubi.session.engine.share.level | USER | (deprecated) - Using kyuubi.engine.share.level instead | string | 1.0.0 | +| kyuubi.session.engine.spark.initialize.sql || The initialize sql for Spark session. It fallback to `kyuubi.engine.session.initialize.sql` | seq | 1.8.1 | +| kyuubi.session.engine.spark.main.resource | <undefined> | The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default | string | 1.0.0 | +| kyuubi.session.engine.spark.max.initial.wait | PT1M | Max wait time for the initial connection to Spark engine. The engine will self-terminate no new incoming connection is established within this time. This setting only applies at the CONNECTION share level. 0 or negative means not to self-terminate. | duration | 1.8.0 | +| kyuubi.session.engine.spark.max.lifetime | PT0S | Max lifetime for Spark engine, the engine will self-terminate when it reaches the end of life. 0 or negative means not to self-terminate. | duration | 1.6.0 | +| kyuubi.session.engine.spark.max.lifetime.gracefulPeriod | PT0S | Graceful period for Spark engine to wait the connections disconnected after reaching the end of life. After the graceful period, all the connections without running operations will be forcibly disconnected. 0 or negative means always waiting the connections disconnected. | duration | 1.8.1 | +| kyuubi.session.engine.spark.progress.timeFormat | yyyy-MM-dd HH:mm:ss.SSS | The time format of the progress bar | string | 1.6.0 | +| kyuubi.session.engine.spark.progress.update.interval | PT1S | Update period of progress bar. | duration | 1.6.0 | +| kyuubi.session.engine.spark.showProgress | false | When true, show the progress bar in the Spark's engine log. | boolean | 1.6.0 | +| kyuubi.session.engine.startup.destroy.timeout | PT5S | Engine startup process destroy wait time, if the process does not stop after this time, force destroy instead. This configuration only takes effect when `kyuubi.session.engine.startup.waitCompletion=false`. | duration | 1.8.0 | +| kyuubi.session.engine.startup.error.max.size | 8192 | During engine bootstrapping, if anderror occurs, using this config to limit the length of error message(characters). | int | 1.1.0 | +| kyuubi.session.engine.startup.maxLogLines | 10 | The maximum number of engine log lines when errors occur during the engine startup phase. Note that this config effects on client-side to help track engine startup issues. | int | 1.4.0 | +| kyuubi.session.engine.startup.waitCompletion | true | Whether to wait for completion after the engine starts. If false, the startup process will be destroyed after the engine is started. Note that only use it when the driver is not running locally, such as in yarn-cluster mode; Otherwise, the engine will be killed. | boolean | 1.5.0 | +| kyuubi.session.engine.trino.connection.catalog | <undefined> | The default catalog that Trino engine will connect to | string | 1.5.0 | +| kyuubi.session.engine.trino.connection.url | <undefined> | The server url that Trino engine will connect to | string | 1.5.0 | +| kyuubi.session.engine.trino.main.resource | <undefined> | The package used to create Trino engine remote job. If it is undefined, Kyuubi will use the default | string | 1.5.0 | +| kyuubi.session.engine.trino.showProgress | true | When true, show the progress bar and final info in the Trino engine log. | boolean | 1.6.0 | +| kyuubi.session.engine.trino.showProgress.debug | false | When true, show the progress debug info in the Trino engine log. | boolean | 1.6.0 | +| kyuubi.session.group.provider | hadoop | A group provider plugin for Kyuubi Server. This plugin can provide primary group and groups information for different users or session configs. This config value should be a subclass of `org.apache.kyuubi.plugin.GroupProvider` which has a zero-arg constructor. Kyuubi provides the following built-in implementations:
  • hadoop: delegate the user group mapping to hadoop UserGroupInformation.
  • | string | 1.7.0 | +| kyuubi.session.idle.timeout | PT6H | session idle timeout, it will be closed when it's not accessed for this duration | duration | 1.2.0 | +| kyuubi.session.local.dir.allow.list || The local dir list that are allowed to access by the kyuubi session application. End-users might set some parameters such as `spark.files` and it will upload some local files when launching the kyuubi engine, if the local dir allow list is defined, kyuubi will check whether the path to upload is in the allow list. Note that, if it is empty, there is no limitation for that. And please use absolute paths. | set | 1.6.0 | +| kyuubi.session.name | <undefined> | A human readable name of the session and we use empty string by default. This name will be recorded in the event. Note that, we only apply this value from session conf. | string | 1.4.0 | +| kyuubi.session.proxy.user | <undefined> | An alternative to hive.server2.proxy.user. The current behavior is consistent with hive.server2.proxy.user and now only takes effect in RESTFul API. When both parameters are set, kyuubi.session.proxy.user takes precedence. | string | 1.9.0 | +| kyuubi.session.timeout | PT6H | (deprecated)session timeout, it will be closed when it's not accessed for this duration | duration | 1.0.0 | +| kyuubi.session.user.sign.enabled | false | Whether to verify the integrity of session user name on the engine side, e.g. Authz plugin in Spark. | boolean | 1.7.0 | ### Spnego diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index d2e738366f8..d1331cd0284 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -129,7 +129,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin info(s"Spark engine is de-registering from engine discovery space.") frontendServices.flatMap(_.discoveryService).foreach(_.stop()) while (backendService.sessionManager.getOpenSessionCount > 0) { - Thread.sleep(1000 * 60) + Thread.sleep(TimeUnit.SECONDS.toMillis(10)) } info(s"Spark engine has no open session now, terminating.") stop() @@ -170,8 +170,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin val maxLifetime = conf.get(ENGINE_SPARK_MAX_LIFETIME) val deregistered = new AtomicBoolean(false) if (maxLifetime > 0) { + val gracefulPeriod = conf.get(ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD) val checkTask: Runnable = () => { - if (!shutdown.get && System.currentTimeMillis() - getStartTime > maxLifetime) { + val elapsedTime = System.currentTimeMillis() - getStartTime + if (!shutdown.get && elapsedTime > maxLifetime) { if (deregistered.compareAndSet(false, true)) { info(s"Spark engine has been running for more than $maxLifetime ms," + s" deregistering from engine discovery space.") @@ -182,6 +184,24 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin info(s"Spark engine has been running for more than $maxLifetime ms" + s" and no open session now, terminating.") stop() + } else if (gracefulPeriod > 0 && elapsedTime > maxLifetime + gracefulPeriod) { + backendService.sessionManager.allSessions().foreach { session => + val operationCount = + backendService.sessionManager.operationManager.allOperations() + .filter(_.getSession == session) + .size + if (operationCount == 0) { + warn(s"Closing session ${session.handle.identifier} forcibly that has no" + + s" operation and has been running for more than $gracefulPeriod ms after engine" + + s" max lifetime.") + try { + backendService.sessionManager.closeSession(session.handle) + } catch { + case e: Throwable => + error(s"Error closing session ${session.handle.identifier}", e) + } + } + } } } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala index 727b232e3f8..9b08c5b0c91 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala @@ -17,19 +17,13 @@ package org.apache.kyuubi.engine.spark -import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME} import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.ShareLevel.ShareLevel trait EtcdShareLevelSparkEngineSuite extends ShareLevelSparkEngineTests with WithEtcdCluster { override def withKyuubiConf: Map[String, String] = { - super.withKyuubiConf ++ - etcdConf ++ Map( - ENGINE_SHARE_LEVEL.key -> shareLevel.toString, - ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s", - ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0", - ENGINE_CHECK_INTERVAL.key -> "PT5s") + super.withKyuubiConf ++ etcdConf } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala index c83139592f7..f2b1544b138 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala @@ -22,6 +22,7 @@ import java.util.UUID import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar.convertIntToGrainOfTime +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME, ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD} import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.ShareLevel.ShareLevel import org.apache.kyuubi.operation.HiveJDBCTestHelper @@ -35,6 +36,13 @@ trait ShareLevelSparkEngineTests extends WithDiscoverySparkSQLEngine with HiveJDBCTestHelper { def shareLevel: ShareLevel + override def withKyuubiConf: Map[String, String] = super.withKyuubiConf ++ Map( + ENGINE_SHARE_LEVEL.key -> shareLevel.toString, + ENGINE_SPARK_MAX_LIFETIME.key -> "PT5s", + ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0", + ENGINE_CHECK_INTERVAL.key -> "PT2s", + ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD.key -> "100") + override protected def jdbcUrl: String = getJdbcUrl override val namespace: String = { // for test, we always use uuid as namespace @@ -76,4 +84,23 @@ trait ShareLevelSparkEngineTests } } } + + test("test spark engine max life-time with graceful period") { + withDiscoveryClient { discoveryClient => + assert(engine.getServiceState == ServiceState.STARTED) + assert(discoveryClient.pathExists(namespace)) + withJdbcStatement() { _ => + eventually(Timeout(30.seconds)) { + shareLevel match { + case ShareLevel.CONNECTION => + assert(engine.getServiceState == ServiceState.STOPPED) + assert(discoveryClient.pathNonExists(namespace)) + case _ => + assert(engine.getServiceState == ServiceState.STOPPED) + assert(discoveryClient.pathExists(namespace)) + } + } + } + } + } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala index f24abb36c0e..15f3e26860d 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala @@ -17,22 +17,13 @@ package org.apache.kyuubi.engine.spark -import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL -import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL -import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT -import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.ShareLevel.ShareLevel trait ZookeeperShareLevelSparkEngineSuite extends ShareLevelSparkEngineTests with WithEmbeddedZookeeper { override def withKyuubiConf: Map[String, String] = { - super.withKyuubiConf ++ - zookeeperConf ++ Map( - ENGINE_SHARE_LEVEL.key -> shareLevel.toString, - ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s", - ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0", - ENGINE_CHECK_INTERVAL.key -> "PT5s") + super.withKyuubiConf ++ zookeeperConf } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 9f09e9bb636..caeecf9f0ad 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1330,6 +1330,16 @@ object KyuubiConf { .timeConf .createWithDefault(0) + val ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD: ConfigEntry[Long] = + buildConf("kyuubi.session.engine.spark.max.lifetime.gracefulPeriod") + .doc("Graceful period for Spark engine to wait the connections disconnected after reaching" + + " the end of life. After the graceful period, all the connections without running" + + " operations will be forcibly disconnected. 0 or negative means always waiting the" + + " connections disconnected.") + .version("1.8.1") + .timeConf + .createWithDefault(0) + val ENGINE_SPARK_MAX_INITIAL_WAIT: ConfigEntry[Long] = buildConf("kyuubi.session.engine.spark.max.initial.wait") .doc("Max wait time for the initial connection to Spark engine. The engine will" + diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala index a1b1466d122..c7eee15030f 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.ha.client -import java.util.concurrent.CountDownLatch +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import org.apache.kyuubi.Logging @@ -68,7 +68,7 @@ abstract class ServiceDiscovery( def stopGracefully(isLost: Boolean = false): Unit = { while (fe.be.sessionManager.getOpenSessionCount > 0) { info(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are active, delay shutdown") - Thread.sleep(1000 * 60) + Thread.sleep(TimeUnit.SECONDS.toMillis(10)) } isServerLost.set(isLost) gracefulShutdownLatch.countDown() diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala index 7edc7e8a310..2f27f7b8096 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala @@ -53,6 +53,7 @@ import org.apache.kyuubi.ha.client.DiscoveryPaths import org.apache.kyuubi.ha.client.ServiceDiscovery import org.apache.kyuubi.ha.client.ServiceNodeInfo import org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient._ +import org.apache.kyuubi.util.ThreadUtils class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient { @@ -381,7 +382,12 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient { .filter(_.getEventType == WatchEvent.EventType.DELETE).foreach(_ => { warn(s"This Kyuubi instance ${instance} is now de-registered from" + s" ETCD. The server will be shut down after the last client session completes.") - serviceDiscovery.stopGracefully() + // for jetcd, the watcher event process might block the main thread, + // so start a new thread to do the de-register work as a workaround, + // see details in https://github.com/etcd-io/jetcd/issues/1089 + ThreadUtils.runInNewThread("deregister-watcher-thread", isDaemon = false) { + serviceDiscovery.stopGracefully() + } }) }