From c0bfebd5543a578c6554530a1575d6485be1d05a Mon Sep 17 00:00:00 2001 From: liupeiyue Date: Mon, 31 Jul 2023 11:42:18 +0800 Subject: [PATCH] [KYUUBI #5065][FOLLOWUP] Graceful close the process when launch engine timeout ### _Why are the changes needed?_ #5065 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes #5097 from ASiegeLion/master. Closes #5065 d50a388d6 [Cheng Pan] followup 80861dd71 [liupeiyue] [KYUUBI #5065][FOLLOWUP] Graceful close the process when launch engine timeout Lead-authored-by: liupeiyue Co-authored-by: Cheng Pan Signed-off-by: Cheng Pan --- .../main/scala/org/apache/kyuubi/Utils.scala | 23 +++++++++++++++++++ .../org/apache/kyuubi/engine/EngineRef.scala | 2 +- .../apache/kyuubi/engine/ProcBuilder.scala | 9 +------- .../kyuubi/operation/BatchJobSubmission.scala | 7 ++++-- 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala index 06c572130..fac30a173 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets import java.nio.file.{Files, Path, Paths, StandardCopyOption} import java.text.SimpleDateFormat import java.util.{Date, Properties, TimeZone, UUID} +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.locks.Lock @@ -417,4 +418,26 @@ object Utils extends Logging { lock.unlock() } } + + /** + * Try killing the process gracefully first, then forcibly if process does not exit in + * graceful period. + * + * @param process the being killed process + * @param gracefulPeriod the graceful killing period, in milliseconds + * @return the exit code if process exit normally, None if the process finally was killed + * forcibly + */ + def terminateProcess(process: java.lang.Process, gracefulPeriod: Long): Option[Int] = { + process.destroy() + if (process.waitFor(gracefulPeriod, TimeUnit.MILLISECONDS)) { + Some(process.exitValue()) + } else { + warn(s"Process does not exit after $gracefulPeriod ms, try to forcibly kill. " + + "Staging files generated by the process may be retained!") + process.destroyForcibly() + None + } + } + } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index 5ade86400..123aec46c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -224,7 +224,7 @@ private[kyuubi] class EngineRef( if (started + timeout <= System.currentTimeMillis()) { val killMessage = engineManager.killApplication(builder.appMgrInfo(), engineRefId) - process.destroyForcibly() + builder.close(true) MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser))) throw KyuubiSQLException( s"Timeout($timeout ms, you can modify ${ENGINE_INIT_TIMEOUT.key} to change it) to" + diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala index f1e49d687..304799db8 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala @@ -21,7 +21,6 @@ import java.io.{File, FilenameFilter, IOException} import java.net.URI import java.nio.charset.StandardCharsets import java.nio.file.{Files, Path, Paths} -import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -261,13 +260,7 @@ trait ProcBuilder { logCaptureThread = null } if (destroyProcess && process != null) { - process.destroy() - if (!process.waitFor(engineStartupDestroyTimeout, TimeUnit.MILLISECONDS)) { - warn("Engine startup process does not exit after " + - s"$engineStartupDestroyTimeout ms, try to forcibly kill. " + - "Staging files generated by the process may be retained!") - process.destroyForcibly() - } + Utils.terminateProcess(process, engineStartupDestroyTimeout) process = null } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index 8bb9804ec..ac723b2c6 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -26,7 +26,7 @@ import com.codahale.metrics.MetricRegistry import com.google.common.annotations.VisibleForTesting import org.apache.hive.service.rpc.thrift._ -import org.apache.kyuubi.{KyuubiException, KyuubiSQLException} +import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState, KillResponse, ProcBuilder} import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder @@ -131,6 +131,9 @@ class BatchJobSubmission( private val applicationStarvationTimeout = session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_STARVATION_TIMEOUT) + private val applicationStartupDestroyTimeout = + session.sessionConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_DESTROY_TIMEOUT) + private def updateBatchMetadata(): Unit = { val endTime = if (isTerminalState(state)) lastAccessTime else 0L @@ -246,7 +249,7 @@ class BatchJobSubmission( } if (applicationFailed(_applicationInfo)) { - process.destroyForcibly() + Utils.terminateProcess(process, applicationStartupDestroyTimeout) throw new KyuubiException(s"Batch job failed: ${_applicationInfo}") } else { process.waitFor()