[KYUUBI-167][FOLLOWUP]populate tokens via spark session cache mgr (#183)

* Prepare releasing v0.6.0

* fix #KYUUBI-167 populate token via spark session cache mgr

* typo

* fix ut

* code cov
This commit is contained in:
Kent Yao 2019-05-07 10:32:04 +08:00 committed by GitHub
parent 43c95476fb
commit 1f0bc742e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 54 additions and 226 deletions

View File

@ -28,18 +28,18 @@ deploy:
jobs:
include:
- stage: spark2.4
language: scala
script: ./build/mvn clean install -Pspark-2.4 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.3
language: scala
script: ./build/mvn clean install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.2
language: scala
script: ./build/mvn clean install -Pspark-2.2 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.1
language: scala
script: ./build/mvn clean install -Pspark-2.1 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.2
language: scala
script: ./build/mvn clean install -Pspark-2.2 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.3
language: scala
script: ./build/mvn clean install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.4
language: scala
script: ./build/mvn clean install -Pspark-2.4 -Dmaven.javadoc.skip=true -B -V
after_success:
- bash <(curl -s https://codecov.io/bash)

View File

@ -89,7 +89,6 @@ spark.kyuubi.<br />backend.session.idle.timeout|30min|SparkSession timeout.
spark.kyuubi.<br />backend.session.local.dir|KYUUBI_HOME/<br />local|Default value to set `spark.local.dir`. For YARN mode, this only affect the Kyuubi server side settings according to the rule of Spark treating `spark.local.dir`.
spark.kyuubi.<br />backend.session.long.cache|${UserGroupInformation.<br />isSecurityEnabled}|Whether to update the tokens of Spark's executor to support long caching SparkSessions iff this is true && `spark.kyuubi.backend.token.update.class` is loadable. This is used towards kerberized hadoop clusters in case of `spark.kyuubi.backend.session.idle.timeout` is set longer than token expiration time limit or SparkSession never idles.
spark.kyuubi.<br />backend.session.token.update.class|org.apache.spark.<br />scheduler.cluster.<br />CoarseGrainedClusterMessages$<br />UpdateDelegationTokens|`CoarseGrainedClusterMessages` for token update message from the driver of Spark to executors, it is loadable only by higher version Spark release(2.3 and later)
spark.kyuubi.<br />backend.session.token.renew.interval|2h|Interval for KyuubiServiceCredentialProvider to update tokens ,which is a ServiceCredentialProvider implementation of Apache Spark
#### Operation

View File

@ -1,116 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
/**
* An implementation of [[ServiceCredentialProvider]] of Apache Spark, this class will be called
* in both Kyuubi Server and the ApplicationMasters. Main purpose here is updating the
* ApplicationMaster's delegation tokens and does not care whether the tokens to be written to HDFS
* for other executors to do their token update job or not. The reason why we do such thing here is
* that this part of Spark source code is totally a mess and changed capriciously before the Spark
* 3.x which is not release yet. So if tokens are written to HDFS, the driver(here as same as Kyuubi
* Server instance) and executor can update their token purposely. But if they are failed, with some
* warning messages ApplicationMaster will schedule this task in every hour which would be helpful
* enough to invoke this class which help us to renew the ApplicationMaster 's delegation tokens.
* Another un-harmful problem the driver and executors will continuously run token update task in
* every minute, which is always annoyingly printing 'CredentialUpdater: Scheduling credentials
* refresh from HDFS in 60000 ms' to logs. Kyuubi will post all delegation tokens to executor side
* by the server itself but not to the ApplicationMaster, so the ApplicationMaster must update
* tokens by itself in case of requesting new executors in runtime.
*
*
* In Kyuubi Server:
* This is called when SparkContext instantiating itself, with `spark.yarn.credentials.file`,
* `spark.yarn.credentials.renewalTime`, `spark.yarn.credentials.updateTime` set. Because
* inside Kyuubi we explicitly turn off Other credential providers of Spark, so the renew time
* and the update time is decided by [[KyuubiServiceCredentialProvider#obtainCredentials]], a
* credentials file will be set to `spark.yarn.credentials.file` but not be created yet. If the
* Spark Driver(here as same as Kyuubi Server instance) itself and the executors detect
* `spark.yarn.credentials.file` is set, a credential updater thead will start and periodically
* get the latest `spark.yarn.credentials.file` to update their own tokens if the file is
* present.
* In ApplicationMaster:
* If the ApplicationMaster detects the `spark.yarn.credentials.file` is set, it will launch a
* token renew thread to call all token provider's obtainCredentials method, so the class will be
* called to update its own delegation token. So when new containers are requested by this
* ApplicationMaster, it will add right tokens to them. Kyuubi Server side will generate tokens
* and post the via UpdateToken messages to executors, so unlike it does in a normal Spark
* application, here the ApplicationMaster only needs to take care of itself.
*/
class KyuubiServiceCredentialProvider extends ServiceCredentialProvider with Logging {
/**
* Name of the service to provide credentials. This name should unique, Spark internally will
* use this name to differentiate credential provider.
*/
override def serviceName: String = "kyuubi"
/**
* Obtain credentials for this service and get the time of the next renewal.
* @param hadoopConf Configuration of current Hadoop Compatible system.
* @param sparkConf Spark configuration.
* @param creds Credentials to add tokens and security keys to.
* @return If this Credential is renewable and can be renewed, return the time of the next
* renewal, otherwise None should be returned.
*/
override def obtainCredentials(hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
try {
val tokenRenewer = renewer(hadoopConf)
hadoopFStoAccess(sparkConf, hadoopConf).foreach { fs =>
fs.addDelegationTokens(tokenRenewer, creds)
}
UserGroupInformation.getCurrentUser.addCredentials(creds)
} catch {
case e: Exception =>
throw new RuntimeException("Failed to renew token", e)
}
val interval = sparkConf.getTimeAsMs("spark.kyuubi.backend.session.token.renew.interval", "2h")
info(s"Token updated, next renew interval will be ${interval / 1000} seconds")
Some(System.currentTimeMillis() + interval)
}
private def renewer(hadoopConf: Configuration): String = {
val tokenRenewer = Master.getMasterPrincipal(hadoopConf)
if (tokenRenewer == null || tokenRenewer.length() == 0) {
val errorMessage = "Can't get Master Kerberos principal for use as renewer."
throw new RuntimeException(errorMessage)
}
tokenRenewer
}
private def hadoopFStoAccess(conf: SparkConf, hadoopConf: Configuration): Set[FileSystem] = {
val fileSystems = conf.getOption("spark.yarn.access.hadoopFileSystems")
.orElse(conf.getOption("spark.yarn.access.namenodes")) match {
case Some(nns) => nns.split(",").map(new Path(_).getFileSystem(hadoopConf)).toSet
case _ => Set.empty[FileSystem]
}
fileSystems +
conf.getOption("spark.yarn.stagingDir").map(new Path(_).getFileSystem(hadoopConf))
.getOrElse(FileSystem.get(hadoopConf))
}
}

View File

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi
import java.net.InetAddress
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
import org.scalatest.mock.MockitoSugar
class KyuubiServiceCredentialProviderSuite extends SparkFunSuite with MockitoSugar {
test("obtain credentials") {
val sparkConf = new SparkConf()
val userName = UserGroupInformation.getCurrentUser.getShortUserName
val hadoopConf = new Configuration()
val credential = new Credentials()
val provider = new KyuubiServiceCredentialProvider
val e1 =
intercept[RuntimeException](provider.obtainCredentials(hadoopConf, sparkConf, credential))
assert(e1.getMessage === "Failed to renew token")
hadoopConf.set(YarnConfiguration.RM_PRINCIPAL, "")
val e2 =
intercept[RuntimeException](provider.obtainCredentials(hadoopConf, sparkConf, credential))
assert(e2.getMessage === "Failed to renew token")
hadoopConf.set(YarnConfiguration.RM_PRINCIPAL,
userName + "/" + InetAddress.getLocalHost.getHostName + "@" + "KYUUBI.ORG")
assert(provider.isInstanceOf[ServiceCredentialProvider])
assert(provider.isInstanceOf[Logging])
assert(!provider.credentialsRequired(hadoopConf))
assert(provider.serviceName === "kyuubi")
val now = System.currentTimeMillis()
val renewalTime = provider.obtainCredentials(hadoopConf, sparkConf, credential)
assert(renewalTime.isDefined)
assert(renewalTime.get - now >= 2L * 60 * 60 *100 )
sparkConf.set("spark.yarn.access.hadoopFileSystems", "hdfs://a/b/c")
intercept[Exception](provider.obtainCredentials(hadoopConf, sparkConf, credential))
sparkConf.set("spark.yarn.access.namenodes", "hdfs://a/b/c, hdfs://d/e/f")
intercept[Exception](provider.obtainCredentials(hadoopConf, sparkConf, credential))
}
}

View File

@ -39,12 +39,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>yaooqinn</groupId>
<artifactId>kyuubi-security</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${spark.group}</groupId>
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
@ -257,7 +251,6 @@
<artifactSet>
<includes>
<include>yaooqinn:kyuubi-common</include>
<include>yaooqinn:kyuubi-security</include>
</includes>
</artifactSet>
</configuration>

View File

@ -329,22 +329,6 @@ object KyuubiSparkUtil extends Logging {
// ForkJoinPool which points to another calling context. Turn off parallel listing seems
// to be a solution to this issue.
conf.setIfMissing(RDD_PAR_LISTING, Int.MaxValue.toString)
val sparkTokenProviders = List("hdfs", "hadoopfs", "hive", "hbase")
val tokenProviderPattens = List(
"spark.yarn.security.tokens.%s.enabled",
"spark.yarn.security.credentials.%s.enabled",
"spark.security.credentials.%s.enabled")
// Set kyuubi credential renewer on, if we do not explicitly turn it off.
tokenProviderPattens.map(_.format("kyuubi")).foreach(conf.setIfMissing(_, "true"))
// Force to turn off Spark's internal token providers, because all useful works will be done
// in KyuubiServiceCredentialProvider, and those ones in Spark always have impersonation
// issue while renew tokens
sparkTokenProviders.foreach { service =>
tokenProviderPattens.map(_.format(service)).foreach {
conf.set(_, "false")
}
}
}
val kyuubiJar = Option(System.getenv("KYUUBI_JAR")).getOrElse("")
@ -355,12 +339,6 @@ object KyuubiSparkUtil extends Logging {
}
conf.set(SPARK_YARN_DIST_JARS, distJars)
// We should obey our client side hadoop settings while running Kyuubi towards HDFS
// federations with maybe only on Yarn cluster
// see https://github.com/apache/spark/pull/24120
val hadoopConf = newConfiguration(conf)
Option(hadoopConf.get("fs.defaultFS"))
.foreach(conf.setIfMissing("spark.hadoop.fs.defaultFS", _))
}
@tailrec

View File

@ -22,7 +22,7 @@ import java.io.{ByteArrayOutputStream, DataOutputStream}
import scala.collection.mutable
import scala.util.control.NonFatal
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.KyuubiConf._
import org.apache.spark.SparkContext
@ -43,13 +43,21 @@ object KyuubiSparkExecutorUtils extends Logging {
* @param user the UserGroupInformation associated with the current KyuubiSession
*/
def populateTokens(sc: SparkContext, user: UserGroupInformation): Unit = {
populateTokens(sc, user.getCredentials)
}
/**
* Populate the tokens contained in the current KyuubiSession's ugi to the all the alive
* executors associated with its own SparkContext.
*/
def populateTokens(sc: SparkContext, creds: Credentials): Unit = {
val schedulerBackend = sc.schedulerBackend
schedulerBackend match {
case backend: CoarseGrainedSchedulerBackend =>
try {
val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
user.getCredentials.writeTokenStorageToStream(dataStream)
creds.writeTokenStorageToStream(dataStream)
val tokens = byteStream.toByteArray
val executorField =
classOf[CoarseGrainedSchedulerBackend].getName.replace('.', '$') + "$$executorDataMap"

View File

@ -56,7 +56,7 @@ private[security] object HDFSTokenCollector extends TokenCollector with Logging
tokenRenewer
}
override def obtainTokens(conf: SparkConf): Unit = {
override def obtainTokens(conf: SparkConf): Unit = try {
val hadoopConf = newConfiguration(conf)
val tokenRenewer = renewer(hadoopConf)
val creds = new Credentials()
@ -64,5 +64,8 @@ private[security] object HDFSTokenCollector extends TokenCollector with Logging
fs.addDelegationTokens(tokenRenewer, creds)
}
UserGroupInformation.getCurrentUser.addCredentials(creds)
} catch {
case e: Exception =>
error("Failed to obtain HDFS tokens", e)
}
}

View File

@ -25,8 +25,10 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.security.Credentials
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
import org.apache.spark.KyuubiConf._
import org.apache.spark.SparkConf
import org.apache.spark.scheduler.cluster.KyuubiSparkExecutorUtils
import org.apache.spark.sql.SparkSession
import yaooqinn.kyuubi.Logging
@ -46,24 +48,34 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
private val userLatestLogout = new ConcurrentHashMap[String, Long]
private var idleTimeout: Long = _
private val userToCredentials = new ConcurrentHashMap[String, Credentials]
private var needPopulateToken: Boolean = _
private val sessionCleaner = new Runnable {
override def run(): Unit = {
userToSession.asScala.foreach {
case (user, (session, _)) if session.sparkContext.isStopped =>
warn(s"SparkSession for $user might already be stopped by forces outside Kyuubi," +
warn(s"SparkSession for $user might already be stopped outside Kyuubi," +
s" cleaning it..")
removeSparkSession(user)
case (user, (_, times)) if times.get() > 0 =>
case (user, (session, times)) if times.get() > 0 || !userLatestLogout.containsKey(user) =>
debug(s"There are $times active connection(s) bound to the SparkSession instance" +
s" of $user ")
case (user, (_, _)) if !userLatestLogout.containsKey(user) =>
if (needPopulateToken) {
val credentials = userToCredentials.getOrDefault(user, new Credentials)
KyuubiSparkExecutorUtils.populateTokens(session.sparkContext, credentials)
}
case (user, (session, _))
if userLatestLogout.get(user) + idleTimeout <= System.currentTimeMillis() =>
info(s"Stopping idle SparkSession for user [$user].")
removeSparkSession(user)
session.stop()
System.setProperty("SPARK_YARN_MODE", "true")
case _ =>
case (user, (session, _)) =>
if (needPopulateToken) {
val credentials = userToCredentials.getOrDefault(user, new Credentials)
KyuubiSparkExecutorUtils.populateTokens(session.sparkContext, credentials)
}
}
}
}
@ -105,8 +117,14 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
}
}
def setupCredentials(user: String, creds: Credentials): Unit = {
userToCredentials.put(user, creds)
}
override def init(conf: SparkConf): Unit = {
idleTimeout = math.max(conf.getTimeAsMs(BACKEND_SESSION_IDLE_TIMEOUT.key), 60 * 1000)
needPopulateToken = conf.get(BACKEND_SESSION_LONG_CACHE).toBoolean &&
KyuubiSparkUtil.classIsLoadable(conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS))
super.init(conf)
}
@ -114,7 +132,6 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
* Periodically close idle SparkSessions in 'spark.kyuubi.session.clean.interval(default 1min)'
*/
override def start(): Unit = {
// at least 1 minutes
val interval = math.max(conf.getTimeAsSeconds(BACKEND_SESSION_CHECK_INTERVAL.key), 1)
info(s"Scheduling SparkSession cache cleaning every $interval seconds")
cacheManager.scheduleAtFixedRate(sessionCleaner, interval, interval, TimeUnit.SECONDS)

View File

@ -101,6 +101,9 @@ class SparkSessionWithUGI(
case _ =>
}
}
// proxy user does not have rights to get token as real user
conf.remove(KEYTAB)
conf.remove(PRINCIPAL)
}
/**
@ -204,6 +207,8 @@ class SparkSessionWithUGI(
def init(sessionConf: Map[String, String]): Unit = {
getOrCreate(sessionConf)
cache.setupCredentials(userName, user.getCredentials)
try {
initialDatabase.foreach { db =>
KyuubiHadoopUtil.doAs(user) {

View File

@ -45,7 +45,7 @@ class TokenCollectorSuite extends SparkFunSuite with Matchers with SecuredFunSui
tryWithSecurityEnabled {
intercept[ServiceException](TokenCollector.obtainTokenIfRequired(conf))
TokenCollector.obtainTokenIfRequired(conf)
conf.set("spark.hadoop.yarn.resourcemanager.principal", "yarn/_HOST@KENT.KYUUBI.COM")
TokenCollector.obtainTokenIfRequired(conf)
conf.set("spark.yarn.access.namenodes", "file:///test")

View File

@ -18,6 +18,7 @@
package yaooqinn.kyuubi.spark
import org.apache.spark._
import org.apache.spark.KyuubiConf._
import org.apache.spark.sql.SparkSession
import org.mockito.Mockito._
import org.scalatest.Matchers
@ -51,6 +52,7 @@ class SparkSessionCacheManagerSuite extends SparkFunSuite with Matchers with Moc
test("start cache") {
val cache = new SparkSessionCacheManager()
val conf = new SparkConf()
conf.set(BACKEND_SESSION_LONG_CACHE, "true")
KyuubiSparkUtil.setupCommonConfig(conf)
cache.init(conf)
cache.start()

View File

@ -26,7 +26,6 @@
<version>0.6.0-SNAPSHOT</version>
<modules>
<module>kyuubi-common</module>
<module>kyuubi-security</module>
<module>kyuubi-server</module>
</modules>
<packaging>pom</packaging>