[KYUUBI #1007] Implement delegation token renewal framework

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
This PR finishes sub-task #1007 under umbrella issue #913.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1015 from zhouyifan279/KYUUBI#1007.

Closes #1007

94263804 [zhouyifan279] [KYUUBI #1007] Implement delegation token renewal framework
6cea96d0 [zhouyifan279] [KYUUBI #1007] Implement delegation token renewal framework
dbffcf15 [zhouyifan279] [KYUUBI #1007] Implement delegation token renewal framework
57100271 [zhouyifan279] [KYUUBI #1007] Implement delegation token renewal framework

Authored-by: zhouyifan279 <zhouyifan279@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
zhouyifan279 2021-09-03 18:40:30 +08:00 committed by Kent Yao
parent 01bebe36e8
commit f8056f458b
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
11 changed files with 678 additions and 1 deletions

View File

@ -147,6 +147,14 @@ kyuubi\.backend\.server<br>\.exec\.pool\.size|<div style='width: 65pt;word-wrap:
kyuubi\.backend\.server<br>\.exec\.pool\.wait\.queue<br>\.size|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>100</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Size of the wait queue for the operation execution thread pool of Kyuubi server</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
### Credentials
Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
kyuubi\.credentials<br>\.renewal\.interval|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1H</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>How often Kyuubi renews one user's DelegationTokens</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.credentials<br>\.renewal\.retryWait|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>How long to wait before retrying to fetch new credentials after a failure.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
### Delegation
Key | Default | Meaning | Type | Since

View File

@ -232,6 +232,20 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofHours(3).toMillis)
val CREDENTIALS_RENEWAL_INTERVAL: ConfigEntry[Long] =
buildConf("credentials.renewal.interval")
.doc("How often Kyuubi renews one user's DelegationTokens")
.version("1.4.0")
.timeConf
.createWithDefault(Duration.ofHours(1).toMillis)
val CREDENTIALS_RENEWAL_RETRY_WAIT: ConfigEntry[Long] =
buildConf("credentials.renewal.retryWait")
.doc("How long to wait before retrying to fetch new credentials after a failure.")
.version("1.4.0")
.timeConf
.checkValue(t => t > 0, "must be positive integer")
.createWithDefault(Duration.ofMinutes(1).toMillis)
/////////////////////////////////////////////////////////////////////////////////////////////////
// Frontend Service Configuration //

View File

@ -17,8 +17,11 @@
package org.apache.kyuubi.util
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.SecurityUtil
import org.apache.hadoop.security.{Credentials, SecurityUtil}
import org.apache.kyuubi.config.KyuubiConf
@ -33,4 +36,23 @@ object KyuubiHadoopUtils {
def getServerPrincipal(principal: String): String = {
SecurityUtil.getServerPrincipal(principal, "0.0.0.0")
}
def encodeCredentials(creds: Credentials): String = {
val byteStream = new ByteArrayOutputStream
creds.writeTokenStorageToStream(new DataOutputStream(byteStream))
val encoder = new Base64(0, null, false)
encoder.encodeToString(byteStream.toByteArray)
}
def decodeCredentials(newValue: String): Credentials = {
val decoder = new Base64(0, null, false)
val decoded = decoder.decode(newValue)
val byteStream = new ByteArrayInputStream(decoded)
val creds = new Credentials()
creds.readTokenStorageStream(new DataInputStream(byteStream))
creds
}
}

View File

@ -17,8 +17,15 @@
package org.apache.kyuubi.util
import scala.util.Random
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.token.Token
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.authentication.KyuubiDelegationTokenIdentifier
class KyuubiHadoopUtilsSuite extends KyuubiFunSuite {
@ -35,4 +42,21 @@ class KyuubiHadoopUtilsSuite extends KyuubiFunSuite {
assert(hadoopConf.get(xyz) === "abc")
assert(hadoopConf.get(test) === "t")
}
test("encode/decode credentials") {
val identifier = new KyuubiDelegationTokenIdentifier()
val password = new Array[Byte](128)
Random.nextBytes(password)
val token = new Token[KyuubiDelegationTokenIdentifier](
identifier.getBytes,
password,
identifier.getKind,
new Text(""))
val credentials = new Credentials()
credentials.addToken(token.getKind, token)
val decoded = KyuubiHadoopUtils.decodeCredentials(
KyuubiHadoopUtils.encodeCredentials(credentials))
assert(decoded.getToken(token.getKind) == credentials.getToken(token.getKind))
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.credentials
import org.apache.hadoop.security.Credentials
import org.apache.kyuubi.credentials.CredentialsRef.UNSET_EPOCH
import org.apache.kyuubi.util.KyuubiHadoopUtils
class CredentialsRef(appUser: String) {
@volatile
private var epoch = UNSET_EPOCH
private var encodedCredentials: String = _
def getEpoch: Long = epoch
def getAppUser: String = appUser
def getEncodedCredentials: String = {
encodedCredentials
}
def updateCredentials(creds: Credentials): Unit = {
encodedCredentials = KyuubiHadoopUtils.encodeCredentials(creds)
epoch += 1
}
}
object CredentialsRef {
val UNSET_EPOCH: Long = -1L
}

View File

@ -0,0 +1,261 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.credentials
import java.util.ServiceLoader
import java.util.concurrent._
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils}
/**
* [[HadoopCredentialsManager]] manages and renews delegation tokens, which are used by SQL engines
* to access kerberos secured services.
*
* Delegation tokens are sent to SQL engines by calling [[sendCredentialsIfNeeded]].
* [[sendCredentialsIfNeeded]] executes the following steps:
* <ol>
* <li>
* Get or create a cached [[CredentialsRef]](contains delegation tokens) object by key
* appUser. If [[CredentialsRef]] is newly created, spawn a scheduled task to renew the
* delegation tokens.
* </li>
* <li>
* Get or create a cached session credentials epoch object by key sessionId.
* </li>
* <li>
* Compare [[CredentialsRef]] epoch with session credentials epoch. (Both epochs are set
* to -1 when created. [[CredentialsRef]] epoch is increased when delegation tokens are
* renewed.)
* </li>
* <li>
* If epochs are equal, return. Else, send delegation tokens to the SQL engine.
* </li>
* <li>
* If sending succeeds, set session credentials epoch to [[CredentialsRef]] epoch. Else,
* record the exception and return.
* </li>
* </ol>
*
* @note
* <ol>
* <li>
* Session credentials epochs are created in session scope and should be removed using
* [[removeSessionCredentialsEpoch]] when session closes.
* </li>
* <li>
* [[HadoopCredentialsManager]] does not renew and send credentials if no provider is left after
* initialize.
* </li>
* </ol>
*/
class HadoopCredentialsManager private (name: String) extends AbstractService(name)
with Logging {
def this() = this(classOf[HadoopCredentialsManager].getSimpleName)
private val userCredentialsRefMap = new ConcurrentHashMap[String, CredentialsRef]()
private val sessionCredentialsEpochMap = new ConcurrentHashMap[String, Long]()
private var providers: Map[String, HadoopDelegationTokenProvider] = _
private var renewalInterval: Long = _
private var renewalRetryWait: Long = _
private var hadoopConf: Configuration = _
private[credentials] var renewalExecutor: Option[ScheduledExecutorService] = None
override def initialize(conf: KyuubiConf): Unit = {
hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
providers = HadoopCredentialsManager.loadProviders(conf)
.filter { case (_, provider) =>
val required = provider.delegationTokensRequired(hadoopConf, conf)
if (!required) {
warn(s"Service ${provider.serviceName} does not require a token." +
s" Check your configuration to see if security is disabled or not.")
}
required
}
if (providers.isEmpty) {
warn("No delegation token is required by services.")
} else {
info("Using the following builtin delegation token providers: " +
s"${providers.keys.mkString(", ")}.")
}
renewalInterval = conf.get(CREDENTIALS_RENEWAL_INTERVAL)
renewalRetryWait = conf.get(CREDENTIALS_RENEWAL_RETRY_WAIT)
super.initialize(conf)
}
override def start(): Unit = {
if (providers.nonEmpty) {
renewalExecutor =
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("Delegation Token Renewal Thread"))
}
super.start()
}
override def stop(): Unit = {
renewalExecutor.foreach { executor =>
executor.shutdownNow()
try {
executor.awaitTermination(10, TimeUnit.SECONDS)
} catch {
case _: InterruptedException =>
}
}
super.stop()
}
/**
* Send credentials to SQL engine which the specified session is talking to if
* [[HadoopCredentialsManager]] has a newer credentials.
*
* @param sessionId Specify the session which is talking with SQL engine
* @param appUser User identity that the SQL engine uses.
* @param send Function to send encoded credentials to SQL engine
*/
def sendCredentialsIfNeeded(
sessionId: String,
appUser: String,
send: String => Unit): Unit = {
val userRef = getOrCreateUserCredentialsRef(appUser)
val sessionEpoch = getSessionCredentialsEpoch(sessionId)
if (userRef.getEpoch > sessionEpoch) {
val currentEpoch = userRef.getEpoch
val currentCreds = userRef.getEncodedCredentials
info(s"Send new credentials with epoch $currentEpoch to SQL engine through session " +
s"$sessionId")
Try(send(currentCreds)) match {
case Success(_) =>
info(s"Update session credentials epoch from $sessionEpoch to $currentEpoch")
sessionCredentialsEpochMap.put(sessionId, currentEpoch)
case Failure(exception) =>
warn(
s"Failed to send new credentials to SQL engine through session $sessionId",
exception)
}
}
}
/**
* Remove session credentials epoch corresponding to `sessionId`.
*
* @param sessionId KyuubiSession id
*/
def removeSessionCredentialsEpoch(sessionId: String): Unit = {
sessionCredentialsEpochMap.remove(sessionId)
}
// Visible for testing.
private[credentials] def getOrCreateUserCredentialsRef(appUser: String): CredentialsRef =
userCredentialsRefMap.computeIfAbsent(
appUser,
appUser => {
val ref = new CredentialsRef(appUser)
scheduleRenewal(ref, 0)
info(s"Created CredentialsRef for user $appUser and scheduled a renewal task")
ref
})
// Visible for testing.
private[credentials] def getSessionCredentialsEpoch(sessionId: String): Long = {
sessionCredentialsEpochMap.getOrDefault(sessionId, CredentialsRef.UNSET_EPOCH)
}
// Visible for testing.
private[credentials] def containsProvider(serviceName: String): Boolean = {
providers.contains(serviceName)
}
private def scheduleRenewal(userRef: CredentialsRef, delay: Long): Unit = {
val renewalTask = new Runnable {
override def run(): Unit = {
try {
val creds = new Credentials()
providers.values
.foreach(_.obtainDelegationTokens(hadoopConf, conf, userRef.getAppUser, creds))
userRef.updateCredentials(creds)
scheduleRenewal(userRef, renewalInterval)
} catch {
case _: InterruptedException =>
// Server is shutting down
case e: Exception =>
warn(
s"Failed to update tokens for ${userRef.getAppUser}, try again in" +
s" $renewalRetryWait ms",
e)
scheduleRenewal(userRef, renewalRetryWait)
}
}
}
renewalExecutor.foreach { executor =>
info(s"Scheduling renewal in $delay ms.")
executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS)
}
}
}
object HadoopCredentialsManager extends Logging {
private val providerEnabledConfig = "kyuubi.credentials.%s.enabled"
def loadProviders(kyuubiConf: KyuubiConf): Map[String, HadoopDelegationTokenProvider] = {
val loader =
ServiceLoader.load(classOf[HadoopDelegationTokenProvider], getClass.getClassLoader)
val providers = mutable.ArrayBuffer[HadoopDelegationTokenProvider]()
val iterator = loader.iterator
while (iterator.hasNext) {
try {
providers += iterator.next
} catch {
case t: Throwable =>
warn(s"Failed to load built in provider.", t)
}
}
// Filter out providers for which kyuubi.credentials.{service}.enabled is false.
providers
.filter { p => HadoopCredentialsManager.isServiceEnabled(kyuubiConf, p.serviceName) }
.map { p => (p.serviceName, p) }
.toMap
}
def isServiceEnabled(kyuubiConf: KyuubiConf, serviceName: String): Boolean = {
val key = providerEnabledConfig.format(serviceName)
kyuubiConf
.getOption(key)
.map(_.toBoolean)
.getOrElse(true)
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.credentials
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
trait HadoopDelegationTokenProvider extends Logging {
/**
* Name of the service to provide delegation tokens. This name should be unique. Kyuubi will
* internally use this name to differentiate delegation token providers.
*/
def serviceName: String
/**
* Returns true if delegation tokens are required for this service. By default, it is based on
* whether Hadoop security is enabled.
*/
def delegationTokensRequired(hadoopConf: Configuration, kyuubiConf: KyuubiConf): Boolean
/**
* Obtain delegation tokens for this service.
*
* @param hadoopConf Configuration of current Hadoop Compatible system.
* @param owner DelegationToken owner.
* @param creds Credentials to add tokens and security keys to.
*/
def obtainDelegationTokens(
hadoopConf: Configuration,
kyuubiConf: KyuubiConf,
owner: String,
creds: Credentials): Unit
}

View File

@ -94,6 +94,7 @@ class KyuubiSessionImpl(
override def close(): Unit = {
super.close()
sessionManager.operationManager.removeConnection(handle)
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)
try {
if (client != null) client.closeSession()
} catch {

View File

@ -22,6 +22,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.credentials.HadoopCredentialsManager
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.KyuubiOperationManager
@ -31,8 +32,10 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
def this() = this(classOf[KyuubiSessionManager].getSimpleName)
val operationManager = new KyuubiOperationManager()
val credentialsManager = new HadoopCredentialsManager()
override def initialize(conf: KyuubiConf): Unit = {
addService(credentialsManager)
super.initialize(conf)
}

View File

@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.kyuubi.credentials.ExceptionThrowingDelegationTokenProvider
org.apache.kyuubi.credentials.UnRequiredDelegationTokenProvider
org.apache.kyuubi.credentials.UnstableDelegationTokenProvider

View File

@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.credentials
import java.io.IOException
import java.util.UUID
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
class HadoopCredentialsManagerSuite extends KyuubiFunSuite {
private val sessionId = UUID.randomUUID().toString
private val appUser = "who"
private val send = (_: String) => {}
private def withStartedManager(kyuubiConf: KyuubiConf)(f: HadoopCredentialsManager => Unit)
: Unit = {
val manager = new HadoopCredentialsManager()
manager.initialize(kyuubiConf)
manager.start()
try f(manager)
finally manager.stop()
}
test("load default providers") {
ExceptionThrowingDelegationTokenProvider.constructed = false
val providers = HadoopCredentialsManager.loadProviders(new KyuubiConf(false))
assert(providers.contains("unstable"))
assert(providers.contains("unrequired"))
// This checks that providers are loaded independently and they have no effect on each other
assert(ExceptionThrowingDelegationTokenProvider.constructed)
assert(!providers.contains("throw"))
}
test("disable a provider") {
val kyuubiConf =
new KyuubiConf(false)
.set("kyuubi.credentials.unstable.enabled", "false")
val providers = HadoopCredentialsManager.loadProviders(kyuubiConf)
assert(!providers.contains("unstable"))
}
test("filter providers when initialize") {
// Filter out providers if `delegationTokensRequired` returns false.
val manager = new HadoopCredentialsManager()
manager.initialize(new KyuubiConf(false))
assert(!manager.containsProvider("unrequired"))
}
test("no provider left after initialize") {
val kyuubiConf =
new KyuubiConf(false)
.set("kyuubi.credentials.unstable.enabled", "false")
withStartedManager(kyuubiConf) { manager =>
// All providers are filtered out either because of being disabled or
// because does not require a token
assert(manager.renewalExecutor.isEmpty)
}
}
test("schedule credentials renewal") {
val kyuubiConf = new KyuubiConf(false)
.set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
withStartedManager(kyuubiConf) { manager =>
val userRef = manager.getOrCreateUserCredentialsRef(appUser)
// Tolerate 100 ms delay
eventually(timeout(1100.milliseconds), interval(100.milliseconds)) {
assert(userRef.getEpoch == 1)
}
}
}
test("schedule credentials renewal retry when failed") {
val kyuubiConf = new KyuubiConf(false)
.set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
.set(KyuubiConf.CREDENTIALS_RENEWAL_RETRY_WAIT, 1000L)
withStartedManager(kyuubiConf) { manager =>
try {
UnstableDelegationTokenProvider.throwException = true
val userRef = manager.getOrCreateUserCredentialsRef(appUser)
// Tolerate 100 ms delay
eventually(timeout(2100.milliseconds), interval(100.milliseconds)) {
// 1 scheduled call and 2 scheduled retrying call
assert(UnstableDelegationTokenProvider.exceptionCount == 3)
}
assert(userRef.getEpoch == CredentialsRef.UNSET_EPOCH)
} finally {
UnstableDelegationTokenProvider.throwException = false
}
}
}
test("send credentials if needed") {
val kyuubiConf = new KyuubiConf(false)
.set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L)
withStartedManager(kyuubiConf) { manager =>
// Trigger UserCredentialsRef's initialization
val userRef = manager.getOrCreateUserCredentialsRef(appUser)
eventually(interval(100.milliseconds)) {
assert(userRef.getEpoch == 0)
}
manager.sendCredentialsIfNeeded(sessionId, appUser, send)
val sessionEpoch = manager.getSessionCredentialsEpoch(sessionId)
assert(sessionEpoch == userRef.getEpoch)
}
}
test("credentials sending failure") {
withStartedManager(new KyuubiConf(false)) { manager =>
// Trigger UserCredentialsRef's initialization
val userRef = manager.getOrCreateUserCredentialsRef(appUser)
eventually(interval(100.milliseconds)) {
assert(userRef.getEpoch == 0)
}
var called = false
manager.sendCredentialsIfNeeded(
sessionId,
appUser,
_ => {
called = true
throw new IOException
})
assert(called)
assert(manager.getSessionCredentialsEpoch(sessionId) == CredentialsRef.UNSET_EPOCH)
}
}
}
private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationTokenProvider {
ExceptionThrowingDelegationTokenProvider.constructed = true
throw new IllegalArgumentException
override def serviceName: String = "throw"
override def delegationTokensRequired(
hadoopConf: Configuration,
kyuubiConf: KyuubiConf): Boolean = true
override def obtainDelegationTokens(
hadoopConf: Configuration,
kyuubiConf: KyuubiConf,
owner: String,
creds: Credentials): Unit = {}
}
private object ExceptionThrowingDelegationTokenProvider {
var constructed = false
}
private class UnRequiredDelegationTokenProvider extends HadoopDelegationTokenProvider {
override def serviceName: String = "unrequired"
override def delegationTokensRequired(
hadoopConf: Configuration,
kyuubiConf: KyuubiConf): Boolean = false
override def obtainDelegationTokens(
hadoopConf: Configuration,
kyuubiConf: KyuubiConf,
owner: String,
creds: Credentials): Unit = {}
}
private class UnstableDelegationTokenProvider extends HadoopDelegationTokenProvider {
override def serviceName: String = "unstable"
override def delegationTokensRequired(
hadoopConf: Configuration,
kyuubiConf: KyuubiConf): Boolean = true
override def obtainDelegationTokens(
hadoopConf: Configuration,
kyuubiConf: KyuubiConf,
owner: String,
creds: Credentials): Unit = {
if (UnstableDelegationTokenProvider.throwException) {
UnstableDelegationTokenProvider.exceptionCount += 1
throw new IllegalArgumentException
}
}
}
private object UnstableDelegationTokenProvider {
@volatile
var throwException: Boolean = false
@volatile
var exceptionCount = 0
}