Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Nov 16, 2023
1 parent df47e1c commit 4a3779f
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 18 deletions.
2 changes: 1 addition & 1 deletion docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co

| Key | Default | Meaning | Type | Since |
|----------------------------------------------------------------------|-------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------|
| kyuubi.kubernetes.application.state.container | <undefined> | The container name to retrieve the application state from. | string | 1.8.1 |
| kyuubi.kubernetes.application.state.containers || A comma-separated list of container names to retrieve the application state from. | set | 1.8.1 |
| kyuubi.kubernetes.application.state.fromContainer | false | If set to true then the application state will be retrieved from the container instead of the pod. | boolean | 1.8.1 |
| kyuubi.kubernetes.authenticate.caCertFile | <undefined> | Path to the CA cert file for connecting to the Kubernetes API server over TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 |
| kyuubi.kubernetes.authenticate.clientCertFile | <undefined> | Path to the client cert file for connecting to the Kubernetes API server over TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1239,12 +1239,13 @@ object KyuubiConf {
.booleanConf
.createWithDefault(false)

val KUBERNETES_APPLICATION_STATE_CONTAINER: OptionalConfigEntry[String] =
buildConf("kyuubi.kubernetes.application.state.container")
.doc("The container name to retrieve the application state from.")
val KUBERNETES_APPLICATION_STATE_CONTAINERS: ConfigEntry[Set[String]] =
buildConf("kyuubi.kubernetes.application.state.containers")
.doc("A comma-separated list of container names to retrieve the application state from.")
.version("1.8.1")
.stringConf
.createOptional
.toSet()
.createWithDefault(Set.empty)

// ///////////////////////////////////////////////////////////////////////////////////////////////
// SQL Engine Configuration //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object KubernetesApplicationAuditLogger extends Logging {
kubernetesInfo: KubernetesInfo,
pod: Pod,
appStateFromContainer: Boolean,
appStateContainer: Option[String]): Unit = {
appStateContainer: Set[String]): Unit = {
val sb = AUDIT_BUFFER.get()
sb.setLength(0)
sb.append(s"label=${pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)}").append("\t")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
kyuubiConf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST)
private def appStateFromContainer: Boolean =
kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_FROM_CONTAINER)
private def appStateContainer: Option[String] =
kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_CONTAINER)
private def appStateContainers: Set[String] =
kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_CONTAINERS)

// key is kyuubi_unique_key
private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] =
Expand Down Expand Up @@ -225,22 +225,22 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
kubernetesInfo,
pod,
appStateFromContainer,
appStateContainer)
appStateContainers)
}
}

override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
if (isSparkEnginePod(newPod)) {
updateApplicationState(newPod)
val appState = toApplicationState(newPod, appStateFromContainer, appStateContainer)
val appState = toApplicationState(newPod, appStateFromContainer, appStateContainers)
if (isTerminated(appState)) {
markApplicationTerminated(newPod)
}
KubernetesApplicationAuditLogger.audit(
kubernetesInfo,
newPod,
appStateFromContainer,
appStateContainer)
appStateContainers)
}
}

Expand All @@ -252,7 +252,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
kubernetesInfo,
pod,
appStateFromContainer,
appStateContainer)
appStateContainers)
}
}
}
Expand All @@ -264,7 +264,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {

private def updateApplicationState(pod: Pod): Unit = {
val (appState, appError) =
toApplicationStateAndError(pod, appStateFromContainer, appStateContainer)
toApplicationStateAndError(pod, appStateFromContainer, appStateContainers)
debug(s"Driver Informer changes pod: ${pod.getMetadata.getName} to state: $appState")
appInfoStore.put(
pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
Expand All @@ -280,7 +280,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
if (cleanupTerminatedAppInfoTrigger.getIfPresent(key) == null) {
cleanupTerminatedAppInfoTrigger.put(
key,
toApplicationState(pod, appStateFromContainer, appStateContainer))
toApplicationState(pod, appStateFromContainer, appStateContainers))
}
}
}
Expand All @@ -296,17 +296,18 @@ object KubernetesApplicationOperation extends Logging {
def toApplicationState(
pod: Pod,
appStateFromContainer: Boolean,
appStateContainer: Option[String]): ApplicationState = {
appStateContainer: Set[String]): ApplicationState = {
toApplicationStateAndError(pod, appStateFromContainer, appStateContainer)._1
}

def toApplicationStateAndError(
pod: Pod,
appStateFromContainer: Boolean,
appStateContainer: Option[String]): (ApplicationState, Option[String]) = {
appStateContainer: Set[String]): (ApplicationState, Option[String]) = {
val containerToBuildAppState = (appStateFromContainer, appStateContainer) match {
case (true, Some(container)) =>
pod.getStatus.getContainerStatuses.asScala.find(_.getName == container).map(_.getState)
case (true, containers) if containers.nonEmpty =>
pod.getStatus.getContainerStatuses.asScala.find(s => containers.contains(s.getName)).map(
_.getState)
case _ => None
}
val applicationState = containerToBuildAppState.map(containerStateToApplicationState)
Expand Down

0 comments on commit 4a3779f

Please sign in to comment.