[KYUUBI #6028] Exited spark-submit process should not block batch submit queue

# 🔍 Description
## Issue References 🔗

While enabling batch implementation V2 with the following configurations
```
kyuubi.batch.impl.version=2
kyuubi.batch.submitter.enabled=true
kyuubi.batch.submitter.threads=48
spark.master=yarn
spark.submit.deployMode=cluster
spark.yarn.submit.waitAppCompletion=false
```

I found that the batch jobs will be blocked in the DB queue once a YARN queue has no resources, this brings an issue, the subsequential batch jobs that are going to be submitted to another YARN queue also be queued in DB, rather than YARN queue.

```
mysql> select state, engine_state, count(1) from metadata where state in ('INITIALIZED', 'PENDING', 'RUNNING') group by state, engine_state;
+-------------+--------------+----------+
| state       | engine_state | count(1) |
+-------------+--------------+----------+
| INITIALIZED | NULL         |      166 |
| PENDING     | NULL         |        1 |
| RUNNING     | PENDING      |      148 |
| RUNNING     | RUNNING      |      415 |
+-------------+--------------+----------+
```

## Describe Your Solution 🔧

The submitter queue whose size is controlled by `kyuubi.batch.submitter.threads` is designed to address the `spark-submit` process concurrency issue, too many `spark-submit` processes may run out of the Kyuubi server's node CPU/memory resources and eventually crash the service. For Spark YARN cluster mode, if set `spark.yarn.submit.waitAppCompletion=false`, the local `spark-submit` process exits immediately once the Application goes ACCEPTED status, even no resource could be allocated for the AM container, we should not block such case in submitter queue.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

Pass GA, and roll out into internal cluster.

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6028 from pan3793/batch-submit.

Closes #6028

05fcc758f [Cheng Pan] Exited spark-submit process should not block batch submit queue

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2024-01-30 23:38:52 +08:00
parent 10230b3f94
commit 208354c327
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
7 changed files with 18 additions and 10 deletions

View File

@ -193,7 +193,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
Seq("_123", "spark_exec", "spark@", "a" * 238).foreach { invalid =>
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalid)
val builder = new SparkProcessBuilder("test", true, conf)
val e = intercept[KyuubiException](builder.validateConf)
val e = intercept[KyuubiException](builder.validateConf())
assert(e.getMessage === s"'$invalid' in spark.kubernetes.executor.podNamePrefix is" +
s" invalid. must conform https://kubernetes.io/docs/concepts/overview/" +
"working-with-objects/names/#dns-subdomain-names and the value length <= 237")

View File

@ -225,7 +225,7 @@ private[kyuubi] class EngineRef(
acquiredPermit = true
val redactedCmd = builder.toString
info(s"Launching engine:\n$redactedCmd")
builder.validateConf
builder.validateConf()
val process = builder.start
var exitValue: Option[Int] = None
var lastApplicationInfo: Option[ApplicationInfo] = None

View File

@ -24,7 +24,6 @@ import java.nio.file.{Files, Path, Paths}
import scala.collection.JavaConverters._
import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.EvictingQueue
import org.apache.commons.lang3.StringUtils.containsIgnoreCase
@ -166,9 +165,8 @@ trait ProcBuilder {
// Visible for test
@volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
private var logCaptureThread: Thread = _
private var process: Process = _
@VisibleForTesting
@volatile private[kyuubi] var processLaunched: Boolean = _
@volatile private[kyuubi] var process: Process = _
@volatile private[kyuubi] var processLaunched: Boolean = false
private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
@ -206,7 +204,7 @@ trait ProcBuilder {
file
}
def validateConf: Unit = {}
def validateConf(): Unit = {}
final def start: Process = synchronized {
process = processBuilder.start()

View File

@ -302,7 +302,7 @@ class SparkProcessBuilder(
conf.getOption(KUBERNETES_NAMESPACE_KEY).orElse(defaultsConf.get(KUBERNETES_NAMESPACE_KEY))
}
override def validateConf: Unit = Validator.validateConf(conf)
override def validateConf(): Unit = Validator.validateConf(conf)
// For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user
def setSparkUserName(userName: String, buffer: mutable.Buffer[String]): Unit = {

View File

@ -100,6 +100,9 @@ class BatchJobSubmission(
getOperationLog)
}
def startupProcessAlive: Boolean =
builder.processLaunched && Option(builder.process).exists(_.isAlive)
override def currentApplicationInfo(): Option[ApplicationInfo] = {
if (isTerminal(state) && _applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) {
return _applicationInfo

View File

@ -94,8 +94,13 @@ class KyuubiBatchService(
metadata.appState match {
// app that is not submitted to resource manager
case None | Some(ApplicationState.NOT_FOUND) => false
// app that is pending in resource manager
case Some(ApplicationState.PENDING) => false
// app that is pending in resource manager while the local startup
// process is alive. For example, in Spark YARN cluster mode, if set
// spark.yarn.submit.waitAppCompletion=false, the local spark-submit
// process exits immediately once Application goes ACCEPTED status,
// even no resource could be allocated for the AM container.
case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive =>
false
// not sure, added for safe
case Some(ApplicationState.UNKNOWN) => false
case _ => true

View File

@ -111,6 +111,8 @@ class KyuubiBatchSession(
batchArgs,
metadata)
def startupProcessAlive: Boolean = batchJobSubmissionOp.startupProcessAlive
private def waitMetadataRequestsRetryCompletion(): Unit = {
val batchId = batchJobSubmissionOp.batchId
sessionManager.getMetadataRequestsRetryRef(batchId).foreach {