add more uts again and again
This commit is contained in:
parent
ca2c73791e
commit
31847bc230
@ -43,6 +43,7 @@ class SparkSessionWithUGI(user: UserGroupInformation, conf: SparkConf) extends L
|
||||
def sparkSession: SparkSession = _sparkSession
|
||||
private[this] val promisedSparkContext = Promise[SparkContext]()
|
||||
private[this] var initialDatabase: Option[String] = None
|
||||
private[this] var sparkException: Option[Throwable] = None
|
||||
|
||||
private[this] def newContext(): Thread = {
|
||||
new Thread(s"Start-SparkContext-$userName") {
|
||||
@ -52,7 +53,7 @@ class SparkSessionWithUGI(user: UserGroupInformation, conf: SparkConf) extends L
|
||||
new SparkContext(conf)
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) => throw e
|
||||
case NonFatal(e) => sparkException = Some(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -64,8 +65,13 @@ class SparkSessionWithUGI(user: UserGroupInformation, conf: SparkConf) extends L
|
||||
private[this] def stopContext(): Unit = {
|
||||
promisedSparkContext.future.map { sc =>
|
||||
warn(s"Error occurred during initializing SparkContext for $userName, stopping")
|
||||
sc.stop
|
||||
System.setProperty("SPARK_YARN_MODE", "true")
|
||||
try {
|
||||
sc.stop
|
||||
} catch {
|
||||
case NonFatal(e) => error(s"Error Stopping $userName's SparkContext", e)
|
||||
} finally {
|
||||
System.setProperty("SPARK_YARN_MODE", "true")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -159,15 +165,11 @@ class SparkSessionWithUGI(user: UserGroupInformation, conf: SparkConf) extends L
|
||||
SparkSessionCacheManager.get.set(userName, _sparkSession)
|
||||
} catch {
|
||||
case ute: UndeclaredThrowableException =>
|
||||
ute.getCause match {
|
||||
case te: TimeoutException =>
|
||||
stopContext()
|
||||
throw new KyuubiSQLException(
|
||||
s"Get SparkSession for [$userName] failed: " + te, "08S01", 1001, te)
|
||||
case _ =>
|
||||
stopContext()
|
||||
throw new KyuubiSQLException(ute.toString, "08S01", ute.getCause)
|
||||
}
|
||||
stopContext()
|
||||
val ke = new KyuubiSQLException(
|
||||
s"Get SparkSession for [$userName] failed: " + ute.getCause, "08S01", 1001, ute.getCause)
|
||||
sparkException.foreach(ke.addSuppressed)
|
||||
throw ke
|
||||
case e: Exception =>
|
||||
stopContext()
|
||||
throw new KyuubiSQLException(
|
||||
|
||||
@ -55,6 +55,21 @@ class SparkSessionWithUGISuite extends SparkFunSuite {
|
||||
spark.stop()
|
||||
}
|
||||
|
||||
test("test init failed with sc init failing") {
|
||||
assert(!spark.sparkContext.isStopped)
|
||||
val confClone = conf.clone().remove(KyuubiSparkUtil.MULTIPLE_CONTEXTS)
|
||||
.set(KyuubiConf.BACKEND_SESSTION_INIT_TIMEOUT.key, "3")
|
||||
val userName1 = "test1"
|
||||
val ru = UserGroupInformation.createRemoteUser(userName1)
|
||||
val sparkSessionWithUGI = new SparkSessionWithUGI(ru, confClone)
|
||||
assert(!SparkSessionWithUGI.isPartiallyConstructed(userName1))
|
||||
val e = intercept[KyuubiSQLException](sparkSessionWithUGI.init(Map.empty))
|
||||
assert(e.getCause.isInstanceOf[TimeoutException])
|
||||
val se = e.getSuppressed.head
|
||||
assert(se.isInstanceOf[SparkException])
|
||||
assert(se.getMessage.startsWith("Only one SparkContext"))
|
||||
}
|
||||
|
||||
test("test init failed with no such database") {
|
||||
val sparkSessionWithUGI = new SparkSessionWithUGI(user, conf)
|
||||
intercept[NoSuchDatabaseException](sparkSessionWithUGI.init(Map("use:database" -> "fakedb")))
|
||||
@ -103,6 +118,7 @@ class SparkSessionWithUGISuite extends SparkFunSuite {
|
||||
assert(e.getMessage.contains("has last more than 15 seconds"))
|
||||
assert(SparkSessionWithUGI.isPartiallyConstructed(userName))
|
||||
assert(!SparkSessionWithUGI.isPartiallyConstructed("Kent Yao"))
|
||||
SparkSessionWithUGI.setFullyConstructed(userName)
|
||||
}
|
||||
|
||||
test("test init failed with time out exception") {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user