diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala index facd7d607..ff1e4c0fd 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala @@ -280,7 +280,7 @@ private[kyuubi] class KyuubiSession( } } - private[this] def closeTimedOutOperations(operations: Seq[KyuubiOperation]): Unit = { + private def closeTimedOutOperations(operations: Seq[KyuubiOperation]): Unit = { acquire(false) try { operations.foreach { op => diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/HDFSTokenCollector.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/HDFSTokenCollector.scala index f69fbb8e0..a92c52cea 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/HDFSTokenCollector.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/HDFSTokenCollector.scala @@ -27,14 +27,21 @@ import org.apache.spark.SparkConf import yaooqinn.kyuubi.Logging import yaooqinn.kyuubi.service.ServiceException +/** + * Token collector for secured HDFS FileSystems + */ private[security] object HDFSTokenCollector extends TokenCollector with Logging { private def hadoopFStoAccess(conf: SparkConf, hadoopConf: Configuration): Set[FileSystem] = { - conf.getOption(ACCESS_FSS) - .orElse(conf.getOption(ACCESS_NNS)) - .map(new Path(_).getFileSystem(hadoopConf)).toSet + - conf.getOption(STAGING_DIR) - .map(new Path(_).getFileSystem(hadoopConf)).getOrElse(FileSystem.get(hadoopConf)) + val fileSystems = conf.getOption(ACCESS_FSS) + .orElse(conf.getOption(ACCESS_NNS)) match { + case Some(nns) => nns.split(",").map(new Path(_).getFileSystem(hadoopConf)).toSet + case _ => Set.empty[FileSystem] + } + + fileSystems + + conf.getOption(STAGING_DIR).map(new Path(_).getFileSystem(hadoopConf)) + .getOrElse(FileSystem.get(hadoopConf)) } private def renewer(hadoopConf: Configuration): String = { diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/TokenCollector.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/TokenCollector.scala index 27d2cc8d0..67704968f 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/TokenCollector.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/security/TokenCollector.scala @@ -20,16 +20,32 @@ package yaooqinn.kyuubi.session.security import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.SparkConf +/** + * An interface for secured service token collectors + */ private[security] trait TokenCollector { + /** + * Obtain tokens from secured services, such as Hive Metastore Server. HDFS etc. + * @param conf a SparkConf + */ def obtainTokens(conf: SparkConf): Unit + /** + * Check whether a service need tokens to visit + * @param conf a SparkConf + * @return true if the service to visit requires tokens + */ def tokensRequired(conf: SparkConf): Boolean = UserGroupInformation.isSecurityEnabled } private[session] object TokenCollector { + /** + * Obtain tokens from all secured services if required. + * @param conf a SparkConf + */ def obtainTokenIfRequired(conf: SparkConf): Unit = { Seq(HiveTokenCollector, HDFSTokenCollector).foreach { co => if (co.tokensRequired(conf)) co.obtainTokens(conf) diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/security/TokenCollectorSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/security/TokenCollectorSuite.scala new file mode 100644 index 000000000..ca77d36b6 --- /dev/null +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/security/TokenCollectorSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.session.security + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.scalatest.Matchers + +import yaooqinn.kyuubi.SecuredFunSuite +import yaooqinn.kyuubi.service.ServiceException + +class TokenCollectorSuite extends SparkFunSuite with Matchers with SecuredFunSuite { + + test("token required") { + val tokenCollector = new TokenCollector { + override def obtainTokens(conf: SparkConf): Unit = {} + } + + val conf = new SparkConf() + assert(!tokenCollector.tokensRequired(conf)) + + tryWithSecurityEnabled { + assert(tokenCollector.tokensRequired(conf)) + + } + } + + test("obtain token if required") { + val conf = new SparkConf() + TokenCollector.obtainTokenIfRequired(conf) + + tryWithSecurityEnabled { + + intercept[ServiceException](TokenCollector.obtainTokenIfRequired(conf)) + conf.set("spark.hadoop.yarn.resourcemanager.principal", "yarn/_HOST@KENT.KYUUBI.COM") + TokenCollector.obtainTokenIfRequired(conf) + conf.set("spark.yarn.access.namenodes", "file:///test") + TokenCollector.obtainTokenIfRequired(conf) + conf.set("spark.yarn.access.hadoopFileSystems", "file:///test") + TokenCollector.obtainTokenIfRequired(conf) + } + } + +} diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala index bd67357b6..39467381d 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala @@ -77,11 +77,23 @@ class KyuubiHadoopUtilSuite extends SparkFunSuite with BeforeAndAfterEach { } KyuubiHadoopUtil.doAs(user1) { - testf(userName1) + assert(testf(userName1)) } KyuubiHadoopUtil.doAs(user2) { - testf(userName2) + assert(testf(userName2)) + } + + KyuubiHadoopUtil.doAs(user1) { + KyuubiHadoopUtil.doAsRealUser { + assert(testf(userName1)) + } + } + + KyuubiHadoopUtil.doAs(user2) { + KyuubiHadoopUtil.doAsRealUser { + assert(testf(userName1)) + } } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHiveUtilSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHiveUtilSuite.scala new file mode 100644 index 000000000..e6a7146c0 --- /dev/null +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHiveUtilSuite.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.utils + +import org.apache.spark.{SparkConf, SparkFunSuite} + +class KyuubiHiveUtilSuite extends SparkFunSuite { + + test("hive conf") { + val uris = "thrift://yaooqinn.kyuubi" + val conf = new SparkConf() + .set("spark.hadoop.hive.metastore.uris", uris) + val hiveConf = KyuubiHiveUtil.hiveConf(conf) + assert(hiveConf.get(KyuubiHiveUtil.URIS) === uris) + } + + test("testURIS") { + assert(KyuubiHiveUtil.URIS === "hive.metastore.uris") + } + + test("metastore principal") { + assert(KyuubiHiveUtil.METASTORE_PRINCIPAL === "hive.metastore.kerberos.principal") + + } + +}