diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala index dff9aa602..85232688e 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala @@ -31,7 +31,7 @@ import org.apache.flink.table.gateway.service.context.DefaultContext import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY} -import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} import org.apache.kyuubi.config.KyuubiConf.ENGINE_FLINK_INITIALIZE_SQL import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY} import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine} @@ -99,6 +99,12 @@ object FlinkSQLEngine extends Logging { kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) + val engineCredentials = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY) + kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY) + engineCredentials.filter(_.nonEmpty).foreach { credentials => + FlinkTBinaryFrontendService.renewDelegationToken(credentials) + } + val engineContext = FlinkEngineUtils.getDefaultContext(args, flinkConf, flinkConfDir) startEngine(engineContext) info("Flink engine started") diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala index a709ea760..79a6dff96 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkTBinaryFrontendService.scala @@ -17,13 +17,36 @@ package org.apache.kyuubi.engine.flink +import org.apache.hadoop.security.{Credentials, UserGroupInformation} + +import org.apache.kyuubi.{KyuubiSQLException, Logging} +import org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService.renewDelegationToken import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery} import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService} +import org.apache.kyuubi.service.TFrontendService.OK_STATUS +import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TRenewDelegationTokenReq, TRenewDelegationTokenResp} +import org.apache.kyuubi.util.KyuubiHadoopUtils class FlinkTBinaryFrontendService( override val serverable: Serverable) extends TBinaryFrontendService("FlinkThriftBinaryFrontendService") { + override def RenewDelegationToken(req: TRenewDelegationTokenReq): TRenewDelegationTokenResp = { + debug(req.toString) + // We hacked `TCLIService.Iface.RenewDelegationToken` to transfer Credentials from Kyuubi + // Server to Flink SQL engine + val resp = new TRenewDelegationTokenResp() + try { + renewDelegationToken(req.getDelegationToken) + resp.setStatus(OK_STATUS) + } catch { + case e: Exception => + warn("Error renew delegation tokens: ", e) + resp.setStatus(KyuubiSQLException.toTStatus(e)) + } + resp + } + override lazy val discoveryService: Option[Service] = { if (ServiceDiscovery.supportServiceDiscovery(conf)) { Some(new EngineServiceDiscovery(this)) @@ -33,3 +56,33 @@ class FlinkTBinaryFrontendService( } } + +object FlinkTBinaryFrontendService extends Logging { + private[flink] def renewDelegationToken(delegationToken: String): Unit = { + val newCreds = KyuubiHadoopUtils.decodeCredentials(delegationToken) + val newTokens = KyuubiHadoopUtils.getTokenMap(newCreds) + + val updateCreds = new Credentials() + val oldCreds = UserGroupInformation.getCurrentUser.getCredentials + newTokens.foreach { case (alias, newToken) => + val oldToken = oldCreds.getToken(alias) + if (oldToken != null) { + if (KyuubiHadoopUtils.compareIssueDate(newToken, oldToken) > 0) { + updateCreds.addToken(alias, newToken) + } else { + warn(s"Ignore token with earlier issue date: $newToken") + } + } else { + info(s"Add new unknown token $newToken") + updateCreds.addToken(alias, newToken) + } + } + + if (updateCreds.numberOfTokens() > 0) { + info("Update delegation tokens. " + + s"The number of tokens sent by the server is ${newCreds.numberOfTokens()}. " + + s"The actual number of updated tokens is ${updateCreds.numberOfTokens()}.") + UserGroupInformation.getCurrentUser.addCredentials(updateCreds) + } + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala index 7ca2e8fbe..2eed5253d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala @@ -162,7 +162,7 @@ object SparkTBinaryFrontendService extends Logging { } .map(_._2) newToken.foreach { token => - if (compareIssueDate(token, oldAliasAndToken.get._2) > 0) { + if (KyuubiHadoopUtils.compareIssueDate(token, oldAliasAndToken.get._2) > 0) { updateCreds.addToken(oldAliasAndToken.get._1, token) } else { warn(s"Ignore Hive token with earlier issue date: $token") @@ -186,7 +186,7 @@ object SparkTBinaryFrontendService extends Logging { tokens.foreach { case (alias, newToken) => val oldToken = oldCreds.getToken(alias) if (oldToken != null) { - if (compareIssueDate(newToken, oldToken) > 0) { + if (KyuubiHadoopUtils.compareIssueDate(newToken, oldToken) > 0) { updateCreds.addToken(alias, newToken) } else { warn(s"Ignore token with earlier issue date: $newToken") @@ -197,18 +197,6 @@ object SparkTBinaryFrontendService extends Logging { } } - private def compareIssueDate( - newToken: Token[_ <: TokenIdentifier], - oldToken: Token[_ <: TokenIdentifier]): Int = { - val newDate = KyuubiHadoopUtils.getTokenIssueDate(newToken) - val oldDate = KyuubiHadoopUtils.getTokenIssueDate(oldToken) - if (newDate.isDefined && oldDate.isDefined && newDate.get <= oldDate.get) { - -1 - } else { - 1 - } - } - private[kyuubi] def hiveConf(hadoopConf: Configuration): Configuration = { if (_hiveConf == null) { synchronized { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala index a3047d0f4..93135ba3e 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala @@ -116,6 +116,18 @@ object KyuubiHadoopUtils extends Logging { } } + def compareIssueDate( + newToken: Token[_ <: TokenIdentifier], + oldToken: Token[_ <: TokenIdentifier]): Int = { + val newDate = KyuubiHadoopUtils.getTokenIssueDate(newToken) + val oldDate = KyuubiHadoopUtils.getTokenIssueDate(oldToken) + if (newDate.isDefined && oldDate.isDefined && newDate.get <= oldDate.get) { + -1 + } else { + 1 + } + } + /** * Add a path variable to the given environment map. * If the map already contains this key, append the value to the existing value instead. diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala index 241b7ec78..3fa7ea50a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala @@ -25,7 +25,7 @@ import scala.collection.mutable import com.google.common.annotations.VisibleForTesting import org.apache.kyuubi._ -import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} +import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder} @@ -80,8 +80,6 @@ class FlinkProcessBuilder( override protected val commands: Iterable[String] = { KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf) - // unset engine credentials because Flink doesn't support them at the moment - conf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY) // flink.execution.target are required in Kyuubi conf currently executionTarget match { case Some("yarn-application") => diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala index 952f71c08..8786ef798 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala @@ -72,7 +72,6 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { (FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop") private def confStr: String = { sessionModeConf.clone.getAll - .filter(!_._1.equals(KYUUBI_ENGINE_CREDENTIALS_KEY)) .map { case (k, v) => s"\\\\\\n\\t--conf $k=$v" } .mkString(" ") }