From f42a7d6cca8f4bbacdd383c700511a7998b2c0b6 Mon Sep 17 00:00:00 2001 From: Paul Lin Date: Thu, 24 Aug 2023 15:28:00 +0800 Subject: [PATCH] [KYUUBI #5190] [FLINK] Explicitly name Flink bootstrap SQL in application mode ### _Why are the changes needed?_ Currently, the name of flink bootstrap SQL is auto-generated 'collect'. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No. Closes #5190 from link3280/bootstrap_job_name. Closes #5190 ac769295c [Paul Lin] Explicit name Flink bootstrap sql Authored-by: Paul Lin Signed-off-by: Paul Lin --- .../org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala index 6adda6390..4b147d020 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala @@ -24,7 +24,7 @@ import java.util.concurrent.CountDownLatch import scala.collection.JavaConverters._ -import org.apache.flink.configuration.{Configuration, DeploymentOptions, GlobalConfiguration} +import org.apache.flink.configuration.{Configuration, DeploymentOptions, GlobalConfiguration, PipelineOptions} import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.gateway.service.context.DefaultContext @@ -131,11 +131,12 @@ object FlinkSQLEngine extends Logging { private def bootstrapFlinkApplicationExecutor() = { // trigger an execution to initiate EmbeddedExecutor with the default flink conf val flinkConf = new Configuration() - debug(s"Running initial Flink SQL in application mode with flink conf: $flinkConf.") + flinkConf.set(PipelineOptions.NAME, "kyuubi-bootstrap-sql") + debug(s"Running bootstrap Flink SQL in application mode with flink conf: $flinkConf.") val tableEnv = TableEnvironment.create(flinkConf) val res = tableEnv.executeSql("select 'kyuubi'") res.await() - info("Initial Flink SQL finished.") + info("Bootstrap Flink SQL finished.") } private def setDeploymentConf(executionTarget: String, flinkConf: Configuration): Unit = {