[KYUUBI #4467][K8S][BATCH] Tolerate Driver Pod ephemerally invisible after submitting

### _Why are the changes needed?_

The following discussion assumes using Spark cluster mode w/ `waitCompletion=false`.

In Spark on Yarn, the application is visible immediately after `spark-submit` is returned, but things are different in Spark on K8s, Driver Pod is ephemerally invisible after submitting, so NOT_FOUND is returned instead of UNKNOWN or PENDING.

To tolerate the above case, `kyuubi.engine.submit.timeout` is introduced, ApplicationManager will report UNKNOWN instead of NOT_FOUND during the Driver Pod scheduling period.

More detail in #4467
1. Remove `KubernetesApplicationOperation`'s `JpsApplicationOperation` for handle Client Deploy Mode(`YarnApplicationOperation` doesn't handle this either)
2. Add engine submit timeout for `KubernetesApplicationOperation` to return Unknown status when not found driver pod in time range.
3. GetApplicationInfo with it's submit time

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4469 from zwangsheng/4467.

Closes #4467

562b67463 [zwangsheng] [KYUUBI #4467] Fix Setting.md
362c43d1b [zwangsheng] [KYUUBI #4467] Fix Setting.md
ac69f4d81 [zwangsheng] [KYUUBI #4467] Add Config Desc
d2b9fb660 [zwangsheng] [KYUUBI #4467] save tab
eac880fcf [zwangsheng] [KYUUBI #4467] Ingnore Kubernetes Operation for client mode test
7a20b97a4 [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
aa4c7716a [zwangsheng] [KYUUBI #4467] Ingnore Kubernetes Operation for client mode test
c5bd888ab [zwangsheng] [KYUUBI #4467] note it test
a86dcefba [zwangsheng] [KYUUBI #4467] Using default none
aed7f8794 [Cheng Pan] Update docs/deployment/settings.md
490df7dc0 [zwangsheng] [KYUUBI #4467] fix complie
33f3a5be8 [zwangsheng] [KYUUBI #4467] fix comments
4745790cf [zwangsheng] [KYUUBI #4467] Fix IT Test
924cfe38e [zwangsheng] [KYUUBI #4467] Fix Setting.md
5f8aeaacc [zwangsheng] [KYUUBI #4467] KubernetesApplicationOperation Wait if not fount driver pod in limit time range

Lead-authored-by: zwangsheng <2213335496@qq.com>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
zwangsheng 2023-03-14 23:01:43 +08:00 committed by Cheng Pan
parent 8267108e7f
commit b23c87c318
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
11 changed files with 77 additions and 39 deletions

View File

@ -160,6 +160,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.spark.python.env.archive | &lt;undefined&gt; | Portable Python env archive used for Spark engine Python language mode. | string | 1.7.0 |
| kyuubi.engine.spark.python.env.archive.exec.path | bin/python | The Python exec path under the Python env archive. | string | 1.7.0 |
| kyuubi.engine.spark.python.home.archive | &lt;undefined&gt; | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode. | string | 1.7.0 |
| kyuubi.engine.submit.timeout | PT30S | Period to tolerant Driver Pod ephemerally invisible after submitting. In some Resource Managers, e.g. K8s, the Driver Pod is not invisible immediately after `spark-submit` is returned. | duration | 1.7.1 |
| kyuubi.engine.trino.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go.<ul> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done.</li></ul> | seq | 1.7.0 |
| kyuubi.engine.trino.extra.classpath | &lt;undefined&gt; | The extra classpath for the Trino query engine, for configuring other libs which may need by the Trino engine | string | 1.6.0 |
| kyuubi.engine.trino.java.options | &lt;undefined&gt; | The extra Java options for the Trino query engine | string | 1.6.0 |

View File

@ -125,6 +125,7 @@ class SparkClusterModeOnKubernetesSuite
override protected def jdbcUrl: String = getJdbcUrl
}
// [KYUUBI #4467] KubernetesApplicationOperator doesn't support client mode
class KyuubiOperationKubernetesClusterClientModeSuite
extends SparkClientModeOnKubernetesSuiteBase {
private lazy val k8sOperation: KubernetesApplicationOperation = {
@ -136,7 +137,7 @@ class KyuubiOperationKubernetesClusterClientModeSuite
private def sessionManager: KyuubiSessionManager =
server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
test("Spark Client Mode On Kubernetes Kyuubi KubernetesApplicationOperation Suite") {
ignore("Spark Client Mode On Kubernetes Kyuubi KubernetesApplicationOperation Suite") {
val batchRequest = newSparkBatchRequest(conf.getAll ++ Map(
KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))

View File

@ -2541,6 +2541,15 @@ object KyuubiConf {
.booleanConf
.createWithDefault(true)
val ENGINE_SUBMIT_TIMEOUT: ConfigEntry[Long] =
buildConf("kyuubi.engine.submit.timeout")
.doc("Period to tolerant Driver Pod ephemerally invisible after submitting. " +
"In some Resource Managers, e.g. K8s, the Driver Pod is not invisible immediately " +
"after `spark-submit` is returned.")
.version("1.7.1")
.timeConf
.createWithDefaultString("PT30S")
/**
* Holds information about keys that have been deprecated.
*

View File

@ -56,9 +56,10 @@ trait ApplicationOperation {
* Get the engine/application status by the unique application tag
*
* @param tag the unique application tag for engine instance.
* @param submitTime engine submit to resourceManager time
* @return [[ApplicationInfo]]
*/
def getApplicationInfoByTag(tag: String): ApplicationInfo
def getApplicationInfoByTag(tag: String, submitTime: Option[Long] = None): ApplicationInfo
}
object ApplicationState extends Enumeration {

View File

@ -216,7 +216,10 @@ private[kyuubi] class EngineRef(
// check the engine application state from engine manager and fast fail on engine terminate
if (exitValue == Some(0)) {
Option(engineManager).foreach { engineMgr =>
engineMgr.getApplicationInfo(builder.clusterManager(), engineRefId).foreach { appInfo =>
engineMgr.getApplicationInfo(
builder.clusterManager(),
engineRefId,
Some(started)).foreach { appInfo =>
if (ApplicationState.isTerminated(appInfo.state)) {
MetricsSystem.tracing { ms =>
ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser))

View File

@ -84,7 +84,7 @@ class JpsApplicationOperation extends ApplicationOperation {
killJpsApplicationByTag(tag, true)
}
override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = {
val commandOption = getEngine(tag)
if (commandOption.nonEmpty) {
val idAndCmd = commandOption.get

View File

@ -32,16 +32,15 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
@volatile
private var kubernetesClient: KubernetesClient = _
private var jpsOperation: JpsApplicationOperation = _
private var submitTimeout: Long = _
override def initialize(conf: KyuubiConf): Unit = {
jpsOperation = new JpsApplicationOperation
jpsOperation.initialize(conf)
info("Start initializing Kubernetes Client.")
kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match {
case Some(client) =>
info(s"Initialized Kubernetes Client connect to: ${client.getMasterUrl}")
submitTimeout = conf.get(KyuubiConf.ENGINE_SUBMIT_TIMEOUT)
client
case None =>
warn("Fail to init Kubernetes Client for Kubernetes Application Operation")
@ -50,6 +49,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}
override def isSupported(clusterManager: Option[String]): Boolean = {
// TODO add deploy mode to check whether is supported
kubernetesClient != null && clusterManager.nonEmpty &&
clusterManager.get.toLowerCase.startsWith("k8s")
}
@ -73,8 +73,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
s"Operation of deleted appId: ${podList.get(0).getMetadata.getName} is completed")
}
} else {
// client mode
jpsOperation.killApplicationByTag(tag)
(
false,
s"Target Pod(tag: $tag) is not found, due to pod have been deleted or not created")
}
} catch {
case e: Exception =>
@ -85,32 +86,44 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}
}
override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
if (kubernetesClient != null) {
debug(s"Getting application info from Kubernetes cluster by $tag tag")
try {
val podList = findDriverPodByTag(tag)
if (podList.size() != 0) {
val pod = podList.get(0)
val info = ApplicationInfo(
// spark pods always tag label `spark-app-selector:<spark-app-id>`
id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
name = pod.getMetadata.getName,
state = KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase),
error = Option(pod.getStatus.getReason))
debug(s"Successfully got application info by $tag: $info")
info
} else {
// client mode
jpsOperation.getApplicationInfoByTag(tag)
}
} catch {
case e: Exception =>
error(s"Failed to get application with $tag, due to ${e.getMessage}")
override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = {
if (kubernetesClient == null) {
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
}
debug(s"Getting application info from Kubernetes cluster by $tag tag")
try {
val podList = findDriverPodByTag(tag)
if (podList.size() != 0) {
val pod = podList.get(0)
val info = ApplicationInfo(
// spark pods always tag label `spark-app-selector:<spark-app-id>`
id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
name = pod.getMetadata.getName,
state = KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase),
error = Option(pod.getStatus.getReason))
debug(s"Successfully got application info by $tag: $info")
return info
}
// Kyuubi should wait second if pod is not be created
submitTime match {
case Some(time) =>
val elapsedTime = System.currentTimeMillis() - time
if (elapsedTime > submitTimeout) {
error(s"Can't find target driver pod by tag: $tag, " +
s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
} else {
warn("Wait for driver pod to be created, " +
s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
ApplicationInfo(id = null, name = null, ApplicationState.UNKNOWN)
}
case None =>
ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
}
} else {
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
} catch {
case e: Exception =>
error(s"Failed to get application with $tag, due to ${e.getMessage}")
ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
}
}

View File

@ -84,10 +84,11 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
def getApplicationInfo(
clusterManager: Option[String],
tag: String): Option[ApplicationInfo] = {
tag: String,
submitTime: Option[Long] = None): Option[ApplicationInfo] = {
val operation = operations.find(_.isSupported(clusterManager))
operation match {
case Some(op) => Some(op.getApplicationInfoByTag(tag))
case Some(op) => Some(op.getApplicationInfoByTag(tag, submitTime))
case None => None
}
}

View File

@ -75,7 +75,7 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
}
}
override def getApplicationInfoByTag(tag: String): ApplicationInfo = {
override def getApplicationInfoByTag(tag: String, submitTime: Option[Long]): ApplicationInfo = {
if (yarnClient != null) {
debug(s"Getting application info from Yarn cluster by $tag tag")
val reports = yarnClient.getApplications(null, null, Set(tag).asJava)

View File

@ -105,8 +105,16 @@ class BatchJobSubmission(
override protected def currentApplicationInfo: Option[ApplicationInfo] = {
if (isTerminal(state) && _applicationInfo.nonEmpty) return _applicationInfo
// only the ApplicationInfo with non-empty id is valid for the operation
val submitTime = if (_appStartTime <= 0) {
System.currentTimeMillis()
} else {
_appStartTime
}
val applicationInfo =
applicationManager.getApplicationInfo(builder.clusterManager(), batchId).filter(_.id != null)
applicationManager.getApplicationInfo(
builder.clusterManager(),
batchId,
Some(submitTime)).filter(_.id != null)
applicationInfo.foreach { _ =>
if (_appStartTime <= 0) {
_appStartTime = System.currentTimeMillis()

View File

@ -296,7 +296,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
error(s"Error redirecting get batch[$batchId] to ${metadata.kyuubiInstance}", e)
val batchAppStatus = sessionManager.applicationManager.getApplicationInfo(
metadata.clusterManager,
batchId)
batchId,
Some(metadata.createTime))
buildBatch(metadata, batchAppStatus)
}
}