This commit is contained in:
Kent Yao 2018-12-11 15:52:26 +08:00
parent 2b172a2cba
commit 9556edd9b4
2 changed files with 17 additions and 6 deletions

View File

@ -48,14 +48,17 @@ class SparkSessionWithUGI(
private var sparkException: Option[Throwable] = None
private lazy val newContext: Thread = {
new Thread(s"Start-SparkContext-$userName") {
val threadName = "SparkContext-Starter-" + userName
new Thread(threadName) {
override def run(): Unit = {
try {
promisedSparkContext.trySuccess {
new SparkContext(conf)
}
} catch {
case NonFatal(e) => sparkException = Some(e)
case e: Exception =>
sparkException = Some(e)
throw e
}
}
}

View File

@ -49,7 +49,7 @@ class SparkSessionWithUGISuite extends SparkFunSuite {
classOf[SparkSession].getName,
Seq(classOf[SparkContext]),
Seq(sc)).asInstanceOf[SparkSession]
cache.init(conf)
cache.start()
cache.set(userName, spark)
@ -80,9 +80,9 @@ class SparkSessionWithUGISuite extends SparkFunSuite {
test("test init failed with no such database") {
val sparkSessionWithUGI = new SparkSessionWithUGI(user, conf, cache)
intercept[NoSuchDatabaseException](sparkSessionWithUGI.init(Map("use:database" -> "fakedb")))
assert(ReflectUtils.getFieldValue(sparkSessionWithUGI,
"yaooqinn$kyuubi$spark$SparkSessionWithUGI$$initialDatabase") === Some("use fakedb"))
val e = intercept[NoSuchDatabaseException](
sparkSessionWithUGI.init(Map("use:database" -> "fakedb")))
assert(e.getMessage().contains("fakedb"))
assert(cache.getAndIncrease(userName).nonEmpty)
}
@ -179,4 +179,12 @@ class SparkSessionWithUGISuite extends SparkFunSuite {
assert(sc.isStopped)
}
}
test("user name should be switched") {
val proxyUserName = "Kent"
val proxyUser = UserGroupInformation.createProxyUser(proxyUserName, user)
val sparkSessionWithUGI = new SparkSessionWithUGI(proxyUser, conf, cache)
sparkSessionWithUGI.init(Map.empty)
assert(sparkSessionWithUGI.sparkSession.sparkContext.sparkUser === proxyUserName)
}
}