Skip to content

Commit

Permalink
[KYUUBI #5952] Disconnect connections without running operations afte…
Browse files Browse the repository at this point in the history
…r engine maxlife time graceful period

# 🔍 Description
## Issue References 🔗

We found that, some kyuubi connections(maybe managed by jdbc connection pool likes hikari) always keep alive, and the engine can not be terminated after exceeds the max life time.

So, In this pr, I introduce a graceful period after spark engine max life time, after the graceful period, the connections without running operations will be disconnected forcibly.

Close #5952

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6040 from turboFei/close_on_nooperation_rebase.

Closes #5952

0b05ddc [Fei Wang] comments
4329a85 [Fei Wang] async stop
b39c9b3 [Fei Wang] use short sleep
7c123f6 [Fei Wang] save
45ad348 [Fei Wang]  check no running operation

Authored-by: Fei Wang <[email protected]>
Signed-off-by: Fei Wang <[email protected]>

(cherry picked from commit 8c3f471)
  • Loading branch information
turboFei authored and wForget committed Feb 8, 2024
1 parent 5dca0e0 commit 0749d75
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 72 deletions.
101 changes: 51 additions & 50 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
info(s"Spark engine is de-registering from engine discovery space.")
frontendServices.flatMap(_.discoveryService).foreach(_.stop())
while (backendService.sessionManager.getOpenSessionCount > 0) {
Thread.sleep(1000 * 60)
Thread.sleep(TimeUnit.SECONDS.toMillis(10))
}
info(s"Spark engine has no open session now, terminating.")
stop()
Expand Down Expand Up @@ -151,8 +151,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
val maxLifetime = conf.get(ENGINE_SPARK_MAX_LIFETIME)
val deregistered = new AtomicBoolean(false)
if (maxLifetime > 0) {
val gracefulPeriod = conf.get(ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD)
val checkTask: Runnable = () => {
if (!shutdown.get && System.currentTimeMillis() - getStartTime > maxLifetime) {
val elapsedTime = System.currentTimeMillis() - getStartTime
if (!shutdown.get && elapsedTime > maxLifetime) {
if (deregistered.compareAndSet(false, true)) {
info(s"Spark engine has been running for more than $maxLifetime ms," +
s" deregistering from engine discovery space.")
Expand All @@ -163,6 +165,24 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
info(s"Spark engine has been running for more than $maxLifetime ms" +
s" and no open session now, terminating.")
stop()
} else if (gracefulPeriod > 0 && elapsedTime > maxLifetime + gracefulPeriod) {
backendService.sessionManager.allSessions().foreach { session =>
val operationCount =
backendService.sessionManager.operationManager.allOperations()
.filter(_.getSession == session)
.size
if (operationCount == 0) {
warn(s"Closing session ${session.handle.identifier} forcibly that has no" +
s" operation and has been running for more than $gracefulPeriod ms after engine" +
s" max lifetime.")
try {
backendService.sessionManager.closeSession(session.handle)
} catch {
case e: Throwable =>
error(s"Error closing session ${session.handle.identifier}", e)
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,13 @@

package org.apache.kyuubi.engine.spark

import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME}
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel

trait EtcdShareLevelSparkEngineSuite
extends ShareLevelSparkEngineTests with WithEtcdCluster {
override def withKyuubiConf: Map[String, String] = {
super.withKyuubiConf ++
etcdConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT5s")
super.withKyuubiConf ++ etcdConf
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.UUID
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime

import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME, ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD}
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel
import org.apache.kyuubi.operation.HiveJDBCTestHelper
Expand All @@ -35,6 +36,13 @@ trait ShareLevelSparkEngineTests
extends WithDiscoverySparkSQLEngine with HiveJDBCTestHelper {
def shareLevel: ShareLevel

override def withKyuubiConf: Map[String, String] = super.withKyuubiConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT5s",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT2s",
ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD.key -> "100")

override protected def jdbcUrl: String = getJdbcUrl
override val namespace: String = {
// for test, we always use uuid as namespace
Expand Down Expand Up @@ -76,4 +84,23 @@ trait ShareLevelSparkEngineTests
}
}
}

test("test spark engine max life-time with graceful period") {
withDiscoveryClient { discoveryClient =>
assert(engine.getServiceState == ServiceState.STARTED)
assert(discoveryClient.pathExists(namespace))
withJdbcStatement() { _ =>
eventually(Timeout(30.seconds)) {
shareLevel match {
case ShareLevel.CONNECTION =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(discoveryClient.pathNonExists(namespace))
case _ =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(discoveryClient.pathExists(namespace))
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,13 @@

package org.apache.kyuubi.engine.spark

import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel

trait ZookeeperShareLevelSparkEngineSuite
extends ShareLevelSparkEngineTests with WithEmbeddedZookeeper {
override def withKyuubiConf: Map[String, String] = {
super.withKyuubiConf ++
zookeeperConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT5s")
super.withKyuubiConf ++ zookeeperConf
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,16 @@ object KyuubiConf {
.timeConf
.createWithDefault(0)

val ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD: ConfigEntry[Long] =
buildConf("kyuubi.session.engine.spark.max.lifetime.gracefulPeriod")
.doc("Graceful period for Spark engine to wait the connections disconnected after reaching" +
" the end of life. After the graceful period, all the connections without running" +
" operations will be forcibly disconnected. 0 or negative means always waiting the" +
" connections disconnected.")
.version("1.8.1")
.timeConf
.createWithDefault(0)

val ENGINE_SPARK_MAX_INITIAL_WAIT: ConfigEntry[Long] =
buildConf("kyuubi.session.engine.spark.max.initial.wait")
.doc("Max wait time for the initial connection to Spark engine. The engine will" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.ha.client

import java.util.concurrent.CountDownLatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.kyuubi.Logging
Expand Down Expand Up @@ -68,7 +68,7 @@ abstract class ServiceDiscovery(
def stopGracefully(isLost: Boolean = false): Unit = {
while (fe.be.sessionManager.getOpenSessionCount > 0) {
info(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are active, delay shutdown")
Thread.sleep(1000 * 60)
Thread.sleep(TimeUnit.SECONDS.toMillis(10))
}
isServerLost.set(isLost)
gracefulShutdownLatch.countDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import org.apache.kyuubi.ha.client.DiscoveryPaths
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.ha.client.ServiceNodeInfo
import org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient._
import org.apache.kyuubi.util.ThreadUtils

class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {

Expand Down Expand Up @@ -381,7 +382,12 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
.filter(_.getEventType == WatchEvent.EventType.DELETE).foreach(_ => {
warn(s"This Kyuubi instance ${instance} is now de-registered from" +
s" ETCD. The server will be shut down after the last client session completes.")
serviceDiscovery.stopGracefully()
// for jetcd, the watcher event process might block the main thread,
// so start a new thread to do the de-register work as a workaround,
// see details in https://github.com/etcd-io/jetcd/issues/1089
ThreadUtils.runInNewThread("deregister-watcher-thread", isDaemon = false) {
serviceDiscovery.stopGracefully()
}
})
}

Expand Down

0 comments on commit 0749d75

Please sign in to comment.