diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala index 6fb57fb6a..7c04d88f2 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala @@ -48,14 +48,17 @@ class SparkSessionWithUGI( private var sparkException: Option[Throwable] = None private lazy val newContext: Thread = { - new Thread(s"Start-SparkContext-$userName") { + val threadName = "SparkContext-Starter-" + userName + new Thread(threadName) { override def run(): Unit = { try { promisedSparkContext.trySuccess { new SparkContext(conf) } } catch { - case NonFatal(e) => sparkException = Some(e) + case e: Exception => + sparkException = Some(e) + throw e } } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGISuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGISuite.scala index a566046f2..a41118970 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGISuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGISuite.scala @@ -49,7 +49,7 @@ class SparkSessionWithUGISuite extends SparkFunSuite { classOf[SparkSession].getName, Seq(classOf[SparkContext]), Seq(sc)).asInstanceOf[SparkSession] - + cache.init(conf) cache.start() cache.set(userName, spark) @@ -80,9 +80,9 @@ class SparkSessionWithUGISuite extends SparkFunSuite { test("test init failed with no such database") { val sparkSessionWithUGI = new SparkSessionWithUGI(user, conf, cache) - intercept[NoSuchDatabaseException](sparkSessionWithUGI.init(Map("use:database" -> "fakedb"))) - assert(ReflectUtils.getFieldValue(sparkSessionWithUGI, - "yaooqinn$kyuubi$spark$SparkSessionWithUGI$$initialDatabase") === Some("use fakedb")) + val e = intercept[NoSuchDatabaseException]( + sparkSessionWithUGI.init(Map("use:database" -> "fakedb"))) + assert(e.getMessage().contains("fakedb")) assert(cache.getAndIncrease(userName).nonEmpty) } @@ -179,4 +179,12 @@ class SparkSessionWithUGISuite extends SparkFunSuite { assert(sc.isStopped) } } + + test("user name should be switched") { + val proxyUserName = "Kent" + val proxyUser = UserGroupInformation.createProxyUser(proxyUserName, user) + val sparkSessionWithUGI = new SparkSessionWithUGI(proxyUser, conf, cache) + sparkSessionWithUGI.init(Map.empty) + assert(sparkSessionWithUGI.sparkSession.sparkContext.sparkUser === proxyUserName) + } }