From b47e1cdc8bf1c8a76056fba787ca8ebbc40a0de5 Mon Sep 17 00:00:00 2001 From: Paul Lin Date: Tue, 25 Jan 2022 10:05:02 +0800 Subject: [PATCH] [KYUUBI #1825] Generate appName for Flink applications ### _Why are the changes needed?_ Generate Flink application names for per-job clusters and application-mode clusters with respect to the pattern in Spark engine. This is a sub-task of KPIP-2 #1322. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1827 from link3280/KYUUBI-1825. Closes #1825 c6670f9b [Paul Lin] [KYUUBI #1825] Remove `null` execution target case branch 1c093d68 [Paul Lin] [KYUUBI #1825] Set app name only if absent ee2845f0 [Paul Lin] [KYUUBI #1825] Add the default branch for pattern matching 3d9e0928 [Paul Lin] [KYUUBI #1825] Generate appName for Flink applications Authored-by: Paul Lin Signed-off-by: Kent Yao --- .../kyuubi/engine/flink/FlinkSQLEngine.scala | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala index 5d3b67a61..f986d7601 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala @@ -17,16 +17,17 @@ package org.apache.kyuubi.engine.flink +import java.time.Instant import java.util.concurrent.CountDownLatch import scala.collection.JavaConverters._ import org.apache.flink.client.cli.{CliFrontend, CustomCommandLine, DefaultCLI} -import org.apache.flink.configuration.GlobalConfiguration +import org.apache.flink.configuration.{DeploymentOptions, GlobalConfiguration} import org.apache.flink.table.client.gateway.context.DefaultContext import org.apache.kyuubi.Logging -import org.apache.kyuubi.Utils.{addShutdownHook, FLINK_ENGINE_SHUTDOWN_PRIORITY} +import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine} import org.apache.kyuubi.service.Serverable @@ -62,6 +63,8 @@ object FlinkSQLEngine extends Logging { val kyuubiConf: KyuubiConf = KyuubiConf() var currentEngine: Option[FlinkSQLEngine] = None + private val user = currentUser + private val countDownLatch = new CountDownLatch(1) def main(args: Array[String]): Unit = { @@ -72,6 +75,24 @@ object FlinkSQLEngine extends Logging { try { val flinkConfDir = CliFrontend.getConfigurationDirectoryFromEnv val flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir) + + val executionTarget = flinkConf.getString(DeploymentOptions.TARGET) + // set cluster name for per-job and application mode + executionTarget match { + case "yarn-per-job" | "yarn-application" => + if (!flinkConf.containsKey("yarn.application.name")) { + val appName = s"kyuubi_${user}_flink_${Instant.now}" + flinkConf.setString("yarn.application.name", appName) + } + case "kubernetes-application" => + if (!flinkConf.containsKey("kubernetes.cluster-id")) { + val appName = s"kyuubi-${user}-flink-${Instant.now}" + flinkConf.setString("kubernetes.cluster-id", appName) + } + case other => + debug(s"Skip generating app name for execution target $other") + } + val engineContext = new DefaultContext( List.empty.asJava, flinkConf,