Skip to content

Commit

Permalink
app error
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Nov 16, 2023
1 parent 732f478 commit 0faa2af
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.kyuubi.engine
import io.fabric8.kubernetes.api.model.Pod

import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}
import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationStateAndError, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}

object KubernetesApplicationAuditLogger extends Logging {
final private val AUDIT_BUFFER = new ThreadLocal[StringBuilder]() {
Expand All @@ -39,7 +39,10 @@ object KubernetesApplicationAuditLogger extends Logging {
sb.append(s"namespace=${kubernetesInfo.namespace.orNull}").append("\t")
sb.append(s"pod=${pod.getMetadata.getName}").append("\t")
sb.append(s"appId=${pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL)}").append("\t")
sb.append(s"appState=${toApplicationState(pod, appStateFromContainer, appStateContainer)}")
val (appState, appError) =
toApplicationStateAndError(pod, appStateFromContainer, appStateContainer)
sb.append(s"appState=$appState").append("\t")
sb.append(s"appError=${appError.orNull}")
info(sb.toString())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._

import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
import io.fabric8.kubernetes.api.model.{ContainerState, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod}
import io.fabric8.kubernetes.api.model.{ContainerState, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, Pod}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.informers.{ResourceEventHandler, SharedIndexInformer}

import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, toLabel, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}
import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, toApplicationStateAndError, toLabel, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}
import org.apache.kyuubi.util.KubernetesUtils

class KubernetesApplicationOperation extends ApplicationOperation with Logging {
Expand Down Expand Up @@ -263,15 +263,16 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}

private def updateApplicationState(pod: Pod): Unit = {
val appState = toApplicationState(pod, appStateFromContainer, appStateContainer)
val (appState, appError) =
toApplicationStateAndError(pod, appStateFromContainer, appStateContainer)
debug(s"Driver Informer changes pod: ${pod.getMetadata.getName} to state: $appState")
appInfoStore.put(
pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
ApplicationInfo(
id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
name = pod.getMetadata.getName,
state = appState,
error = Option(pod.getStatus.getReason)))
error = appError))
}

private def markApplicationTerminated(pod: Pod): Unit = synchronized {
Expand All @@ -296,13 +297,23 @@ object KubernetesApplicationOperation extends Logging {
pod: Pod,
appStateFromContainer: Boolean,
appStateContainer: Option[String]): ApplicationState = {
toApplicationStateAndError(pod, appStateFromContainer, appStateContainer)._1
}

def toApplicationStateAndError(
pod: Pod,
appStateFromContainer: Boolean,
appStateContainer: Option[String]): (ApplicationState, Option[String]) = {
val containerToBuildAppState = (appStateFromContainer, appStateContainer) match {
case (true, Some(container)) =>
pod.getStatus.getContainerStatuses.asScala.find(_.getName == container).map(_.getState)
case _ => None
}
containerToBuildAppState.map(containerStateToApplicationState)
val applicationState = containerToBuildAppState.map(containerStateToApplicationState)
.getOrElse(podStateToApplicationState(pod.getStatus.getPhase))
val applicationError = containerToBuildAppState.map(containerStateToApplicationError)
.getOrElse(Option(pod.getStatus.getReason))
applicationState -> applicationError
}

def containerStateToApplicationState(containerState: ContainerState): ApplicationState = {
Expand All @@ -322,8 +333,18 @@ object KubernetesApplicationOperation extends Logging {
}
}

def containerStateToApplicationError(containerState: ContainerState): Option[String] = {
// https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#container-states
Option(containerState.getWaiting)
.orElse(Option(containerState.getRunning))
.orElse(Option(containerState.getTerminated)) match {
case Some(waiting: ContainerStateWaiting) => Option(waiting.getReason)
case Some(terminated: ContainerStateTerminated) => Option(terminated.getReason)
case _ => None
}
}

def podStateToApplicationState(podState: String): ApplicationState = podState match {
// https://github.com/kubernetes/kubernetes/blob/master/pkg/apis/core/types.go#L2396
// https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase
case "Pending" => PENDING
case "Running" => RUNNING
Expand Down

0 comments on commit 0faa2af

Please sign in to comment.