diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala index 0e914987c..f2887b3e9 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala @@ -115,7 +115,10 @@ object KyuubiApplicationManager { } private def setupFlinkYarnTag(tag: String, conf: KyuubiConf): Unit = { - val originalTag = conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY).map(_ + ",").getOrElse("") + val originalTag = conf + .getOption(s"${FlinkProcessBuilder.FLINK_CONF_PREFIX}.${FlinkProcessBuilder.YARN_TAG_KEY}") + .orElse(conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY)) + .map(_ + ",").getOrElse("") val newTag = s"${originalTag}KYUUBI" + Some(tag).filterNot(_.isEmpty).map("," + _).getOrElse("") conf.set(FlinkProcessBuilder.YARN_TAG_KEY, newTag) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala index 4ae714dee..241b7ec78 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala @@ -115,10 +115,15 @@ class FlinkProcessBuilder( flinkExtraJars += s"$hiveConfFile" } + val customFlinkConf = conf.getAllWithPrefix(FLINK_CONF_PREFIX, "") + // add custom yarn.ship-files + flinkExtraJars ++= customFlinkConf.get(YARN_SHIP_FILES_KEY) + val yarnAppName = customFlinkConf.get(YARN_APPLICATION_NAME_KEY) + .orElse(conf.getOption(APP_KEY)) buffer += "-t" buffer += "yarn-application" buffer += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}" - buffer += s"-Dyarn.application.name=${conf.getOption(APP_KEY).get}" + buffer += s"-Dyarn.application.name=${yarnAppName.get}" buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}" buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=." @@ -126,8 +131,10 @@ class FlinkProcessBuilder( buffer += "-Dcontainerized.master.env.HIVE_CONF_DIR=." } - val customFlinkConf = conf.getAllWithPrefix("flink", "") - customFlinkConf.filter(_._1 != "app.name").foreach { case (k, v) => + customFlinkConf.filter { case (k, _) => + !Seq("app.name", YARN_SHIP_FILES_KEY, YARN_APPLICATION_NAME_KEY, YARN_TAG_KEY) + .contains(k) + }.foreach { case (k, v) => buffer += s"-D$k=$v" } @@ -213,8 +220,12 @@ class FlinkProcessBuilder( object FlinkProcessBuilder { final val FLINK_EXEC_FILE = "flink" + final val FLINK_CONF_PREFIX = "flink" final val APP_KEY = "flink.app.name" final val YARN_TAG_KEY = "yarn.tags" + final val YARN_SHIP_FILES_KEY = "yarn.ship-files" + final val YARN_APPLICATION_NAME_KEY = "yarn.application.name" + final val FLINK_HADOOP_CLASSPATH_KEY = "FLINK_HADOOP_CLASSPATH" final val FLINK_PROXY_USER_KEY = "HADOOP_PROXY_USER" } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala index 39fee1163..952f71c08 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala @@ -167,4 +167,35 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { } matchActualAndExpectedApplicationMode(builder) } + + test("user configuration takes priority") { + val customShipFiles = "testFile1.jar;testFile2.jar" + val customAppName = "testAppName" + val customYarnTags = "testTag1,testTag2" + val builderConf = applicationModeConf + builderConf.set("flink.yarn.ship-files", customShipFiles) + builderConf.set("flink.yarn.application.name", customAppName) + builderConf.set("flink.yarn.tags", customYarnTags) + val builder = new FlinkProcessBuilder("test", true, builderConf) { + override def env: Map[String, String] = envWithAllHadoop + } + val actualCommands = builder.toString + // scalastyle:off line.size.limit + val expectedCommands = + escapePaths( + s"""${builder.flinkExecutable} run-application \\\\ + |\\t-t yarn-application \\\\ + |\\t-Dyarn.ship-files=.*flink-sql-client.*jar;.*flink-sql-gateway.*jar;$tempUdfJar;.*hive-site.xml;$customShipFiles \\\\ + |\\t-Dyarn.application.name=$customAppName \\\\ + |\\t-Dyarn.tags=$customYarnTags,KYUUBI \\\\ + |\\t-Dcontainerized.master.env.FLINK_CONF_DIR=. \\\\ + |\\t-Dcontainerized.master.env.HIVE_CONF_DIR=. \\\\ + |\\t-Dexecution.target=yarn-application \\\\ + |\\t-c org.apache.kyuubi.engine.flink.FlinkSQLEngine .*kyuubi-flink-sql-engine_.*jar""".stripMargin + + "(?: \\\\\\n\\t--conf \\S+=\\S+)+") + // scalastyle:on line.size.limit + val regex = new Regex(expectedCommands) + val matcher = regex.pattern.matcher(actualCommands) + assert(matcher.matches()) + } }