Add SparkSQLEngineListener

This commit is contained in:
Kent Yao 2021-01-03 02:09:25 +08:00 committed by Kent Yao
parent aaaeddeb48
commit 4c433fc59a
No known key found for this signature in database
GPG Key ID: A4F0BE81C89B595B
4 changed files with 91 additions and 3 deletions

View File

@ -40,6 +40,8 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
private val discoveryService = new ServiceDiscovery(this)
override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
spark.sparkContext.addSparkListener(listener)
super.initialize(conf)
if (ServiceDiscovery.supportServiceDiscovery(conf)) {
addService(discoveryService)
@ -49,7 +51,6 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
override protected def stopServer(): Unit = {
countDownLatch.countDown()
spark.stop()
}
}
@ -112,9 +113,11 @@ object SparkSQLEngine extends Logging {
error("Error start SparkSQLEngine", t)
if (engine != null) {
engine.stop()
} else if (spark != null) {
spark.stop()
}
} finally {
if (spark != null) {
spark.stop()
}
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.kyuubi.Logging
import org.apache.kyuubi.service.{Serverable, ServiceState}
class SparkSQLEngineListener(server: Serverable) extends SparkListener with Logging {
override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
server.getServiceState match {
case ServiceState.STOPPED => debug("Received ApplicationEnd Message form Spark after the" +
" engine has stopped")
case state =>
info(s"Received ApplicationEnd Message from Spark at $state, stopping")
server.stop()
}
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.ServiceState
class SparkSQLEngineListenerSuite extends KyuubiFunSuite {
override def beforeAll(): Unit = {
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
super.beforeAll()
}
test("application end") {
val spark = SparkSession
.builder().master("local").getOrCreate()
val engine = new SparkSQLEngine(spark)
engine.initialize(KyuubiConf())
engine.start()
assert(engine.getServiceState === ServiceState.STARTED)
spark.stop()
assert(engine.getServiceState === ServiceState.STOPPED)
}
}

View File

@ -39,6 +39,8 @@ trait WithSparkSQLEngine extends JDBCTests {
System.setProperty("spark.sql.warehouse.dir", warehousePath.toString)
System.setProperty("spark.sql.hive.metastore.sharedPrefixes", "org.apache.hive.jdbc")
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
protected val spark: SparkSession = SparkSQLEngine.createSpark()
protected var engine: SparkSQLEngine = _
@ -56,6 +58,7 @@ trait WithSparkSQLEngine extends JDBCTests {
if (engine != null) {
engine.stop()
}
spark.stop()
SessionState.detachSession()
Hive.closeCurrent()
}