diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala index 1aaa91984..d110b4ff4 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala @@ -39,26 +39,30 @@ import yaooqinn.kyuubi.Logging object KyuubiSparkUtil extends Logging { // PREFIXES val SPARK_PREFIX = "spark." - private[this] val YARN_PREFIX = "yarn." - private[this] val HADOOP_PRFIX = "hadoop." + private val YARN_PREFIX = "yarn." + private val HADOOP_PRFIX = "hadoop." val SPARK_HADOOP_PREFIX: String = SPARK_PREFIX + HADOOP_PRFIX - private[this] val DRIVER_PREFIX = "driver." - private[this] val AM_PREFIX = SPARK_PREFIX + YARN_PREFIX + "am." + private val SPARK_YARN_PREFIX: String = SPARK_PREFIX + YARN_PREFIX + private val DRIVER_PREFIX = "driver." + private val AM_PREFIX = SPARK_PREFIX + YARN_PREFIX + "am." - private[this] val UI_PREFIX = "ui." - private[this] val SQL_PREFIX = "sql." - private[this] val HIVE_PREFIX = "hive." - private[this] val METASTORE_PREFIX = "metastore." + private val UI_PREFIX = "ui." + private val SQL_PREFIX = "sql." + private val HIVE_PREFIX = "hive." + private val METASTORE_PREFIX = "metastore." // ENVIRONMENTS val SPARK_HOME: String = System.getenv("SPARK_HOME") val SPARK_JARS_DIR: String = SPARK_HOME + File.separator + "jars" // YARN - val KEYTAB: String = SPARK_PREFIX + YARN_PREFIX + "keytab" - val PRINCIPAL: String = SPARK_PREFIX + YARN_PREFIX + "principal" - val MAX_APP_ATTEMPTS: String = SPARK_PREFIX + YARN_PREFIX + "maxAppAttempts" - val SPARK_YARN_JARS: String = SPARK_PREFIX + YARN_PREFIX + "jars" + val KEYTAB: String = SPARK_YARN_PREFIX + "keytab" + val PRINCIPAL: String = SPARK_YARN_PREFIX + "principal" + val MAX_APP_ATTEMPTS: String = SPARK_YARN_PREFIX + "maxAppAttempts" + val SPARK_YARN_JARS: String = SPARK_YARN_PREFIX + "jars" + val ACCESS_NNS: String = SPARK_YARN_PREFIX + "access.namenodes" + val ACCESS_FSS: String = SPARK_YARN_PREFIX + "access.hadoopFileSystems" + val STAGING_DIR: String = SPARK_YARN_PREFIX + "stagingDir" // DRIVER val DRIVER_BIND_ADDR: String = SPARK_PREFIX + DRIVER_PREFIX + "bindAddress" diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/service/ServiceException.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/service/ServiceException.scala index 0428f3032..f810bc095 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/service/ServiceException.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/service/ServiceException.scala @@ -23,7 +23,7 @@ package yaooqinn.kyuubi.service */ class ServiceException(message: String, cause: Throwable) extends RuntimeException(message, cause) { - def this(cause: Throwable) = this("", cause) + def this(cause: Throwable) = this(cause.toString, cause) def this(message: String) = this(message, null) } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala index e71ba5548..738637997 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.{HashSet => MHSet} import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext} import org.apache.spark.sql.SparkSession @@ -34,6 +34,7 @@ import yaooqinn.kyuubi.auth.KyuubiAuthFactory import yaooqinn.kyuubi.cli._ import yaooqinn.kyuubi.operation.{KyuubiOperation, OperationHandle, OperationManager} import yaooqinn.kyuubi.schema.RowSet +import yaooqinn.kyuubi.session.security.TokenCollector import yaooqinn.kyuubi.spark.SparkSessionWithUGI /** @@ -73,7 +74,9 @@ private[kyuubi] class KyuubiSession( // Do not check keytab file existing as spark-submit has it done currentUser.reloginFromKeytab() } - UserGroupInformation.createProxyUser(username, currentUser) + val user = UserGroupInformation.createProxyUser(username, currentUser) + TokenCollector.obtainTokenIfRequired(conf, user.getCredentials) + user } else { UserGroupInformation.createRemoteUser(username) } @@ -134,7 +137,12 @@ private[kyuubi] class KyuubiSession( def sparkSession: SparkSession = this.sparkSessionWithUGI.sparkSession - def ugi: UserGroupInformation = this.sessionUGI + def ugi: UserGroupInformation = { + val creds = new Credentials + TokenCollector.obtainTokenIfRequired(conf, creds) + sessionUGI.addCredentials(creds) + sessionUGI + } @throws[KyuubiSQLException] def open(sessionConf: Map[String, String]): Unit = { diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/HDFSTokenCollector.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/HDFSTokenCollector.scala new file mode 100644 index 000000000..6b96dca90 --- /dev/null +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/HDFSTokenCollector.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.session.security + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.mapred.Master +import org.apache.hadoop.security.Credentials +import org.apache.spark.KyuubiSparkUtil._ +import org.apache.spark.SparkConf + +import yaooqinn.kyuubi.Logging +import yaooqinn.kyuubi.service.ServiceException + +private[security] object HDFSTokenCollector extends TokenCollector with Logging { + + private def hadoopFStoAccess(conf: SparkConf, hadoopConf: Configuration): Set[FileSystem] = { + conf.getOption(ACCESS_FSS) + .orElse(conf.getOption(ACCESS_NNS)) + .map(new Path(_).getFileSystem(hadoopConf)).toSet + + conf.getOption(STAGING_DIR) + .map(new Path(_).getFileSystem(hadoopConf)).getOrElse(FileSystem.get(hadoopConf)) + } + + private def renewer(hadoopConf: Configuration): String = { + val tokenRenewer = Master.getMasterPrincipal(hadoopConf) + debug("Delegation token renewer is: " + tokenRenewer) + + if (tokenRenewer == null || tokenRenewer.length() == 0) { + val errorMessage = "Can't get Master Kerberos principal for use as renewer." + error(errorMessage) + throw new ServiceException(errorMessage) + } + tokenRenewer + } + + override def obtainTokens(conf: SparkConf, creds: Credentials): Unit = { + val hadoopConf = newConfiguration(conf) + val tokenRenewer = renewer(hadoopConf) + hadoopFStoAccess(conf, hadoopConf).foreach { fs => + fs.addDelegationTokens(tokenRenewer, creds) + } + } +} diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/HiveTokenCollector.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/HiveTokenCollector.scala new file mode 100644 index 000000000..99765fc0b --- /dev/null +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/HiveTokenCollector.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.session.security + +import scala.util.control.NonFatal + +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token +import org.apache.spark.SparkConf + +import yaooqinn.kyuubi.Logging +import yaooqinn.kyuubi.utils.KyuubiHadoopUtil +import yaooqinn.kyuubi.utils.KyuubiHiveUtil._ + +private[security] object HiveTokenCollector extends TokenCollector with Logging { + + override def obtainTokens(conf: SparkConf, creds: Credentials): Unit = { + try { + val c = hiveConf(conf) + val principal = c.getTrimmed(METASTORE_PRINCIPAL) + val uris = c.getTrimmed(URIS) + require(StringUtils.isNotEmpty(principal), METASTORE_PRINCIPAL + " Undefined") + require(StringUtils.isNotEmpty(uris), URIS + " Undefined") + val currentUser = UserGroupInformation.getCurrentUser.getUserName + KyuubiHadoopUtil.doAsRealUser { + val hive = Hive.get(c, classOf[HiveConf]) + val tokenString = hive.getDelegationToken(currentUser, principal) + val token = new Token[DelegationTokenIdentifier] + token.decodeFromUrlString(tokenString) + creds.addToken(new Text("hive.metastore.delegation.token"), token) + } + } catch { + case NonFatal(e) => + error("Failed to get token from hive metatore service", e) + } finally { + Hive.closeCurrent() + } + } + + override def tokensRequired(conf: SparkConf): Boolean = { + UserGroupInformation.isSecurityEnabled && StringUtils.isNotBlank(hiveConf(conf).get(URIS)) + } +} diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/TokenCollector.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/TokenCollector.scala new file mode 100644 index 000000000..98b8a187c --- /dev/null +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/TokenCollector.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.session.security + +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.spark.SparkConf + +private[security] trait TokenCollector { + + def obtainTokens(conf: SparkConf, creds: Credentials): Unit + + def tokensRequired(conf: SparkConf): Boolean = UserGroupInformation.isSecurityEnabled + +} + +private[session] object TokenCollector { + + def obtainTokenIfRequired(conf: SparkConf, creds: Credentials): Unit = { + Seq(HiveTokenCollector, HDFSTokenCollector).foreach { co => + if (co.tokensRequired(conf)) co.obtainTokens(conf, creds) + } + } +} diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala index cd363a1e2..a0cf8e697 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala @@ -17,6 +17,7 @@ package yaooqinn.kyuubi.utils +import java.lang.reflect.UndeclaredThrowableException import java.security.PrivilegedExceptionAction import scala.collection.JavaConverters._ @@ -61,8 +62,22 @@ private[kyuubi] object KyuubiHadoopUtil extends Logging { } def doAs[T](user: UserGroupInformation)(f: => T): T = { - user.doAs(new PrivilegedExceptionAction[T] { - override def run(): T = f - }) + try { + user.doAs(new PrivilegedExceptionAction[T] { + override def run(): T = f + }) + } catch { + case e: UndeclaredThrowableException => throw Option(e.getCause).getOrElse(e) + } + } + + /** + * Run some code as the real logged in user (which may differ from the current user, for + * example, when using proxying). + */ + def doAsRealUser[T](f: => T): T = { + val currentUser = UserGroupInformation.getCurrentUser + val realUser = Option(currentUser.getRealUser).getOrElse(currentUser) + doAs(realUser)(f) } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHiveUtil.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHiveUtil.scala new file mode 100644 index 000000000..ec3d10bff --- /dev/null +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHiveUtil.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.utils + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.spark.{KyuubiSparkUtil, SparkConf} + +object KyuubiHiveUtil { + + private val HIVE_PREFIX = "hive." + private val METASTORE_PREFIX = "metastore." + + val URIS: String = HIVE_PREFIX + METASTORE_PREFIX + "uris" + val METASTORE_PRINCIPAL: String = HIVE_PREFIX + METASTORE_PREFIX + "kerberos.principal" + + def hiveConf(conf: SparkConf): Configuration = { + val hadoopConf = KyuubiSparkUtil.newConfiguration(conf) + new HiveConf(hadoopConf, classOf[HiveConf]) + } + +} diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/service/ServiceExceptionSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/service/ServiceExceptionSuite.scala index 942d1b1ce..05a01abd5 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/service/ServiceExceptionSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/service/ServiceExceptionSuite.scala @@ -31,11 +31,11 @@ class ServiceExceptionSuite extends SparkFunSuite { val e2 = new ServiceException(e1) val tStatus1 = KyuubiSQLException.toTStatus(e2) assert(tStatus1.isSetStatusCode) - assert(tStatus1.getErrorMessage === "") + assert(tStatus1.getErrorMessage === e1.toString) assert(tStatus1.getSqlState === null) assert(tStatus1.getErrorCode === 0) assert(tStatus1.getInfoMessages === KyuubiSQLException.toString(e2).asJava) - assert(e2.getMessage.isEmpty) + assert(e2.getMessage === e1.toString) assert(e2.getCause === e1) val e3 = new ServiceException(msg)