[KYUUBI #5795][K8S] Support to cleanup the spark driver pod periodically
# 🔍 Description ## Issue References 🔗 This pull request fixes #5795 ## Describe Your Solution 🔧 Create a single daemon thread to traverse cache map periodically, which will evict expired cache and trigger a pod clean up operation. ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests --- # Checklists ## 📝 Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [ ] I have performed a self-review - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## 📝 Committer Pre-Merge Checklist - [x] Pull request title is okay. - [x] No license issues. - [x] Milestone correctly set? - [ ] Test coverage is ok - [x] Assignees are selected. - [x] Minimum number of approvals - [x] No changes are requested **Be nice. Be informative.** Closes #5806 from liaoyt/master. Closes #5795 75c2b68cc [yeatsliao] cleanup driver pod periodically Authored-by: yeatsliao <liaoyt66066@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
f3f643a309
commit
27ad1024b5
@ -326,7 +326,8 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
|
||||
| kyuubi.kubernetes.master.address | <undefined> | The internal Kubernetes master (API server) address to be used for kyuubi. | string | 1.7.0 |
|
||||
| kyuubi.kubernetes.namespace | default | The namespace that will be used for running the kyuubi pods and find engines. | string | 1.7.0 |
|
||||
| kyuubi.kubernetes.namespace.allow.list || The allowed kubernetes namespace list, if it is empty, there is no kubernetes namespace limitation. | set | 1.8.0 |
|
||||
| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod | NONE | Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are NONE, ALL, COMPLETED and default value is None which means none of the pod will be deleted | string | 1.8.1 |
|
||||
| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval | PT1M | Kyuubi server use guava cache as the cleanup trigger with time-based eviction, but the eviction would not happened until any get/put operation happened. This option schedule a daemon thread evict cache periodically. | duration | 1.8.1 |
|
||||
| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind | NONE | Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. Available options are NONE, ALL, COMPLETED and default value is None which means none of the pod will be deleted | string | 1.8.1 |
|
||||
| kyuubi.kubernetes.spark.forciblyRewriteDriverPodName.enabled | false | Whether to forcibly rewrite Spark driver pod name with 'kyuubi-<uuid>-driver'. If disabled, Kyuubi will try to preserve the application name while satisfying K8s' pod name policy, but some vendors may have stricter pod name policies, thus the generated name may become illegal. | boolean | 1.8.1 |
|
||||
| kyuubi.kubernetes.spark.forciblyRewriteExecutorPodNamePrefix.enabled | false | Whether to forcibly rewrite Spark executor pod name prefix with 'kyuubi-<uuid>'. If disabled, Kyuubi will try to preserve the application name while satisfying K8s' pod name policy, but some vendors may have stricter Pod name policies, thus the generated name may become illegal. | boolean | 1.8.1 |
|
||||
| kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M | The period for which the Kyuubi server retains application information after the application terminates. | duration | 1.7.1 |
|
||||
|
||||
@ -1231,8 +1231,17 @@ object KyuubiConf {
|
||||
.checkValue(_ > 0, "must be positive number")
|
||||
.createWithDefault(Duration.ofMinutes(5).toMillis)
|
||||
|
||||
val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD: ConfigEntry[String] =
|
||||
buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod")
|
||||
val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND_CHECK_INTERVAL: ConfigEntry[Long] =
|
||||
buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval")
|
||||
.doc("Kyuubi server use guava cache as the cleanup trigger with time-based eviction, " +
|
||||
"but the eviction would not happened until any get/put operation happened. " +
|
||||
"This option schedule a daemon thread evict cache periodically.")
|
||||
.version("1.8.1")
|
||||
.timeConf
|
||||
.createWithDefaultString("PT1M")
|
||||
|
||||
val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND: ConfigEntry[String] =
|
||||
buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind")
|
||||
.doc("Kyuubi server will delete the spark driver pod after " +
|
||||
s"the application terminates for ${KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD.key}. " +
|
||||
"Available options are NONE, ALL, COMPLETED and " +
|
||||
|
||||
@ -18,7 +18,7 @@
|
||||
package org.apache.kyuubi.engine
|
||||
|
||||
import java.util.Locale
|
||||
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
|
||||
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.util.control.NonFatal
|
||||
@ -34,7 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku
|
||||
import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource
|
||||
import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE}
|
||||
import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
|
||||
import org.apache.kyuubi.util.KubernetesUtils
|
||||
import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils}
|
||||
|
||||
class KubernetesApplicationOperation extends ApplicationOperation with Logging {
|
||||
import KubernetesApplicationOperation._
|
||||
@ -64,6 +64,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
|
||||
// key is kyuubi_unique_key
|
||||
private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState] = _
|
||||
|
||||
private var expireCleanUpTriggerCacheExecutor: ScheduledExecutorService = _
|
||||
|
||||
private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = {
|
||||
checkKubernetesInfo(kubernetesInfo)
|
||||
kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo))
|
||||
@ -109,7 +111,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
|
||||
// Defer cleaning terminated application information
|
||||
val retainPeriod = conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
|
||||
val cleanupDriverPodStrategy = KubernetesCleanupDriverPodStrategy.withName(
|
||||
conf.get(KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD))
|
||||
conf.get(KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND))
|
||||
val cleanupDriverPodCheckInterval = conf.get(
|
||||
KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND_CHECK_INTERVAL)
|
||||
cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
|
||||
.expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
|
||||
.removalListener((notification: RemovalNotification[String, ApplicationState]) => {
|
||||
@ -147,6 +151,23 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
|
||||
}
|
||||
})
|
||||
.build()
|
||||
expireCleanUpTriggerCacheExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
|
||||
"pod-cleanup-trigger-thread")
|
||||
expireCleanUpTriggerCacheExecutor.scheduleWithFixedDelay(
|
||||
() => {
|
||||
try {
|
||||
cleanupTerminatedAppInfoTrigger.asMap().asScala.foreach {
|
||||
case (key, _) =>
|
||||
// do get to trigger cache eviction
|
||||
cleanupTerminatedAppInfoTrigger.getIfPresent(key)
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) => error("Failed to evict clean up terminated app cache", e)
|
||||
}
|
||||
},
|
||||
5,
|
||||
cleanupDriverPodCheckInterval,
|
||||
TimeUnit.MINUTES)
|
||||
}
|
||||
|
||||
override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user