diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala index 5d3b67a61..f986d7601 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala @@ -17,16 +17,17 @@ package org.apache.kyuubi.engine.flink +import java.time.Instant import java.util.concurrent.CountDownLatch import scala.collection.JavaConverters._ import org.apache.flink.client.cli.{CliFrontend, CustomCommandLine, DefaultCLI} -import org.apache.flink.configuration.GlobalConfiguration +import org.apache.flink.configuration.{DeploymentOptions, GlobalConfiguration} import org.apache.flink.table.client.gateway.context.DefaultContext import org.apache.kyuubi.Logging -import org.apache.kyuubi.Utils.{addShutdownHook, FLINK_ENGINE_SHUTDOWN_PRIORITY} +import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine} import org.apache.kyuubi.service.Serverable @@ -62,6 +63,8 @@ object FlinkSQLEngine extends Logging { val kyuubiConf: KyuubiConf = KyuubiConf() var currentEngine: Option[FlinkSQLEngine] = None + private val user = currentUser + private val countDownLatch = new CountDownLatch(1) def main(args: Array[String]): Unit = { @@ -72,6 +75,24 @@ object FlinkSQLEngine extends Logging { try { val flinkConfDir = CliFrontend.getConfigurationDirectoryFromEnv val flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir) + + val executionTarget = flinkConf.getString(DeploymentOptions.TARGET) + // set cluster name for per-job and application mode + executionTarget match { + case "yarn-per-job" | "yarn-application" => + if (!flinkConf.containsKey("yarn.application.name")) { + val appName = s"kyuubi_${user}_flink_${Instant.now}" + flinkConf.setString("yarn.application.name", appName) + } + case "kubernetes-application" => + if (!flinkConf.containsKey("kubernetes.cluster-id")) { + val appName = s"kyuubi-${user}-flink-${Instant.now}" + flinkConf.setString("kubernetes.cluster-id", appName) + } + case other => + debug(s"Skip generating app name for execution target $other") + } + val engineContext = new DefaultContext( List.empty.asJava, flinkConf,