Merge pull request #145 from yaooqinn/KYUUBI-143-backport

[KYUUBI-143][Backport] Set active SparkContext in KyuubiOperation
This commit is contained in:
Kent Yao 2019-01-15 09:53:54 +08:00 committed by GitHub
commit bc0cc8602c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 0 deletions

View File

@ -238,6 +238,10 @@ object KyuubiSparkUtil extends Logging {
Utils.getPropertiesFromFile(filename)
}
def setActiveSparkContext(sc: SparkContext): Unit = {
SparkContext.setActiveContext(sc, allowMultipleContexts = true)
}
/**
* Get and set Kyuubi Jar First ClassLoader
*/

View File

@ -314,6 +314,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
session.getUserName)
}
session.sparkSession.sparkContext.setJobGroup(statementId, statement)
KyuubiSparkUtil.setActiveSparkContext(session.sparkSession.sparkContext)
result = session.sparkSession.sql(statement)
KyuubiServerMonitor.getListener(session.getUserName).foreach {
_.onStatementParsed(statementId, result.queryExecution.toString())

View File

@ -335,4 +335,18 @@ class KyuubiSparkUtilSuite extends SparkFunSuite with Logging {
val e9 = KyuubiSparkUtil.findCause(e0)
assert(e9 === e0)
}
test("set active spark context") {
val conf = new SparkConf(true)
.setMaster("local")
.setAppName("active context")
.set(KyuubiSparkUtil.MULTIPLE_CONTEXTS, "true")
val sc1 = new SparkContext(conf)
assert(SparkContext.getActive.contains(sc1))
sc1.stop()
assert(SparkContext.getActive.isEmpty)
KyuubiSparkUtil.setActiveSparkContext(sc1)
assert(SparkContext.getActive.contains(sc1))
SparkContext.clearActiveContext()
}
}