diff --git a/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala b/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala index b0d88e971..afc751ef3 100644 --- a/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala +++ b/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala @@ -43,6 +43,7 @@ class SparkSessionWithUGI(user: UserGroupInformation, conf: SparkConf) extends L def sparkSession: SparkSession = _sparkSession private[this] val promisedSparkContext = Promise[SparkContext]() private[this] var initialDatabase: Option[String] = None + private[this] var sparkException: Option[Throwable] = None private[this] def newContext(): Thread = { new Thread(s"Start-SparkContext-$userName") { @@ -52,7 +53,7 @@ class SparkSessionWithUGI(user: UserGroupInformation, conf: SparkConf) extends L new SparkContext(conf) } } catch { - case NonFatal(e) => throw e + case NonFatal(e) => sparkException = Some(e) } } } @@ -64,8 +65,13 @@ class SparkSessionWithUGI(user: UserGroupInformation, conf: SparkConf) extends L private[this] def stopContext(): Unit = { promisedSparkContext.future.map { sc => warn(s"Error occurred during initializing SparkContext for $userName, stopping") - sc.stop - System.setProperty("SPARK_YARN_MODE", "true") + try { + sc.stop + } catch { + case NonFatal(e) => error(s"Error Stopping $userName's SparkContext", e) + } finally { + System.setProperty("SPARK_YARN_MODE", "true") + } } } @@ -159,15 +165,11 @@ class SparkSessionWithUGI(user: UserGroupInformation, conf: SparkConf) extends L SparkSessionCacheManager.get.set(userName, _sparkSession) } catch { case ute: UndeclaredThrowableException => - ute.getCause match { - case te: TimeoutException => - stopContext() - throw new KyuubiSQLException( - s"Get SparkSession for [$userName] failed: " + te, "08S01", 1001, te) - case _ => - stopContext() - throw new KyuubiSQLException(ute.toString, "08S01", ute.getCause) - } + stopContext() + val ke = new KyuubiSQLException( + s"Get SparkSession for [$userName] failed: " + ute.getCause, "08S01", 1001, ute.getCause) + sparkException.foreach(ke.addSuppressed) + throw ke case e: Exception => stopContext() throw new KyuubiSQLException( diff --git a/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGISuite.scala b/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGISuite.scala index 2ec486861..e895b5490 100644 --- a/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGISuite.scala +++ b/src/test/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGISuite.scala @@ -55,6 +55,21 @@ class SparkSessionWithUGISuite extends SparkFunSuite { spark.stop() } + test("test init failed with sc init failing") { + assert(!spark.sparkContext.isStopped) + val confClone = conf.clone().remove(KyuubiSparkUtil.MULTIPLE_CONTEXTS) + .set(KyuubiConf.BACKEND_SESSTION_INIT_TIMEOUT.key, "3") + val userName1 = "test1" + val ru = UserGroupInformation.createRemoteUser(userName1) + val sparkSessionWithUGI = new SparkSessionWithUGI(ru, confClone) + assert(!SparkSessionWithUGI.isPartiallyConstructed(userName1)) + val e = intercept[KyuubiSQLException](sparkSessionWithUGI.init(Map.empty)) + assert(e.getCause.isInstanceOf[TimeoutException]) + val se = e.getSuppressed.head + assert(se.isInstanceOf[SparkException]) + assert(se.getMessage.startsWith("Only one SparkContext")) + } + test("test init failed with no such database") { val sparkSessionWithUGI = new SparkSessionWithUGI(user, conf) intercept[NoSuchDatabaseException](sparkSessionWithUGI.init(Map("use:database" -> "fakedb"))) @@ -103,6 +118,7 @@ class SparkSessionWithUGISuite extends SparkFunSuite { assert(e.getMessage.contains("has last more than 15 seconds")) assert(SparkSessionWithUGI.isPartiallyConstructed(userName)) assert(!SparkSessionWithUGI.isPartiallyConstructed("Kent Yao")) + SparkSessionWithUGI.setFullyConstructed(userName) } test("test init failed with time out exception") {