diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index 2bcc7fa72..7c5d63d97 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -339,6 +339,7 @@ Key | Default | Meaning | Type | Since kyuubi.session.engine.request.timeout|
PT1M
|
The timeout of awaiting response after sending request to remote sql query engine
|
duration
|
1.4.0
kyuubi.session.engine.share.level|
USER
|
(deprecated) - Using kyuubi.engine.share.level instead
|
string
|
1.0.0
kyuubi.session.engine.spark.main.resource|
<undefined>
|
The package used to create Spark SQL engine remote application. If it is undefined, Kyuubi will use the default
|
string
|
1.0.0
+kyuubi.session.engine.spark.max.lifetime|
PT0S
|
Max lifetime for spark engine, the engine will self-terminate when it reaches the end of life. 0 or negative means not to self-terminate.
|
duration
|
1.6.0
kyuubi.session.engine.spark.progress.timeFormat|
yyyy-MM-dd HH:mm:ss.SSS
|
The time format of the progress bar
|
string
|
1.6.0
kyuubi.session.engine.spark.progress.update.interval|
PT1S
|
Update period of progress bar.
|
duration
|
1.6.0
kyuubi.session.engine.spark.showProgress|
false
|
When true, show the progress bar in the spark engine log.
|
boolean
|
1.6.0
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 6ff551f2b..191825f06 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -18,9 +18,10 @@ package org.apache.kyuubi.engine.spark import java.time.Instant -import java.util.concurrent.CountDownLatch +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean +import scala.concurrent.duration.Duration import scala.util.control.NonFatal import org.apache.spark.{ui, SparkConf} @@ -38,15 +39,21 @@ import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore} import org.apache.kyuubi.engine.spark.events.handler.{SparkHistoryLoggingEventHandler, SparkJsonLoggingEventHandler} import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent} import org.apache.kyuubi.ha.HighAvailabilityConf._ -import org.apache.kyuubi.ha.client.RetryPolicies +import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, RetryPolicies} import org.apache.kyuubi.service.Serverable -import org.apache.kyuubi.util.SignalRegister +import org.apache.kyuubi.util.{SignalRegister, ThreadUtils} case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngine") { override val backendService = new SparkSQLBackendService(spark) override val frontendServices = Seq(new SparkTBinaryFrontendService(this)) + @volatile private var shutdown = false + @volatile private var deregistered = false + + private val lifetimeTerminatingChecker = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-engine-lifetime-checker") + override def initialize(conf: KyuubiConf): Unit = { val listener = new SparkSQLEngineListener(this) spark.sparkContext.addSparkListener(listener) @@ -64,11 +71,58 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin assert(currentEngine.isDefined) currentEngine.get.stop() }) + + startLifetimeTerminatingChecker(() => { + assert(currentEngine.isDefined) + currentEngine.get.stop() + }) + } + + override def stop(): Unit = synchronized { + super.stop() + + shutdown = true + val shutdownTimeout = conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT) + ThreadUtils.shutdown( + lifetimeTerminatingChecker, + Duration(shutdownTimeout, TimeUnit.MILLISECONDS)) } override protected def stopServer(): Unit = { countDownLatch.countDown() } + + private[kyuubi] def startLifetimeTerminatingChecker(stop: () => Unit): Unit = { + val interval = conf.get(ENGINE_CHECK_INTERVAL) + val maxLifetime = conf.get(ENGINE_SPARK_MAX_LIFETIME) + if (maxLifetime > 0) { + val checkTask = new Runnable { + override def run(): Unit = { + if (!shutdown && System.currentTimeMillis() - getStartTime > maxLifetime) { + if (!deregistered) { + info(s"Spark engine has been running for more than $maxLifetime ms," + + s" deregistering from engine discovery space.") + frontendServices.flatMap(_.discoveryService).map { + case engineServiceDiscovery: EngineServiceDiscovery => engineServiceDiscovery.stop() + } + deregistered = true + } + + if (backendService.sessionManager.getOpenSessionCount <= 0) { + info(s"Spark engine has been running for more than $maxLifetime ms" + + s" and no open session now, terminating") + stop() + } + } + } + } + lifetimeTerminatingChecker.scheduleWithFixedDelay( + checkTask, + interval, + interval, + TimeUnit.MILLISECONDS) + } + } } object SparkSQLEngine extends Logging { diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala index 008f55323..47ed53ce4 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/ShareLevelSparkEngineSuite.scala @@ -19,7 +19,12 @@ package org.apache.kyuubi.engine.spark import java.util.UUID +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime + +import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL +import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.ShareLevel.ShareLevel import org.apache.kyuubi.operation.HiveJDBCTestHelper @@ -33,7 +38,10 @@ abstract class ShareLevelSparkEngineSuite extends WithDiscoverySparkSQLEngine with HiveJDBCTestHelper { def shareLevel: ShareLevel override def withKyuubiConf: Map[String, String] = { - super.withKyuubiConf ++ Map(ENGINE_SHARE_LEVEL.key -> shareLevel.toString) + super.withKyuubiConf ++ Map( + ENGINE_SHARE_LEVEL.key -> shareLevel.toString, + ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s", + ENGINE_CHECK_INTERVAL.key -> "PT5s") } override protected def jdbcUrl: String = getJdbcUrl override val namespace: String = { @@ -57,6 +65,25 @@ abstract class ShareLevelSparkEngineSuite } } } + + test("test spark engine max life-time") { + withZkClient { zkClient => + assert(engine.getServiceState == ServiceState.STARTED) + assert(zkClient.checkExists().forPath(namespace) != null) + withJdbcStatement() { _ => } + + eventually(Timeout(30.seconds)) { + shareLevel match { + case ShareLevel.CONNECTION => + assert(engine.getServiceState == ServiceState.STOPPED) + assert(zkClient.checkExists().forPath(namespace) == null) + case _ => + assert(engine.getServiceState == ServiceState.STOPPED) + assert(zkClient.checkExists().forPath(namespace) != null) + } + } + } + } } class ConnectionLevelSparkEngineSuite extends ShareLevelSparkEngineSuite { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index bb54b37a6..4e8fb9b51 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -622,6 +622,14 @@ object KyuubiConf { .stringConf .createOptional + val ENGINE_SPARK_MAX_LIFETIME: ConfigEntry[Long] = + buildConf("kyuubi.session.engine.spark.max.lifetime") + .doc("Max lifetime for spark engine, the engine will self-terminate when it reaches the" + + " end of life. 0 or negative means not to self-terminate.") + .version("1.6.0") + .timeConf + .createWithDefault(0) + val ENGINE_FLINK_MAIN_RESOURCE: OptionalConfigEntry[String] = buildConf("kyuubi.session.engine.flink.main.resource") .doc("The package used to create Flink SQL engine remote job. If it is undefined," + diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala index 9aa469418..dfacc9ece 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala @@ -22,6 +22,7 @@ import java.nio.file.{Files, Paths} import java.util.concurrent.{ConcurrentHashMap, Future, ThreadPoolExecutor, TimeUnit} import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration import org.apache.hive.service.rpc.thrift.TProtocolVersion @@ -227,25 +228,9 @@ abstract class SessionManager(name: String) extends CompositeService(name) { } else { conf.get(SERVER_EXEC_POOL_SHUTDOWN_TIMEOUT) } - timeoutChecker.shutdown() - try { - timeoutChecker.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS) - } catch { - case i: InterruptedException => - warn(s"Exceeded to shutdown session timeout checker ", i) - } - if (execPool != null) { - execPool.shutdown() - try { - execPool.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS) - } catch { - case e: InterruptedException => - warn( - s"Exceeded timeout($shutdownTimeout ms) to wait the exec-pool shutdown gracefully", - e) - } - } + ThreadUtils.shutdown(timeoutChecker, Duration(shutdownTimeout, TimeUnit.MILLISECONDS)) + ThreadUtils.shutdown(execPool, Duration(shutdownTimeout, TimeUnit.MILLISECONDS)) } private def startTimeoutChecker(): Unit = { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala index a540b954d..c5a3944e6 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/ThreadUtils.scala @@ -17,10 +17,10 @@ package org.apache.kyuubi.util -import java.util.concurrent.{LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit} +import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, ScheduledExecutorService, ScheduledThreadPoolExecutor, ThreadPoolExecutor, TimeUnit} import scala.concurrent.Awaitable -import scala.concurrent.duration.Duration +import scala.concurrent.duration.{Duration, FiniteDuration} import org.apache.kyuubi.{KyuubiException, Logging} @@ -64,4 +64,21 @@ object ThreadUtils extends Logging { throw new KyuubiException("Exception thrown in awaitResult: ", e) } } + + def shutdown( + executor: ExecutorService, + gracePeriod: Duration = FiniteDuration(30, TimeUnit.SECONDS)): Unit = { + val shutdownTimeout = gracePeriod.toMillis + if (executor != null) { + executor.shutdown() + try { + executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS) + } catch { + case e: InterruptedException => + warn( + s"Exceeded timeout($shutdownTimeout ms) to wait the exec-pool shutdown gracefully", + e) + } + } + } }