Merge pull request #81 from yaooqinn/i80
disable fs cache for long caching SparkSession to avoid token expirly
This commit is contained in:
commit
bfb6163c81
@ -69,6 +69,10 @@ object KyuubiSparkUtil extends Logging {
|
||||
val USE_DB: Regex = """use:([^=]+)""".r
|
||||
val QUEUE = SPARK_PREFIX + YARN_PREFIX + "queue"
|
||||
val DEPRECATED_QUEUE = "mapred.job.queue.name"
|
||||
val HDFS_CLIENT_CACHE = SPARK_HADOOP_PREFIX + "fs.hdfs.impl.disable.cache"
|
||||
val HDFS_CLIENT_CACHE_DEFAULT = "true"
|
||||
val FILE_CLIENT_CACHE = SPARK_HADOOP_PREFIX + "fs.file.impl.disable.cache"
|
||||
val FILE_CLIENT_CACHE_DEFAULT = "true"
|
||||
|
||||
// Runtime Spark Version
|
||||
val SPARK_VERSION = org.apache.spark.SPARK_VERSION
|
||||
|
||||
@ -125,6 +125,13 @@ object KyuubiServer extends Logging {
|
||||
|
||||
conf.setIfMissing(
|
||||
KyuubiSparkUtil.SPARK_LOCAL_DIR, conf.get(KyuubiConf.BACKEND_SESSION_LOCAL_DIR.key))
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled) {
|
||||
conf.setIfMissing(KyuubiSparkUtil.HDFS_CLIENT_CACHE,
|
||||
KyuubiSparkUtil.HDFS_CLIENT_CACHE_DEFAULT)
|
||||
conf.setIfMissing(KyuubiSparkUtil.FILE_CLIENT_CACHE,
|
||||
KyuubiSparkUtil.FILE_CLIENT_CACHE_DEFAULT)
|
||||
}
|
||||
}
|
||||
|
||||
private[kyuubi] def validate(): Unit = {
|
||||
|
||||
@ -191,4 +191,11 @@ class KyuubiSparkUtilSuite extends SparkFunSuite with Logging {
|
||||
KyuubiSparkUtil.addShutdownHook(f)
|
||||
assert(y === 0)
|
||||
}
|
||||
|
||||
test("testHDFS_CLIENT_CACHE") {
|
||||
assert(KyuubiSparkUtil.HDFS_CLIENT_CACHE === "spark.hadoop.fs.hdfs.impl.disable.cache")
|
||||
assert(KyuubiSparkUtil.HDFS_CLIENT_CACHE_DEFAULT.toBoolean)
|
||||
assert(KyuubiSparkUtil.FILE_CLIENT_CACHE === "spark.hadoop.fs.file.impl.disable.cache")
|
||||
assert(KyuubiSparkUtil.FILE_CLIENT_CACHE_DEFAULT.toBoolean)
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,7 +17,13 @@
|
||||
|
||||
package yaooqinn.kyuubi.server
|
||||
|
||||
import java.io.{File, IOException}
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
import org.apache.hadoop.minikdc.MiniKdc
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
|
||||
import yaooqinn.kyuubi.KyuubiServerException
|
||||
import yaooqinn.kyuubi.utils.ReflectUtils
|
||||
@ -38,6 +44,7 @@ class KyuubiServerSuite extends SparkFunSuite {
|
||||
assert(conf.get(KyuubiSparkUtil.SPARK_UI_PORT) === KyuubiSparkUtil.SPARK_UI_PORT_DEFAULT)
|
||||
assert(conf.get(KyuubiSparkUtil.SPARK_LOCAL_DIR)
|
||||
.startsWith(System.getProperty("java.io.tmpdir")))
|
||||
assert(!conf.contains(KyuubiSparkUtil.HDFS_CLIENT_CACHE))
|
||||
val foo = "spark.foo"
|
||||
val e = intercept[NoSuchElementException](conf.get(foo))
|
||||
assert(e.getMessage === foo)
|
||||
@ -64,4 +71,54 @@ class KyuubiServerSuite extends SparkFunSuite {
|
||||
assert(KyuubiSparkUtil.SPARK_VERSION === oldVersion)
|
||||
|
||||
}
|
||||
|
||||
test("init KyuubiServer") {
|
||||
val conf = new SparkConf(true)
|
||||
KyuubiServer.setupCommonConfig(conf)
|
||||
val server = new KyuubiServer()
|
||||
server.init(conf)
|
||||
assert(server.feService !== null)
|
||||
assert(server.beService !== null)
|
||||
assert(server.beService.getSessionManager !== null)
|
||||
assert(server.beService.getSessionManager.getOperationMgr !== null)
|
||||
server.start()
|
||||
assert(ReflectUtils.getFieldValue(server, "started").asInstanceOf[AtomicBoolean].get)
|
||||
server.stop()
|
||||
assert(!ReflectUtils.getFieldValue(server, "started").asInstanceOf[AtomicBoolean].get)
|
||||
}
|
||||
|
||||
test("disable fs caches for secured cluster") {
|
||||
|
||||
var kdc: MiniKdc = null
|
||||
val baseDir = KyuubiSparkUtil.createTempDir(namePrefix = "kyuubi-kdc")
|
||||
try {
|
||||
val kdcConf = MiniKdc.createConf()
|
||||
kdcConf.setProperty(MiniKdc.INSTANCE, "KyuubiKrbServer")
|
||||
kdcConf.setProperty(MiniKdc.ORG_NAME, "KYUUBI")
|
||||
kdcConf.setProperty(MiniKdc.ORG_DOMAIN, "COM")
|
||||
|
||||
if (kdc == null) {
|
||||
kdc = new MiniKdc(kdcConf, baseDir)
|
||||
kdc.start()
|
||||
}
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
throw new AssertionError("unable to create temporary directory: " + e.getMessage)
|
||||
}
|
||||
|
||||
assert(!UserGroupInformation.isSecurityEnabled)
|
||||
val conf = new SparkConf(true)
|
||||
conf.set("spark.hadoop.hadoop.security.authentication", "KERBEROS")
|
||||
System.setProperty("java.security.krb5.realm", kdc.getRealm)
|
||||
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
|
||||
UserGroupInformation.setConfiguration(hadoopConf)
|
||||
KyuubiServer.setupCommonConfig(conf)
|
||||
assert(conf.contains(KyuubiSparkUtil.HDFS_CLIENT_CACHE))
|
||||
assert(conf.get(KyuubiSparkUtil.HDFS_CLIENT_CACHE) === "true")
|
||||
assert(conf.get(KyuubiSparkUtil.HDFS_CLIENT_CACHE) === "true")
|
||||
System.clearProperty("java.security.krb5.realm")
|
||||
if (kdc !== null) {
|
||||
kdc.stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user