diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala index 48a354b0f..99a9ee56b 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala @@ -31,6 +31,7 @@ import org.apache.flink.table.gateway.service.context.DefaultContext import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY} import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine} import org.apache.kyuubi.service.Serverable import org.apache.kyuubi.util.SignalRegister @@ -92,26 +93,7 @@ object FlinkSQLEngine extends Logging { flinkConf.addAll(Configuration.fromMap(flinkConfFromArgs.asJava)) val executionTarget = flinkConf.getString(DeploymentOptions.TARGET) - // set cluster name for per-job and application mode - executionTarget match { - case "yarn-per-job" | "yarn-application" => - if (!flinkConf.containsKey("yarn.application.name")) { - val appName = s"kyuubi_${user}_flink_${Instant.now}" - flinkConf.setString("yarn.application.name", appName) - } - if (flinkConf.containsKey("high-availability.cluster-id")) { - flinkConf.setString( - "yarn.application.id", - flinkConf.toMap.get("high-availability.cluster-id")) - } - case "kubernetes-application" => - if (!flinkConf.containsKey("kubernetes.cluster-id")) { - val appName = s"kyuubi-${user}-flink-${Instant.now}" - flinkConf.setString("kubernetes.cluster-id", appName) - } - case other => - debug(s"Skip generating app name for execution target $other") - } + setDeploymentConf(executionTarget, flinkConf) kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) @@ -153,4 +135,35 @@ object FlinkSQLEngine extends Logging { res.await() info("Initial Flink SQL finished.") } + + private def setDeploymentConf(executionTarget: String, flinkConf: Configuration): Unit = { + // forward kyuubi engine variables to flink configuration + val instant = Instant.now + val engineName = s"kyuubi_${user}_flink_$instant" + flinkConf.setString(KYUUBI_ENGINE_NAME, engineName) + + kyuubiConf.getOption(KYUUBI_SESSION_USER_KEY).foreach(user => + flinkConf.setString(KYUUBI_SESSION_USER_KEY, user)) + + // set cluster name for per-job and application mode + executionTarget match { + case "yarn-per-job" | "yarn-application" => + if (!flinkConf.containsKey("yarn.application.name")) { + val appName = engineName + flinkConf.setString("yarn.application.name", appName) + } + if (flinkConf.containsKey("high-availability.cluster-id")) { + flinkConf.setString( + "yarn.application.id", + flinkConf.toMap.get("high-availability.cluster-id")) + } + case "kubernetes-application" => + if (!flinkConf.containsKey("kubernetes.cluster-id")) { + val appName = s"kyuubi-${user}-flink-$instant" + flinkConf.setString("kubernetes.cluster-id", appName) + } + case other => + debug(s"Skip generating app name for execution target $other") + } + } } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala index 10a48f1a1..b8d1f8569 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala @@ -30,6 +30,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY import org.apache.kyuubi.engine.flink.FlinkEngineUtils +import org.apache.kyuubi.engine.flink.udf.KDFRegistry import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager, USE_CATALOG, USE_DATABASE} class FlinkSessionImpl( @@ -46,10 +47,12 @@ class FlinkSessionImpl( conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID) .getOrElse(SessionHandle.fromUUID(fSession.getSessionHandle.getIdentifier.toString)) - lazy val sessionContext: SessionContext = { + val sessionContext: SessionContext = { FlinkEngineUtils.getSessionContext(fSession) } + KDFRegistry.registerAll(sessionContext) + private def setModifiableConfig(key: String, value: String): Unit = { try { sessionContext.set(key, value) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala new file mode 100644 index 000000000..b6729cff3 --- /dev/null +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KDFRegistry.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.flink.udf + +import java.util + +import scala.collection.mutable.ArrayBuffer + +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.{ScalarFunction, UserDefinedFunction} +import org.apache.flink.table.gateway.service.context.SessionContext + +import org.apache.kyuubi.{KYUUBI_VERSION, Utils} +import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY} +import org.apache.kyuubi.engine.flink.FlinkEngineUtils +import org.apache.kyuubi.util.reflect.DynMethods + +object KDFRegistry { + + def createKyuubiDefinedFunctions(sessionContext: SessionContext): Array[KyuubiDefinedFunction] = { + + val kyuubiDefinedFunctions = new ArrayBuffer[KyuubiDefinedFunction] + + val flinkConfigMap: util.Map[String, String] = { + if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) { + DynMethods + .builder("getConfigMap") + .impl(classOf[SessionContext]) + .build() + .invoke(sessionContext) + .asInstanceOf[util.Map[String, String]] + } else { + DynMethods + .builder("getSessionConf") + .impl(classOf[SessionContext]) + .build() + .invoke(sessionContext) + .asInstanceOf[Configuration] + .toMap + } + } + + val kyuubi_version: KyuubiDefinedFunction = create( + "kyuubi_version", + new KyuubiVersionFunction(flinkConfigMap), + "Return the version of Kyuubi Server", + "string", + "1.8.0") + kyuubiDefinedFunctions += kyuubi_version + + val engineName: KyuubiDefinedFunction = create( + "kyuubi_engine_name", + new EngineNameFunction(flinkConfigMap), + "Return the application name for the associated query engine", + "string", + "1.8.0") + kyuubiDefinedFunctions += engineName + + val engineId: KyuubiDefinedFunction = create( + "kyuubi_engine_id", + new EngineIdFunction(flinkConfigMap), + "Return the application id for the associated query engine", + "string", + "1.8.0") + kyuubiDefinedFunctions += engineId + + val systemUser: KyuubiDefinedFunction = create( + "kyuubi_system_user", + new SystemUserFunction(flinkConfigMap), + "Return the system user name for the associated query engine", + "string", + "1.8.0") + kyuubiDefinedFunctions += systemUser + + val sessionUser: KyuubiDefinedFunction = create( + "kyuubi_session_user", + new SessionUserFunction(flinkConfigMap), + "Return the session username for the associated query engine", + "string", + "1.8.0") + kyuubiDefinedFunctions += sessionUser + + kyuubiDefinedFunctions.toArray + } + + def create( + name: String, + udf: UserDefinedFunction, + description: String, + returnType: String, + since: String): KyuubiDefinedFunction = { + val kdf = KyuubiDefinedFunction(name, udf, description, returnType, since) + kdf + } + + def registerAll(sessionContext: SessionContext): Unit = { + val functions = createKyuubiDefinedFunctions(sessionContext) + for (func <- functions) { + sessionContext.getSessionState.functionCatalog + .registerTemporarySystemFunction(func.name, func.udf, true) + } + } +} + +class KyuubiVersionFunction(confMap: util.Map[String, String]) extends ScalarFunction { + def eval(): String = KYUUBI_VERSION +} + +class EngineNameFunction(confMap: util.Map[String, String]) extends ScalarFunction { + def eval(): String = { + confMap match { + case m if m.containsKey("yarn.application.name") => m.get("yarn.application.name") + case m if m.containsKey("kubernetes.cluster-id") => m.get("kubernetes.cluster-id") + case m => m.getOrDefault(KYUUBI_ENGINE_NAME, "unknown-engine-name") + } + } +} + +class EngineIdFunction(confMap: util.Map[String, String]) extends ScalarFunction { + def eval(): String = { + confMap match { + case m if m.containsKey("yarn.application.id") => m.get("yarn.application.id") + case m if m.containsKey("kubernetes.cluster-id") => m.get("kubernetes.cluster-id") + case m => m.getOrDefault("high-availability.cluster-id", "unknown-engine-id") + } + } +} + +class SystemUserFunction(confMap: util.Map[String, String]) extends ScalarFunction { + def eval(): String = Utils.currentUser +} + +class SessionUserFunction(confMap: util.Map[String, String]) extends ScalarFunction { + def eval(): String = confMap.getOrDefault(KYUUBI_SESSION_USER_KEY, "unknown-user") +} diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KyuubiDefinedFunction.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KyuubiDefinedFunction.scala new file mode 100644 index 000000000..5cfce86d6 --- /dev/null +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/udf/KyuubiDefinedFunction.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.flink.udf + +import org.apache.flink.table.functions.UserDefinedFunction + +/** + * A wrapper for Flink's [[UserDefinedFunction]] + * + * @param name function name + * @param udf user-defined function + * @param description function description + */ +case class KyuubiDefinedFunction( + name: String, + udf: UserDefinedFunction, + description: String, + returnType: String, + since: String) diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala index 0f4b38d36..b8f6768cc 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala @@ -19,7 +19,9 @@ package org.apache.kyuubi.engine.flink.operation import java.util.UUID +import org.apache.kyuubi.{KYUUBI_VERSION, Utils} import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineLocal} import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE} @@ -33,11 +35,13 @@ class FlinkOperationLocalSuite extends FlinkOperationSuite override def withKyuubiConf: Map[String, String] = { Map( "flink.execution.target" -> "remote", + "flink.high-availability.cluster-id" -> "flink-mini-cluster", HA_NAMESPACE.key -> namespace, HA_ENGINE_REF_ID.key -> engineRefId, ENGINE_TYPE.key -> "FLINK_SQL", ENGINE_SHARE_LEVEL.key -> shareLevel, - OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name) ++ testExtraConf + OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name, + KYUUBI_SESSION_USER_KEY -> "paullin") ++ testExtraConf } override protected def engineRefId: String = UUID.randomUUID().toString @@ -48,4 +52,27 @@ class FlinkOperationLocalSuite extends FlinkOperationSuite def engineType: String = "flink" + test("execute statement - kyuubi defined functions") { + withJdbcStatement() { statement => + var resultSet = statement.executeQuery("select kyuubi_version() as kyuubi_version") + assert(resultSet.next()) + assert(resultSet.getString(1) === KYUUBI_VERSION) + + resultSet = statement.executeQuery("select kyuubi_engine_name() as engine_name") + assert(resultSet.next()) + assert(resultSet.getString(1).startsWith(s"kyuubi_${Utils.currentUser}_flink")) + + resultSet = statement.executeQuery("select kyuubi_engine_id() as engine_id") + assert(resultSet.next()) + assert(resultSet.getString(1) === "flink-mini-cluster") + + resultSet = statement.executeQuery("select kyuubi_system_user() as `system_user`") + assert(resultSet.next()) + assert(resultSet.getString(1) === Utils.currentUser) + + resultSet = statement.executeQuery("select kyuubi_session_user() as `session_user`") + assert(resultSet.next()) + assert(resultSet.getString(1) === "paullin") + } + } } diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala index 931d500f7..25e23d82a 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala @@ -19,7 +19,9 @@ package org.apache.kyuubi.engine.flink.operation import java.util.UUID +import org.apache.kyuubi.{KYUUBI_VERSION, Utils} import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_TYPE} +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineOnYarn} import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE} @@ -34,7 +36,8 @@ class FlinkOperationOnYarnSuite extends FlinkOperationSuite HA_NAMESPACE.key -> namespace, HA_ENGINE_REF_ID.key -> engineRefId, ENGINE_TYPE.key -> "FLINK_SQL", - ENGINE_SHARE_LEVEL.key -> shareLevel) ++ testExtraConf + ENGINE_SHARE_LEVEL.key -> shareLevel, + KYUUBI_SESSION_USER_KEY -> "paullin") ++ testExtraConf } override protected def engineRefId: String = UUID.randomUUID().toString @@ -44,4 +47,28 @@ class FlinkOperationOnYarnSuite extends FlinkOperationSuite def shareLevel: String = ShareLevel.USER.toString def engineType: String = "flink" + + test("execute statement - kyuubi defined functions") { + withJdbcStatement() { statement => + var resultSet = statement.executeQuery("select kyuubi_version() as kyuubi_version") + assert(resultSet.next()) + assert(resultSet.getString(1) === KYUUBI_VERSION) + + resultSet = statement.executeQuery("select kyuubi_engine_name() as engine_name") + assert(resultSet.next()) + assert(resultSet.getString(1).startsWith(s"kyuubi_${Utils.currentUser}_flink")) + + resultSet = statement.executeQuery("select kyuubi_engine_id() as engine_id") + assert(resultSet.next()) + assert(resultSet.getString(1).startsWith("application_")) + + resultSet = statement.executeQuery("select kyuubi_system_user() as `system_user`") + assert(resultSet.next()) + assert(resultSet.getString(1) === Utils.currentUser) + + resultSet = statement.executeQuery("select kyuubi_session_user() as `session_user`") + assert(resultSet.next()) + assert(resultSet.getString(1) === "paullin") + } + } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala index 30228bf72..6bc2e3ddb 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/udf/KyuubiDefinedFunction.scala @@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.udf import org.apache.spark.sql.expressions.UserDefinedFunction /** - * A wrapper for Spark' [[UserDefinedFunction]] + * A wrapper for Spark's [[UserDefinedFunction]] * * @param name function name * @param udf user-defined function