diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 9ad1a8212..9f65881a5 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -18,7 +18,7 @@ package org.apache.kyuubi.engine.spark import java.time.Instant -import java.util.concurrent.{CountDownLatch, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.{CountDownLatch, ScheduledExecutorService, ThreadPoolExecutor, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.duration.Duration @@ -48,8 +48,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin override val frontendServices = Seq(new SparkTBinaryFrontendService(this)) private val shutdown = new AtomicBoolean(false) + private val gracefulStopDeregistered = new AtomicBoolean(false) @volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None + @volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None override def initialize(conf: KyuubiConf): Unit = { val listener = new SparkSQLEngineListener(this) @@ -83,6 +85,28 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin checker, Duration(shutdownTimeout, TimeUnit.MILLISECONDS)) }) + stopEngineExec.foreach(exec => { + ThreadUtils.shutdown( + exec, + Duration(60, TimeUnit.SECONDS)) + }) + } + + def gracefulStop(): Unit = if (gracefulStopDeregistered.compareAndSet(false, true)) { + val stopTask: Runnable = () => { + if (!shutdown.get) { + info(s"Spark engine is de-registering from engine discovery space.") + frontendServices.flatMap(_.discoveryService).foreach(_.stop()) + while (backendService.sessionManager.getOpenSessionCount > 0) { + Thread.sleep(1000 * 60) + } + info(s"Spark engine has no open session now, terminating.") + stop() + } + } + stopEngineExec = + Some(ThreadUtils.newDaemonFixedThreadPool(1, "spark-engine-graceful-stop")) + stopEngineExec.get.execute(stopTask) } override protected def stopServer(): Unit = { @@ -102,12 +126,9 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin frontendServices.flatMap(_.discoveryService).foreach(_.stop()) } - val openSessionCount = backendService.sessionManager.getOpenSessionCount - if (openSessionCount > 0) { - info(s"${openSessionCount} connection(s) are active, delay shutdown") - } else { + if (backendService.sessionManager.getOpenSessionCount <= 0) { info(s"Spark engine has been running for more than $maxLifetime ms" + - s" and no open session now, terminating") + s" and no open session now, terminating.") stop() } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala index 9e8cf96e4..dafdb0eb6 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala @@ -92,14 +92,23 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { private def stop(request: HttpServletRequest): Seq[Node] = { val basePath = UIUtils.prependBaseUri(request, parent.basePath) if (parent.killEnabled) { - val confirm = - s"if (window.confirm('Are you sure you want to kill kyuubi engine ?')) " + + val confirmForceStop = + s"if (window.confirm('Are you sure you want to stop kyuubi engine immediately ?')) " + "{ this.parentNode.submit(); return true; } else { return false; }" - val stopLinkUri = s"$basePath/kyuubi/stop" -