[KYUUBI #3792] [SPARK] Engine UI support grace stop

### _Why are the changes needed?_
Now the kill operation of the Spark engine ui will directly shut down, and the SQL that is being executed by the engine will fail.

We can support grace stop, first log off the engine from the znode, and then wait for the SQL execution to complete.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3792 from cxzl25/spark_ui_grace_stop.

Closes #3792

ebde958d [sychen] global
2676c193 [sychen] remove print session count
9acdf37a [sychen] address comment
25203ef9 [sychen] address comment
6e54ddb7 [sychen] engine ui grace stop

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Shaoyun Chen <csy@apache.org>
This commit is contained in:
sychen 2022-11-24 16:11:07 +08:00 committed by Shaoyun Chen
parent 35669b375a
commit 7ee25b2220
No known key found for this signature in database
GPG Key ID: 81A87B8D54DB73A3
3 changed files with 72 additions and 21 deletions

View File

@ -18,7 +18,7 @@
package org.apache.kyuubi.engine.spark
import java.time.Instant
import java.util.concurrent.{CountDownLatch, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.{CountDownLatch, ScheduledExecutorService, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.duration.Duration
@ -48,8 +48,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
override val frontendServices = Seq(new SparkTBinaryFrontendService(this))
private val shutdown = new AtomicBoolean(false)
private val gracefulStopDeregistered = new AtomicBoolean(false)
@volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None
@volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
@ -83,6 +85,28 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
checker,
Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
})
stopEngineExec.foreach(exec => {
ThreadUtils.shutdown(
exec,
Duration(60, TimeUnit.SECONDS))
})
}
def gracefulStop(): Unit = if (gracefulStopDeregistered.compareAndSet(false, true)) {
val stopTask: Runnable = () => {
if (!shutdown.get) {
info(s"Spark engine is de-registering from engine discovery space.")
frontendServices.flatMap(_.discoveryService).foreach(_.stop())
while (backendService.sessionManager.getOpenSessionCount > 0) {
Thread.sleep(1000 * 60)
}
info(s"Spark engine has no open session now, terminating.")
stop()
}
}
stopEngineExec =
Some(ThreadUtils.newDaemonFixedThreadPool(1, "spark-engine-graceful-stop"))
stopEngineExec.get.execute(stopTask)
}
override protected def stopServer(): Unit = {
@ -102,12 +126,9 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
frontendServices.flatMap(_.discoveryService).foreach(_.stop())
}
val openSessionCount = backendService.sessionManager.getOpenSessionCount
if (openSessionCount > 0) {
info(s"${openSessionCount} connection(s) are active, delay shutdown")
} else {
if (backendService.sessionManager.getOpenSessionCount <= 0) {
info(s"Spark engine has been running for more than $maxLifetime ms" +
s" and no open session now, terminating")
s" and no open session now, terminating.")
stop()
}
}

View File

@ -92,14 +92,23 @@ case class EnginePage(parent: EngineTab) extends WebUIPage("") {
private def stop(request: HttpServletRequest): Seq[Node] = {
val basePath = UIUtils.prependBaseUri(request, parent.basePath)
if (parent.killEnabled) {
val confirm =
s"if (window.confirm('Are you sure you want to kill kyuubi engine ?')) " +
val confirmForceStop =
s"if (window.confirm('Are you sure you want to stop kyuubi engine immediately ?')) " +
"{ this.parentNode.submit(); return true; } else { return false; }"
val stopLinkUri = s"$basePath/kyuubi/stop"
<ul class ="list-unstyled">
val forceStopLinkUri = s"$basePath/kyuubi/stop"
val confirmGracefulStop =
s"if (window.confirm('Are you sure you want to stop kyuubi engine gracefully ?')) " +
"{ this.parentNode.submit(); return true; } else { return false; }"
val gracefulStopLinkUri = s"$basePath/kyuubi/gracefulstop"
<ul class="list-unstyled">
<li>
<strong>Stop kyuubi engine: </strong>
<a href={stopLinkUri} onclick={confirm} class="stop-link">(kill)</a>
<strong>Stop kyuubi engine:</strong>
<a href={forceStopLinkUri} onclick={confirmForceStop} class="stop-link">
(Stop Immediately)</a>
<a href={gracefulStopLinkUri} onclick={confirmGracefulStop} class="stop-link">
(Stop Gracefully)</a>
</li>
</ul>
} else {

View File

@ -64,19 +64,34 @@ case class EngineTab(
try {
// Spark shade the jetty package so here we use reflection
val sparkServletContextHandlerClz = loadSparkServletContextHandler
Class.forName("org.apache.spark.ui.SparkUI")
val attachHandlerMethod = Class.forName("org.apache.spark.ui.SparkUI")
.getMethod("attachHandler", sparkServletContextHandlerClz)
val createRedirectHandlerMethod = Class.forName("org.apache.spark.ui.JettyUtils")
.getMethod(
"createRedirectHandler",
classOf[String],
classOf[String],
classOf[(HttpServletRequest) => Unit],
classOf[String],
classOf[Set[String]])
attachHandlerMethod
.invoke(
ui,
Class.forName("org.apache.spark.ui.JettyUtils")
.getMethod(
"createRedirectHandler",
classOf[String],
classOf[String],
classOf[HttpServletRequest => Unit],
classOf[String],
classOf[scala.collection.immutable.Set[String]])
createRedirectHandlerMethod
.invoke(null, "/kyuubi/stop", "/kyuubi", handleKillRequest _, "", Set("GET", "POST")))
attachHandlerMethod
.invoke(
ui,
createRedirectHandlerMethod
.invoke(
null,
"/kyuubi/gracefulstop",
"/kyuubi",
handleGracefulKillRequest _,
"",
Set("GET", "POST")))
} catch {
case NonFatal(cause) => reportInstallError(cause)
case cause: NoClassDefFoundError => reportInstallError(cause)
@ -107,4 +122,10 @@ case class EngineTab(
engine.get.stop()
}
}
def handleGracefulKillRequest(request: HttpServletRequest): Unit = {
if (killEnabled && engine.isDefined && engine.get.getServiceState != ServiceState.STOPPED) {
engine.get.gracefulStop()
}
}
}