diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala index 8a8f59ffe99..263f1177263 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala @@ -24,7 +24,6 @@ import java.nio.file.{Files, Path, Paths} import scala.collection.JavaConverters._ -import com.google.common.annotations.VisibleForTesting import com.google.common.collect.EvictingQueue import org.apache.commons.lang3.StringUtils.containsIgnoreCase @@ -166,8 +165,7 @@ trait ProcBuilder { // Visible for test @volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true private var logCaptureThread: Thread = _ - private var process: Process = _ - @VisibleForTesting + @volatile private[kyuubi] var process: Process = _ @volatile private[kyuubi] var processLaunched: Boolean = _ private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index 276fe344600..c29065f192d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -100,6 +100,9 @@ class BatchJobSubmission( getOperationLog) } + def startupProcessAlive: Boolean = + builder.processLaunched && Option(builder.process).exists(_.isAlive) + override def currentApplicationInfo(): Option[ApplicationInfo] = { if (isTerminal(state) && _applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) { return _applicationInfo diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index 2bfbbce2ab7..e2736267f7e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -94,8 +94,13 @@ class KyuubiBatchService( metadata.appState match { // app that is not submitted to resource manager case None | Some(ApplicationState.NOT_FOUND) => false - // app that is pending in resource manager - case Some(ApplicationState.PENDING) => false + // app that is pending in resource manager while the local startup + // process is alive. For example, in Spark YARN cluster mode, if set + // spark.yarn.submit.waitAppCompletion=false, the local spark-submit + // process exits immediately once Application goes ACCEPTED status, + // even no resource could be allocated for the AM container. + case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive => + false // not sure, added for safe case Some(ApplicationState.UNKNOWN) => false case _ => true diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala index 531bbc3af87..4ac84c1d0dc 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala @@ -111,6 +111,8 @@ class KyuubiBatchSession( batchArgs, metadata) + def startupProcessAlive: Boolean = batchJobSubmissionOp.startupProcessAlive + private def waitMetadataRequestsRetryCompletion(): Unit = { val batchId = batchJobSubmissionOp.batchId sessionManager.getMetadataRequestsRetryRef(batchId).foreach {