[KYUUBI #4938][FLINK] Implement Kyuubi UDF in Flink engine

### _Why are the changes needed?_
As titled.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

Closes #5014 from link3280/KYUUBI-4938.

Closes #4938

c43d480f9 [Paul Lin] [KYUUBI #4938][FLINK] Update function description
0dd991f03 [Paul Lin] [KYUUBI #4938][FLINK] Fix compatibility problems with Flink 1.16
7e6a3b184 [Paul Lin] [KYUUBI #4938][FLINK] Fix inconsistent istant in engine names
6ecde4c60 [Paul Lin] [KYUUBI #4938][FLINK] Implement Kyuubi UDF in Flink engine

Authored-by: Paul Lin <paullin3280@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Paul Lin 2023-07-10 12:29:55 +08:00 committed by Cheng Pan
parent 47562b464b
commit 8d0010dee0
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
7 changed files with 278 additions and 24 deletions

View File

@ -31,6 +31,7 @@ import org.apache.flink.table.gateway.service.context.DefaultContext
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.SignalRegister
@ -92,26 +93,7 @@ object FlinkSQLEngine extends Logging {
flinkConf.addAll(Configuration.fromMap(flinkConfFromArgs.asJava))
val executionTarget = flinkConf.getString(DeploymentOptions.TARGET)
// set cluster name for per-job and application mode
executionTarget match {
case "yarn-per-job" | "yarn-application" =>
if (!flinkConf.containsKey("yarn.application.name")) {
val appName = s"kyuubi_${user}_flink_${Instant.now}"
flinkConf.setString("yarn.application.name", appName)
}
if (flinkConf.containsKey("high-availability.cluster-id")) {
flinkConf.setString(
"yarn.application.id",
flinkConf.toMap.get("high-availability.cluster-id"))
}
case "kubernetes-application" =>
if (!flinkConf.containsKey("kubernetes.cluster-id")) {
val appName = s"kyuubi-${user}-flink-${Instant.now}"
flinkConf.setString("kubernetes.cluster-id", appName)
}
case other =>
debug(s"Skip generating app name for execution target $other")
}
setDeploymentConf(executionTarget, flinkConf)
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
@ -153,4 +135,35 @@ object FlinkSQLEngine extends Logging {
res.await()
info("Initial Flink SQL finished.")
}
private def setDeploymentConf(executionTarget: String, flinkConf: Configuration): Unit = {
// forward kyuubi engine variables to flink configuration
val instant = Instant.now
val engineName = s"kyuubi_${user}_flink_$instant"
flinkConf.setString(KYUUBI_ENGINE_NAME, engineName)
kyuubiConf.getOption(KYUUBI_SESSION_USER_KEY).foreach(user =>
flinkConf.setString(KYUUBI_SESSION_USER_KEY, user))
// set cluster name for per-job and application mode
executionTarget match {
case "yarn-per-job" | "yarn-application" =>
if (!flinkConf.containsKey("yarn.application.name")) {
val appName = engineName
flinkConf.setString("yarn.application.name", appName)
}
if (flinkConf.containsKey("high-availability.cluster-id")) {
flinkConf.setString(
"yarn.application.id",
flinkConf.toMap.get("high-availability.cluster-id"))
}
case "kubernetes-application" =>
if (!flinkConf.containsKey("kubernetes.cluster-id")) {
val appName = s"kyuubi-${user}-flink-$instant"
flinkConf.setString("kubernetes.cluster-id", appName)
}
case other =>
debug(s"Skip generating app name for execution target $other")
}
}
}

View File

@ -30,6 +30,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.udf.KDFRegistry
import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager, USE_CATALOG, USE_DATABASE}
class FlinkSessionImpl(
@ -46,10 +47,12 @@ class FlinkSessionImpl(
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID)
.getOrElse(SessionHandle.fromUUID(fSession.getSessionHandle.getIdentifier.toString))
lazy val sessionContext: SessionContext = {
val sessionContext: SessionContext = {
FlinkEngineUtils.getSessionContext(fSession)
}
KDFRegistry.registerAll(sessionContext)
private def setModifiableConfig(key: String, value: String): Unit = {
try {
sessionContext.set(key, value)

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.udf
import java.util
import scala.collection.mutable.ArrayBuffer
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.functions.{ScalarFunction, UserDefinedFunction}
import org.apache.flink.table.gateway.service.context.SessionContext
import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.util.reflect.DynMethods
object KDFRegistry {
def createKyuubiDefinedFunctions(sessionContext: SessionContext): Array[KyuubiDefinedFunction] = {
val kyuubiDefinedFunctions = new ArrayBuffer[KyuubiDefinedFunction]
val flinkConfigMap: util.Map[String, String] = {
if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
DynMethods
.builder("getConfigMap")
.impl(classOf[SessionContext])
.build()
.invoke(sessionContext)
.asInstanceOf[util.Map[String, String]]
} else {
DynMethods
.builder("getSessionConf")
.impl(classOf[SessionContext])
.build()
.invoke(sessionContext)
.asInstanceOf[Configuration]
.toMap
}
}
val kyuubi_version: KyuubiDefinedFunction = create(
"kyuubi_version",
new KyuubiVersionFunction(flinkConfigMap),
"Return the version of Kyuubi Server",
"string",
"1.8.0")
kyuubiDefinedFunctions += kyuubi_version
val engineName: KyuubiDefinedFunction = create(
"kyuubi_engine_name",
new EngineNameFunction(flinkConfigMap),
"Return the application name for the associated query engine",
"string",
"1.8.0")
kyuubiDefinedFunctions += engineName
val engineId: KyuubiDefinedFunction = create(
"kyuubi_engine_id",
new EngineIdFunction(flinkConfigMap),
"Return the application id for the associated query engine",
"string",
"1.8.0")
kyuubiDefinedFunctions += engineId
val systemUser: KyuubiDefinedFunction = create(
"kyuubi_system_user",
new SystemUserFunction(flinkConfigMap),
"Return the system user name for the associated query engine",
"string",
"1.8.0")
kyuubiDefinedFunctions += systemUser
val sessionUser: KyuubiDefinedFunction = create(
"kyuubi_session_user",
new SessionUserFunction(flinkConfigMap),
"Return the session username for the associated query engine",
"string",
"1.8.0")
kyuubiDefinedFunctions += sessionUser
kyuubiDefinedFunctions.toArray
}
def create(
name: String,
udf: UserDefinedFunction,
description: String,
returnType: String,
since: String): KyuubiDefinedFunction = {
val kdf = KyuubiDefinedFunction(name, udf, description, returnType, since)
kdf
}
def registerAll(sessionContext: SessionContext): Unit = {
val functions = createKyuubiDefinedFunctions(sessionContext)
for (func <- functions) {
sessionContext.getSessionState.functionCatalog
.registerTemporarySystemFunction(func.name, func.udf, true)
}
}
}
class KyuubiVersionFunction(confMap: util.Map[String, String]) extends ScalarFunction {
def eval(): String = KYUUBI_VERSION
}
class EngineNameFunction(confMap: util.Map[String, String]) extends ScalarFunction {
def eval(): String = {
confMap match {
case m if m.containsKey("yarn.application.name") => m.get("yarn.application.name")
case m if m.containsKey("kubernetes.cluster-id") => m.get("kubernetes.cluster-id")
case m => m.getOrDefault(KYUUBI_ENGINE_NAME, "unknown-engine-name")
}
}
}
class EngineIdFunction(confMap: util.Map[String, String]) extends ScalarFunction {
def eval(): String = {
confMap match {
case m if m.containsKey("yarn.application.id") => m.get("yarn.application.id")
case m if m.containsKey("kubernetes.cluster-id") => m.get("kubernetes.cluster-id")
case m => m.getOrDefault("high-availability.cluster-id", "unknown-engine-id")
}
}
}
class SystemUserFunction(confMap: util.Map[String, String]) extends ScalarFunction {
def eval(): String = Utils.currentUser
}
class SessionUserFunction(confMap: util.Map[String, String]) extends ScalarFunction {
def eval(): String = confMap.getOrDefault(KYUUBI_SESSION_USER_KEY, "unknown-user")
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.udf
import org.apache.flink.table.functions.UserDefinedFunction
/**
* A wrapper for Flink's [[UserDefinedFunction]]
*
* @param name function name
* @param udf user-defined function
* @param description function description
*/
case class KyuubiDefinedFunction(
name: String,
udf: UserDefinedFunction,
description: String,
returnType: String,
since: String)

View File

@ -19,7 +19,9 @@ package org.apache.kyuubi.engine.flink.operation
import java.util.UUID
import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineLocal}
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
@ -33,11 +35,13 @@ class FlinkOperationLocalSuite extends FlinkOperationSuite
override def withKyuubiConf: Map[String, String] = {
Map(
"flink.execution.target" -> "remote",
"flink.high-availability.cluster-id" -> "flink-mini-cluster",
HA_NAMESPACE.key -> namespace,
HA_ENGINE_REF_ID.key -> engineRefId,
ENGINE_TYPE.key -> "FLINK_SQL",
ENGINE_SHARE_LEVEL.key -> shareLevel,
OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name) ++ testExtraConf
OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name,
KYUUBI_SESSION_USER_KEY -> "paullin") ++ testExtraConf
}
override protected def engineRefId: String = UUID.randomUUID().toString
@ -48,4 +52,27 @@ class FlinkOperationLocalSuite extends FlinkOperationSuite
def engineType: String = "flink"
test("execute statement - kyuubi defined functions") {
withJdbcStatement() { statement =>
var resultSet = statement.executeQuery("select kyuubi_version() as kyuubi_version")
assert(resultSet.next())
assert(resultSet.getString(1) === KYUUBI_VERSION)
resultSet = statement.executeQuery("select kyuubi_engine_name() as engine_name")
assert(resultSet.next())
assert(resultSet.getString(1).startsWith(s"kyuubi_${Utils.currentUser}_flink"))
resultSet = statement.executeQuery("select kyuubi_engine_id() as engine_id")
assert(resultSet.next())
assert(resultSet.getString(1) === "flink-mini-cluster")
resultSet = statement.executeQuery("select kyuubi_system_user() as `system_user`")
assert(resultSet.next())
assert(resultSet.getString(1) === Utils.currentUser)
resultSet = statement.executeQuery("select kyuubi_session_user() as `session_user`")
assert(resultSet.next())
assert(resultSet.getString(1) === "paullin")
}
}
}

View File

@ -19,7 +19,9 @@ package org.apache.kyuubi.engine.flink.operation
import java.util.UUID
import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_TYPE}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineOnYarn}
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
@ -34,7 +36,8 @@ class FlinkOperationOnYarnSuite extends FlinkOperationSuite
HA_NAMESPACE.key -> namespace,
HA_ENGINE_REF_ID.key -> engineRefId,
ENGINE_TYPE.key -> "FLINK_SQL",
ENGINE_SHARE_LEVEL.key -> shareLevel) ++ testExtraConf
ENGINE_SHARE_LEVEL.key -> shareLevel,
KYUUBI_SESSION_USER_KEY -> "paullin") ++ testExtraConf
}
override protected def engineRefId: String = UUID.randomUUID().toString
@ -44,4 +47,28 @@ class FlinkOperationOnYarnSuite extends FlinkOperationSuite
def shareLevel: String = ShareLevel.USER.toString
def engineType: String = "flink"
test("execute statement - kyuubi defined functions") {
withJdbcStatement() { statement =>
var resultSet = statement.executeQuery("select kyuubi_version() as kyuubi_version")
assert(resultSet.next())
assert(resultSet.getString(1) === KYUUBI_VERSION)
resultSet = statement.executeQuery("select kyuubi_engine_name() as engine_name")
assert(resultSet.next())
assert(resultSet.getString(1).startsWith(s"kyuubi_${Utils.currentUser}_flink"))
resultSet = statement.executeQuery("select kyuubi_engine_id() as engine_id")
assert(resultSet.next())
assert(resultSet.getString(1).startsWith("application_"))
resultSet = statement.executeQuery("select kyuubi_system_user() as `system_user`")
assert(resultSet.next())
assert(resultSet.getString(1) === Utils.currentUser)
resultSet = statement.executeQuery("select kyuubi_session_user() as `session_user`")
assert(resultSet.next())
assert(resultSet.getString(1) === "paullin")
}
}
}

View File

@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.udf
import org.apache.spark.sql.expressions.UserDefinedFunction
/**
* A wrapper for Spark' [[UserDefinedFunction]]
* A wrapper for Spark's [[UserDefinedFunction]]
*
* @param name function name
* @param udf user-defined function