From 7ee25b2220d683fc1361511412c06f2d3b49cc31 Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 24 Nov 2022 16:11:07 +0800 Subject: [PATCH] [KYUUBI #3792] [SPARK] Engine UI support grace stop ### _Why are the changes needed?_ Now the kill operation of the Spark engine ui will directly shut down, and the SQL that is being executed by the engine will fail. We can support grace stop, first log off the engine from the znode, and then wait for the SQL execution to complete. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3792 from cxzl25/spark_ui_grace_stop. Closes #3792 ebde958d [sychen] global 2676c193 [sychen] remove print session count 9acdf37a [sychen] address comment 25203ef9 [sychen] address comment 6e54ddb7 [sychen] engine ui grace stop Authored-by: sychen Signed-off-by: Shaoyun Chen --- .../kyuubi/engine/spark/SparkSQLEngine.scala | 33 +++++++++++++--- .../org/apache/spark/ui/EnginePage.scala | 21 +++++++--- .../scala/org/apache/spark/ui/EngineTab.scala | 39 ++++++++++++++----- 3 files changed, 72 insertions(+), 21 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 9ad1a8212..9f65881a5 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -18,7 +18,7 @@ package org.apache.kyuubi.engine.spark import java.time.Instant -import java.util.concurrent.{CountDownLatch, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.{CountDownLatch, ScheduledExecutorService, ThreadPoolExecutor, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent.duration.Duration @@ -48,8 +48,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin override val frontendServices = Seq(new SparkTBinaryFrontendService(this)) private val shutdown = new AtomicBoolean(false) + private val gracefulStopDeregistered = new AtomicBoolean(false) @volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None + @volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None override def initialize(conf: KyuubiConf): Unit = { val listener = new SparkSQLEngineListener(this) @@ -83,6 +85,28 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin checker, Duration(shutdownTimeout, TimeUnit.MILLISECONDS)) }) + stopEngineExec.foreach(exec => { + ThreadUtils.shutdown( + exec, + Duration(60, TimeUnit.SECONDS)) + }) + } + + def gracefulStop(): Unit = if (gracefulStopDeregistered.compareAndSet(false, true)) { + val stopTask: Runnable = () => { + if (!shutdown.get) { + info(s"Spark engine is de-registering from engine discovery space.") + frontendServices.flatMap(_.discoveryService).foreach(_.stop()) + while (backendService.sessionManager.getOpenSessionCount > 0) { + Thread.sleep(1000 * 60) + } + info(s"Spark engine has no open session now, terminating.") + stop() + } + } + stopEngineExec = + Some(ThreadUtils.newDaemonFixedThreadPool(1, "spark-engine-graceful-stop")) + stopEngineExec.get.execute(stopTask) } override protected def stopServer(): Unit = { @@ -102,12 +126,9 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin frontendServices.flatMap(_.discoveryService).foreach(_.stop()) } - val openSessionCount = backendService.sessionManager.getOpenSessionCount - if (openSessionCount > 0) { - info(s"${openSessionCount} connection(s) are active, delay shutdown") - } else { + if (backendService.sessionManager.getOpenSessionCount <= 0) { info(s"Spark engine has been running for more than $maxLifetime ms" + - s" and no open session now, terminating") + s" and no open session now, terminating.") stop() } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala index 9e8cf96e4..dafdb0eb6 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala @@ -92,14 +92,23 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") { private def stop(request: HttpServletRequest): Seq[Node] = { val basePath = UIUtils.prependBaseUri(request, parent.basePath) if (parent.killEnabled) { - val confirm = - s"if (window.confirm('Are you sure you want to kill kyuubi engine ?')) " + + val confirmForceStop = + s"if (window.confirm('Are you sure you want to stop kyuubi engine immediately ?')) " + "{ this.parentNode.submit(); return true; } else { return false; }" - val stopLinkUri = s"$basePath/kyuubi/stop" -
    + val forceStopLinkUri = s"$basePath/kyuubi/stop" + + val confirmGracefulStop = + s"if (window.confirm('Are you sure you want to stop kyuubi engine gracefully ?')) " + + "{ this.parentNode.submit(); return true; } else { return false; }" + val gracefulStopLinkUri = s"$basePath/kyuubi/gracefulstop" + + } else { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala index b9632f5b6..b7cebbd97 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala @@ -64,19 +64,34 @@ case class EngineTab( try { // Spark shade the jetty package so here we use reflection val sparkServletContextHandlerClz = loadSparkServletContextHandler - Class.forName("org.apache.spark.ui.SparkUI") + val attachHandlerMethod = Class.forName("org.apache.spark.ui.SparkUI") .getMethod("attachHandler", sparkServletContextHandlerClz) + val createRedirectHandlerMethod = Class.forName("org.apache.spark.ui.JettyUtils") + .getMethod( + "createRedirectHandler", + classOf[String], + classOf[String], + classOf[(HttpServletRequest) => Unit], + classOf[String], + classOf[Set[String]]) + + attachHandlerMethod .invoke( ui, - Class.forName("org.apache.spark.ui.JettyUtils") - .getMethod( - "createRedirectHandler", - classOf[String], - classOf[String], - classOf[HttpServletRequest => Unit], - classOf[String], - classOf[scala.collection.immutable.Set[String]]) + createRedirectHandlerMethod .invoke(null, "/kyuubi/stop", "/kyuubi", handleKillRequest _, "", Set("GET", "POST"))) + + attachHandlerMethod + .invoke( + ui, + createRedirectHandlerMethod + .invoke( + null, + "/kyuubi/gracefulstop", + "/kyuubi", + handleGracefulKillRequest _, + "", + Set("GET", "POST"))) } catch { case NonFatal(cause) => reportInstallError(cause) case cause: NoClassDefFoundError => reportInstallError(cause) @@ -107,4 +122,10 @@ case class EngineTab( engine.get.stop() } } + + def handleGracefulKillRequest(request: HttpServletRequest): Unit = { + if (killEnabled && engine.isDefined && engine.get.getServiceState != ServiceState.STOPPED) { + engine.get.gracefulStop() + } + } }