diff --git a/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala b/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala index 861f879ed..5677ec85f 100644 --- a/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala +++ b/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala @@ -69,6 +69,10 @@ object KyuubiSparkUtil extends Logging { val USE_DB: Regex = """use:([^=]+)""".r val QUEUE = SPARK_PREFIX + YARN_PREFIX + "queue" val DEPRECATED_QUEUE = "mapred.job.queue.name" + val HDFS_CLIENT_CACHE = SPARK_HADOOP_PREFIX + "fs.hdfs.impl.disable.cache" + val HDFS_CLIENT_CACHE_DEFAULT = "true" + val FILE_CLIENT_CACHE = SPARK_HADOOP_PREFIX + "fs.file.impl.disable.cache" + val FILE_CLIENT_CACHE_DEFAULT = "true" // Runtime Spark Version val SPARK_VERSION = org.apache.spark.SPARK_VERSION diff --git a/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala b/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala index 6b4bd5b71..344a9693c 100644 --- a/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala +++ b/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala @@ -125,6 +125,13 @@ object KyuubiServer extends Logging { conf.setIfMissing( KyuubiSparkUtil.SPARK_LOCAL_DIR, conf.get(KyuubiConf.BACKEND_SESSION_LOCAL_DIR.key)) + + if (UserGroupInformation.isSecurityEnabled) { + conf.setIfMissing(KyuubiSparkUtil.HDFS_CLIENT_CACHE, + KyuubiSparkUtil.HDFS_CLIENT_CACHE_DEFAULT) + conf.setIfMissing(KyuubiSparkUtil.FILE_CLIENT_CACHE, + KyuubiSparkUtil.FILE_CLIENT_CACHE_DEFAULT) + } } private[kyuubi] def validate(): Unit = { diff --git a/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala b/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala index 364c2fab2..6028c8b33 100644 --- a/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala +++ b/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala @@ -191,4 +191,11 @@ class KyuubiSparkUtilSuite extends SparkFunSuite with Logging { KyuubiSparkUtil.addShutdownHook(f) assert(y === 0) } + + test("testHDFS_CLIENT_CACHE") { + assert(KyuubiSparkUtil.HDFS_CLIENT_CACHE === "spark.hadoop.fs.hdfs.impl.disable.cache") + assert(KyuubiSparkUtil.HDFS_CLIENT_CACHE_DEFAULT.toBoolean) + assert(KyuubiSparkUtil.FILE_CLIENT_CACHE === "spark.hadoop.fs.file.impl.disable.cache") + assert(KyuubiSparkUtil.FILE_CLIENT_CACHE_DEFAULT.toBoolean) + } } diff --git a/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala b/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala index 51836c42e..a606039eb 100644 --- a/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala +++ b/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala @@ -17,7 +17,13 @@ package yaooqinn.kyuubi.server +import java.io.{File, IOException} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.hadoop.minikdc.MiniKdc +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil import yaooqinn.kyuubi.KyuubiServerException import yaooqinn.kyuubi.utils.ReflectUtils @@ -38,6 +44,7 @@ class KyuubiServerSuite extends SparkFunSuite { assert(conf.get(KyuubiSparkUtil.SPARK_UI_PORT) === KyuubiSparkUtil.SPARK_UI_PORT_DEFAULT) assert(conf.get(KyuubiSparkUtil.SPARK_LOCAL_DIR) .startsWith(System.getProperty("java.io.tmpdir"))) + assert(!conf.contains(KyuubiSparkUtil.HDFS_CLIENT_CACHE)) val foo = "spark.foo" val e = intercept[NoSuchElementException](conf.get(foo)) assert(e.getMessage === foo) @@ -64,4 +71,54 @@ class KyuubiServerSuite extends SparkFunSuite { assert(KyuubiSparkUtil.SPARK_VERSION === oldVersion) } + + test("init KyuubiServer") { + val conf = new SparkConf(true) + KyuubiServer.setupCommonConfig(conf) + val server = new KyuubiServer() + server.init(conf) + assert(server.feService !== null) + assert(server.beService !== null) + assert(server.beService.getSessionManager !== null) + assert(server.beService.getSessionManager.getOperationMgr !== null) + server.start() + assert(ReflectUtils.getFieldValue(server, "started").asInstanceOf[AtomicBoolean].get) + server.stop() + assert(!ReflectUtils.getFieldValue(server, "started").asInstanceOf[AtomicBoolean].get) + } + + test("disable fs caches for secured cluster") { + + var kdc: MiniKdc = null + val baseDir = KyuubiSparkUtil.createTempDir(namePrefix = "kyuubi-kdc") + try { + val kdcConf = MiniKdc.createConf() + kdcConf.setProperty(MiniKdc.INSTANCE, "KyuubiKrbServer") + kdcConf.setProperty(MiniKdc.ORG_NAME, "KYUUBI") + kdcConf.setProperty(MiniKdc.ORG_DOMAIN, "COM") + + if (kdc == null) { + kdc = new MiniKdc(kdcConf, baseDir) + kdc.start() + } + } catch { + case e: IOException => + throw new AssertionError("unable to create temporary directory: " + e.getMessage) + } + + assert(!UserGroupInformation.isSecurityEnabled) + val conf = new SparkConf(true) + conf.set("spark.hadoop.hadoop.security.authentication", "KERBEROS") + System.setProperty("java.security.krb5.realm", kdc.getRealm) + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + UserGroupInformation.setConfiguration(hadoopConf) + KyuubiServer.setupCommonConfig(conf) + assert(conf.contains(KyuubiSparkUtil.HDFS_CLIENT_CACHE)) + assert(conf.get(KyuubiSparkUtil.HDFS_CLIENT_CACHE) === "true") + assert(conf.get(KyuubiSparkUtil.HDFS_CLIENT_CACHE) === "true") + System.clearProperty("java.security.krb5.realm") + if (kdc !== null) { + kdc.stop() + } + } }