From 27ad1024b581f7016dccabbec8ddf0f7fa314bf7 Mon Sep 17 00:00:00 2001 From: yeatsliao Date: Thu, 7 Dec 2023 13:55:16 +0800 Subject: [PATCH] [KYUUBI #5795][K8S] Support to cleanup the spark driver pod periodically MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— This pull request fixes #5795 ## Describe Your Solution ๐Ÿ”ง Create a single daemon thread to traverse cache map periodically, which will evict expired cache and trigger a pod clean up operation. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklists ## ๐Ÿ“ Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [ ] I have performed a self-review - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## ๐Ÿ“ Committer Pre-Merge Checklist - [x] Pull request title is okay. - [x] No license issues. - [x] Milestone correctly set? - [ ] Test coverage is ok - [x] Assignees are selected. - [x] Minimum number of approvals - [x] No changes are requested **Be nice. Be informative.** Closes #5806 from liaoyt/master. Closes #5795 75c2b68cc [yeatsliao] cleanup driver pod periodically Authored-by: yeatsliao Signed-off-by: Cheng Pan --- docs/configuration/settings.md | 3 ++- .../org/apache/kyuubi/config/KyuubiConf.scala | 13 +++++++-- .../KubernetesApplicationOperation.scala | 27 ++++++++++++++++--- 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index 651829b4f..a3c0ca0df 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -326,7 +326,8 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.kubernetes.master.address | <undefined> | The internal Kubernetes master (API server) address to be used for kyuubi. | string | 1.7.0 | | kyuubi.kubernetes.namespace | default | The namespace that will be used for running the kyuubi pods and find engines. | string | 1.7.0 | | kyuubi.kubernetes.namespace.allow.list || The allowed kubernetes namespace list, if it is empty, there is no kubernetes namespace limitation. | set | 1.8.0 | -| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod | NONE | Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are NONE, ALL, COMPLETED and default value is None which means none of the pod will be deleted | string | 1.8.1 | +| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval | PT1M | Kyuubi server use guava cache as the cleanup trigger with time-based eviction, but the eviction would not happened until any get/put operation happened. This option schedule a daemon thread evict cache periodically. | duration | 1.8.1 | +| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind | NONE | Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are NONE, ALL, COMPLETED and default value is None which means none of the pod will be deleted | string | 1.8.1 | | kyuubi.kubernetes.spark.forciblyRewriteDriverPodName.enabled | false | Whether to forcibly rewrite Spark driver pod name with 'kyuubi--driver'. If disabled, Kyuubi will try to preserve the application name while satisfying K8s' pod name policy, but some vendors may have stricter pod name policies, thus the generated name may become illegal. | boolean | 1.8.1 | | kyuubi.kubernetes.spark.forciblyRewriteExecutorPodNamePrefix.enabled | false | Whether to forcibly rewrite Spark executor pod name prefix with 'kyuubi-'. If disabled, Kyuubi will try to preserve the application name while satisfying K8s' pod name policy, but some vendors may have stricter Pod name policies, thus the generated name may become illegal. | boolean | 1.8.1 | | kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M | The period for which the Kyuubi server retains application information after the application terminates. | duration | 1.7.1 | diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index dcd84e7be..b655f7984 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1231,8 +1231,17 @@ object KyuubiConf { .checkValue(_ > 0, "must be positive number") .createWithDefault(Duration.ofMinutes(5).toMillis) - val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD: ConfigEntry[String] = - buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod") + val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND_CHECK_INTERVAL: ConfigEntry[Long] = + buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval") + .doc("Kyuubi server use guava cache as the cleanup trigger with time-based eviction, " + + "but the eviction would not happened until any get/put operation happened. " + + "This option schedule a daemon thread evict cache periodically.") + .version("1.8.1") + .timeConf + .createWithDefaultString("PT1M") + + val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND: ConfigEntry[String] = + buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind") .doc("Kyuubi server will delete the spark driver pod after " + s"the application terminates for ${KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD.key}. " + "Available options are NONE, ALL, COMPLETED and " + diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index 922dd9a15..c7c69cc16 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -18,7 +18,7 @@ package org.apache.kyuubi.engine import java.util.Locale -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} import scala.collection.JavaConverters._ import scala.util.control.NonFatal @@ -34,7 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE} import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN} -import org.apache.kyuubi.util.KubernetesUtils +import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils} class KubernetesApplicationOperation extends ApplicationOperation with Logging { import KubernetesApplicationOperation._ @@ -64,6 +64,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { // key is kyuubi_unique_key private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState] = _ + private var expireCleanUpTriggerCacheExecutor: ScheduledExecutorService = _ + private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = { checkKubernetesInfo(kubernetesInfo) kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo)) @@ -109,7 +111,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { // Defer cleaning terminated application information val retainPeriod = conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD) val cleanupDriverPodStrategy = KubernetesCleanupDriverPodStrategy.withName( - conf.get(KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD)) + conf.get(KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND)) + val cleanupDriverPodCheckInterval = conf.get( + KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND_CHECK_INTERVAL) cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder() .expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS) .removalListener((notification: RemovalNotification[String, ApplicationState]) => { @@ -147,6 +151,23 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } }) .build() + expireCleanUpTriggerCacheExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "pod-cleanup-trigger-thread") + expireCleanUpTriggerCacheExecutor.scheduleWithFixedDelay( + () => { + try { + cleanupTerminatedAppInfoTrigger.asMap().asScala.foreach { + case (key, _) => + // do get to trigger cache eviction + cleanupTerminatedAppInfoTrigger.getIfPresent(key) + } + } catch { + case NonFatal(e) => error("Failed to evict clean up terminated app cache", e) + } + }, + 5, + cleanupDriverPodCheckInterval, + TimeUnit.MINUTES) } override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {