Skip to content

Commit

Permalink
Exited spark-submit process should not block batch submit queue
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed Jan 30, 2024
1 parent 3f993f4 commit 05fcc75
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
Seq("_123", "spark_exec", "spark@", "a" * 238).foreach { invalid =>
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalid)
val builder = new SparkProcessBuilder("test", true, conf)
val e = intercept[KyuubiException](builder.validateConf)
val e = intercept[KyuubiException](builder.validateConf())
assert(e.getMessage === s"'$invalid' in spark.kubernetes.executor.podNamePrefix is" +
s" invalid. must conform https://kubernetes.io/docs/concepts/overview/" +
"working-with-objects/names/#dns-subdomain-names and the value length <= 237")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private[kyuubi] class EngineRef(
acquiredPermit = true
val redactedCmd = builder.toString
info(s"Launching engine:\n$redactedCmd")
builder.validateConf
builder.validateConf()
val process = builder.start
var exitValue: Option[Int] = None
var lastApplicationInfo: Option[ApplicationInfo] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.nio.file.{Files, Path, Paths}

import scala.collection.JavaConverters._

import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.EvictingQueue
import org.apache.commons.lang3.StringUtils.containsIgnoreCase

Expand Down Expand Up @@ -166,9 +165,8 @@ trait ProcBuilder {
// Visible for test
@volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
private var logCaptureThread: Thread = _
private var process: Process = _
@VisibleForTesting
@volatile private[kyuubi] var processLaunched: Boolean = _
@volatile private[kyuubi] var process: Process = _
@volatile private[kyuubi] var processLaunched: Boolean = false

private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
Expand Down Expand Up @@ -206,7 +204,7 @@ trait ProcBuilder {
file
}

def validateConf: Unit = {}
def validateConf(): Unit = {}

final def start: Process = synchronized {
process = processBuilder.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class SparkProcessBuilder(
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
}

override def validateConf: Unit = Validator.validateConf(conf)
override def validateConf(): Unit = Validator.validateConf(conf)

// For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user
def setSparkUserName(userName: String, buffer: mutable.Buffer[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class BatchJobSubmission(
getOperationLog)
}

def startupProcessAlive: Boolean =
builder.processLaunched && Option(builder.process).exists(_.isAlive)

override def currentApplicationInfo(): Option[ApplicationInfo] = {
if (isTerminal(state) && _applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) {
return _applicationInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,13 @@ class KyuubiBatchService(
metadata.appState match {
// app that is not submitted to resource manager
case None | Some(ApplicationState.NOT_FOUND) => false
// app that is pending in resource manager
case Some(ApplicationState.PENDING) => false
// app that is pending in resource manager while the local startup
// process is alive. For example, in Spark YARN cluster mode, if set
// spark.yarn.submit.waitAppCompletion=false, the local spark-submit
// process exits immediately once Application goes ACCEPTED status,
// even no resource could be allocated for the AM container.
case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive =>
false
// not sure, added for safe
case Some(ApplicationState.UNKNOWN) => false
case _ => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class KyuubiBatchSession(
batchArgs,
metadata)

def startupProcessAlive: Boolean = batchJobSubmissionOp.startupProcessAlive

private def waitMetadataRequestsRetryCompletion(): Unit = {
val batchId = batchJobSubmissionOp.batchId
sessionManager.getMetadataRequestsRetryRef(batchId).foreach {
Expand Down

0 comments on commit 05fcc75

Please sign in to comment.