Skip to content

Commit

Permalink
cleanup driver pod periodically
Browse files Browse the repository at this point in the history
  • Loading branch information
liaoyt committed Dec 7, 2023
1 parent 5b3a78d commit 75c2b68
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 6 deletions.
3 changes: 2 additions & 1 deletion docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.kubernetes.master.address | <undefined> | The internal Kubernetes master (API server) address to be used for kyuubi. | string | 1.7.0 |
| kyuubi.kubernetes.namespace | default | The namespace that will be used for running the kyuubi pods and find engines. | string | 1.7.0 |
| kyuubi.kubernetes.namespace.allow.list || The allowed kubernetes namespace list, if it is empty, there is no kubernetes namespace limitation. | set | 1.8.0 |
| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod | NONE | Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are NONE, ALL, COMPLETED and default value is None which means none of the pod will be deleted | string | 1.8.1 |
| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval | PT1M | Kyuubi server use guava cache as the cleanup trigger with time-based eviction, but the eviction would not happened until any get/put operation happened. This option schedule a daemon thread evict cache periodically. | duration | 1.8.1 |
| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind | NONE | Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are NONE, ALL, COMPLETED and default value is None which means none of the pod will be deleted | string | 1.8.1 |
| kyuubi.kubernetes.spark.forciblyRewriteDriverPodName.enabled | false | Whether to forcibly rewrite Spark driver pod name with 'kyuubi-<uuid>-driver'. If disabled, Kyuubi will try to preserve the application name while satisfying K8s' pod name policy, but some vendors may have stricter pod name policies, thus the generated name may become illegal. | boolean | 1.8.1 |
| kyuubi.kubernetes.spark.forciblyRewriteExecutorPodNamePrefix.enabled | false | Whether to forcibly rewrite Spark executor pod name prefix with 'kyuubi-<uuid>'. If disabled, Kyuubi will try to preserve the application name while satisfying K8s' pod name policy, but some vendors may have stricter Pod name policies, thus the generated name may become illegal. | boolean | 1.8.1 |
| kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M | The period for which the Kyuubi server retains application information after the application terminates. | duration | 1.7.1 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1231,8 +1231,17 @@ object KyuubiConf {
.checkValue(_ > 0, "must be positive number")
.createWithDefault(Duration.ofMinutes(5).toMillis)

val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD: ConfigEntry[String] =
buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod")
val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval")
.doc("Kyuubi server use guava cache as the cleanup trigger with time-based eviction, " +
"but the eviction would not happened until any get/put operation happened. " +
"This option schedule a daemon thread evict cache periodically.")
.version("1.8.1")
.timeConf
.createWithDefaultString("PT1M")

val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND: ConfigEntry[String] =
buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind")
.doc("Kyuubi server will delete the spark driver pod after " +
s"the application terminates for ${KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD.key}. " +
"Available options are NONE, ALL, COMPLETED and " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.kyuubi.engine

import java.util.Locale
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit}

import scala.collection.JavaConverters._
import scala.util.control.NonFatal
Expand All @@ -34,7 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku
import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource
import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE}
import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
import org.apache.kyuubi.util.KubernetesUtils
import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils}

class KubernetesApplicationOperation extends ApplicationOperation with Logging {
import KubernetesApplicationOperation._
Expand Down Expand Up @@ -64,6 +64,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
// key is kyuubi_unique_key
private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState] = _

private var expireCleanUpTriggerCacheExecutor: ScheduledExecutorService = _

private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = {
checkKubernetesInfo(kubernetesInfo)
kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo))
Expand Down Expand Up @@ -109,7 +111,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
// Defer cleaning terminated application information
val retainPeriod = conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
val cleanupDriverPodStrategy = KubernetesCleanupDriverPodStrategy.withName(
conf.get(KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD))
conf.get(KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND))
val cleanupDriverPodCheckInterval = conf.get(
KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND_CHECK_INTERVAL)
cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
.expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
.removalListener((notification: RemovalNotification[String, ApplicationState]) => {
Expand Down Expand Up @@ -147,6 +151,23 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}
})
.build()
expireCleanUpTriggerCacheExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"pod-cleanup-trigger-thread")
expireCleanUpTriggerCacheExecutor.scheduleWithFixedDelay(
() => {
try {
cleanupTerminatedAppInfoTrigger.asMap().asScala.foreach {
case (key, _) =>
// do get to trigger cache eviction
cleanupTerminatedAppInfoTrigger.getIfPresent(key)
}
} catch {
case NonFatal(e) => error("Failed to evict clean up terminated app cache", e)
}
},
5,
cleanupDriverPodCheckInterval,
TimeUnit.MINUTES)
}

override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
Expand Down

0 comments on commit 75c2b68

Please sign in to comment.