diff --git a/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala b/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala index d3dd0c7fb..5677ec85f 100644 --- a/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala +++ b/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala @@ -71,6 +71,8 @@ object KyuubiSparkUtil extends Logging { val DEPRECATED_QUEUE = "mapred.job.queue.name" val HDFS_CLIENT_CACHE = SPARK_HADOOP_PREFIX + "fs.hdfs.impl.disable.cache" val HDFS_CLIENT_CACHE_DEFAULT = "true" + val FILE_CLIENT_CACHE = SPARK_HADOOP_PREFIX + "fs.file.impl.disable.cache" + val FILE_CLIENT_CACHE_DEFAULT = "true" // Runtime Spark Version val SPARK_VERSION = org.apache.spark.SPARK_VERSION diff --git a/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala b/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala index 02a52ee53..344a9693c 100644 --- a/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala +++ b/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala @@ -129,6 +129,8 @@ object KyuubiServer extends Logging { if (UserGroupInformation.isSecurityEnabled) { conf.setIfMissing(KyuubiSparkUtil.HDFS_CLIENT_CACHE, KyuubiSparkUtil.HDFS_CLIENT_CACHE_DEFAULT) + conf.setIfMissing(KyuubiSparkUtil.FILE_CLIENT_CACHE, + KyuubiSparkUtil.FILE_CLIENT_CACHE_DEFAULT) } } diff --git a/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala b/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala index 51836c42e..878a0da2e 100644 --- a/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala +++ b/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala @@ -17,7 +17,11 @@ package yaooqinn.kyuubi.server +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil import yaooqinn.kyuubi.KyuubiServerException import yaooqinn.kyuubi.utils.ReflectUtils @@ -38,6 +42,7 @@ class KyuubiServerSuite extends SparkFunSuite { assert(conf.get(KyuubiSparkUtil.SPARK_UI_PORT) === KyuubiSparkUtil.SPARK_UI_PORT_DEFAULT) assert(conf.get(KyuubiSparkUtil.SPARK_LOCAL_DIR) .startsWith(System.getProperty("java.io.tmpdir"))) + assert(!conf.contains(KyuubiSparkUtil.HDFS_CLIENT_CACHE)) val foo = "spark.foo" val e = intercept[NoSuchElementException](conf.get(foo)) assert(e.getMessage === foo) @@ -64,4 +69,31 @@ class KyuubiServerSuite extends SparkFunSuite { assert(KyuubiSparkUtil.SPARK_VERSION === oldVersion) } + + test("init KyuubiServer") { + val conf = new SparkConf(true) + KyuubiServer.setupCommonConfig(conf) + val server = new KyuubiServer() + server.init(conf) + assert(server.feService !== null) + assert(server.beService !== null) + assert(server.beService.getSessionManager !== null) + assert(server.beService.getSessionManager.getOperationMgr !== null) + server.start() + assert(ReflectUtils.getFieldValue(server, "started").asInstanceOf[AtomicBoolean].get) + server.stop() + assert(!ReflectUtils.getFieldValue(server, "started").asInstanceOf[AtomicBoolean].get) + } + + test("disable fs caches for secured cluster") { + assert(!UserGroupInformation.isSecurityEnabled) + val conf = new SparkConf(true) + conf.set("spark.hadoop.hadoop.security.authentication", "KERBEROS") + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + UserGroupInformation.setConfiguration(hadoopConf) + KyuubiServer.setupCommonConfig(conf) + assert(conf.contains(KyuubiSparkUtil.HDFS_CLIENT_CACHE)) + assert(conf.get(KyuubiSparkUtil.HDFS_CLIENT_CACHE) === "true") + assert(conf.get(KyuubiSparkUtil.HDFS_CLIENT_CACHE) === "true") + } }