From 070428cffc4e85846770529fc8fe4314f6118e5f Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Sat, 10 Feb 2024 18:40:33 +0800 Subject: [PATCH] [KYUUBI #5952][1.8] Disconnect connections without running operations after engine maxlife time graceful period MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— We found that, some kyuubi connections(maybe managed by jdbc connection pool likes hikari) always keep alive, and the engine can not be terminated after exceeds the max life time. So, In this pr, I introduce a graceful period after spark engine max life time, after the graceful period, the connections without running operations will be disconnected forcibly. Close #5952 ## Describe Your Solution ๐Ÿ”ง Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐Ÿ“ - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6040 from turboFei/close_on_nooperation_rebase. Closes #5952 0b05ddc0c [Fei Wang] comments 4329a85cf [Fei Wang] async stop b39c9b3a0 [Fei Wang] use short sleep 7c123f612 [Fei Wang] save 45ad3489d [Fei Wang] check no running operation Authored-by: Fei Wang Signed-off-by: Fei Wang (cherry picked from commit 8c3f471ae370d55c54b62ffb7d74096839c77c21) # :mag: Description ## Issue References ๐Ÿ”— This pull request fixes # ## Describe Your Solution ๐Ÿ”ง Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐Ÿ“ - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6054 from wForget/cherry_pick_6040_to_1_8. Closes #5952 0749d75eb [Fei Wang] [KYUUBI #5952] Disconnect connections without running operations after engine maxlife time graceful period Authored-by: Fei Wang Signed-off-by: Cheng Pan --- .../kyuubi/engine/spark/SparkSQLEngine.scala | 24 +++++++++++++++-- .../EtcdShareLevelSparkEngineSuite.scala | 8 +----- .../spark/ShareLevelSparkEngineTests.scala | 27 +++++++++++++++++++ .../ZookeeperShareLevelSparkEngineSuite.scala | 11 +------- .../org/apache/kyuubi/config/KyuubiConf.scala | 10 +++++++ .../kyuubi/ha/client/ServiceDiscovery.scala | 4 +-- .../ha/client/etcd/EtcdDiscoveryClient.scala | 8 +++++- 7 files changed, 70 insertions(+), 22 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 6e323cfe784..bb996d37050 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -109,7 +109,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin info(s"Spark engine is de-registering from engine discovery space.") frontendServices.flatMap(_.discoveryService).foreach(_.stop()) while (backendService.sessionManager.getOpenSessionCount > 0) { - Thread.sleep(1000 * 60) + Thread.sleep(TimeUnit.SECONDS.toMillis(10)) } info(s"Spark engine has no open session now, terminating.") stop() @@ -150,8 +150,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin val maxLifetime = conf.get(ENGINE_SPARK_MAX_LIFETIME) val deregistered = new AtomicBoolean(false) if (maxLifetime > 0) { + val gracefulPeriod = conf.get(ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD) val checkTask: Runnable = () => { - if (!shutdown.get && System.currentTimeMillis() - getStartTime > maxLifetime) { + val elapsedTime = System.currentTimeMillis() - getStartTime + if (!shutdown.get && elapsedTime > maxLifetime) { if (deregistered.compareAndSet(false, true)) { info(s"Spark engine has been running for more than $maxLifetime ms," + s" deregistering from engine discovery space.") @@ -162,6 +164,24 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin info(s"Spark engine has been running for more than $maxLifetime ms" + s" and no open session now, terminating.") stop() + } else if (gracefulPeriod > 0 && elapsedTime > maxLifetime + gracefulPeriod) { + backendService.sessionManager.allSessions().foreach { session => + val operationCount = + backendService.sessionManager.operationManager.allOperations() + .filter(_.getSession == session) + .size + if (operationCount == 0) { + warn(s"Closing session ${session.handle.identifier} forcibly that has no" + + s" operation and has been running for more than $gracefulPeriod ms after engine" + + s" max lifetime.") + try { + backendService.sessionManager.closeSession(session.handle) + } catch { + case e: Throwable => + error(s"Error closing session ${session.handle.identifier}", e) + } + } + } } } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala index 727b232e3f8..9b08c5b0c91 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/EtcdShareLevelSparkEngineSuite.scala @@ -17,19 +17,13 @@ package org.apache.kyuubi.engine.spark -import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME} import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.ShareLevel.ShareLevel trait EtcdShareLevelSparkEngineSuite extends ShareLevelSparkEngineTests with WithEtcdCluster { override def withKyuubiConf: Map[String, String] = { - super.withKyuubiConf ++ - etcdConf ++ Map( - ENGINE_SHARE_LEVEL.key -> shareLevel.toString, - ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s", - ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0", - ENGINE_CHECK_INTERVAL.key -> "PT5s") + super.withKyuubiConf ++ etcdConf } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala index c83139592f7..f2b1544b138 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineTests.scala @@ -22,6 +22,7 @@ import java.util.UUID import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar.convertIntToGrainOfTime +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME, ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD} import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.ShareLevel.ShareLevel import org.apache.kyuubi.operation.HiveJDBCTestHelper @@ -35,6 +36,13 @@ trait ShareLevelSparkEngineTests extends WithDiscoverySparkSQLEngine with HiveJDBCTestHelper { def shareLevel: ShareLevel + override def withKyuubiConf: Map[String, String] = super.withKyuubiConf ++ Map( + ENGINE_SHARE_LEVEL.key -> shareLevel.toString, + ENGINE_SPARK_MAX_LIFETIME.key -> "PT5s", + ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0", + ENGINE_CHECK_INTERVAL.key -> "PT2s", + ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD.key -> "100") + override protected def jdbcUrl: String = getJdbcUrl override val namespace: String = { // for test, we always use uuid as namespace @@ -76,4 +84,23 @@ trait ShareLevelSparkEngineTests } } } + + test("test spark engine max life-time with graceful period") { + withDiscoveryClient { discoveryClient => + assert(engine.getServiceState == ServiceState.STARTED) + assert(discoveryClient.pathExists(namespace)) + withJdbcStatement() { _ => + eventually(Timeout(30.seconds)) { + shareLevel match { + case ShareLevel.CONNECTION => + assert(engine.getServiceState == ServiceState.STOPPED) + assert(discoveryClient.pathNonExists(namespace)) + case _ => + assert(engine.getServiceState == ServiceState.STOPPED) + assert(discoveryClient.pathExists(namespace)) + } + } + } + } + } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala index f24abb36c0e..15f3e26860d 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ZookeeperShareLevelSparkEngineSuite.scala @@ -17,22 +17,13 @@ package org.apache.kyuubi.engine.spark -import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL -import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL -import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT -import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.ShareLevel.ShareLevel trait ZookeeperShareLevelSparkEngineSuite extends ShareLevelSparkEngineTests with WithEmbeddedZookeeper { override def withKyuubiConf: Map[String, String] = { - super.withKyuubiConf ++ - zookeeperConf ++ Map( - ENGINE_SHARE_LEVEL.key -> shareLevel.toString, - ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s", - ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0", - ENGINE_CHECK_INTERVAL.key -> "PT5s") + super.withKyuubiConf ++ zookeeperConf } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index bd041425b74..8058c5a1cc5 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1288,6 +1288,16 @@ object KyuubiConf { .timeConf .createWithDefault(0) + val ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD: ConfigEntry[Long] = + buildConf("kyuubi.session.engine.spark.max.lifetime.gracefulPeriod") + .doc("Graceful period for Spark engine to wait the connections disconnected after reaching" + + " the end of life. After the graceful period, all the connections without running" + + " operations will be forcibly disconnected. 0 or negative means always waiting the" + + " connections disconnected.") + .version("1.8.1") + .timeConf + .createWithDefault(0) + val ENGINE_SPARK_MAX_INITIAL_WAIT: ConfigEntry[Long] = buildConf("kyuubi.session.engine.spark.max.initial.wait") .doc("Max wait time for the initial connection to Spark engine. The engine will" + diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala index d532b3710ab..ff1033cd575 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.ha.client -import java.util.concurrent.CountDownLatch +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import org.apache.kyuubi.Logging @@ -69,7 +69,7 @@ abstract class ServiceDiscovery( fe.serverable.stopGracefully() while (fe.be.sessionManager.getOpenSessionCount > 0) { info(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are active, delay shutdown") - Thread.sleep(1000 * 60) + Thread.sleep(TimeUnit.SECONDS.toMillis(10)) } isServerLost.set(isLost) gracefulShutdownLatch.countDown() diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala index d979804f417..cead1a260e3 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/etcd/EtcdDiscoveryClient.scala @@ -53,6 +53,7 @@ import org.apache.kyuubi.ha.client.DiscoveryPaths import org.apache.kyuubi.ha.client.ServiceDiscovery import org.apache.kyuubi.ha.client.ServiceNodeInfo import org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient._ +import org.apache.kyuubi.util.ThreadUtils class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient { @@ -381,7 +382,12 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient { .filter(_.getEventType == WatchEvent.EventType.DELETE).foreach(_ => { warn(s"This Kyuubi instance ${instance} is now de-registered from" + s" ETCD. The server will be shut down after the last client session completes.") - serviceDiscovery.stopGracefully() + // for jetcd, the watcher event process might block the main thread, + // so start a new thread to do the de-register work as a workaround, + // see details in https://github.com/etcd-io/jetcd/issues/1089 + ThreadUtils.runInNewThread("deregister-watcher-thread", isDaemon = false) { + serviceDiscovery.stopGracefully() + } }) }