From 669b9767e78baeebd35ef087e706368f0dad1a84 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 30 May 2018 17:03:41 +0800 Subject: [PATCH 1/4] set fs.hdfs.impl.disable.cache true fix #80 --- src/main/scala/org/apache/spark/KyuubiSparkUtil.scala | 2 ++ src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala | 5 +++++ src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala | 5 +++++ 3 files changed, 12 insertions(+) diff --git a/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala b/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala index 861f879ed..d3dd0c7fb 100644 --- a/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala +++ b/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala @@ -69,6 +69,8 @@ object KyuubiSparkUtil extends Logging { val USE_DB: Regex = """use:([^=]+)""".r val QUEUE = SPARK_PREFIX + YARN_PREFIX + "queue" val DEPRECATED_QUEUE = "mapred.job.queue.name" + val HDFS_CLIENT_CACHE = SPARK_HADOOP_PREFIX + "fs.hdfs.impl.disable.cache" + val HDFS_CLIENT_CACHE_DEFAULT = "true" // Runtime Spark Version val SPARK_VERSION = org.apache.spark.SPARK_VERSION diff --git a/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala b/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala index 6b4bd5b71..02a52ee53 100644 --- a/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala +++ b/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala @@ -125,6 +125,11 @@ object KyuubiServer extends Logging { conf.setIfMissing( KyuubiSparkUtil.SPARK_LOCAL_DIR, conf.get(KyuubiConf.BACKEND_SESSION_LOCAL_DIR.key)) + + if (UserGroupInformation.isSecurityEnabled) { + conf.setIfMissing(KyuubiSparkUtil.HDFS_CLIENT_CACHE, + KyuubiSparkUtil.HDFS_CLIENT_CACHE_DEFAULT) + } } private[kyuubi] def validate(): Unit = { diff --git a/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala b/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala index 364c2fab2..2e3211fa3 100644 --- a/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala +++ b/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala @@ -191,4 +191,9 @@ class KyuubiSparkUtilSuite extends SparkFunSuite with Logging { KyuubiSparkUtil.addShutdownHook(f) assert(y === 0) } + + test("testHDFS_CLIENT_CACHE") { + assert(KyuubiSparkUtil.HDFS_CLIENT_CACHE === "spark.hadoop.fs.hdfs.impl.disable.cache") + assert(KyuubiSparkUtil.HDFS_CLIENT_CACHE_DEFAULT.toBoolean) + } } From fef50782d244062b842c0099bc373fd6d75a6aba Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 30 May 2018 18:37:40 +0800 Subject: [PATCH 2/4] add ut --- .../org/apache/spark/KyuubiSparkUtil.scala | 2 ++ .../yaooqinn/kyuubi/server/KyuubiServer.scala | 2 ++ .../kyuubi/server/KyuubiServerSuite.scala | 32 +++++++++++++++++++ 3 files changed, 36 insertions(+) diff --git a/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala b/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala index d3dd0c7fb..5677ec85f 100644 --- a/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala +++ b/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala @@ -71,6 +71,8 @@ object KyuubiSparkUtil extends Logging { val DEPRECATED_QUEUE = "mapred.job.queue.name" val HDFS_CLIENT_CACHE = SPARK_HADOOP_PREFIX + "fs.hdfs.impl.disable.cache" val HDFS_CLIENT_CACHE_DEFAULT = "true" + val FILE_CLIENT_CACHE = SPARK_HADOOP_PREFIX + "fs.file.impl.disable.cache" + val FILE_CLIENT_CACHE_DEFAULT = "true" // Runtime Spark Version val SPARK_VERSION = org.apache.spark.SPARK_VERSION diff --git a/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala b/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala index 02a52ee53..344a9693c 100644 --- a/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala +++ b/src/main/scala/yaooqinn/kyuubi/server/KyuubiServer.scala @@ -129,6 +129,8 @@ object KyuubiServer extends Logging { if (UserGroupInformation.isSecurityEnabled) { conf.setIfMissing(KyuubiSparkUtil.HDFS_CLIENT_CACHE, KyuubiSparkUtil.HDFS_CLIENT_CACHE_DEFAULT) + conf.setIfMissing(KyuubiSparkUtil.FILE_CLIENT_CACHE, + KyuubiSparkUtil.FILE_CLIENT_CACHE_DEFAULT) } } diff --git a/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala b/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala index 51836c42e..878a0da2e 100644 --- a/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala +++ b/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala @@ -17,7 +17,11 @@ package yaooqinn.kyuubi.server +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil import yaooqinn.kyuubi.KyuubiServerException import yaooqinn.kyuubi.utils.ReflectUtils @@ -38,6 +42,7 @@ class KyuubiServerSuite extends SparkFunSuite { assert(conf.get(KyuubiSparkUtil.SPARK_UI_PORT) === KyuubiSparkUtil.SPARK_UI_PORT_DEFAULT) assert(conf.get(KyuubiSparkUtil.SPARK_LOCAL_DIR) .startsWith(System.getProperty("java.io.tmpdir"))) + assert(!conf.contains(KyuubiSparkUtil.HDFS_CLIENT_CACHE)) val foo = "spark.foo" val e = intercept[NoSuchElementException](conf.get(foo)) assert(e.getMessage === foo) @@ -64,4 +69,31 @@ class KyuubiServerSuite extends SparkFunSuite { assert(KyuubiSparkUtil.SPARK_VERSION === oldVersion) } + + test("init KyuubiServer") { + val conf = new SparkConf(true) + KyuubiServer.setupCommonConfig(conf) + val server = new KyuubiServer() + server.init(conf) + assert(server.feService !== null) + assert(server.beService !== null) + assert(server.beService.getSessionManager !== null) + assert(server.beService.getSessionManager.getOperationMgr !== null) + server.start() + assert(ReflectUtils.getFieldValue(server, "started").asInstanceOf[AtomicBoolean].get) + server.stop() + assert(!ReflectUtils.getFieldValue(server, "started").asInstanceOf[AtomicBoolean].get) + } + + test("disable fs caches for secured cluster") { + assert(!UserGroupInformation.isSecurityEnabled) + val conf = new SparkConf(true) + conf.set("spark.hadoop.hadoop.security.authentication", "KERBEROS") + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + UserGroupInformation.setConfiguration(hadoopConf) + KyuubiServer.setupCommonConfig(conf) + assert(conf.contains(KyuubiSparkUtil.HDFS_CLIENT_CACHE)) + assert(conf.get(KyuubiSparkUtil.HDFS_CLIENT_CACHE) === "true") + assert(conf.get(KyuubiSparkUtil.HDFS_CLIENT_CACHE) === "true") + } } From 9f45c3f9113cebf4294bb21502d31c1c82d7e007 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 30 May 2018 18:44:59 +0800 Subject: [PATCH 3/4] add ut --- src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala b/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala index 2e3211fa3..6028c8b33 100644 --- a/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala +++ b/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala @@ -195,5 +195,7 @@ class KyuubiSparkUtilSuite extends SparkFunSuite with Logging { test("testHDFS_CLIENT_CACHE") { assert(KyuubiSparkUtil.HDFS_CLIENT_CACHE === "spark.hadoop.fs.hdfs.impl.disable.cache") assert(KyuubiSparkUtil.HDFS_CLIENT_CACHE_DEFAULT.toBoolean) + assert(KyuubiSparkUtil.FILE_CLIENT_CACHE === "spark.hadoop.fs.file.impl.disable.cache") + assert(KyuubiSparkUtil.FILE_CLIENT_CACHE_DEFAULT.toBoolean) } } From 81cdf9af3e8c030563e224e2f8d91e7401a9bb48 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 30 May 2018 20:13:32 +0800 Subject: [PATCH 4/4] fix ut --- .../kyuubi/server/KyuubiServerSuite.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala b/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala index 878a0da2e..a606039eb 100644 --- a/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala +++ b/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala @@ -17,8 +17,10 @@ package yaooqinn.kyuubi.server +import java.io.{File, IOException} import java.util.concurrent.atomic.AtomicBoolean +import org.apache.hadoop.minikdc.MiniKdc import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil @@ -86,14 +88,37 @@ class KyuubiServerSuite extends SparkFunSuite { } test("disable fs caches for secured cluster") { + + var kdc: MiniKdc = null + val baseDir = KyuubiSparkUtil.createTempDir(namePrefix = "kyuubi-kdc") + try { + val kdcConf = MiniKdc.createConf() + kdcConf.setProperty(MiniKdc.INSTANCE, "KyuubiKrbServer") + kdcConf.setProperty(MiniKdc.ORG_NAME, "KYUUBI") + kdcConf.setProperty(MiniKdc.ORG_DOMAIN, "COM") + + if (kdc == null) { + kdc = new MiniKdc(kdcConf, baseDir) + kdc.start() + } + } catch { + case e: IOException => + throw new AssertionError("unable to create temporary directory: " + e.getMessage) + } + assert(!UserGroupInformation.isSecurityEnabled) val conf = new SparkConf(true) conf.set("spark.hadoop.hadoop.security.authentication", "KERBEROS") + System.setProperty("java.security.krb5.realm", kdc.getRealm) val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) UserGroupInformation.setConfiguration(hadoopConf) KyuubiServer.setupCommonConfig(conf) assert(conf.contains(KyuubiSparkUtil.HDFS_CLIENT_CACHE)) assert(conf.get(KyuubiSparkUtil.HDFS_CLIENT_CACHE) === "true") assert(conf.get(KyuubiSparkUtil.HDFS_CLIENT_CACHE) === "true") + System.clearProperty("java.security.krb5.realm") + if (kdc !== null) { + kdc.stop() + } } }