fix #125 hotfix session ugi not wrapping sparkcontext initializing

This commit is contained in:
Kent Yao 2018-12-11 11:59:10 +08:00
parent df1abd4702
commit 2b172a2cba

View File

@ -47,7 +47,7 @@ class SparkSessionWithUGI(
private var initialDatabase: Option[String] = None
private var sparkException: Option[Throwable] = None
private def newContext(): Thread = {
private lazy val newContext: Thread = {
new Thread(s"Start-SparkContext-$userName") {
override def run(): Unit = {
try {
@ -156,10 +156,9 @@ class SparkSessionWithUGI(
conf.setAppName(appName)
configureSparkConf(sessionConf)
val totalWaitTime: Long = conf.getTimeAsSeconds(BACKEND_SESSTION_INIT_TIMEOUT)
val newContextThread = newContext()
try {
KyuubiHadoopUtil.doAs(user) {
newContextThread.start()
newContext.start()
val context =
Await.result(promisedSparkContext.future, Duration(totalWaitTime, TimeUnit.SECONDS))
_sparkSession = ReflectUtils.newInstance(
@ -182,9 +181,7 @@ class SparkSessionWithUGI(
throw ke
} finally {
SparkSessionWithUGI.setFullyConstructed(userName)
if (newContextThread.isAlive) {
newContextThread.join()
}
newContext.join()
}
KyuubiServerMonitor.setListener(userName, new KyuubiServerListener(conf))