[KYUUBI #4780] Get engine application info with interval to prevent frequent call to resource manager

### _Why are the changes needed?_

To prevent frequent call to resource manager.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4780 from turboFei/engine_ref.

Closes #4780

09f67c699 [fwang12] re-order
88c1cb33c [fwang12] sleep

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
fwang12 2023-05-03 20:24:59 +08:00 committed by Cheng Pan
parent 0875ce7066
commit 66de0ad8a0
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D

View File

@ -206,10 +206,11 @@ private[kyuubi] class EngineRef(
builder.validateConf
val process = builder.start
var exitValue: Option[Int] = None
var lastApplicationInfo: Option[ApplicationInfo] = None
while (engineRef.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
exitValue = Some(process.exitValue())
if (exitValue.get != 0) {
if (exitValue != Some(0)) {
val error = builder.getError
MetricsSystem.tracing { ms =>
ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser))
@ -219,30 +220,6 @@ private[kyuubi] class EngineRef(
}
}
// even the submit process succeeds, the application might meet failure when initializing,
// check the engine application state from engine manager and fast fail on engine terminate
if (exitValue == Some(0)) {
Option(engineManager).foreach { engineMgr =>
engineMgr.getApplicationInfo(
builder.clusterManager(),
engineRefId,
Some(started)).foreach { appInfo =>
if (ApplicationState.isTerminated(appInfo.state)) {
MetricsSystem.tracing { ms =>
ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser))
ms.incCount(MetricRegistry.name(ENGINE_FAIL, "ENGINE_TERMINATE"))
}
throw new KyuubiSQLException(
s"""
|The engine application has been terminated. Please check the engine log.
|ApplicationInfo: ${appInfo.toMap.mkString("(\n", ",\n", "\n)")}
|""".stripMargin,
builder.getError)
}
}
}
}
if (started + timeout <= System.currentTimeMillis()) {
val killMessage = engineManager.killApplication(builder.clusterManager(), engineRefId)
process.destroyForcibly()
@ -253,6 +230,38 @@ private[kyuubi] class EngineRef(
builder.getError)
}
engineRef = discoveryClient.getEngineByRefId(engineSpace, engineRefId)
// even the submit process succeeds, the application might meet failure when initializing,
// check the engine application state from engine manager and fast fail on engine terminate
if (engineRef.isEmpty && exitValue == Some(0)) {
Option(engineManager).foreach { engineMgr =>
if (lastApplicationInfo.isDefined) {
TimeUnit.SECONDS.sleep(1)
}
val applicationInfo = engineMgr.getApplicationInfo(
builder.clusterManager(),
engineRefId,
Some(started))
applicationInfo.foreach { appInfo =>
if (ApplicationState.isTerminated(appInfo.state)) {
MetricsSystem.tracing { ms =>
ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser))
ms.incCount(MetricRegistry.name(ENGINE_FAIL, "ENGINE_TERMINATE"))
}
throw new KyuubiSQLException(
s"""
|The engine application has been terminated. Please check the engine log.
|ApplicationInfo: ${appInfo.toMap.mkString("(\n", ",\n", "\n)")}
|""".stripMargin,
builder.getError)
}
}
lastApplicationInfo = applicationInfo
}
}
}
engineRef.get
} finally {