add more uts again
This commit is contained in:
parent
5318d99d1a
commit
ca2c73791e
@ -113,7 +113,8 @@ class SparkSessionWithUGI(user: UserGroupInformation, conf: SparkConf) extends L
|
||||
}
|
||||
|
||||
private[this] def getOrCreate(sessionConf: Map[String, String]): Unit = synchronized {
|
||||
var checkRound = math.max(conf.get(BACKEND_SESSION_WAIT_OTHER_TIMES.key).toInt, 15)
|
||||
val totalRounds = math.max(conf.get(BACKEND_SESSION_WAIT_OTHER_TIMES.key).toInt, 15)
|
||||
var checkRound = totalRounds
|
||||
val interval = conf.getTimeAsMs(BACKEND_SESSION_WAIT_OTHER_INTERVAL.key)
|
||||
// if user's sc is being constructed by another
|
||||
while (SparkSessionWithUGI.isPartiallyConstructed(userName)) {
|
||||
@ -121,7 +122,7 @@ class SparkSessionWithUGI(user: UserGroupInformation, conf: SparkConf) extends L
|
||||
checkRound -= 1
|
||||
if (checkRound <= 0) {
|
||||
throw new KyuubiSQLException(s"A partially constructed SparkContext for [$userName] " +
|
||||
s"has last more than ${checkRound * interval} seconds")
|
||||
s"has last more than ${totalRounds * interval / 1000} seconds")
|
||||
}
|
||||
info(s"A partially constructed SparkContext for [$userName], $checkRound times countdown.")
|
||||
}
|
||||
|
||||
@ -17,11 +17,14 @@
|
||||
|
||||
package yaooqinn.kyuubi.spark
|
||||
|
||||
import scala.concurrent.TimeoutException
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite}
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
|
||||
|
||||
import yaooqinn.kyuubi.KyuubiSQLException
|
||||
import yaooqinn.kyuubi.server.KyuubiServer
|
||||
import yaooqinn.kyuubi.utils.ReflectUtils
|
||||
|
||||
@ -91,11 +94,30 @@ class SparkSessionWithUGISuite extends SparkFunSuite {
|
||||
}
|
||||
|
||||
test("testSetPartiallyConstructed") {
|
||||
SparkSessionWithUGI.setPartiallyConstructed("Kent")
|
||||
assert(SparkSessionWithUGI.isPartiallyConstructed("Kent"))
|
||||
val confClone = conf.clone().set(KyuubiConf.BACKEND_SESSION_WAIT_OTHER_TIMES.key, "3")
|
||||
SparkSessionWithUGI.setPartiallyConstructed(userName)
|
||||
val sparkSessionWithUGI = new SparkSessionWithUGI(user, confClone)
|
||||
val e = intercept[KyuubiSQLException](sparkSessionWithUGI.init(Map.empty))
|
||||
assert(e.getMessage.startsWith("A partially constructed SparkContext for"))
|
||||
assert(e.getMessage.contains(userName))
|
||||
assert(e.getMessage.contains("has last more than 15 seconds"))
|
||||
assert(SparkSessionWithUGI.isPartiallyConstructed(userName))
|
||||
assert(!SparkSessionWithUGI.isPartiallyConstructed("Kent Yao"))
|
||||
}
|
||||
|
||||
test("test init failed with time out exception") {
|
||||
// point to an non-exist cluster manager
|
||||
val confClone = conf.clone().setMaster("spark://localhost:7077")
|
||||
.set(KyuubiConf.BACKEND_SESSTION_INIT_TIMEOUT.key, "3")
|
||||
val userName1 = "test"
|
||||
val ru = UserGroupInformation.createRemoteUser(userName1)
|
||||
val sparkSessionWithUGI = new SparkSessionWithUGI(ru, confClone)
|
||||
assert(!SparkSessionWithUGI.isPartiallyConstructed(userName1))
|
||||
val e = intercept[KyuubiSQLException](sparkSessionWithUGI.init(Map.empty))
|
||||
assert(e.getCause.isInstanceOf[TimeoutException])
|
||||
assert(e.getMessage.startsWith("Get SparkSession"))
|
||||
}
|
||||
|
||||
test("testSetFullyConstructed") {
|
||||
SparkSessionWithUGI.setPartiallyConstructed("Kent")
|
||||
assert(SparkSessionWithUGI.isPartiallyConstructed("Kent"))
|
||||
|
||||
Loading…
Reference in New Issue
Block a user