[KYUUBI #6367] Flink SQL engine supports RenewDelegationToken

# 🔍 Description
## Issue References 🔗

This pull request fixes #6367

## Describe Your Solution 🔧

+ Implement `RenewDelegationToken` method in `FlinkTBinaryFrontendService`.
+ Pass `kyuubi.engine.credentials` configuration when starting flink engine.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [X] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉
test connection:

```
"jdbc:hive2://hadoop-master1.orb.local:10009/default;hive.server2.proxy.user=spark;principal=kyuubi/_HOSTTEST.ORG?kyuubi.engine.type=FLINK_SQL;flink.execution.target=yarn-application"
```

flink engine builder command:

![image](https://github.com/apache/kyuubi/assets/17894939/dcdb8466-c423-464d-8119-9c4236f17ce7)

jobmanager log:

```
2024-05-22 07:46:46,545 INFO  org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService   [] - Add new unknown token Kind: HIVE_DELEGATION_TOKEN, Service: , Ident: 00 05 73 70 61 72 6b 04 68 69 76 65 28 6b 79 75 75 62 69 2f 68 61 64 6f 6f 70 2d 6d 61 73 74 65 72 31 2e 6f 72 62 2e 6c 6f 63 61 6c 40 54 45 53 54 2e 4f 52 47 8a 01 8f 9f 3f d5 4c 8a 01 8f c3 4c 59 4c 0b 06
2024-05-22 07:46:46,547 WARN  org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService   [] - Ignore token with earlier issue date: Kind: HDFS_DELEGATION_TOKEN, Service: 172.20.0.5:8020, Ident: (token for spark: HDFS_DELEGATION_TOKEN owner=spark, renewer=spark, realUser=kyuubi/hadoop-master1.orb.localTEST.ORG, issueDate=1716363711750, maxDate=1716968511750, sequenceNumber=15, masterKeyId=7)
2024-05-22 07:46:46,548 INFO  org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService   [] - Update delegation tokens. The number of tokens sent by the server is 2. The actual number of updated tokens is 1.

```

#### Related Unit Tests

---

# Checklist 📝

- [X] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6371 from wForget/KYUUBI-6367.

Closes #6367

83b402aa0 [wforget] Revert "change Base64 encoder/decoder"
f5c08eb45 [wforget] change Base64 encoder/decoder
e8c66dfc5 [wforget] fix test
e59820b3e [wforget] [KYUUBI #6367] Support RenewDelegationToken for flink sql engine

Authored-by: wforget <643348094@qq.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
wforget 2024-05-24 12:15:44 +08:00 committed by Cheng Pan
parent b89c185eec
commit 4cbecdc12f
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
6 changed files with 75 additions and 19 deletions

View File

@ -31,7 +31,7 @@ import org.apache.flink.table.gateway.service.context.DefaultContext
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.ENGINE_FLINK_INITIALIZE_SQL
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine}
@ -99,6 +99,12 @@ object FlinkSQLEngine extends Logging {
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
val engineCredentials = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
kyuubiConf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
engineCredentials.filter(_.nonEmpty).foreach { credentials =>
FlinkTBinaryFrontendService.renewDelegationToken(credentials)
}
val engineContext = FlinkEngineUtils.getDefaultContext(args, flinkConf, flinkConfDir)
startEngine(engineContext)
info("Flink engine started")

View File

@ -17,13 +17,36 @@
package org.apache.kyuubi.engine.flink
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.flink.FlinkTBinaryFrontendService.renewDelegationToken
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}
import org.apache.kyuubi.service.TFrontendService.OK_STATUS
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TRenewDelegationTokenReq, TRenewDelegationTokenResp}
import org.apache.kyuubi.util.KyuubiHadoopUtils
class FlinkTBinaryFrontendService(
override val serverable: Serverable)
extends TBinaryFrontendService("FlinkThriftBinaryFrontendService") {
override def RenewDelegationToken(req: TRenewDelegationTokenReq): TRenewDelegationTokenResp = {
debug(req.toString)
// We hacked `TCLIService.Iface.RenewDelegationToken` to transfer Credentials from Kyuubi
// Server to Flink SQL engine
val resp = new TRenewDelegationTokenResp()
try {
renewDelegationToken(req.getDelegationToken)
resp.setStatus(OK_STATUS)
} catch {
case e: Exception =>
warn("Error renew delegation tokens: ", e)
resp.setStatus(KyuubiSQLException.toTStatus(e))
}
resp
}
override lazy val discoveryService: Option[Service] = {
if (ServiceDiscovery.supportServiceDiscovery(conf)) {
Some(new EngineServiceDiscovery(this))
@ -33,3 +56,33 @@ class FlinkTBinaryFrontendService(
}
}
object FlinkTBinaryFrontendService extends Logging {
private[flink] def renewDelegationToken(delegationToken: String): Unit = {
val newCreds = KyuubiHadoopUtils.decodeCredentials(delegationToken)
val newTokens = KyuubiHadoopUtils.getTokenMap(newCreds)
val updateCreds = new Credentials()
val oldCreds = UserGroupInformation.getCurrentUser.getCredentials
newTokens.foreach { case (alias, newToken) =>
val oldToken = oldCreds.getToken(alias)
if (oldToken != null) {
if (KyuubiHadoopUtils.compareIssueDate(newToken, oldToken) > 0) {
updateCreds.addToken(alias, newToken)
} else {
warn(s"Ignore token with earlier issue date: $newToken")
}
} else {
info(s"Add new unknown token $newToken")
updateCreds.addToken(alias, newToken)
}
}
if (updateCreds.numberOfTokens() > 0) {
info("Update delegation tokens. " +
s"The number of tokens sent by the server is ${newCreds.numberOfTokens()}. " +
s"The actual number of updated tokens is ${updateCreds.numberOfTokens()}.")
UserGroupInformation.getCurrentUser.addCredentials(updateCreds)
}
}
}

View File

@ -162,7 +162,7 @@ object SparkTBinaryFrontendService extends Logging {
}
.map(_._2)
newToken.foreach { token =>
if (compareIssueDate(token, oldAliasAndToken.get._2) > 0) {
if (KyuubiHadoopUtils.compareIssueDate(token, oldAliasAndToken.get._2) > 0) {
updateCreds.addToken(oldAliasAndToken.get._1, token)
} else {
warn(s"Ignore Hive token with earlier issue date: $token")
@ -186,7 +186,7 @@ object SparkTBinaryFrontendService extends Logging {
tokens.foreach { case (alias, newToken) =>
val oldToken = oldCreds.getToken(alias)
if (oldToken != null) {
if (compareIssueDate(newToken, oldToken) > 0) {
if (KyuubiHadoopUtils.compareIssueDate(newToken, oldToken) > 0) {
updateCreds.addToken(alias, newToken)
} else {
warn(s"Ignore token with earlier issue date: $newToken")
@ -197,18 +197,6 @@ object SparkTBinaryFrontendService extends Logging {
}
}
private def compareIssueDate(
newToken: Token[_ <: TokenIdentifier],
oldToken: Token[_ <: TokenIdentifier]): Int = {
val newDate = KyuubiHadoopUtils.getTokenIssueDate(newToken)
val oldDate = KyuubiHadoopUtils.getTokenIssueDate(oldToken)
if (newDate.isDefined && oldDate.isDefined && newDate.get <= oldDate.get) {
-1
} else {
1
}
}
private[kyuubi] def hiveConf(hadoopConf: Configuration): Configuration = {
if (_hiveConf == null) {
synchronized {

View File

@ -116,6 +116,18 @@ object KyuubiHadoopUtils extends Logging {
}
}
def compareIssueDate(
newToken: Token[_ <: TokenIdentifier],
oldToken: Token[_ <: TokenIdentifier]): Int = {
val newDate = KyuubiHadoopUtils.getTokenIssueDate(newToken)
val oldDate = KyuubiHadoopUtils.getTokenIssueDate(oldToken)
if (newDate.isDefined && oldDate.isDefined && newDate.get <= oldDate.get) {
-1
} else {
1
}
}
/**
* Add a path variable to the given environment map.
* If the map already contains this key, append the value to the existing value instead.

View File

@ -25,7 +25,7 @@ import scala.collection.mutable
import com.google.common.annotations.VisibleForTesting
import org.apache.kyuubi._
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder}
@ -80,8 +80,6 @@ class FlinkProcessBuilder(
override protected val commands: Iterable[String] = {
KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf)
// unset engine credentials because Flink doesn't support them at the moment
conf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
// flink.execution.target are required in Kyuubi conf currently
executionTarget match {
case Some("yarn-application") =>

View File

@ -72,7 +72,6 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
(FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop")
private def confStr: String = {
sessionModeConf.clone.getAll
.filter(!_._1.equals(KYUUBI_ENGINE_CREDENTIALS_KEY))
.map { case (k, v) => s"\\\\\\n\\t--conf $k=$v" }
.mkString(" ")
}