[KYUUBI #5220][FOLLOWUP] Batch submitted considers application state

### _Why are the changes needed?_

This PR aims to fix the `SparkSubmit` concurrency limit implemented in #5220.

"submitted" judgment in #5220 only considered `OperationState`, actually, `ApplicationState` should be counted too. For instance, if a batch is pending in `ACCEPTED` state, the `SparkSubmit` process won't exit until changed to `RUNNING` or `FAILED` state, in such case, the `OperationState` is `RUNNING` and `ApplicationState` is `PENDING`, it should not be treated as "submitted".

Additionally, this PR treats metastore as the single of truth for batch instead of `SessionManager`

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No.

Closes #5279 from pan3793/5220-followup.

Closes #5220

903abc6d4 [Cheng Pan] Fix
0af6738d2 [Cheng Pan] [KYUUBI #5220][FOLLOWUP] Batch submmited should not contain application pending state

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2023-09-14 12:13:20 +08:00
parent b21ae88545
commit dd2cd516d7
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
2 changed files with 31 additions and 18 deletions

View File

@ -20,6 +20,7 @@ package org.apache.kyuubi.server
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS
import org.apache.kyuubi.engine.ApplicationState
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.service.{AbstractService, Serverable}
@ -83,26 +84,28 @@ class KyuubiBatchService(
metadata.requestArgs,
Some(metadata),
fromRecovery = false)
val sessionHandle = sessionManager.openBatchSession(batchSession)
sessionManager.openBatchSession(batchSession)
var submitted = false
while (!submitted) { // block until batch job submitted
submitted = sessionManager.getBatchSession(sessionHandle).map { batchSession =>
val batchState = batchSession.batchJobSubmissionOp.getStatus.state
batchState == OperationState.RUNNING || OperationState.isTerminal(batchState)
}.getOrElse {
error(s"Batch Session $batchId is not existed, marked as finished")
true
submitted = metadataManager.getBatchSessionMetadata(batchId) match {
case Some(metadata) if OperationState.isTerminal(metadata.opState) =>
true
case Some(metadata) if metadata.opState == OperationState.RUNNING =>
metadata.appState match {
// app that is not submitted to resource manager
case None | Some(ApplicationState.NOT_FOUND) => false
// app that is pending in resource manager
case Some(ApplicationState.PENDING) => false
// not sure, added for safe
case Some(ApplicationState.UNKNOWN) => false
case _ => true
}
case Some(_) =>
false
case None =>
error(s"$batchId does not existed in metastore, assume it is finished")
true
}
// should we always treat metastore as the single of truth?
//
// submitted = metadataManager.getBatchSessionMetadata(batchId) match {
// case Some(metadata) =>
// val batchState = OperationState.withName(metadata.state)
// batchState == OperationState.RUNNING || OperationState.isTerminal(batchState)
// case None =>
// error(s"$batchId does not existed in metastore, assume it is finished")
// true
// }
if (!submitted) Thread.sleep(1000)
}
info(s"$batchId is submitted or finished.")

View File

@ -18,7 +18,10 @@
package org.apache.kyuubi.server.metadata.api
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationManagerInfo
import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState}
import org.apache.kyuubi.engine.ApplicationState.ApplicationState
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.session.SessionType.SessionType
/**
@ -82,4 +85,11 @@ case class Metadata(
requestConf.get(KyuubiConf.KUBERNETES_CONTEXT.key),
requestConf.get(KyuubiConf.KUBERNETES_NAMESPACE.key))
}
def opState: OperationState = {
assert(state != null, "invalid state, a normal batch record must have non-null state")
OperationState.withName(state)
}
def appState: Option[ApplicationState] = Option(engineState).map(ApplicationState.withName)
}