This commit is contained in:
Kent Yao 2018-05-30 18:37:40 +08:00
parent 669b9767e7
commit fef50782d2
3 changed files with 36 additions and 0 deletions

View File

@ -71,6 +71,8 @@ object KyuubiSparkUtil extends Logging {
val DEPRECATED_QUEUE = "mapred.job.queue.name"
val HDFS_CLIENT_CACHE = SPARK_HADOOP_PREFIX + "fs.hdfs.impl.disable.cache"
val HDFS_CLIENT_CACHE_DEFAULT = "true"
val FILE_CLIENT_CACHE = SPARK_HADOOP_PREFIX + "fs.file.impl.disable.cache"
val FILE_CLIENT_CACHE_DEFAULT = "true"
// Runtime Spark Version
val SPARK_VERSION = org.apache.spark.SPARK_VERSION

View File

@ -129,6 +129,8 @@ object KyuubiServer extends Logging {
if (UserGroupInformation.isSecurityEnabled) {
conf.setIfMissing(KyuubiSparkUtil.HDFS_CLIENT_CACHE,
KyuubiSparkUtil.HDFS_CLIENT_CACHE_DEFAULT)
conf.setIfMissing(KyuubiSparkUtil.FILE_CLIENT_CACHE,
KyuubiSparkUtil.FILE_CLIENT_CACHE_DEFAULT)
}
}

View File

@ -17,7 +17,11 @@
package yaooqinn.kyuubi.server
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import yaooqinn.kyuubi.KyuubiServerException
import yaooqinn.kyuubi.utils.ReflectUtils
@ -38,6 +42,7 @@ class KyuubiServerSuite extends SparkFunSuite {
assert(conf.get(KyuubiSparkUtil.SPARK_UI_PORT) === KyuubiSparkUtil.SPARK_UI_PORT_DEFAULT)
assert(conf.get(KyuubiSparkUtil.SPARK_LOCAL_DIR)
.startsWith(System.getProperty("java.io.tmpdir")))
assert(!conf.contains(KyuubiSparkUtil.HDFS_CLIENT_CACHE))
val foo = "spark.foo"
val e = intercept[NoSuchElementException](conf.get(foo))
assert(e.getMessage === foo)
@ -64,4 +69,31 @@ class KyuubiServerSuite extends SparkFunSuite {
assert(KyuubiSparkUtil.SPARK_VERSION === oldVersion)
}
test("init KyuubiServer") {
val conf = new SparkConf(true)
KyuubiServer.setupCommonConfig(conf)
val server = new KyuubiServer()
server.init(conf)
assert(server.feService !== null)
assert(server.beService !== null)
assert(server.beService.getSessionManager !== null)
assert(server.beService.getSessionManager.getOperationMgr !== null)
server.start()
assert(ReflectUtils.getFieldValue(server, "started").asInstanceOf[AtomicBoolean].get)
server.stop()
assert(!ReflectUtils.getFieldValue(server, "started").asInstanceOf[AtomicBoolean].get)
}
test("disable fs caches for secured cluster") {
assert(!UserGroupInformation.isSecurityEnabled)
val conf = new SparkConf(true)
conf.set("spark.hadoop.hadoop.security.authentication", "KERBEROS")
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
UserGroupInformation.setConfiguration(hadoopConf)
KyuubiServer.setupCommonConfig(conf)
assert(conf.contains(KyuubiSparkUtil.HDFS_CLIENT_CACHE))
assert(conf.get(KyuubiSparkUtil.HDFS_CLIENT_CACHE) === "true")
assert(conf.get(KyuubiSparkUtil.HDFS_CLIENT_CACHE) === "true")
}
}