From b174d0c19f969997477a7043b12e01cd5bad89df Mon Sep 17 00:00:00 2001 From: zwangsheng <2213335496@qq.com> Date: Sun, 3 Jul 2022 01:15:47 +0800 Subject: [PATCH] [KYUUBI #2977] [BATCH] Using KyuubiApplicationManger#tagApplication help tag batch application close #2977 ### _Why are the changes needed?_ ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2982 from zwangsheng/feature/KYUUBI-2977. Closes #2977 46e5dba2 [zwangsheng] add spark app name a3943596 [zwangsheng] fix compiler 02c43f21 [zwangsheng] fix style f7e44eeb [zwangsheng] refis 3a7cb803 [zwangsheng] instead Authored-by: zwangsheng <2213335496@qq.com> Signed-off-by: Fei Wang --- .../engine/spark/SparkBatchProcessBuilder.scala | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala index 96cd21a00..657d9a436 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala @@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark import scala.collection.mutable.ArrayBuffer import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.engine.KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY +import org.apache.kyuubi.engine.KyuubiApplicationManager import org.apache.kyuubi.operation.log.OperationLog class SparkBatchProcessBuilder( @@ -44,16 +44,12 @@ class SparkBatchProcessBuilder( buffer += cla } - // tag for YARN - val batchJobTag = batchConf.get(TAG_KEY).map(_ + ",").getOrElse("") + batchId - var allConf = batchConf ++ Map(TAG_KEY -> batchJobTag) ++ sparkAppNameConf() + val batchKyuubiConf = new KyuubiConf(false) + batchConf.foreach(entry => { batchKyuubiConf.set(entry._1, entry._2) }) + // tag batch application + KyuubiApplicationManager.tagApplication(batchId, "spark", clusterManager(), batchKyuubiConf) - // tag for K8S - conf.getOption("spark.kubernetes.driver.label." + LABEL_KYUUBI_UNIQUE_KEY).foreach(option => { - allConf = allConf ++ Map("spark.kubernetes.driver.label." + LABEL_KYUUBI_UNIQUE_KEY -> option) - }) - - allConf.foreach { case (k, v) => + (batchKyuubiConf.getAll ++ sparkAppNameConf()).foreach { case (k, v) => buffer += CONF buffer += s"$k=$v" }