diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index 651829b4f..a3c0ca0df 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -326,7 +326,8 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.kubernetes.master.address | <undefined> | The internal Kubernetes master (API server) address to be used for kyuubi. | string | 1.7.0 | | kyuubi.kubernetes.namespace | default | The namespace that will be used for running the kyuubi pods and find engines. | string | 1.7.0 | | kyuubi.kubernetes.namespace.allow.list || The allowed kubernetes namespace list, if it is empty, there is no kubernetes namespace limitation. | set | 1.8.0 | -| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod | NONE | Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are NONE, ALL, COMPLETED and default value is None which means none of the pod will be deleted | string | 1.8.1 | +| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval | PT1M | Kyuubi server use guava cache as the cleanup trigger with time-based eviction, but the eviction would not happened until any get/put operation happened. This option schedule a daemon thread evict cache periodically. | duration | 1.8.1 | +| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind | NONE | Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are NONE, ALL, COMPLETED and default value is None which means none of the pod will be deleted | string | 1.8.1 | | kyuubi.kubernetes.spark.forciblyRewriteDriverPodName.enabled | false | Whether to forcibly rewrite Spark driver pod name with 'kyuubi--driver'. If disabled, Kyuubi will try to preserve the application name while satisfying K8s' pod name policy, but some vendors may have stricter pod name policies, thus the generated name may become illegal. | boolean | 1.8.1 | | kyuubi.kubernetes.spark.forciblyRewriteExecutorPodNamePrefix.enabled | false | Whether to forcibly rewrite Spark executor pod name prefix with 'kyuubi-'. If disabled, Kyuubi will try to preserve the application name while satisfying K8s' pod name policy, but some vendors may have stricter Pod name policies, thus the generated name may become illegal. | boolean | 1.8.1 | | kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M | The period for which the Kyuubi server retains application information after the application terminates. | duration | 1.7.1 | diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index dcd84e7be..b655f7984 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1231,8 +1231,17 @@ object KyuubiConf { .checkValue(_ > 0, "must be positive number") .createWithDefault(Duration.ofMinutes(5).toMillis) - val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD: ConfigEntry[String] = - buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod") + val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND_CHECK_INTERVAL: ConfigEntry[Long] = + buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval") + .doc("Kyuubi server use guava cache as the cleanup trigger with time-based eviction, " + + "but the eviction would not happened until any get/put operation happened. " + + "This option schedule a daemon thread evict cache periodically.") + .version("1.8.1") + .timeConf + .createWithDefaultString("PT1M") + + val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND: ConfigEntry[String] = + buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind") .doc("Kyuubi server will delete the spark driver pod after " + s"the application terminates for ${KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD.key}. " + "Available options are NONE, ALL, COMPLETED and " + diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 922dd9a15..c7c69cc16 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -18,7 +18,7 @@ package org.apache.kyuubi.engine import java.util.Locale -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} import scala.collection.JavaConverters._ import scala.util.control.NonFatal @@ -34,7 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE} import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN} -import org.apache.kyuubi.util.KubernetesUtils +import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils} class KubernetesApplicationOperation extends ApplicationOperation with Logging { import KubernetesApplicationOperation._ @@ -64,6 +64,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { // key is kyuubi_unique_key private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState] = _ + private var expireCleanUpTriggerCacheExecutor: ScheduledExecutorService = _ + private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = { checkKubernetesInfo(kubernetesInfo) kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo)) @@ -109,7 +111,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { // Defer cleaning terminated application information val retainPeriod = conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD) val cleanupDriverPodStrategy = KubernetesCleanupDriverPodStrategy.withName( - conf.get(KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD)) + conf.get(KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND)) + val cleanupDriverPodCheckInterval = conf.get( + KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND_CHECK_INTERVAL) cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder() .expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS) .removalListener((notification: RemovalNotification[String, ApplicationState]) => { @@ -147,6 +151,23 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } }) .build() + expireCleanUpTriggerCacheExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "pod-cleanup-trigger-thread") + expireCleanUpTriggerCacheExecutor.scheduleWithFixedDelay( + () => { + try { + cleanupTerminatedAppInfoTrigger.asMap().asScala.foreach { + case (key, _) => + // do get to trigger cache eviction + cleanupTerminatedAppInfoTrigger.getIfPresent(key) + } + } catch { + case NonFatal(e) => error("Failed to evict clean up terminated app cache", e) + } + }, + 5, + cleanupDriverPodCheckInterval, + TimeUnit.MINUTES) } override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {