[KYUUBI #7163][SPARK] Check whether engine context stopped in engine terminating checker

### Why are the changes needed?

To close #7163, in this PR, it checks whether engine context stopped in engine terminating checker.
1. Spark context stooped dut to OOM in `spark-listener-group-shared`, and call `tryOrStopSparkContext`.

```
25/08/03 19:08:06 ERROR Utils: uncaught error in thread spark-listener-group-shared, stopping SparkContext
java.lang.OutOfMemoryError: GC overhead limit exceeded
25/08/03 19:08:06 INFO OperationAuditLogger: operation=a7f134b9-373b-402d-a82b-2d42df568807 opType=ExecuteStatement state=INITIALIZED   user=b_hrvst    session=6a90d01c-7627-4ae6-a506-7ba826355489
...
25/08/03 19:08:23 INFO SparkSQLSessionManager: Opening session for b_hrvst10.147.254.115
25/08/03 19:08:23 ERROR SparkTBinaryFrontendService: Error opening session:
org.apache.kyuubi.KyuubiSQLException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:951)
org.apache.kyuubi.engine.spark.SparkSQLEngine$.createSpark(SparkSQLEngine.scala:337)
org.apache.kyuubi.engine.spark.SparkSQLEngine$.main(SparkSQLEngine.scala:415)
org.apache.kyuubi.engine.spark.SparkSQLEngine.main(SparkSQLEngine.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
The currently active SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:951)
org.apache.kyuubi.engine.spark.SparkSQLEngine$.createSpark(SparkSQLEngine.scala:337)
org.apache.kyuubi.engine.spark.SparkSQLEngine$.main(SparkSQLEngine.scala:415)
org.apache.kyuubi.engine.spark.SparkSQLEngine.main(SparkSQLEngine.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)

    at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
    at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:73)
```

2. The kyuubi engine stop after 12 hours.
```
25/08/04 07:13:25 ERROR ZookeeperDiscoveryClient: Zookeeper client connection state changed to: LOST, but failed to reconnect in 3 seconds. Give up retry and stop gracefully .
25/08/04 07:13:25 INFO ClientCnxn: Session establishment complete on server zeus-slc-zk-3.vip.hadoop.ebay.com/10.147.141.240:2181, sessionid = 0x3939e22c983032e, negotiated timeout = 40000
25/08/04 07:13:25 INFO ConnectionStateManager: State change: RECONNECTED
25/08/04 07:13:25 INFO ZookeeperDiscoveryClient: Zookeeper client connection state changed to: RECONNECTED
25/08/04 07:13:25 INFO SparkSQLEngine: Service: [SparkTBinaryFrontend] is stopping.
25/08/04 07:13:25 INFO SparkTBinaryFrontendService: Service: [EngineServiceDiscovery] is stopping.
25/08/04 07:13:25 WARN EngineServiceDiscovery: The Zookeeper ensemble is LOST
25/08/04 07:13:25 INFO EngineServiceDiscovery: Service[EngineServiceDiscovery] is stopped.
25/08/04 07:13:25 INFO SparkTBinaryFrontendService: Service[SparkTBinaryFrontend] is stopped.
25/08/04 07:13:25 INFO SparkTBinaryFrontendService: SparkTBinaryFrontend has stopped
25/08/04 07:13:25 INFO SparkSQLEngine: Service: [SparkSQLBackendService] is stopping.
25/08/04 07:13:25 INFO SparkSQLBackendService: Service: [SparkSQLSessionManager] is stopping.
25/08/04 07:13:25 INFO SparkSQLSessionManager: Service: [SparkSQLOperationManager] is stopping.
25/08/04 07:13:45 INFO SparkSQLOperationManager: Service[SparkSQLOperationManager] is stopped.
25/08/04 07:13:45 INFO SparkSQLSessionManager: Service[SparkSQLSessionManager] is stopped.
```

3. seem the shutdown hook does not work in such case
9a0c49e791/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala (L375-L376)

4. and `SparkSQLEngineListener` did not receive `ApplicationEnd` message, maybe due to `spark-listener-group-shared` OOM? I do not have jstack for that, and can not check whether the thread alive.
9a0c49e791/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala (L55-L63)

### How was this patch tested?

Existing GA.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #7167 from turboFei/check_spark_stopped.

Closes #7163

835cb3dec [Wang, Fei] SparkContext
cd542decb [Wang, Fei] Revert "no hard code"
cf9e40ef6 [Wang, Fei] no hard code
ca551c23d [Wang, Fei] check engine context stopped

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2025-08-07 01:27:55 -07:00
parent 2c64e4e5fc
commit b31663f569
2 changed files with 15 additions and 4 deletions

View File

@ -227,4 +227,8 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
opHandle: OperationHandle): Path = { opHandle: OperationHandle): Path = {
new Path(getSessionResultSavePath(sessionHandle), opHandle.identifier.toString) new Path(getSessionResultSavePath(sessionHandle), opHandle.identifier.toString)
} }
override private[kyuubi] def isEngineContextStopped = {
spark.sparkContext.isStopped
}
} }

View File

@ -345,10 +345,15 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
if (idleTimeout > 0) { if (idleTimeout > 0) {
val checkTask = new Runnable { val checkTask = new Runnable {
override def run(): Unit = { override def run(): Unit = {
if (!shutdown && System.currentTimeMillis() - latestLogoutTime > idleTimeout && if (!shutdown) {
getActiveUserSessionCount <= 0) { if (System.currentTimeMillis() - latestLogoutTime > idleTimeout &&
info(s"Idled for more than $idleTimeout ms, terminating") getActiveUserSessionCount <= 0) {
stop() info(s"Idled for more than $idleTimeout ms, terminating")
stop()
} else if (isEngineContextStopped) {
error(s"Engine's SparkContext is stopped, terminating")
stop()
}
} }
} }
} }
@ -360,4 +365,6 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
TimeUnit.MILLISECONDS) TimeUnit.MILLISECONDS)
} }
} }
private[kyuubi] def isEngineContextStopped: Boolean = false
} }