[KYUUBI #2977] [BATCH] Using KyuubiApplicationManger#tagApplication help tag batch application

close #2977
### _Why are the changes needed?_

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2982 from zwangsheng/feature/KYUUBI-2977.

Closes #2977

46e5dba2 [zwangsheng] add spark app name
a3943596 [zwangsheng] fix compiler
02c43f21 [zwangsheng] fix style
f7e44eeb [zwangsheng] refis
3a7cb803 [zwangsheng] instead

Authored-by: zwangsheng <2213335496@qq.com>
Signed-off-by: Fei Wang <fwang12@ebay.com>
This commit is contained in:
zwangsheng 2022-07-03 01:15:47 +08:00 committed by Fei Wang
parent 163e0f8232
commit b174d0c19f

View File

@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark
import scala.collection.mutable.ArrayBuffer
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY
import org.apache.kyuubi.engine.KyuubiApplicationManager
import org.apache.kyuubi.operation.log.OperationLog
class SparkBatchProcessBuilder(
@ -44,16 +44,12 @@ class SparkBatchProcessBuilder(
buffer += cla
}
// tag for YARN
val batchJobTag = batchConf.get(TAG_KEY).map(_ + ",").getOrElse("") + batchId
var allConf = batchConf ++ Map(TAG_KEY -> batchJobTag) ++ sparkAppNameConf()
val batchKyuubiConf = new KyuubiConf(false)
batchConf.foreach(entry => { batchKyuubiConf.set(entry._1, entry._2) })
// tag batch application
KyuubiApplicationManager.tagApplication(batchId, "spark", clusterManager(), batchKyuubiConf)
// tag for K8S
conf.getOption("spark.kubernetes.driver.label." + LABEL_KYUUBI_UNIQUE_KEY).foreach(option => {
allConf = allConf ++ Map("spark.kubernetes.driver.label." + LABEL_KYUUBI_UNIQUE_KEY -> option)
})
allConf.foreach { case (k, v) =>
(batchKyuubiConf.getAll ++ sparkAppNameConf()).foreach { case (k, v) =>
buffer += CONF
buffer += s"$k=$v"
}