hdfs no host
This commit is contained in:
parent
bea9ebd0e7
commit
3779e55e65
@ -280,7 +280,7 @@ private[kyuubi] class KyuubiSession(
|
||||
}
|
||||
}
|
||||
|
||||
private[this] def closeTimedOutOperations(operations: Seq[KyuubiOperation]): Unit = {
|
||||
private def closeTimedOutOperations(operations: Seq[KyuubiOperation]): Unit = {
|
||||
acquire(false)
|
||||
try {
|
||||
operations.foreach { op =>
|
||||
|
||||
@ -27,14 +27,21 @@ import org.apache.spark.SparkConf
|
||||
import yaooqinn.kyuubi.Logging
|
||||
import yaooqinn.kyuubi.service.ServiceException
|
||||
|
||||
/**
|
||||
* Token collector for secured HDFS FileSystems
|
||||
*/
|
||||
private[security] object HDFSTokenCollector extends TokenCollector with Logging {
|
||||
|
||||
private def hadoopFStoAccess(conf: SparkConf, hadoopConf: Configuration): Set[FileSystem] = {
|
||||
conf.getOption(ACCESS_FSS)
|
||||
.orElse(conf.getOption(ACCESS_NNS))
|
||||
.map(new Path(_).getFileSystem(hadoopConf)).toSet +
|
||||
conf.getOption(STAGING_DIR)
|
||||
.map(new Path(_).getFileSystem(hadoopConf)).getOrElse(FileSystem.get(hadoopConf))
|
||||
val fileSystems = conf.getOption(ACCESS_FSS)
|
||||
.orElse(conf.getOption(ACCESS_NNS)) match {
|
||||
case Some(nns) => nns.split(",").map(new Path(_).getFileSystem(hadoopConf)).toSet
|
||||
case _ => Set.empty[FileSystem]
|
||||
}
|
||||
|
||||
fileSystems +
|
||||
conf.getOption(STAGING_DIR).map(new Path(_).getFileSystem(hadoopConf))
|
||||
.getOrElse(FileSystem.get(hadoopConf))
|
||||
}
|
||||
|
||||
private def renewer(hadoopConf: Configuration): String = {
|
||||
|
||||
@ -20,16 +20,32 @@ package yaooqinn.kyuubi.session.security
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.spark.SparkConf
|
||||
|
||||
/**
|
||||
* An interface for secured service token collectors
|
||||
*/
|
||||
private[security] trait TokenCollector {
|
||||
|
||||
/**
|
||||
* Obtain tokens from secured services, such as Hive Metastore Server. HDFS etc.
|
||||
* @param conf a SparkConf
|
||||
*/
|
||||
def obtainTokens(conf: SparkConf): Unit
|
||||
|
||||
/**
|
||||
* Check whether a service need tokens to visit
|
||||
* @param conf a SparkConf
|
||||
* @return true if the service to visit requires tokens
|
||||
*/
|
||||
def tokensRequired(conf: SparkConf): Boolean = UserGroupInformation.isSecurityEnabled
|
||||
|
||||
}
|
||||
|
||||
private[session] object TokenCollector {
|
||||
|
||||
/**
|
||||
* Obtain tokens from all secured services if required.
|
||||
* @param conf a SparkConf
|
||||
*/
|
||||
def obtainTokenIfRequired(conf: SparkConf): Unit = {
|
||||
Seq(HiveTokenCollector, HDFSTokenCollector).foreach { co =>
|
||||
if (co.tokensRequired(conf)) co.obtainTokens(conf)
|
||||
|
||||
@ -0,0 +1,58 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package yaooqinn.kyuubi.session.security
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkFunSuite}
|
||||
import org.scalatest.Matchers
|
||||
|
||||
import yaooqinn.kyuubi.SecuredFunSuite
|
||||
import yaooqinn.kyuubi.service.ServiceException
|
||||
|
||||
class TokenCollectorSuite extends SparkFunSuite with Matchers with SecuredFunSuite {
|
||||
|
||||
test("token required") {
|
||||
val tokenCollector = new TokenCollector {
|
||||
override def obtainTokens(conf: SparkConf): Unit = {}
|
||||
}
|
||||
|
||||
val conf = new SparkConf()
|
||||
assert(!tokenCollector.tokensRequired(conf))
|
||||
|
||||
tryWithSecurityEnabled {
|
||||
assert(tokenCollector.tokensRequired(conf))
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
test("obtain token if required") {
|
||||
val conf = new SparkConf()
|
||||
TokenCollector.obtainTokenIfRequired(conf)
|
||||
|
||||
tryWithSecurityEnabled {
|
||||
|
||||
intercept[ServiceException](TokenCollector.obtainTokenIfRequired(conf))
|
||||
conf.set("spark.hadoop.yarn.resourcemanager.principal", "yarn/_HOST@KENT.KYUUBI.COM")
|
||||
TokenCollector.obtainTokenIfRequired(conf)
|
||||
conf.set("spark.yarn.access.namenodes", "file:///test")
|
||||
TokenCollector.obtainTokenIfRequired(conf)
|
||||
conf.set("spark.yarn.access.hadoopFileSystems", "file:///test")
|
||||
TokenCollector.obtainTokenIfRequired(conf)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -77,11 +77,23 @@ class KyuubiHadoopUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
|
||||
}
|
||||
|
||||
KyuubiHadoopUtil.doAs(user1) {
|
||||
testf(userName1)
|
||||
assert(testf(userName1))
|
||||
}
|
||||
|
||||
KyuubiHadoopUtil.doAs(user2) {
|
||||
testf(userName2)
|
||||
assert(testf(userName2))
|
||||
}
|
||||
|
||||
KyuubiHadoopUtil.doAs(user1) {
|
||||
KyuubiHadoopUtil.doAsRealUser {
|
||||
assert(testf(userName1))
|
||||
}
|
||||
}
|
||||
|
||||
KyuubiHadoopUtil.doAs(user2) {
|
||||
KyuubiHadoopUtil.doAsRealUser {
|
||||
assert(testf(userName1))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,41 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package yaooqinn.kyuubi.utils
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkFunSuite}
|
||||
|
||||
class KyuubiHiveUtilSuite extends SparkFunSuite {
|
||||
|
||||
test("hive conf") {
|
||||
val uris = "thrift://yaooqinn.kyuubi"
|
||||
val conf = new SparkConf()
|
||||
.set("spark.hadoop.hive.metastore.uris", uris)
|
||||
val hiveConf = KyuubiHiveUtil.hiveConf(conf)
|
||||
assert(hiveConf.get(KyuubiHiveUtil.URIS) === uris)
|
||||
}
|
||||
|
||||
test("testURIS") {
|
||||
assert(KyuubiHiveUtil.URIS === "hive.metastore.uris")
|
||||
}
|
||||
|
||||
test("metastore principal") {
|
||||
assert(KyuubiHiveUtil.METASTORE_PRINCIPAL === "hive.metastore.kerberos.principal")
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user