[KYUUBI #6344] FlinkProcessBuilder prioritizes user configurations
# 🔍 Description ## Issue References 🔗 This pull request fixes #6344 `FlinkProcessBuilder` specifies `yarn.ship-files`, `yarn.application.name` and `yarn.tags` configurations of kyuubi platform. Sometimes we also need to customize these configurations, so we should prioritize these user configurations. ## Describe Your Solution 🔧 FlinkProcessBuilder prioritizes user configurations. ## Types of changes 🔖 - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests added new unit test --- # Checklist 📝 - [X] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6342 from wForget/hotfix2. Closes #6344 feca972ca [wforget] address comment 17df0844d [wforget] fix test and add flink constant ece91cc0c [wforget] FlinkProcessBuilder prioritizes user configurations Authored-by: wforget <643348094@qq.com> Signed-off-by: wforget <643348094@qq.com>
This commit is contained in:
parent
86ce5e1554
commit
c8e645734b
@ -115,7 +115,10 @@ object KyuubiApplicationManager {
|
||||
}
|
||||
|
||||
private def setupFlinkYarnTag(tag: String, conf: KyuubiConf): Unit = {
|
||||
val originalTag = conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY).map(_ + ",").getOrElse("")
|
||||
val originalTag = conf
|
||||
.getOption(s"${FlinkProcessBuilder.FLINK_CONF_PREFIX}.${FlinkProcessBuilder.YARN_TAG_KEY}")
|
||||
.orElse(conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY))
|
||||
.map(_ + ",").getOrElse("")
|
||||
val newTag = s"${originalTag}KYUUBI" + Some(tag).filterNot(_.isEmpty).map("," + _).getOrElse("")
|
||||
conf.set(FlinkProcessBuilder.YARN_TAG_KEY, newTag)
|
||||
}
|
||||
|
||||
@ -115,10 +115,15 @@ class FlinkProcessBuilder(
|
||||
flinkExtraJars += s"$hiveConfFile"
|
||||
}
|
||||
|
||||
val customFlinkConf = conf.getAllWithPrefix(FLINK_CONF_PREFIX, "")
|
||||
// add custom yarn.ship-files
|
||||
flinkExtraJars ++= customFlinkConf.get(YARN_SHIP_FILES_KEY)
|
||||
val yarnAppName = customFlinkConf.get(YARN_APPLICATION_NAME_KEY)
|
||||
.orElse(conf.getOption(APP_KEY))
|
||||
buffer += "-t"
|
||||
buffer += "yarn-application"
|
||||
buffer += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
|
||||
buffer += s"-Dyarn.application.name=${conf.getOption(APP_KEY).get}"
|
||||
buffer += s"-Dyarn.application.name=${yarnAppName.get}"
|
||||
buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}"
|
||||
buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=."
|
||||
|
||||
@ -126,8 +131,10 @@ class FlinkProcessBuilder(
|
||||
buffer += "-Dcontainerized.master.env.HIVE_CONF_DIR=."
|
||||
}
|
||||
|
||||
val customFlinkConf = conf.getAllWithPrefix("flink", "")
|
||||
customFlinkConf.filter(_._1 != "app.name").foreach { case (k, v) =>
|
||||
customFlinkConf.filter { case (k, _) =>
|
||||
!Seq("app.name", YARN_SHIP_FILES_KEY, YARN_APPLICATION_NAME_KEY, YARN_TAG_KEY)
|
||||
.contains(k)
|
||||
}.foreach { case (k, v) =>
|
||||
buffer += s"-D$k=$v"
|
||||
}
|
||||
|
||||
@ -213,8 +220,12 @@ class FlinkProcessBuilder(
|
||||
|
||||
object FlinkProcessBuilder {
|
||||
final val FLINK_EXEC_FILE = "flink"
|
||||
final val FLINK_CONF_PREFIX = "flink"
|
||||
final val APP_KEY = "flink.app.name"
|
||||
final val YARN_TAG_KEY = "yarn.tags"
|
||||
final val YARN_SHIP_FILES_KEY = "yarn.ship-files"
|
||||
final val YARN_APPLICATION_NAME_KEY = "yarn.application.name"
|
||||
|
||||
final val FLINK_HADOOP_CLASSPATH_KEY = "FLINK_HADOOP_CLASSPATH"
|
||||
final val FLINK_PROXY_USER_KEY = "HADOOP_PROXY_USER"
|
||||
}
|
||||
|
||||
@ -167,4 +167,35 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
|
||||
}
|
||||
matchActualAndExpectedApplicationMode(builder)
|
||||
}
|
||||
|
||||
test("user configuration takes priority") {
|
||||
val customShipFiles = "testFile1.jar;testFile2.jar"
|
||||
val customAppName = "testAppName"
|
||||
val customYarnTags = "testTag1,testTag2"
|
||||
val builderConf = applicationModeConf
|
||||
builderConf.set("flink.yarn.ship-files", customShipFiles)
|
||||
builderConf.set("flink.yarn.application.name", customAppName)
|
||||
builderConf.set("flink.yarn.tags", customYarnTags)
|
||||
val builder = new FlinkProcessBuilder("test", true, builderConf) {
|
||||
override def env: Map[String, String] = envWithAllHadoop
|
||||
}
|
||||
val actualCommands = builder.toString
|
||||
// scalastyle:off line.size.limit
|
||||
val expectedCommands =
|
||||
escapePaths(
|
||||
s"""${builder.flinkExecutable} run-application \\\\
|
||||
|\\t-t yarn-application \\\\
|
||||
|\\t-Dyarn.ship-files=.*flink-sql-client.*jar;.*flink-sql-gateway.*jar;$tempUdfJar;.*hive-site.xml;$customShipFiles \\\\
|
||||
|\\t-Dyarn.application.name=$customAppName \\\\
|
||||
|\\t-Dyarn.tags=$customYarnTags,KYUUBI \\\\
|
||||
|\\t-Dcontainerized.master.env.FLINK_CONF_DIR=. \\\\
|
||||
|\\t-Dcontainerized.master.env.HIVE_CONF_DIR=. \\\\
|
||||
|\\t-Dexecution.target=yarn-application \\\\
|
||||
|\\t-c org.apache.kyuubi.engine.flink.FlinkSQLEngine .*kyuubi-flink-sql-engine_.*jar""".stripMargin +
|
||||
"(?: \\\\\\n\\t--conf \\S+=\\S+)+")
|
||||
// scalastyle:on line.size.limit
|
||||
val regex = new Regex(expectedCommands)
|
||||
val matcher = regex.pattern.matcher(actualCommands)
|
||||
assert(matcher.matches())
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user