From 0749d75eb17963b177917e23c6aa387d2ce797bd Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Wed, 7 Feb 2024 10:16:46 +0800 Subject: [PATCH] [KYUUBI #5952] Disconnect connections without running operations after engine maxlife time graceful period MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— We found that, some kyuubi connections(maybe managed by jdbc connection pool likes hikari) always keep alive, and the engine can not be terminated after exceeds the max life time. So, In this pr, I introduce a graceful period after spark engine max life time, after the graceful period, the connections without running operations will be disconnected forcibly. Close #5952 ## Describe Your Solution ๐Ÿ”ง Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐Ÿ“ - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6040 from turboFei/close_on_nooperation_rebase. Closes #5952 0b05ddc0c [Fei Wang] comments 4329a85cf [Fei Wang] async stop b39c9b3a0 [Fei Wang] use short sleep 7c123f612 [Fei Wang] save 45ad3489d [Fei Wang] check no running operation Authored-by: Fei Wang Signed-off-by: Fei Wang (cherry picked from commit 8c3f471ae370d55c54b62ffb7d74096839c77c21) --- docs/configuration/settings.md | 101 +++++++++--------- .../kyuubi/engine/spark/SparkSQLEngine.scala | 24 ++++- .../EtcdShareLevelSparkEngineSuite.scala | 8 +- .../spark/ShareLevelSparkEngineTests.scala | 27 +++++ .../ZookeeperShareLevelSparkEngineSuite.scala | 11 +- .../org/apache/kyuubi/config/KyuubiConf.scala | 10 ++ .../kyuubi/ha/client/ServiceDiscovery.scala | 4 +- .../ha/client/etcd/EtcdDiscoveryClient.scala | 8 +- 8 files changed, 121 insertions(+), 72 deletions(-) diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index 69d2203378c..4eecea1a0c2 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -414,56 +414,57 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co ### Session -| Key | Default | Meaning | Type | Since | -|------------------------------------------------------|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| -| kyuubi.session.check.interval | PT5M | The check interval for session timeout. | duration | 1.0.0 | -| kyuubi.session.close.on.disconnect | true | Session will be closed when client disconnects from kyuubi gateway. Set this to false to have session outlive its parent connection. | boolean | 1.8.0 | -| kyuubi.session.conf.advisor | <undefined> | A config advisor plugin for Kyuubi Server. This plugin can provide some custom configs for different users or session configs and overwrite the session configs before opening a new session. This config value should be a subclass of `org.apache.kyuubi.plugin.SessionConfAdvisor` which has a zero-arg constructor. | string | 1.5.0 | -| kyuubi.session.conf.file.reload.interval | PT10M | When `FileSessionConfAdvisor` is used, this configuration defines the expired time of `$KYUUBI_CONF_DIR/kyuubi-session-.conf` in the cache. After exceeding this value, the file will be reloaded. | duration | 1.7.0 | -| kyuubi.session.conf.ignore.list || A comma-separated list of ignored keys. If the client connection contains any of them, the key and the corresponding value will be removed silently during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax. | set | 1.2.0 | -| kyuubi.session.conf.profile | <undefined> | Specify a profile to load session-level configurations from `$KYUUBI_CONF_DIR/kyuubi-session-.conf`. This configuration will be ignored if the file does not exist. This configuration only takes effect when `kyuubi.session.conf.advisor` is set as `org.apache.kyuubi.session.FileSessionConfAdvisor`. | string | 1.7.0 | -| kyuubi.session.conf.restrict.list || A comma-separated list of restricted keys. If the client connection contains any of them, the connection will be rejected explicitly during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax. | set | 1.2.0 | -| kyuubi.session.engine.alive.max.failures | 3 | The maximum number of failures allowed for the engine. | int | 1.8.1 | -| kyuubi.session.engine.alive.probe.enabled | false | Whether to enable the engine alive probe, it true, we will create a companion thrift client that keeps sending simple requests to check whether the engine is alive. | boolean | 1.6.0 | -| kyuubi.session.engine.alive.probe.interval | PT10S | The interval for engine alive probe. | duration | 1.6.0 | -| kyuubi.session.engine.alive.timeout | PT2M | The timeout for engine alive. If there is no alive probe success in the last timeout window, the engine will be marked as no-alive. | duration | 1.6.0 | -| kyuubi.session.engine.check.interval | PT1M | The check interval for engine timeout | duration | 1.0.0 | -| kyuubi.session.engine.flink.fetch.timeout | <undefined> | Result fetch timeout for Flink engine. If the timeout is reached, the result fetch would be stopped and the current fetched would be returned. If no data are fetched, a TimeoutException would be thrown. | duration | 1.8.0 | -| kyuubi.session.engine.flink.initialize.sql || The initialize sql for Flink session. It fallback to `kyuubi.engine.session.initialize.sql` | seq | 1.8.1 | -| kyuubi.session.engine.flink.main.resource | <undefined> | The package used to create Flink SQL engine remote job. If it is undefined, Kyuubi will use the default | string | 1.4.0 | -| kyuubi.session.engine.flink.max.rows | 1000000 | Max rows of Flink query results. For batch queries, rows exceeding the limit would be ignored. For streaming queries, the query would be canceled if the limit is reached. | int | 1.5.0 | -| kyuubi.session.engine.hive.main.resource | <undefined> | The package used to create Hive engine remote job. If it is undefined, Kyuubi will use the default | string | 1.6.0 | -| kyuubi.session.engine.idle.timeout | PT30M | engine timeout, the engine will self-terminate when it's not accessed for this duration. 0 or negative means not to self-terminate. | duration | 1.0.0 | -| kyuubi.session.engine.initialize.timeout | PT3M | Timeout for starting the background engine, e.g. SparkSQLEngine. | duration | 1.0.0 | -| kyuubi.session.engine.launch.async | true | When opening kyuubi session, whether to launch the backend engine asynchronously. When true, the Kyuubi server will set up the connection with the client without delay as the backend engine will be created asynchronously. | boolean | 1.4.0 | -| kyuubi.session.engine.log.timeout | PT24H | If we use Spark as the engine then the session submit log is the console output of spark-submit. We will retain the session submit log until over the config value. | duration | 1.1.0 | -| kyuubi.session.engine.login.timeout | PT15S | The timeout of creating the connection to remote sql query engine | duration | 1.0.0 | -| kyuubi.session.engine.open.max.attempts | 9 | The number of times an open engine will retry when encountering a special error. | int | 1.7.0 | -| kyuubi.session.engine.open.onFailure | RETRY | The behavior when opening engine failed:
  • RETRY: retry to open engine for kyuubi.session.engine.open.max.attempts times.
  • DEREGISTER_IMMEDIATELY: deregister the engine immediately.
  • DEREGISTER_AFTER_RETRY: deregister the engine after retry to open engine for kyuubi.session.engine.open.max.attempts times.
| string | 1.8.1 | -| kyuubi.session.engine.open.retry.wait | PT10S | How long to wait before retrying to open the engine after failure. | duration | 1.7.0 | -| kyuubi.session.engine.share.level | USER | (deprecated) - Using kyuubi.engine.share.level instead | string | 1.0.0 | -| kyuubi.session.engine.spark.initialize.sql || The initialize sql for Spark session. It fallback to `kyuubi.engine.session.initialize.sql` | seq | 1.8.1 | -| kyuubi.session.engine.spark.main.resource | <undefined> | The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default | string | 1.0.0 | -| kyuubi.session.engine.spark.max.initial.wait | PT1M | Max wait time for the initial connection to Spark engine. The engine will self-terminate no new incoming connection is established within this time. This setting only applies at the CONNECTION share level. 0 or negative means not to self-terminate. | duration | 1.8.0 | -| kyuubi.session.engine.spark.max.lifetime | PT0S | Max lifetime for Spark engine, the engine will self-terminate when it reaches the end of life. 0 or negative means not to self-terminate. | duration | 1.6.0 | -| kyuubi.session.engine.spark.progress.timeFormat | yyyy-MM-dd HH:mm:ss.SSS | The time format of the progress bar | string | 1.6.0 | -| kyuubi.session.engine.spark.progress.update.interval | PT1S | Update period of progress bar. | duration | 1.6.0 | -| kyuubi.session.engine.spark.showProgress | false | When true, show the progress bar in the Spark's engine log. | boolean | 1.6.0 | -| kyuubi.session.engine.startup.destroy.timeout | PT5S | Engine startup process destroy wait time, if the process does not stop after this time, force destroy instead. This configuration only takes effect when `kyuubi.session.engine.startup.waitCompletion=false`. | duration | 1.8.0 | -| kyuubi.session.engine.startup.error.max.size | 8192 | During engine bootstrapping, if anderror occurs, using this config to limit the length of error message(characters). | int | 1.1.0 | -| kyuubi.session.engine.startup.maxLogLines | 10 | The maximum number of engine log lines when errors occur during the engine startup phase. Note that this config effects on client-side to help track engine startup issues. | int | 1.4.0 | -| kyuubi.session.engine.startup.waitCompletion | true | Whether to wait for completion after the engine starts. If false, the startup process will be destroyed after the engine is started. Note that only use it when the driver is not running locally, such as in yarn-cluster mode; Otherwise, the engine will be killed. | boolean | 1.5.0 | -| kyuubi.session.engine.trino.connection.catalog | <undefined> | The default catalog that Trino engine will connect to | string | 1.5.0 | -| kyuubi.session.engine.trino.connection.url | <undefined> | The server url that Trino engine will connect to | string | 1.5.0 | -| kyuubi.session.engine.trino.main.resource | <undefined> | The package used to create Trino engine remote job. If it is undefined, Kyuubi will use the default | string | 1.5.0 | -| kyuubi.session.engine.trino.showProgress | true | When true, show the progress bar and final info in the Trino engine log. | boolean | 1.6.0 | -| kyuubi.session.engine.trino.showProgress.debug | false | When true, show the progress debug info in the Trino engine log. | boolean | 1.6.0 | -| kyuubi.session.group.provider | hadoop | A group provider plugin for Kyuubi Server. This plugin can provide primary group and groups information for different users or session configs. This config value should be a subclass of `org.apache.kyuubi.plugin.GroupProvider` which has a zero-arg constructor. Kyuubi provides the following built-in implementations:
  • hadoop: delegate the user group mapping to hadoop UserGroupInformation.
  • | string | 1.7.0 | -| kyuubi.session.idle.timeout | PT6H | session idle timeout, it will be closed when it's not accessed for this duration | duration | 1.2.0 | -| kyuubi.session.local.dir.allow.list || The local dir list that are allowed to access by the kyuubi session application. End-users might set some parameters such as `spark.files` and it will upload some local files when launching the kyuubi engine, if the local dir allow list is defined, kyuubi will check whether the path to upload is in the allow list. Note that, if it is empty, there is no limitation for that. And please use absolute paths. | set | 1.6.0 | -| kyuubi.session.name | <undefined> | A human readable name of the session and we use empty string by default. This name will be recorded in the event. Note that, we only apply this value from session conf. | string | 1.4.0 | -| kyuubi.session.timeout | PT6H | (deprecated)session timeout, it will be closed when it's not accessed for this duration | duration | 1.0.0 | -| kyuubi.session.user.sign.enabled | false | Whether to verify the integrity of session user name on the engine side, e.g. Authz plugin in Spark. | boolean | 1.7.0 | +| Key | Default | Meaning | Type | Since | +|---------------------------------------------------------|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| +| kyuubi.session.check.interval | PT5M | The check interval for session timeout. | duration | 1.0.0 | +| kyuubi.session.close.on.disconnect | true | Session will be closed when client disconnects from kyuubi gateway. Set this to false to have session outlive its parent connection. | boolean | 1.8.0 | +| kyuubi.session.conf.advisor | <undefined> | A config advisor plugin for Kyuubi Server. This plugin can provide some custom configs for different users or session configs and overwrite the session configs before opening a new session. This config value should be a subclass of `org.apache.kyuubi.plugin.SessionConfAdvisor` which has a zero-arg constructor. | string | 1.5.0 | +| kyuubi.session.conf.file.reload.interval | PT10M | When `FileSessionConfAdvisor` is used, this configuration defines the expired time of `$KYUUBI_CONF_DIR/kyuubi-session-.conf` in the cache. After exceeding this value, the file will be reloaded. | duration | 1.7.0 | +| kyuubi.session.conf.ignore.list || A comma-separated list of ignored keys. If the client connection contains any of them, the key and the corresponding value will be removed silently during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax. | set | 1.2.0 | +| kyuubi.session.conf.profile | <undefined> | Specify a profile to load session-level configurations from `$KYUUBI_CONF_DIR/kyuubi-session-.conf`. This configuration will be ignored if the file does not exist. This configuration only takes effect when `kyuubi.session.conf.advisor` is set as `org.apache.kyuubi.session.FileSessionConfAdvisor`. | string | 1.7.0 | +| kyuubi.session.conf.restrict.list || A comma-separated list of restricted keys. If the client connection contains any of them, the connection will be rejected explicitly during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax. | set | 1.2.0 | +| kyuubi.session.engine.alive.max.failures | 3 | The maximum number of failures allowed for the engine. | int | 1.8.1 | +| kyuubi.session.engine.alive.probe.enabled | false | Whether to enable the engine alive probe, it true, we will create a companion thrift client that keeps sending simple requests to check whether the engine is alive. | boolean | 1.6.0 | +| kyuubi.session.engine.alive.probe.interval | PT10S | The interval for engine alive probe. | duration | 1.6.0 | +| kyuubi.session.engine.alive.timeout | PT2M | The timeout for engine alive. If there is no alive probe success in the last timeout window, the engine will be marked as no-alive. | duration | 1.6.0 | +| kyuubi.session.engine.check.interval | PT1M | The check interval for engine timeout | duration | 1.0.0 | +| kyuubi.session.engine.flink.fetch.timeout | <undefined> | Result fetch timeout for Flink engine. If the timeout is reached, the result fetch would be stopped and the current fetched would be returned. If no data are fetched, a TimeoutException would be thrown. | duration | 1.8.0 | +| kyuubi.session.engine.flink.initialize.sql || The initialize sql for Flink session. It fallback to `kyuubi.engine.session.initialize.sql` | seq | 1.8.1 | +| kyuubi.session.engine.flink.main.resource | <undefined> | The package used to create Flink SQL engine remote job. If it is undefined, Kyuubi will use the default | string | 1.4.0 | +| kyuubi.session.engine.flink.max.rows | 1000000 | Max rows of Flink query results. For batch queries, rows exceeding the limit would be ignored. For streaming queries, the query would be canceled if the limit is reached. | int | 1.5.0 | +| kyuubi.session.engine.hive.main.resource | <undefined> | The package used to create Hive engine remote job. If it is undefined, Kyuubi will use the default | string | 1.6.0 | +| kyuubi.session.engine.idle.timeout | PT30M | engine timeout, the engine will self-terminate when it's not accessed for this duration. 0 or negative means not to self-terminate. | duration | 1.0.0 | +| kyuubi.session.engine.initialize.timeout | PT3M | Timeout for starting the background engine, e.g. SparkSQLEngine. | duration | 1.0.0 | +| kyuubi.session.engine.launch.async | true | When opening kyuubi session, whether to launch the backend engine asynchronously. When true, the Kyuubi server will set up the connection with the client without delay as the backend engine will be created asynchronously. | boolean | 1.4.0 | +| kyuubi.session.engine.log.timeout | PT24H | If we use Spark as the engine then the session submit log is the console output of spark-submit. We will retain the session submit log until over the config value. | duration | 1.1.0 | +| kyuubi.session.engine.login.timeout | PT15S | The timeout of creating the connection to remote sql query engine | duration | 1.0.0 | +| kyuubi.session.engine.open.max.attempts | 9 | The number of times an open engine will retry when encountering a special error. | int | 1.7.0 | +| kyuubi.session.engine.open.onFailure | RETRY | The behavior when opening engine failed:
    • RETRY: retry to open engine for kyuubi.session.engine.open.max.attempts times.
    • DEREGISTER_IMMEDIATELY: deregister the engine immediately.
    • DEREGISTER_AFTER_RETRY: deregister the engine after retry to open engine for kyuubi.session.engine.open.max.attempts times.
    | string | 1.8.1 | +| kyuubi.session.engine.open.retry.wait | PT10S | How long to wait before retrying to open the engine after failure. | duration | 1.7.0 | +| kyuubi.session.engine.share.level | USER | (deprecated) - Using kyuubi.engine.share.level instead | string | 1.0.0 | +| kyuubi.session.engine.spark.initialize.sql || The initialize sql for Spark session. It fallback to `kyuubi.engine.session.initialize.sql` | seq | 1.8.1 | +| kyuubi.session.engine.spark.main.resource | <undefined> | The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default | string | 1.0.0 | +| kyuubi.session.engine.spark.max.initial.wait | PT1M | Max wait time for the initial connection to Spark engine. The engine will self-terminate no new incoming connection is established within this time. This setting only applies at the CONNECTION share level. 0 or negative means not to self-terminate. | duration | 1.8.0 | +| kyuubi.session.engine.spark.max.lifetime | PT0S | Max lifetime for Spark engine, the engine will self-terminate when it reaches the end of life. 0 or negative means not to self-terminate. | duration | 1.6.0 | +| kyuubi.session.engine.spark.max.lifetime.gracefulPeriod | PT0S | Graceful period for Spark engine to wait the connections disconnected after reaching the end of life. After the graceful period, all the connections without running operations will be forcibly disconnected. 0 or negative means always waiting the connections disconnected. | duration | 1.8.1 | +| kyuubi.session.engine.spark.progress.timeFormat | yyyy-MM-dd HH:mm:ss.SSS | The time format of the progress bar | string | 1.6.0 | +| kyuubi.session.engine.spark.progress.update.interval | PT1S | Update period of progress bar. | duration | 1.6.0 | +| kyuubi.session.engine.spark.showProgress | false | When true, show the progress bar in the Spark's engine log. | boolean | 1.6.0 | +| kyuubi.session.engine.startup.destroy.timeout | PT5S | Engine startup process destroy wait time, if the process does not stop after this time, force destroy instead. This configuration only takes effect when `kyuubi.session.engine.startup.waitCompletion=false`. | duration | 1.8.0 | +| kyuubi.session.engine.startup.error.max.size | 8192 | During engine bootstrapping, if anderror occurs, using this config to limit the length of error message(characters). | int | 1.1.0 | +| kyuubi.session.engine.startup.maxLogLines | 10 | The maximum number of engine log lines when errors occur during the engine startup phase. Note that this config effects on client-side to help track engine startup issues. | int | 1.4.0 | +| kyuubi.session.engine.startup.waitCompletion | true | Whether to wait for completion after the engine starts. If false, the startup process will be destroyed after the engine is started. Note that only use it when the driver is not running locally, such as in yarn-cluster mode; Otherwise, the engine will be killed. | boolean | 1.5.0 | +| kyuubi.session.engine.trino.connection.catalog | <undefined> | The default catalog that Trino engine will connect to | string | 1.5.0 | +| kyuubi.session.engine.trino.connection.url | <undefined> | The server url that Trino engine will connect to | string | 1.5.0 | +| kyuubi.session.engine.trino.main.resource | <undefined> | The package used to create Trino engine remote job. If it is undefined, Kyuubi will use the default | string | 1.5.0 | +| kyuubi.session.engine.trino.showProgress | true | When true, show the progress bar and final info in the Trino engine log. | boolean | 1.6.0 | +| kyuubi.session.engine.trino.showProgress.debug | false | When true, show the progress debug info in the Trino engine log. | boolean | 1.6.0 | +| kyuubi.session.group.provider | hadoop | A group provider plugin for Kyuubi Server. This plugin can provide primary group and groups information for different users or session configs. This config value should be a subclass of `org.apache.kyuubi.plugin.GroupProvider` which has a zero-arg constructor. Kyuubi provides the following built-in implementations:
  • hadoop: delegate the user group mapping to hadoop UserGroupInformation.
  • | string | 1.7.0 | +| kyuubi.session.idle.timeout | PT6H | session idle timeout, it will be closed when it's not accessed for this duration | duration | 1.2.0 | +| kyuubi.session.local.dir.allow.list || The local dir list that are allowed to access by the kyuubi session application. End-users might set some parameters such as `spark.files` and it will upload some local files when launching the kyuubi engine, if the local dir allow list is defined, kyuubi will check whether the path to upload is in the allow list. Note that, if it is empty, there is no limitation for that. And please use absolute paths. | set | 1.6.0 | +| kyuubi.session.name | <undefined> | A human readable name of the session and we use empty string by default. This name will be recorded in the event. Note that, we only apply this value from session conf. | string | 1.4.0 | +| kyuubi.session.timeout | PT6H | (deprecated)session timeout, it will be closed when it's not accessed for this duration | duration | 1.0.0 | +| kyuubi.session.user.sign.enabled | false | Whether to verify the integrity of session user name on the engine side, e.g. Authz plugin in Spark. | boolean | 1.7.0 | ### Spnego diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index bf7be14b861..756c0496967 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -110,7 +110,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin info(s"Spark engine is de-registering from engine discovery space.") frontendServices.flatMap(_.discoveryService).foreach(_.stop()) while (backendService.sessionManager.getOpenSessionCount > 0) { - Thread.sleep(1000 * 60) + Thread.sleep(TimeUnit.SECONDS.toMillis(10)) } info(s"Spark engine has no open session now, terminating.") stop() @@ -151,8 +151,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin val maxLifetime = conf.get(ENGINE_SPARK_MAX_LIFETIME) val deregistered = new AtomicBoolean(false) if (maxLifetime > 0) { + val gracefulPeriod = conf.get(ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD) val checkTask: Runnable = () => { - if (!shutdown.get && System.currentTimeMillis() - getStartTime > maxLifetime) { + val elapsedTime = System.currentTimeMillis() - getStartTime + if (!shutdown.get && elapsedTime > maxLifetime) { if (deregistered.compareAndSet(false, true)) { info(s"Spark engine has been running for more than $maxLifetime ms," + s" deregistering from engine discovery space.") @@ -163,6 +165,24 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin info(s"Spark engine has been running for more than $maxLifetime ms" + s" and no open session now, terminating.") stop() + } else if (gracefulPeriod > 0 && elapsedTime > maxLifetime + gracefulPeriod) { + backendService.sessionManager.allSessions().foreach { session => + val operationCount = + backendService.sessionManager.operationManager.allOperations() + .filter(_.getSession == session) + .size + if (operationCount == 0) { + warn(s"Closing session ${session.handle.identifier} forcibly that has no" + + s" operation and has been running for more than $gracefulPeriod ms after engine" + + s" max lifetime.") + try { + backendService.sessionManager.closeSession(session.handle) + } catch { + case e: Throwable => + error(s"Error closing session ${session.handle.identifier}", e) + } + } + } } } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala index 727b232e3f8..9b08c5b0c91 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala @@ -17,19 +17,13 @@ package org.apache.kyuubi.engine.spark -import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME} import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.ShareLevel.ShareLevel trait EtcdShareLevelSparkEngineSuite extends ShareLevelSparkEngineTests with WithEtcdCluster { override def withKyuubiConf: Map[String, String] = { - super.withKyuubiConf ++ - etcdConf ++ Map( - ENGINE_SHARE_LEVEL.key -> shareLevel.toString, - ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s", - ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0", - ENGINE_CHECK_INTERVAL.key -> "PT5s") + super.withKyuubiConf ++ etcdConf } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala index c83139592f7..f2b1544b138 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala @@ -22,6 +22,7 @@ import java.util.UUID import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar.convertIntToGrainOfTime +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME, ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD} import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.ShareLevel.ShareLevel import org.apache.kyuubi.operation.HiveJDBCTestHelper @@ -35,6 +36,13 @@ trait ShareLevelSparkEngineTests extends WithDiscoverySparkSQLEngine with HiveJDBCTestHelper { def shareLevel: ShareLevel + override def withKyuubiConf: Map[String, String] = super.withKyuubiConf ++ Map( + ENGINE_SHARE_LEVEL.key -> shareLevel.toString, + ENGINE_SPARK_MAX_LIFETIME.key -> "PT5s", + ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0", + ENGINE_CHECK_INTERVAL.key -> "PT2s", + ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD.key -> "100") + override protected def jdbcUrl: String = getJdbcUrl override val namespace: String = { // for test, we always use uuid as namespace @@ -76,4 +84,23 @@ trait ShareLevelSparkEngineTests } } } + + test("test spark engine max life-time with graceful period") { + withDiscoveryClient { discoveryClient => + assert(engine.getServiceState == ServiceState.STARTED) + assert(discoveryClient.pathExists(namespace)) + withJdbcStatement() { _ => + eventually(Timeout(30.seconds)) { + shareLevel match { + case ShareLevel.CONNECTION => + assert(engine.getServiceState == ServiceState.STOPPED) + assert(discoveryClient.pathNonExists(namespace)) + case _ => + assert(engine.getServiceState == ServiceState.STOPPED) + assert(discoveryClient.pathExists(namespace)) + } + } + } + } + } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala index f24abb36c0e..15f3e26860d 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala @@ -17,22 +17,13 @@ package org.apache.kyuubi.engine.spark -import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL -import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL -import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT -import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.ShareLevel.ShareLevel trait ZookeeperShareLevelSparkEngineSuite extends ShareLevelSparkEngineTests with WithEmbeddedZookeeper { override def withKyuubiConf: Map[String, String] = { - super.withKyuubiConf ++ - zookeeperConf ++ Map( - ENGINE_SHARE_LEVEL.key -> shareLevel.toString, - ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s", - ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0", - ENGINE_CHECK_INTERVAL.key -> "PT5s") + super.withKyuubiConf ++ zookeeperConf } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index b22a5131fc5..46e6e999869 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1336,6 +1336,16 @@ object KyuubiConf { .timeConf .createWithDefault(0) + val ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD: ConfigEntry[Long] = + buildConf("kyuubi.session.engine.spark.max.lifetime.gracefulPeriod") + .doc("Graceful period for Spark engine to wait the connections disconnected after reaching" + + " the end of life. After the graceful period, all the connections without running" + + " operations will be forcibly disconnected. 0 or negative means always waiting the" + + " connections disconnected.") + .version("1.8.1") + .timeConf + .createWithDefault(0) + val ENGINE_SPARK_MAX_INITIAL_WAIT: ConfigEntry[Long] = buildConf("kyuubi.session.engine.spark.max.initial.wait") .doc("Max wait time for the initial connection to Spark engine. The engine will" + diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala index a1b1466d122..c7eee15030f 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.ha.client -import java.util.concurrent.CountDownLatch +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import org.apache.kyuubi.Logging @@ -68,7 +68,7 @@ abstract class ServiceDiscovery( def stopGracefully(isLost: Boolean = false): Unit = { while (fe.be.sessionManager.getOpenSessionCount > 0) { info(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are active, delay shutdown") - Thread.sleep(1000 * 60) + Thread.sleep(TimeUnit.SECONDS.toMillis(10)) } isServerLost.set(isLost) gracefulShutdownLatch.countDown() diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala index 9ab4d9da7e4..a322ca8d88c 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala @@ -53,6 +53,7 @@ import org.apache.kyuubi.ha.client.DiscoveryPaths import org.apache.kyuubi.ha.client.ServiceDiscovery import org.apache.kyuubi.ha.client.ServiceNodeInfo import org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient._ +import org.apache.kyuubi.util.ThreadUtils class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient { @@ -381,7 +382,12 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient { .filter(_.getEventType == WatchEvent.EventType.DELETE).foreach(_ => { warn(s"This Kyuubi instance ${instance} is now de-registered from" + s" ETCD. The server will be shut down after the last client session completes.") - serviceDiscovery.stopGracefully() + // for jetcd, the watcher event process might block the main thread, + // so start a new thread to do the de-register work as a workaround, + // see details in https://github.com/etcd-io/jetcd/issues/1089 + ThreadUtils.runInNewThread("deregister-watcher-thread", isDaemon = false) { + serviceDiscovery.stopGracefully() + } }) }