diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index c5b488f49..edc121b59 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -147,6 +147,14 @@ kyuubi\.backend\.server
\.exec\.pool\.size|
100
|
Size of the wait queue for the operation execution thread pool of Kyuubi server
|
int
|
1.0.0
+### Credentials + +Key | Default | Meaning | Type | Since +--- | --- | --- | --- | --- +kyuubi\.credentials
\.renewal\.interval|
PT1H
|
How often Kyuubi renews one user's DelegationTokens
|
duration
|
1.4.0
+kyuubi\.credentials
\.renewal\.retryWait|
PT1M
|
How long to wait before retrying to fetch new credentials after a failure.
|
duration
|
1.4.0
+ + ### Delegation Key | Default | Meaning | Type | Since diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 28611cd8f..ad7b17e3e 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -232,6 +232,20 @@ object KyuubiConf { .timeConf .createWithDefault(Duration.ofHours(3).toMillis) + val CREDENTIALS_RENEWAL_INTERVAL: ConfigEntry[Long] = + buildConf("credentials.renewal.interval") + .doc("How often Kyuubi renews one user's DelegationTokens") + .version("1.4.0") + .timeConf + .createWithDefault(Duration.ofHours(1).toMillis) + + val CREDENTIALS_RENEWAL_RETRY_WAIT: ConfigEntry[Long] = + buildConf("credentials.renewal.retryWait") + .doc("How long to wait before retrying to fetch new credentials after a failure.") + .version("1.4.0") + .timeConf + .checkValue(t => t > 0, "must be positive integer") + .createWithDefault(Duration.ofMinutes(1).toMillis) ///////////////////////////////////////////////////////////////////////////////////////////////// // Frontend Service Configuration // diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala index e6a009b93..b8fb96704 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala @@ -17,8 +17,11 @@ package org.apache.kyuubi.util +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} + +import org.apache.commons.codec.binary.Base64 import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.SecurityUtil +import org.apache.hadoop.security.{Credentials, SecurityUtil} import org.apache.kyuubi.config.KyuubiConf @@ -33,4 +36,23 @@ object KyuubiHadoopUtils { def getServerPrincipal(principal: String): String = { SecurityUtil.getServerPrincipal(principal, "0.0.0.0") } + + def encodeCredentials(creds: Credentials): String = { + val byteStream = new ByteArrayOutputStream + creds.writeTokenStorageToStream(new DataOutputStream(byteStream)) + + val encoder = new Base64(0, null, false) + encoder.encodeToString(byteStream.toByteArray) + } + + def decodeCredentials(newValue: String): Credentials = { + val decoder = new Base64(0, null, false) + val decoded = decoder.decode(newValue) + + val byteStream = new ByteArrayInputStream(decoded) + val creds = new Credentials() + creds.readTokenStorageStream(new DataInputStream(byteStream)) + creds + } + } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/util/KyuubiHadoopUtilsSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/util/KyuubiHadoopUtilsSuite.scala index 03fc055d6..9a6d76732 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/util/KyuubiHadoopUtilsSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/util/KyuubiHadoopUtilsSuite.scala @@ -17,8 +17,15 @@ package org.apache.kyuubi.util +import scala.util.Random + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.Token + import org.apache.kyuubi.KyuubiFunSuite import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.service.authentication.KyuubiDelegationTokenIdentifier class KyuubiHadoopUtilsSuite extends KyuubiFunSuite { @@ -35,4 +42,21 @@ class KyuubiHadoopUtilsSuite extends KyuubiFunSuite { assert(hadoopConf.get(xyz) === "abc") assert(hadoopConf.get(test) === "t") } + + test("encode/decode credentials") { + val identifier = new KyuubiDelegationTokenIdentifier() + val password = new Array[Byte](128) + Random.nextBytes(password) + val token = new Token[KyuubiDelegationTokenIdentifier]( + identifier.getBytes, + password, + identifier.getKind, + new Text("")) + val credentials = new Credentials() + credentials.addToken(token.getKind, token) + + val decoded = KyuubiHadoopUtils.decodeCredentials( + KyuubiHadoopUtils.encodeCredentials(credentials)) + assert(decoded.getToken(token.getKind) == credentials.getToken(token.getKind)) + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala new file mode 100644 index 000000000..9ccf445b5 --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/CredentialsRef.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.credentials + +import org.apache.hadoop.security.Credentials + +import org.apache.kyuubi.credentials.CredentialsRef.UNSET_EPOCH +import org.apache.kyuubi.util.KyuubiHadoopUtils + +class CredentialsRef(appUser: String) { + + @volatile + private var epoch = UNSET_EPOCH + + private var encodedCredentials: String = _ + + def getEpoch: Long = epoch + + def getAppUser: String = appUser + + def getEncodedCredentials: String = { + encodedCredentials + } + + def updateCredentials(creds: Credentials): Unit = { + encodedCredentials = KyuubiHadoopUtils.encodeCredentials(creds) + epoch += 1 + } + +} + +object CredentialsRef { + val UNSET_EPOCH: Long = -1L +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala new file mode 100644 index 000000000..253778d11 --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopCredentialsManager.scala @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.credentials + +import java.util.ServiceLoader +import java.util.concurrent._ + +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials + +import org.apache.kyuubi.Logging +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.service.AbstractService +import org.apache.kyuubi.util.{KyuubiHadoopUtils, ThreadUtils} + +/** + * [[HadoopCredentialsManager]] manages and renews delegation tokens, which are used by SQL engines + * to access kerberos secured services. + * + * Delegation tokens are sent to SQL engines by calling [[sendCredentialsIfNeeded]]. + * [[sendCredentialsIfNeeded]] executes the following steps: + *
    + *
  1. + * Get or create a cached [[CredentialsRef]](contains delegation tokens) object by key + * appUser. If [[CredentialsRef]] is newly created, spawn a scheduled task to renew the + * delegation tokens. + *
  2. + *
  3. + * Get or create a cached session credentials epoch object by key sessionId. + *
  4. + *
  5. + * Compare [[CredentialsRef]] epoch with session credentials epoch. (Both epochs are set + * to -1 when created. [[CredentialsRef]] epoch is increased when delegation tokens are + * renewed.) + *
  6. + *
  7. + * If epochs are equal, return. Else, send delegation tokens to the SQL engine. + *
  8. + *
  9. + * If sending succeeds, set session credentials epoch to [[CredentialsRef]] epoch. Else, + * record the exception and return. + *
  10. + *
+ * + * @note + *
    + *
  1. + * Session credentials epochs are created in session scope and should be removed using + * [[removeSessionCredentialsEpoch]] when session closes. + *
  2. + *
  3. + * [[HadoopCredentialsManager]] does not renew and send credentials if no provider is left after + * initialize. + *
  4. + *
+ */ +class HadoopCredentialsManager private (name: String) extends AbstractService(name) + with Logging { + + def this() = this(classOf[HadoopCredentialsManager].getSimpleName) + + private val userCredentialsRefMap = new ConcurrentHashMap[String, CredentialsRef]() + private val sessionCredentialsEpochMap = new ConcurrentHashMap[String, Long]() + + private var providers: Map[String, HadoopDelegationTokenProvider] = _ + private var renewalInterval: Long = _ + private var renewalRetryWait: Long = _ + private var hadoopConf: Configuration = _ + + private[credentials] var renewalExecutor: Option[ScheduledExecutorService] = None + + override def initialize(conf: KyuubiConf): Unit = { + hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf) + providers = HadoopCredentialsManager.loadProviders(conf) + .filter { case (_, provider) => + val required = provider.delegationTokensRequired(hadoopConf, conf) + if (!required) { + warn(s"Service ${provider.serviceName} does not require a token." + + s" Check your configuration to see if security is disabled or not.") + } + required + } + + if (providers.isEmpty) { + warn("No delegation token is required by services.") + } else { + info("Using the following builtin delegation token providers: " + + s"${providers.keys.mkString(", ")}.") + } + + renewalInterval = conf.get(CREDENTIALS_RENEWAL_INTERVAL) + renewalRetryWait = conf.get(CREDENTIALS_RENEWAL_RETRY_WAIT) + super.initialize(conf) + } + + override def start(): Unit = { + if (providers.nonEmpty) { + renewalExecutor = + Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor("Delegation Token Renewal Thread")) + } + super.start() + } + + override def stop(): Unit = { + renewalExecutor.foreach { executor => + executor.shutdownNow() + try { + executor.awaitTermination(10, TimeUnit.SECONDS) + } catch { + case _: InterruptedException => + } + } + super.stop() + } + + /** + * Send credentials to SQL engine which the specified session is talking to if + * [[HadoopCredentialsManager]] has a newer credentials. + * + * @param sessionId Specify the session which is talking with SQL engine + * @param appUser User identity that the SQL engine uses. + * @param send Function to send encoded credentials to SQL engine + */ + def sendCredentialsIfNeeded( + sessionId: String, + appUser: String, + send: String => Unit): Unit = { + val userRef = getOrCreateUserCredentialsRef(appUser) + val sessionEpoch = getSessionCredentialsEpoch(sessionId) + + if (userRef.getEpoch > sessionEpoch) { + val currentEpoch = userRef.getEpoch + val currentCreds = userRef.getEncodedCredentials + info(s"Send new credentials with epoch $currentEpoch to SQL engine through session " + + s"$sessionId") + Try(send(currentCreds)) match { + case Success(_) => + info(s"Update session credentials epoch from $sessionEpoch to $currentEpoch") + sessionCredentialsEpochMap.put(sessionId, currentEpoch) + case Failure(exception) => + warn( + s"Failed to send new credentials to SQL engine through session $sessionId", + exception) + } + } + } + + /** + * Remove session credentials epoch corresponding to `sessionId`. + * + * @param sessionId KyuubiSession id + */ + def removeSessionCredentialsEpoch(sessionId: String): Unit = { + sessionCredentialsEpochMap.remove(sessionId) + } + + // Visible for testing. + private[credentials] def getOrCreateUserCredentialsRef(appUser: String): CredentialsRef = + userCredentialsRefMap.computeIfAbsent( + appUser, + appUser => { + val ref = new CredentialsRef(appUser) + scheduleRenewal(ref, 0) + info(s"Created CredentialsRef for user $appUser and scheduled a renewal task") + ref + }) + + // Visible for testing. + private[credentials] def getSessionCredentialsEpoch(sessionId: String): Long = { + sessionCredentialsEpochMap.getOrDefault(sessionId, CredentialsRef.UNSET_EPOCH) + } + + // Visible for testing. + private[credentials] def containsProvider(serviceName: String): Boolean = { + providers.contains(serviceName) + } + + private def scheduleRenewal(userRef: CredentialsRef, delay: Long): Unit = { + val renewalTask = new Runnable { + override def run(): Unit = { + try { + val creds = new Credentials() + providers.values + .foreach(_.obtainDelegationTokens(hadoopConf, conf, userRef.getAppUser, creds)) + userRef.updateCredentials(creds) + scheduleRenewal(userRef, renewalInterval) + } catch { + case _: InterruptedException => + // Server is shutting down + case e: Exception => + warn( + s"Failed to update tokens for ${userRef.getAppUser}, try again in" + + s" $renewalRetryWait ms", + e) + scheduleRenewal(userRef, renewalRetryWait) + } + } + } + + renewalExecutor.foreach { executor => + info(s"Scheduling renewal in $delay ms.") + executor.schedule(renewalTask, delay, TimeUnit.MILLISECONDS) + } + } + +} + +object HadoopCredentialsManager extends Logging { + + private val providerEnabledConfig = "kyuubi.credentials.%s.enabled" + + def loadProviders(kyuubiConf: KyuubiConf): Map[String, HadoopDelegationTokenProvider] = { + val loader = + ServiceLoader.load(classOf[HadoopDelegationTokenProvider], getClass.getClassLoader) + val providers = mutable.ArrayBuffer[HadoopDelegationTokenProvider]() + + val iterator = loader.iterator + while (iterator.hasNext) { + try { + providers += iterator.next + } catch { + case t: Throwable => + warn(s"Failed to load built in provider.", t) + } + } + + // Filter out providers for which kyuubi.credentials.{service}.enabled is false. + providers + .filter { p => HadoopCredentialsManager.isServiceEnabled(kyuubiConf, p.serviceName) } + .map { p => (p.serviceName, p) } + .toMap + } + + def isServiceEnabled(kyuubiConf: KyuubiConf, serviceName: String): Boolean = { + val key = providerEnabledConfig.format(serviceName) + kyuubiConf + .getOption(key) + .map(_.toBoolean) + .getOrElse(true) + } + +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala new file mode 100644 index 000000000..cba5761c3 --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopDelegationProvider.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.credentials + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials + +import org.apache.kyuubi.Logging +import org.apache.kyuubi.config.KyuubiConf + +trait HadoopDelegationTokenProvider extends Logging { + + /** + * Name of the service to provide delegation tokens. This name should be unique. Kyuubi will + * internally use this name to differentiate delegation token providers. + */ + def serviceName: String + + /** + * Returns true if delegation tokens are required for this service. By default, it is based on + * whether Hadoop security is enabled. + */ + def delegationTokensRequired(hadoopConf: Configuration, kyuubiConf: KyuubiConf): Boolean + + /** + * Obtain delegation tokens for this service. + * + * @param hadoopConf Configuration of current Hadoop Compatible system. + * @param owner DelegationToken owner. + * @param creds Credentials to add tokens and security keys to. + */ + def obtainDelegationTokens( + hadoopConf: Configuration, + kyuubiConf: KyuubiConf, + owner: String, + creds: Credentials): Unit + +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 8d416e68a..973e4c2e7 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -94,6 +94,7 @@ class KyuubiSessionImpl( override def close(): Unit = { super.close() sessionManager.operationManager.removeConnection(handle) + sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString) try { if (client != null) client.closeSession() } catch { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index bd2aeabdd..760c57e27 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -22,6 +22,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.credentials.HadoopCredentialsManager import org.apache.kyuubi.metrics.MetricsConstants._ import org.apache.kyuubi.metrics.MetricsSystem import org.apache.kyuubi.operation.KyuubiOperationManager @@ -31,8 +32,10 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { def this() = this(classOf[KyuubiSessionManager].getSimpleName) val operationManager = new KyuubiOperationManager() + val credentialsManager = new HadoopCredentialsManager() override def initialize(conf: KyuubiConf): Unit = { + addService(credentialsManager) super.initialize(conf) } diff --git a/kyuubi-server/src/test/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider b/kyuubi-server/src/test/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider new file mode 100644 index 000000000..40645622c --- /dev/null +++ b/kyuubi-server/src/test/resources/META-INF/services/org.apache.kyuubi.credentials.HadoopDelegationTokenProvider @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.kyuubi.credentials.ExceptionThrowingDelegationTokenProvider +org.apache.kyuubi.credentials.UnRequiredDelegationTokenProvider +org.apache.kyuubi.credentials.UnstableDelegationTokenProvider diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala new file mode 100644 index 000000000..f1a30e067 --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HadoopCredentialsManagerSuite.scala @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.credentials + +import java.io.IOException +import java.util.UUID + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime + +import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.config.KyuubiConf + +class HadoopCredentialsManagerSuite extends KyuubiFunSuite { + + private val sessionId = UUID.randomUUID().toString + private val appUser = "who" + private val send = (_: String) => {} + + private def withStartedManager(kyuubiConf: KyuubiConf)(f: HadoopCredentialsManager => Unit) + : Unit = { + val manager = new HadoopCredentialsManager() + manager.initialize(kyuubiConf) + manager.start() + + try f(manager) + finally manager.stop() + } + + test("load default providers") { + ExceptionThrowingDelegationTokenProvider.constructed = false + val providers = HadoopCredentialsManager.loadProviders(new KyuubiConf(false)) + assert(providers.contains("unstable")) + assert(providers.contains("unrequired")) + // This checks that providers are loaded independently and they have no effect on each other + assert(ExceptionThrowingDelegationTokenProvider.constructed) + assert(!providers.contains("throw")) + } + + test("disable a provider") { + val kyuubiConf = + new KyuubiConf(false) + .set("kyuubi.credentials.unstable.enabled", "false") + val providers = HadoopCredentialsManager.loadProviders(kyuubiConf) + assert(!providers.contains("unstable")) + } + + test("filter providers when initialize") { + // Filter out providers if `delegationTokensRequired` returns false. + val manager = new HadoopCredentialsManager() + manager.initialize(new KyuubiConf(false)) + assert(!manager.containsProvider("unrequired")) + } + + test("no provider left after initialize") { + val kyuubiConf = + new KyuubiConf(false) + .set("kyuubi.credentials.unstable.enabled", "false") + withStartedManager(kyuubiConf) { manager => + // All providers are filtered out either because of being disabled or + // because does not require a token + assert(manager.renewalExecutor.isEmpty) + } + } + + test("schedule credentials renewal") { + val kyuubiConf = new KyuubiConf(false) + .set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L) + withStartedManager(kyuubiConf) { manager => + val userRef = manager.getOrCreateUserCredentialsRef(appUser) + // Tolerate 100 ms delay + eventually(timeout(1100.milliseconds), interval(100.milliseconds)) { + assert(userRef.getEpoch == 1) + } + } + } + + test("schedule credentials renewal retry when failed") { + val kyuubiConf = new KyuubiConf(false) + .set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L) + .set(KyuubiConf.CREDENTIALS_RENEWAL_RETRY_WAIT, 1000L) + withStartedManager(kyuubiConf) { manager => + try { + UnstableDelegationTokenProvider.throwException = true + + val userRef = manager.getOrCreateUserCredentialsRef(appUser) + // Tolerate 100 ms delay + eventually(timeout(2100.milliseconds), interval(100.milliseconds)) { + // 1 scheduled call and 2 scheduled retrying call + assert(UnstableDelegationTokenProvider.exceptionCount == 3) + } + assert(userRef.getEpoch == CredentialsRef.UNSET_EPOCH) + } finally { + UnstableDelegationTokenProvider.throwException = false + } + } + } + + test("send credentials if needed") { + val kyuubiConf = new KyuubiConf(false) + .set(KyuubiConf.CREDENTIALS_RENEWAL_INTERVAL, 1000L) + withStartedManager(kyuubiConf) { manager => + // Trigger UserCredentialsRef's initialization + val userRef = manager.getOrCreateUserCredentialsRef(appUser) + eventually(interval(100.milliseconds)) { + assert(userRef.getEpoch == 0) + } + + manager.sendCredentialsIfNeeded(sessionId, appUser, send) + + val sessionEpoch = manager.getSessionCredentialsEpoch(sessionId) + assert(sessionEpoch == userRef.getEpoch) + } + } + + test("credentials sending failure") { + withStartedManager(new KyuubiConf(false)) { manager => + // Trigger UserCredentialsRef's initialization + val userRef = manager.getOrCreateUserCredentialsRef(appUser) + eventually(interval(100.milliseconds)) { + assert(userRef.getEpoch == 0) + } + + var called = false + manager.sendCredentialsIfNeeded( + sessionId, + appUser, + _ => { + called = true + throw new IOException + }) + + assert(called) + assert(manager.getSessionCredentialsEpoch(sessionId) == CredentialsRef.UNSET_EPOCH) + } + } +} + +private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationTokenProvider { + ExceptionThrowingDelegationTokenProvider.constructed = true + throw new IllegalArgumentException + + override def serviceName: String = "throw" + + override def delegationTokensRequired( + hadoopConf: Configuration, + kyuubiConf: KyuubiConf): Boolean = true + + override def obtainDelegationTokens( + hadoopConf: Configuration, + kyuubiConf: KyuubiConf, + owner: String, + creds: Credentials): Unit = {} + +} + +private object ExceptionThrowingDelegationTokenProvider { + var constructed = false +} + +private class UnRequiredDelegationTokenProvider extends HadoopDelegationTokenProvider { + + override def serviceName: String = "unrequired" + + override def delegationTokensRequired( + hadoopConf: Configuration, + kyuubiConf: KyuubiConf): Boolean = false + + override def obtainDelegationTokens( + hadoopConf: Configuration, + kyuubiConf: KyuubiConf, + owner: String, + creds: Credentials): Unit = {} + +} + +private class UnstableDelegationTokenProvider extends HadoopDelegationTokenProvider { + + override def serviceName: String = "unstable" + + override def delegationTokensRequired( + hadoopConf: Configuration, + kyuubiConf: KyuubiConf): Boolean = true + + override def obtainDelegationTokens( + hadoopConf: Configuration, + kyuubiConf: KyuubiConf, + owner: String, + creds: Credentials): Unit = { + if (UnstableDelegationTokenProvider.throwException) { + UnstableDelegationTokenProvider.exceptionCount += 1 + throw new IllegalArgumentException + } + } + +} + +private object UnstableDelegationTokenProvider { + + @volatile + var throwException: Boolean = false + + @volatile + var exceptionCount = 0 + +}