diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala index afd36381b..89e74e504 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala @@ -34,19 +34,19 @@ import org.apache.spark.ui.KyuubiSessionTab import yaooqinn.kyuubi.{KyuubiSQLException, Logging} import yaooqinn.kyuubi.author.AuthzHelper import yaooqinn.kyuubi.ui.{KyuubiServerListener, KyuubiServerMonitor} -import yaooqinn.kyuubi.utils.{KyuubiHadoopUtil, ReflectUtils} +import yaooqinn.kyuubi.utils.{KyuubiHadoopUtil, KyuubiHiveUtil, ReflectUtils} class SparkSessionWithUGI( user: UserGroupInformation, conf: SparkConf, cacheMgr: SparkSessionCacheManager) extends Logging { - import SparkSessionWithUGI._ import KyuubiHadoopUtil._ + import SparkSessionWithUGI._ private var _sparkSession: SparkSession = _ private val userName: String = user.getShortUserName private val promisedSparkContext = Promise[SparkContext]() - private var initialDatabase: Option[String] = None + private var initialDatabase: String = "use default" private val startTime = System.currentTimeMillis() private val timeout = KyuubiSparkUtil.timeStringAsMs(conf.get(BACKEND_SESSION_INIT_TIMEOUT)) @@ -71,7 +71,7 @@ class SparkSessionWithUGI( } else { conf.set(SPARK_HADOOP_PREFIX + k, value) } - case USE_DB => initialDatabase = Some("use " + value) + case USE_DB => initialDatabase = "use " + value case _ => } } @@ -94,7 +94,7 @@ class SparkSessionWithUGI( } else { _sparkSession.conf.set(SPARK_HADOOP_PREFIX + k, value) } - case USE_DB => initialDatabase = Some("use " + value) + case USE_DB => initialDatabase = "use " + value case _ => } } @@ -185,9 +185,8 @@ class SparkSessionWithUGI( try { doAs(user) { SparkSQLUtils.initializeMetaStoreClient(_sparkSession) - } - initialDatabase.foreach { db => - doAs(user)(_sparkSession.sql(db)) + _sparkSession.sql(initialDatabase) + KyuubiHiveUtil.addDelegationTokensToHiveState(user) } } catch { case e: Exception => diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHiveUtil.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHiveUtil.scala index 2d9265ea3..a5daeff48 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHiveUtil.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHiveUtil.scala @@ -17,10 +17,15 @@ package yaooqinn.kyuubi.utils +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{KyuubiSparkUtil, SparkConf} -object KyuubiHiveUtil { +import yaooqinn.kyuubi.Logging + +object KyuubiHiveUtil extends Logging { private val HIVE_PREFIX = "hive." private val METASTORE_PREFIX = "metastore." @@ -33,4 +38,25 @@ object KyuubiHiveUtil { new HiveConf(hadoopConf, classOf[HiveConf]) } + def addDelegationTokensToHiveState(ugi: UserGroupInformation): Unit = { + val state = SessionState.get + if (state != null) { + addDelegationTokensToHiveState(state, ugi) + } + } + + def addDelegationTokensToHiveState(state: SessionState, ugi: UserGroupInformation): Unit = { + state.getHdfsEncryptionShim match { + case shim: org.apache.hadoop.hive.shims.Hadoop23Shims#HdfsEncryptionShim => + try { + val hdfsAdmin = ReflectUtils.getFieldValue(shim, "hdfsAdmin") + val dfs = ReflectUtils.getFieldValue(hdfsAdmin, "dfs") + dfs.asInstanceOf[FileSystem].addDelegationTokens(ugi.getUserName, ugi.getCredentials) + } catch { + case e: Exception => + error("Failed add delegation token to hive session state", e) + } + case _ => + } + } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/ExecuteStatementInClientModeWithHDFSSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/ExecuteStatementInClientModeWithHDFSSuite.scala index e3c74b94c..3356e46c2 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/ExecuteStatementInClientModeWithHDFSSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/ExecuteStatementInClientModeWithHDFSSuite.scala @@ -17,16 +17,22 @@ package yaooqinn.kyuubi.operation -import java.io.File +import java.io.{File, IOException} + +import scala.util.Try import org.apache.hadoop.fs.Path import org.apache.hadoop.hdfs.{HdfsConfiguration, MiniDFSCluster} +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.sql.catalyst.catalog.FunctionResource import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.internal.SQLConf +import org.mockito.Mockito.when import yaooqinn.kyuubi.operation.statement.ExecuteStatementInClientMode -import yaooqinn.kyuubi.utils.ReflectUtils +import yaooqinn.kyuubi.utils.{KyuubiHiveUtil, ReflectUtils} class ExecuteStatementInClientModeWithHDFSSuite extends ExecuteStatementInClientModeSuite { val hdfsConf = new HdfsConfiguration @@ -71,4 +77,15 @@ class ExecuteStatementInClientModeWithHDFSSuite extends ExecuteStatementInClient ReflectUtils.getFieldValue(plan3, "resources").asInstanceOf[Seq[FunctionResource]].isEmpty) } + test("add delegation token with hive session state, hdfs") { + val hiveConf = new HiveConf(hdfsConf, classOf[HiveConf]) + val state = new SessionState(hiveConf) + assert(Try { + KyuubiHiveUtil.addDelegationTokensToHiveState(state, UserGroupInformation.getCurrentUser) + }.isSuccess) + + val mockuser = mock[UserGroupInformation] + when(mockuser.getUserName).thenThrow(classOf[IOException]) + KyuubiHiveUtil.addDelegationTokensToHiveState(state, mockuser) + } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHiveUtilSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHiveUtilSuite.scala index e6a7146c0..1895c6365 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHiveUtilSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHiveUtilSuite.scala @@ -17,9 +17,14 @@ package yaooqinn.kyuubi.utils +import scala.util.Try + +import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{SparkConf, SparkFunSuite} class KyuubiHiveUtilSuite extends SparkFunSuite { + private val user = UserGroupInformation.getCurrentUser test("hive conf") { val uris = "thrift://yaooqinn.kyuubi" @@ -38,4 +43,14 @@ class KyuubiHiveUtilSuite extends SparkFunSuite { } + test("add delegation tokens without hive session state ") { + assert(Try {KyuubiHiveUtil.addDelegationTokensToHiveState(user)}.isSuccess) + + } + + test("add delegation token with hive session state, local fs") { + val state = new SessionState(KyuubiHiveUtil.hiveConf(new SparkConf())) + assert(Try {KyuubiHiveUtil.addDelegationTokensToHiveState(state, user) }.isSuccess) + } + }