fix #122 Obtain tokens for kerberized Hive/HDFS services

This commit is contained in:
Kent Yao 2018-11-16 11:30:14 +08:00
parent d1d34e5856
commit 6368cb1ebe
9 changed files with 245 additions and 21 deletions

View File

@ -39,26 +39,30 @@ import yaooqinn.kyuubi.Logging
object KyuubiSparkUtil extends Logging {
// PREFIXES
val SPARK_PREFIX = "spark."
private[this] val YARN_PREFIX = "yarn."
private[this] val HADOOP_PRFIX = "hadoop."
private val YARN_PREFIX = "yarn."
private val HADOOP_PRFIX = "hadoop."
val SPARK_HADOOP_PREFIX: String = SPARK_PREFIX + HADOOP_PRFIX
private[this] val DRIVER_PREFIX = "driver."
private[this] val AM_PREFIX = SPARK_PREFIX + YARN_PREFIX + "am."
private val SPARK_YARN_PREFIX: String = SPARK_PREFIX + YARN_PREFIX
private val DRIVER_PREFIX = "driver."
private val AM_PREFIX = SPARK_PREFIX + YARN_PREFIX + "am."
private[this] val UI_PREFIX = "ui."
private[this] val SQL_PREFIX = "sql."
private[this] val HIVE_PREFIX = "hive."
private[this] val METASTORE_PREFIX = "metastore."
private val UI_PREFIX = "ui."
private val SQL_PREFIX = "sql."
private val HIVE_PREFIX = "hive."
private val METASTORE_PREFIX = "metastore."
// ENVIRONMENTS
val SPARK_HOME: String = System.getenv("SPARK_HOME")
val SPARK_JARS_DIR: String = SPARK_HOME + File.separator + "jars"
// YARN
val KEYTAB: String = SPARK_PREFIX + YARN_PREFIX + "keytab"
val PRINCIPAL: String = SPARK_PREFIX + YARN_PREFIX + "principal"
val MAX_APP_ATTEMPTS: String = SPARK_PREFIX + YARN_PREFIX + "maxAppAttempts"
val SPARK_YARN_JARS: String = SPARK_PREFIX + YARN_PREFIX + "jars"
val KEYTAB: String = SPARK_YARN_PREFIX + "keytab"
val PRINCIPAL: String = SPARK_YARN_PREFIX + "principal"
val MAX_APP_ATTEMPTS: String = SPARK_YARN_PREFIX + "maxAppAttempts"
val SPARK_YARN_JARS: String = SPARK_YARN_PREFIX + "jars"
val ACCESS_NNS: String = SPARK_YARN_PREFIX + "access.namenodes"
val ACCESS_FSS: String = SPARK_YARN_PREFIX + "access.hadoopFileSystems"
val STAGING_DIR: String = SPARK_YARN_PREFIX + "stagingDir"
// DRIVER
val DRIVER_BIND_ADDR: String = SPARK_PREFIX + DRIVER_PREFIX + "bindAddress"

View File

@ -23,7 +23,7 @@ package yaooqinn.kyuubi.service
*/
class ServiceException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
def this(cause: Throwable) = this("", cause)
def this(cause: Throwable) = this(cause.toString, cause)
def this(message: String) = this(message, null)
}

View File

@ -23,7 +23,7 @@ import scala.collection.mutable.{HashSet => MHSet}
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
@ -34,6 +34,7 @@ import yaooqinn.kyuubi.auth.KyuubiAuthFactory
import yaooqinn.kyuubi.cli._
import yaooqinn.kyuubi.operation.{KyuubiOperation, OperationHandle, OperationManager}
import yaooqinn.kyuubi.schema.RowSet
import yaooqinn.kyuubi.session.security.TokenCollector
import yaooqinn.kyuubi.spark.SparkSessionWithUGI
/**
@ -73,7 +74,9 @@ private[kyuubi] class KyuubiSession(
// Do not check keytab file existing as spark-submit has it done
currentUser.reloginFromKeytab()
}
UserGroupInformation.createProxyUser(username, currentUser)
val user = UserGroupInformation.createProxyUser(username, currentUser)
TokenCollector.obtainTokenIfRequired(conf, user.getCredentials)
user
} else {
UserGroupInformation.createRemoteUser(username)
}
@ -134,7 +137,12 @@ private[kyuubi] class KyuubiSession(
def sparkSession: SparkSession = this.sparkSessionWithUGI.sparkSession
def ugi: UserGroupInformation = this.sessionUGI
def ugi: UserGroupInformation = {
val creds = new Credentials
TokenCollector.obtainTokenIfRequired(conf, creds)
sessionUGI.addCredentials(creds)
sessionUGI
}
@throws[KyuubiSQLException]
def open(sessionConf: Map[String, String]): Unit = {

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi.session.security
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.Credentials
import org.apache.spark.KyuubiSparkUtil._
import org.apache.spark.SparkConf
import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.service.ServiceException
private[security] object HDFSTokenCollector extends TokenCollector with Logging {
private def hadoopFStoAccess(conf: SparkConf, hadoopConf: Configuration): Set[FileSystem] = {
conf.getOption(ACCESS_FSS)
.orElse(conf.getOption(ACCESS_NNS))
.map(new Path(_).getFileSystem(hadoopConf)).toSet +
conf.getOption(STAGING_DIR)
.map(new Path(_).getFileSystem(hadoopConf)).getOrElse(FileSystem.get(hadoopConf))
}
private def renewer(hadoopConf: Configuration): String = {
val tokenRenewer = Master.getMasterPrincipal(hadoopConf)
debug("Delegation token renewer is: " + tokenRenewer)
if (tokenRenewer == null || tokenRenewer.length() == 0) {
val errorMessage = "Can't get Master Kerberos principal for use as renewer."
error(errorMessage)
throw new ServiceException(errorMessage)
}
tokenRenewer
}
override def obtainTokens(conf: SparkConf, creds: Credentials): Unit = {
val hadoopConf = newConfiguration(conf)
val tokenRenewer = renewer(hadoopConf)
hadoopFStoAccess(conf, hadoopConf).foreach { fs =>
fs.addDelegationTokens(tokenRenewer, creds)
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi.session.security
import scala.util.control.NonFatal
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.Token
import org.apache.spark.SparkConf
import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.utils.KyuubiHadoopUtil
import yaooqinn.kyuubi.utils.KyuubiHiveUtil._
private[security] object HiveTokenCollector extends TokenCollector with Logging {
override def obtainTokens(conf: SparkConf, creds: Credentials): Unit = {
try {
val c = hiveConf(conf)
val principal = c.getTrimmed(METASTORE_PRINCIPAL)
val uris = c.getTrimmed(URIS)
require(StringUtils.isNotEmpty(principal), METASTORE_PRINCIPAL + " Undefined")
require(StringUtils.isNotEmpty(uris), URIS + " Undefined")
val currentUser = UserGroupInformation.getCurrentUser.getUserName
KyuubiHadoopUtil.doAsRealUser {
val hive = Hive.get(c, classOf[HiveConf])
val tokenString = hive.getDelegationToken(currentUser, principal)
val token = new Token[DelegationTokenIdentifier]
token.decodeFromUrlString(tokenString)
creds.addToken(new Text("hive.metastore.delegation.token"), token)
}
} catch {
case NonFatal(e) =>
error("Failed to get token from hive metatore service", e)
} finally {
Hive.closeCurrent()
}
}
override def tokensRequired(conf: SparkConf): Boolean = {
UserGroupInformation.isSecurityEnabled && StringUtils.isNotBlank(hiveConf(conf).get(URIS))
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi.session.security
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.SparkConf
private[security] trait TokenCollector {
def obtainTokens(conf: SparkConf, creds: Credentials): Unit
def tokensRequired(conf: SparkConf): Boolean = UserGroupInformation.isSecurityEnabled
}
private[session] object TokenCollector {
def obtainTokenIfRequired(conf: SparkConf, creds: Credentials): Unit = {
Seq(HiveTokenCollector, HDFSTokenCollector).foreach { co =>
if (co.tokensRequired(conf)) co.obtainTokens(conf, creds)
}
}
}

View File

@ -17,6 +17,7 @@
package yaooqinn.kyuubi.utils
import java.lang.reflect.UndeclaredThrowableException
import java.security.PrivilegedExceptionAction
import scala.collection.JavaConverters._
@ -61,8 +62,22 @@ private[kyuubi] object KyuubiHadoopUtil extends Logging {
}
def doAs[T](user: UserGroupInformation)(f: => T): T = {
user.doAs(new PrivilegedExceptionAction[T] {
override def run(): T = f
})
try {
user.doAs(new PrivilegedExceptionAction[T] {
override def run(): T = f
})
} catch {
case e: UndeclaredThrowableException => throw Option(e.getCause).getOrElse(e)
}
}
/**
* Run some code as the real logged in user (which may differ from the current user, for
* example, when using proxying).
*/
def doAsRealUser[T](f: => T): T = {
val currentUser = UserGroupInformation.getCurrentUser
val realUser = Option(currentUser.getRealUser).getOrElse(currentUser)
doAs(realUser)(f)
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi.utils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
object KyuubiHiveUtil {
private val HIVE_PREFIX = "hive."
private val METASTORE_PREFIX = "metastore."
val URIS: String = HIVE_PREFIX + METASTORE_PREFIX + "uris"
val METASTORE_PRINCIPAL: String = HIVE_PREFIX + METASTORE_PREFIX + "kerberos.principal"
def hiveConf(conf: SparkConf): Configuration = {
val hadoopConf = KyuubiSparkUtil.newConfiguration(conf)
new HiveConf(hadoopConf, classOf[HiveConf])
}
}

View File

@ -31,11 +31,11 @@ class ServiceExceptionSuite extends SparkFunSuite {
val e2 = new ServiceException(e1)
val tStatus1 = KyuubiSQLException.toTStatus(e2)
assert(tStatus1.isSetStatusCode)
assert(tStatus1.getErrorMessage === "")
assert(tStatus1.getErrorMessage === e1.toString)
assert(tStatus1.getSqlState === null)
assert(tStatus1.getErrorCode === 0)
assert(tStatus1.getInfoMessages === KyuubiSQLException.toString(e2).asJava)
assert(e2.getMessage.isEmpty)
assert(e2.getMessage === e1.toString)
assert(e2.getCause === e1)
val e3 = new ServiceException(msg)