From 4c433fc59af1ab8d799ad1c97df054a33dbba4bf Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sun, 3 Jan 2021 02:09:25 +0800 Subject: [PATCH] Add SparkSQLEngineListener --- .../kyuubi/engine/spark/SparkSQLEngine.scala | 9 ++-- .../engine/spark/SparkSQLEngineListener.scala | 37 +++++++++++++++ .../spark/SparkSQLEngineListenerSuite.scala | 45 +++++++++++++++++++ .../engine/spark/WithSparkSQLEngine.scala | 3 ++ 4 files changed, 91 insertions(+), 3 deletions(-) create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListener.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 9afeae3e0..251d548da 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -40,6 +40,8 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession) private val discoveryService = new ServiceDiscovery(this) override def initialize(conf: KyuubiConf): Unit = { + val listener = new SparkSQLEngineListener(this) + spark.sparkContext.addSparkListener(listener) super.initialize(conf) if (ServiceDiscovery.supportServiceDiscovery(conf)) { addService(discoveryService) @@ -49,7 +51,6 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession) override protected def stopServer(): Unit = { countDownLatch.countDown() - spark.stop() } } @@ -112,9 +113,11 @@ object SparkSQLEngine extends Logging { error("Error start SparkSQLEngine", t) if (engine != null) { engine.stop() - } else if (spark != null) { - spark.stop() } + } finally { + if (spark != null) { + spark.stop() + } } } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListener.scala new file mode 100644 index 000000000..b7970c71a --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListener.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark + +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} + +import org.apache.kyuubi.Logging +import org.apache.kyuubi.service.{Serverable, ServiceState} + +class SparkSQLEngineListener(server: Serverable) extends SparkListener with Logging { + + override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { + server.getServiceState match { + case ServiceState.STOPPED => debug("Received ApplicationEnd Message form Spark after the" + + " engine has stopped") + case state => + info(s"Received ApplicationEnd Message from Spark at $state, stopping") + server.stop() + } + } + +} diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala new file mode 100644 index 000000000..0c2a956a7 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark + +import org.apache.spark.sql.SparkSession + +import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.service.ServiceState + +class SparkSQLEngineListenerSuite extends KyuubiFunSuite { + + override def beforeAll(): Unit = { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + super.beforeAll() + } + + test("application end") { + val spark = SparkSession + .builder().master("local").getOrCreate() + + val engine = new SparkSQLEngine(spark) + engine.initialize(KyuubiConf()) + engine.start() + assert(engine.getServiceState === ServiceState.STARTED) + spark.stop() + assert(engine.getServiceState === ServiceState.STOPPED) + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala index 987c2b97f..bf8d4f682 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala @@ -39,6 +39,8 @@ trait WithSparkSQLEngine extends JDBCTests { System.setProperty("spark.sql.warehouse.dir", warehousePath.toString) System.setProperty("spark.sql.hive.metastore.sharedPrefixes", "org.apache.hive.jdbc") + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() protected val spark: SparkSession = SparkSQLEngine.createSpark() protected var engine: SparkSQLEngine = _ @@ -56,6 +58,7 @@ trait WithSparkSQLEngine extends JDBCTests { if (engine != null) { engine.stop() } + spark.stop() SessionState.detachSession() Hive.closeCurrent() }