Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5952] Disconnect connections without running operations after engine maxlife time graceful period #6040

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 52 additions & 51 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
info(s"Spark engine is de-registering from engine discovery space.")
frontendServices.flatMap(_.discoveryService).foreach(_.stop())
while (backendService.sessionManager.getOpenSessionCount > 0) {
Thread.sleep(1000 * 60)
Thread.sleep(TimeUnit.SECONDS.toMillis(5))
turboFei marked this conversation as resolved.
Show resolved Hide resolved
}
info(s"Spark engine has no open session now, terminating.")
stop()
Expand Down Expand Up @@ -170,18 +170,41 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
val maxLifetime = conf.get(ENGINE_SPARK_MAX_LIFETIME)
val deregistered = new AtomicBoolean(false)
if (maxLifetime > 0) {
val gracefulPeriod = conf.get(ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD)
val checkTask: Runnable = () => {
if (!shutdown.get && System.currentTimeMillis() - getStartTime > maxLifetime) {
val elapsedTime = System.currentTimeMillis() - getStartTime
if (!shutdown.get && elapsedTime > maxLifetime) {
if (deregistered.compareAndSet(false, true)) {
info(s"Spark engine has been running for more than $maxLifetime ms," +
s" deregistering from engine discovery space.")
frontendServices.flatMap(_.discoveryService).foreach(_.stop())
ThreadUtils.runInNewThread("engine-de-register", isDaemon = false) {
// for ETCD, the de-registering process might be blocked, so deregister it async
info(s"Spark engine has been running for more than $maxLifetime ms," +
s" deregistering from engine discovery space.")
frontendServices.flatMap(_.discoveryService).foreach(_.stop())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

09:46:29.180 vertx-blocked-thread-checker WARN BlockedThreadChecker: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 19018 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked
        at java.lang.Thread.sleep(Native Method)
        at org.apache.kyuubi.ha.client.ServiceDiscovery.stopGracefully(ServiceDiscovery.scala:71)
        at org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient$DeRegisterWatcher.$anonfun$onNext$2(EtcdDiscoveryClient.scala:384)
        at org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient$DeRegisterWatcher.$anonfun$onNext$2$adapted(EtcdDiscoveryClient.scala:381)
        at org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient$DeRegisterWatcher$$Lambda$2964/1803489267.apply(Unknown Source)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient$DeRegisterWatcher.onNext(EtcdDiscoveryClient.scala:381)
        at io.etcd.jetcd.impl.WatchImpl$WatcherImpl.onNext(WatchImpl.java:307)
        at io.etcd.jetcd.impl.WatchImpl$WatcherImpl$$Lambda$2892/1233911532.handle(Unknown Source)
        at io.vertx.grpc.stub.StreamObserverReadStream.onNext(StreamObserverReadStream.java:37)
        at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:468)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:667)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:654)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at io.vertx.grpc.VertxChannelBuilder.lambda$null$0(VertxChannelBuilder.java:305)
        at io.vertx.grpc.VertxChannelBuilder$$Lambda$2847/1899882869.handle(Unknown Source)
        at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:276)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to deregister it async? The blocked vertx-blocked-thread-checker thread does not seem to be the checker thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, discoveryService.stop() blocks the subsequent close session operation, causing the EngineServiceDiscovery.stopGracefully method to continue waiting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. It doesn't happen every time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, the current solution looks good to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the async deregister may introduce issues, say for some reason the deregistering takes 3s to complete and we stop the frontend/backend services, new incoming sessions will get errors

Copy link
Member

@pan3793 pan3793 Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if only etcd has such an issue, let's add an internal configuration to switch the sync/async behavior. the Guava's SameThreadExecutorService may help

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we execute stopGracefully asynchronously in DeRegisterWatcher#onNext?

}
}

if (backendService.sessionManager.getOpenSessionCount <= 0) {
info(s"Spark engine has been running for more than $maxLifetime ms" +
s" and no open session now, terminating.")
stop()
} else if (gracefulPeriod > 0 && elapsedTime > maxLifetime + gracefulPeriod) {
backendService.sessionManager.allSessions().foreach { session =>
val operationCount =
backendService.sessionManager.operationManager.allOperations()
.filter(_.getSession == session)
.size
if (operationCount == 0) {
warn(s"Closing session ${session.handle.identifier} forcibly that has no" +
s" operation and has been running for more than $gracefulPeriod ms after engine" +
s" max lifetime.")
try {
backendService.sessionManager.closeSession(session.handle)
} catch {
case e: Throwable =>
error(s"Error closing session ${session.handle.identifier}", e)
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,13 @@

package org.apache.kyuubi.engine.spark

import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME}
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel

trait EtcdShareLevelSparkEngineSuite
extends ShareLevelSparkEngineTests with WithEtcdCluster {
override def withKyuubiConf: Map[String, String] = {
super.withKyuubiConf ++
etcdConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT5s")
super.withKyuubiConf ++ etcdConf
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.UUID
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime

import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME, ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD}
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel
import org.apache.kyuubi.operation.HiveJDBCTestHelper
Expand All @@ -35,6 +36,13 @@ trait ShareLevelSparkEngineTests
extends WithDiscoverySparkSQLEngine with HiveJDBCTestHelper {
def shareLevel: ShareLevel

override def withKyuubiConf: Map[String, String] = super.withKyuubiConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT5s",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT2s",
ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD.key -> "100")

override protected def jdbcUrl: String = getJdbcUrl
override val namespace: String = {
// for test, we always use uuid as namespace
Expand Down Expand Up @@ -76,4 +84,23 @@ trait ShareLevelSparkEngineTests
}
}
}

test("test spark engine max life-time with graceful period") {
withDiscoveryClient { discoveryClient =>
assert(engine.getServiceState == ServiceState.STARTED)
assert(discoveryClient.pathExists(namespace))
withJdbcStatement() { _ =>
eventually(Timeout(30.seconds)) {
shareLevel match {
case ShareLevel.CONNECTION =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(discoveryClient.pathNonExists(namespace))
case _ =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(discoveryClient.pathExists(namespace))
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,13 @@

package org.apache.kyuubi.engine.spark

import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel

trait ZookeeperShareLevelSparkEngineSuite
extends ShareLevelSparkEngineTests with WithEmbeddedZookeeper {
override def withKyuubiConf: Map[String, String] = {
super.withKyuubiConf ++
zookeeperConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT5s")
super.withKyuubiConf ++ zookeeperConf
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,16 @@ object KyuubiConf {
.timeConf
.createWithDefault(0)

val ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD: ConfigEntry[Long] =
buildConf("kyuubi.session.engine.spark.max.lifetime.gracefulPeriod")
.doc("Graceful period for Spark engine to wait the connections disconnected after reaching" +
" the end of life. After the graceful period, all the connections without running" +
" operations will be forcibly disconnected. 0 or negative means always waiting the" +
" connections disconnected.")
.version("1.8.1")
.timeConf
.createWithDefault(0)

val ENGINE_SPARK_MAX_INITIAL_WAIT: ConfigEntry[Long] =
buildConf("kyuubi.session.engine.spark.max.initial.wait")
.doc("Max wait time for the initial connection to Spark engine. The engine will" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.ha.client

import java.util.concurrent.CountDownLatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.kyuubi.Logging
Expand Down Expand Up @@ -68,7 +68,7 @@ abstract class ServiceDiscovery(
def stopGracefully(isLost: Boolean = false): Unit = {
while (fe.be.sessionManager.getOpenSessionCount > 0) {
info(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are active, delay shutdown")
Thread.sleep(1000 * 60)
Thread.sleep(TimeUnit.SECONDS.toMillis(5))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

short the sleep interval to pass UT

}
isServerLost.set(isLost)
gracefulShutdownLatch.countDown()
Expand Down
Loading