[KYUUBI #2250] Support to limit the spark engine max running time

### _Why are the changes needed?_

To close #2250

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2295 from lightning-L/kyuubi-2250.

Closes #2250

71851bf8 [Tianlin Liao] [KYUUBI #2250] limit the spark engine max running time
7314df84 [Tianlin Liao] [KYUUBI #2250] add method to shutdown threadpool executor in ThreadUtils

Authored-by: Tianlin Liao <tiliao@ebay.com>
Signed-off-by: Fei Wang <fwang12@ebay.com>
This commit is contained in:
Tianlin Liao 2022-04-08 22:48:39 +08:00 committed by Fei Wang
parent 9c4af55fe7
commit 4bc14657bf
6 changed files with 116 additions and 24 deletions

View File

@ -339,6 +339,7 @@ Key | Default | Meaning | Type | Since
<code>kyuubi.session.engine.request.timeout</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The timeout of awaiting response after sending request to remote sql query engine</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.4.0</div>
<code>kyuubi.session.engine.share.level</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>USER</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>(deprecated) - Using kyuubi.engine.share.level instead</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.0.0</div>
<code>kyuubi.session.engine.spark.main.resource</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.0.0</div>
<code>kyuubi.session.engine.spark.max.lifetime</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT0S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Max lifetime for spark engine, the engine will self-terminate when it reaches the end of life. 0 or negative means not to self-terminate.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.session.engine.spark.progress.timeFormat</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>yyyy-MM-dd HH:mm:ss.SSS</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The time format of the progress bar</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.session.engine.spark.progress.update.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Update period of progress bar.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.session.engine.spark.showProgress</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, show the progress bar in the spark engine log.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.6.0</div>

View File

@ -18,9 +18,10 @@
package org.apache.kyuubi.engine.spark
import java.time.Instant
import java.util.concurrent.CountDownLatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
import org.apache.spark.{ui, SparkConf}
@ -38,15 +39,21 @@ import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore}
import org.apache.kyuubi.engine.spark.events.handler.{SparkHistoryLoggingEventHandler, SparkJsonLoggingEventHandler}
import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent}
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies}
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.SignalRegister
import org.apache.kyuubi.util.{SignalRegister, ThreadUtils}
case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") {
override val backendService = new SparkSQLBackendService(spark)
override val frontendServices = Seq(new SparkTBinaryFrontendService(this))
@volatile private var shutdown = false
@volatile private var deregistered = false
private val lifetimeTerminatingChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-engine-lifetime-checker")
override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
spark.sparkContext.addSparkListener(listener)
@ -64,11 +71,58 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
assert(currentEngine.isDefined)
currentEngine.get.stop()
})
startLifetimeTerminatingChecker(() => {
assert(currentEngine.isDefined)
currentEngine.get.stop()
})
}
override def stop(): Unit = synchronized {
super.stop()
shutdown = true
val shutdownTimeout = conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
ThreadUtils.shutdown(
lifetimeTerminatingChecker,
Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
}
override protected def stopServer(): Unit = {
countDownLatch.countDown()
}
private[kyuubi] def startLifetimeTerminatingChecker(stop: () => Unit): Unit = {
val interval = conf.get(ENGINE_CHECK_INTERVAL)
val maxLifetime = conf.get(ENGINE_SPARK_MAX_LIFETIME)
if (maxLifetime > 0) {
val checkTask = new Runnable {
override def run(): Unit = {
if (!shutdown && System.currentTimeMillis() - getStartTime > maxLifetime) {
if (!deregistered) {
info(s"Spark engine has been running for more than $maxLifetime ms," +
s" deregistering from engine discovery space.")
frontendServices.flatMap(_.discoveryService).map {
case engineServiceDiscovery: EngineServiceDiscovery => engineServiceDiscovery.stop()
}
deregistered = true
}
if (backendService.sessionManager.getOpenSessionCount <= 0) {
info(s"Spark engine has been running for more than $maxLifetime ms" +
s" and no open session now, terminating")
stop()
}
}
}
}
lifetimeTerminatingChecker.scheduleWithFixedDelay(
checkTask,
interval,
interval,
TimeUnit.MILLISECONDS)
}
}
}
object SparkSQLEngine extends Logging {

View File

@ -19,7 +19,12 @@ package org.apache.kyuubi.engine.spark
import java.util.UUID
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel
import org.apache.kyuubi.operation.HiveJDBCTestHelper
@ -33,7 +38,10 @@ abstract class ShareLevelSparkEngineSuite
extends WithDiscoverySparkSQLEngine with HiveJDBCTestHelper {
def shareLevel: ShareLevel
override def withKyuubiConf: Map[String, String] = {
super.withKyuubiConf ++ Map(ENGINE_SHARE_LEVEL.key -> shareLevel.toString)
super.withKyuubiConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
ENGINE_CHECK_INTERVAL.key -> "PT5s")
}
override protected def jdbcUrl: String = getJdbcUrl
override val namespace: String = {
@ -57,6 +65,25 @@ abstract class ShareLevelSparkEngineSuite
}
}
}
test("test spark engine max life-time") {
withZkClient { zkClient =>
assert(engine.getServiceState == ServiceState.STARTED)
assert(zkClient.checkExists().forPath(namespace) != null)
withJdbcStatement() { _ => }
eventually(Timeout(30.seconds)) {
shareLevel match {
case ShareLevel.CONNECTION =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(zkClient.checkExists().forPath(namespace) == null)
case _ =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(zkClient.checkExists().forPath(namespace) != null)
}
}
}
}
}
class ConnectionLevelSparkEngineSuite extends ShareLevelSparkEngineSuite {

View File

@ -622,6 +622,14 @@ object KyuubiConf {
.stringConf
.createOptional
val ENGINE_SPARK_MAX_LIFETIME: ConfigEntry[Long] =
buildConf("kyuubi.session.engine.spark.max.lifetime")
.doc("Max lifetime for spark engine, the engine will self-terminate when it reaches the" +
" end of life. 0 or negative means not to self-terminate.")
.version("1.6.0")
.timeConf
.createWithDefault(0)
val ENGINE_FLINK_MAIN_RESOURCE: OptionalConfigEntry[String] =
buildConf("kyuubi.session.engine.flink.main.resource")
.doc("The package used to create Flink SQL engine remote job. If it is undefined," +

View File

@ -22,6 +22,7 @@ import java.nio.file.{Files, Paths}
import java.util.concurrent.{ConcurrentHashMap, Future, ThreadPoolExecutor, TimeUnit}
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import org.apache.hive.service.rpc.thrift.TProtocolVersion
@ -227,25 +228,9 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
} else {
conf.get(SERVER_EXEC_POOL_SHUTDOWN_TIMEOUT)
}
timeoutChecker.shutdown()
try {
timeoutChecker.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)
} catch {
case i: InterruptedException =>
warn(s"Exceeded to shutdown session timeout checker ", i)
}
if (execPool != null) {
execPool.shutdown()
try {
execPool.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)
} catch {
case e: InterruptedException =>
warn(
s"Exceeded timeout($shutdownTimeout ms) to wait the exec-pool shutdown gracefully",
e)
}
}
ThreadUtils.shutdown(timeoutChecker, Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
ThreadUtils.shutdown(execPool, Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
}
private def startTimeoutChecker(): Unit = {

View File

@ -17,10 +17,10 @@
package org.apache.kyuubi.util
import java.util.concurrent.{LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit}
import scala.concurrent.Awaitable
import scala.concurrent.duration.Duration
import scala.concurrent.duration.{Duration, FiniteDuration}
import org.apache.kyuubi.{KyuubiException, Logging}
@ -64,4 +64,21 @@ object ThreadUtils extends Logging {
throw new KyuubiException("Exception thrown in awaitResult: ", e)
}
}
def shutdown(
executor: ExecutorService,
gracePeriod: Duration = FiniteDuration(30, TimeUnit.SECONDS)): Unit = {
val shutdownTimeout = gracePeriod.toMillis
if (executor != null) {
executor.shutdown()
try {
executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)
} catch {
case e: InterruptedException =>
warn(
s"Exceeded timeout($shutdownTimeout ms) to wait the exec-pool shutdown gracefully",
e)
}
}
}
}