[KYUUBI-208] add delegation tokens to hive session state's dfs client (#209)

* [KYUUBI-208] add delegation tokens to hive session state's dfs client

* add ut

* handle exception

* should wrap in do as

fix #208
This commit is contained in:
Kent Yao 2019-07-25 10:39:45 +08:00 committed by GitHub
parent 7729ff0ba5
commit 552bb100c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 68 additions and 11 deletions

View File

@ -34,19 +34,19 @@ import org.apache.spark.ui.KyuubiSessionTab
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
import yaooqinn.kyuubi.author.AuthzHelper
import yaooqinn.kyuubi.ui.{KyuubiServerListener, KyuubiServerMonitor}
import yaooqinn.kyuubi.utils.{KyuubiHadoopUtil, ReflectUtils}
import yaooqinn.kyuubi.utils.{KyuubiHadoopUtil, KyuubiHiveUtil, ReflectUtils}
class SparkSessionWithUGI(
user: UserGroupInformation,
conf: SparkConf,
cacheMgr: SparkSessionCacheManager) extends Logging {
import SparkSessionWithUGI._
import KyuubiHadoopUtil._
import SparkSessionWithUGI._
private var _sparkSession: SparkSession = _
private val userName: String = user.getShortUserName
private val promisedSparkContext = Promise[SparkContext]()
private var initialDatabase: Option[String] = None
private var initialDatabase: String = "use default"
private val startTime = System.currentTimeMillis()
private val timeout = KyuubiSparkUtil.timeStringAsMs(conf.get(BACKEND_SESSION_INIT_TIMEOUT))
@ -71,7 +71,7 @@ class SparkSessionWithUGI(
} else {
conf.set(SPARK_HADOOP_PREFIX + k, value)
}
case USE_DB => initialDatabase = Some("use " + value)
case USE_DB => initialDatabase = "use " + value
case _ =>
}
}
@ -94,7 +94,7 @@ class SparkSessionWithUGI(
} else {
_sparkSession.conf.set(SPARK_HADOOP_PREFIX + k, value)
}
case USE_DB => initialDatabase = Some("use " + value)
case USE_DB => initialDatabase = "use " + value
case _ =>
}
}
@ -185,9 +185,8 @@ class SparkSessionWithUGI(
try {
doAs(user) {
SparkSQLUtils.initializeMetaStoreClient(_sparkSession)
}
initialDatabase.foreach { db =>
doAs(user)(_sparkSession.sql(db))
_sparkSession.sql(initialDatabase)
KyuubiHiveUtil.addDelegationTokensToHiveState(user)
}
} catch {
case e: Exception =>

View File

@ -17,10 +17,15 @@
package yaooqinn.kyuubi.utils
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
object KyuubiHiveUtil {
import yaooqinn.kyuubi.Logging
object KyuubiHiveUtil extends Logging {
private val HIVE_PREFIX = "hive."
private val METASTORE_PREFIX = "metastore."
@ -33,4 +38,25 @@ object KyuubiHiveUtil {
new HiveConf(hadoopConf, classOf[HiveConf])
}
def addDelegationTokensToHiveState(ugi: UserGroupInformation): Unit = {
val state = SessionState.get
if (state != null) {
addDelegationTokensToHiveState(state, ugi)
}
}
def addDelegationTokensToHiveState(state: SessionState, ugi: UserGroupInformation): Unit = {
state.getHdfsEncryptionShim match {
case shim: org.apache.hadoop.hive.shims.Hadoop23Shims#HdfsEncryptionShim =>
try {
val hdfsAdmin = ReflectUtils.getFieldValue(shim, "hdfsAdmin")
val dfs = ReflectUtils.getFieldValue(hdfsAdmin, "dfs")
dfs.asInstanceOf[FileSystem].addDelegationTokens(ugi.getUserName, ugi.getCredentials)
} catch {
case e: Exception =>
error("Failed add delegation token to hive session state", e)
}
case _ =>
}
}
}

View File

@ -17,16 +17,22 @@
package yaooqinn.kyuubi.operation
import java.io.File
import java.io.{File, IOException}
import scala.util.Try
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hdfs.{HdfsConfiguration, MiniDFSCluster}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.internal.SQLConf
import org.mockito.Mockito.when
import yaooqinn.kyuubi.operation.statement.ExecuteStatementInClientMode
import yaooqinn.kyuubi.utils.ReflectUtils
import yaooqinn.kyuubi.utils.{KyuubiHiveUtil, ReflectUtils}
class ExecuteStatementInClientModeWithHDFSSuite extends ExecuteStatementInClientModeSuite {
val hdfsConf = new HdfsConfiguration
@ -71,4 +77,15 @@ class ExecuteStatementInClientModeWithHDFSSuite extends ExecuteStatementInClient
ReflectUtils.getFieldValue(plan3, "resources").asInstanceOf[Seq[FunctionResource]].isEmpty)
}
test("add delegation token with hive session state, hdfs") {
val hiveConf = new HiveConf(hdfsConf, classOf[HiveConf])
val state = new SessionState(hiveConf)
assert(Try {
KyuubiHiveUtil.addDelegationTokensToHiveState(state, UserGroupInformation.getCurrentUser)
}.isSuccess)
val mockuser = mock[UserGroupInformation]
when(mockuser.getUserName).thenThrow(classOf[IOException])
KyuubiHiveUtil.addDelegationTokensToHiveState(state, mockuser)
}
}

View File

@ -17,9 +17,14 @@
package yaooqinn.kyuubi.utils
import scala.util.Try
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SparkConf, SparkFunSuite}
class KyuubiHiveUtilSuite extends SparkFunSuite {
private val user = UserGroupInformation.getCurrentUser
test("hive conf") {
val uris = "thrift://yaooqinn.kyuubi"
@ -38,4 +43,14 @@ class KyuubiHiveUtilSuite extends SparkFunSuite {
}
test("add delegation tokens without hive session state ") {
assert(Try {KyuubiHiveUtil.addDelegationTokensToHiveState(user)}.isSuccess)
}
test("add delegation token with hive session state, local fs") {
val state = new SessionState(KyuubiHiveUtil.hiveConf(new SparkConf()))
assert(Try {KyuubiHiveUtil.addDelegationTokensToHiveState(state, user) }.isSuccess)
}
}