[KYUUBI #5065][FOLLOWUP] Graceful close the process when launch engine timeout

### _Why are the changes needed?_
#5065

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

Closes #5097 from ASiegeLion/master.

Closes #5065

d50a388d6 [Cheng Pan] followup
80861dd71 [liupeiyue] [KYUUBI #5065][FOLLOWUP] Graceful close the process when launch engine timeout

Lead-authored-by: liupeiyue <liupeiyue@yy.com>
Co-authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
liupeiyue 2023-07-31 11:42:18 +08:00 committed by Cheng Pan
parent 998b5d5349
commit c0bfebd554
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
4 changed files with 30 additions and 11 deletions

View File

@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.text.SimpleDateFormat
import java.util.{Date, Properties, TimeZone, UUID}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.Lock
@ -417,4 +418,26 @@ object Utils extends Logging {
lock.unlock()
}
}
/**
* Try killing the process gracefully first, then forcibly if process does not exit in
* graceful period.
*
* @param process the being killed process
* @param gracefulPeriod the graceful killing period, in milliseconds
* @return the exit code if process exit normally, None if the process finally was killed
* forcibly
*/
def terminateProcess(process: java.lang.Process, gracefulPeriod: Long): Option[Int] = {
process.destroy()
if (process.waitFor(gracefulPeriod, TimeUnit.MILLISECONDS)) {
Some(process.exitValue())
} else {
warn(s"Process does not exit after $gracefulPeriod ms, try to forcibly kill. " +
"Staging files generated by the process may be retained!")
process.destroyForcibly()
None
}
}
}

View File

@ -224,7 +224,7 @@ private[kyuubi] class EngineRef(
if (started + timeout <= System.currentTimeMillis()) {
val killMessage = engineManager.killApplication(builder.appMgrInfo(), engineRefId)
process.destroyForcibly()
builder.close(true)
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
throw KyuubiSQLException(
s"Timeout($timeout ms, you can modify ${ENGINE_INIT_TIMEOUT.key} to change it) to" +

View File

@ -21,7 +21,6 @@ import java.io.{File, FilenameFilter, IOException}
import java.net.URI
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
@ -261,13 +260,7 @@ trait ProcBuilder {
logCaptureThread = null
}
if (destroyProcess && process != null) {
process.destroy()
if (!process.waitFor(engineStartupDestroyTimeout, TimeUnit.MILLISECONDS)) {
warn("Engine startup process does not exit after " +
s"$engineStartupDestroyTimeout ms, try to forcibly kill. " +
"Staging files generated by the process may be retained!")
process.destroyForcibly()
}
Utils.terminateProcess(process, engineStartupDestroyTimeout)
process = null
}
}

View File

@ -26,7 +26,7 @@ import com.codahale.metrics.MetricRegistry
import com.google.common.annotations.VisibleForTesting
import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.{KyuubiException, KyuubiSQLException}
import org.apache.kyuubi.{KyuubiException, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationState, KillResponse, ProcBuilder}
import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
@ -131,6 +131,9 @@ class BatchJobSubmission(
private val applicationStarvationTimeout =
session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_STARVATION_TIMEOUT)
private val applicationStartupDestroyTimeout =
session.sessionConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_DESTROY_TIMEOUT)
private def updateBatchMetadata(): Unit = {
val endTime = if (isTerminalState(state)) lastAccessTime else 0L
@ -246,7 +249,7 @@ class BatchJobSubmission(
}
if (applicationFailed(_applicationInfo)) {
process.destroyForcibly()
Utils.terminateProcess(process, applicationStartupDestroyTimeout)
throw new KyuubiException(s"Batch job failed: ${_applicationInfo}")
} else {
process.waitFor()