diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala index 4ae67dbba..9aaf1cfab 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala @@ -238,6 +238,10 @@ object KyuubiSparkUtil extends Logging { Utils.getPropertiesFromFile(filename) } + def setActiveSparkContext(sc: SparkContext): Unit = { + SparkContext.setActiveContext(sc, allowMultipleContexts = true) + } + /** * Get and set Kyuubi Jar First ClassLoader */ diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 3ce6f9325..79ec8372a 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -314,6 +314,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging session.getUserName) } session.sparkSession.sparkContext.setJobGroup(statementId, statement) + KyuubiSparkUtil.setActiveSparkContext(session.sparkSession.sparkContext) result = session.sparkSession.sql(statement) KyuubiServerMonitor.getListener(session.getUserName).foreach { _.onStatementParsed(statementId, result.queryExecution.toString()) diff --git a/kyuubi-server/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala b/kyuubi-server/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala index f496fa7f8..a65ae2595 100644 --- a/kyuubi-server/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/spark/KyuubiSparkUtilSuite.scala @@ -335,4 +335,18 @@ class KyuubiSparkUtilSuite extends SparkFunSuite with Logging { val e9 = KyuubiSparkUtil.findCause(e0) assert(e9 === e0) } + + test("set active spark context") { + val conf = new SparkConf(true) + .setMaster("local") + .setAppName("active context") + .set(KyuubiSparkUtil.MULTIPLE_CONTEXTS, "true") + val sc1 = new SparkContext(conf) + assert(SparkContext.getActive.contains(sc1)) + sc1.stop() + assert(SparkContext.getActive.isEmpty) + KyuubiSparkUtil.setActiveSparkContext(sc1) + assert(SparkContext.getActive.contains(sc1)) + SparkContext.clearActiveContext() + } }