[KYUUBI #7041] Fix NPE when getting metadtamanager in KubernetesApplicationOperation

### Why are the changes needed?

To fix NPE.

Before, we use below method to get `metadataManager`.
```
private def metadataManager = KyuubiServer.kyuubiServer.backendService
    .sessionManager.asInstanceOf[KyuubiSessionManager].metadataManager
```
But before the kyuubi server fully restarted, the `KyuubiServer.kyuubiServer` is null and might throw NPE during batch recovery phase.

For example:

```
:2025-04-23 14:06:24.040 ERROR [KyuubiSessionManager-exec-pool: Thread-231] org.apache.kyuubi.engine.KubernetesApplicationOperation: Failed to get application by label: kyuubi-unique-tag=95116703-4240-4cc1-9886-ccae3a2ac879, due to Cannot invoke "org.apache.kyuubi.server.KyuubiServer.backendService()" because the return value of "org.apache.kyuubi.server.KyuubiServer$.kyuubiServer()" is null
```

### How was this patch tested?

Existing GA.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #7041 from turboFei/fix_NPE.

Closes #7041

064d88707 [Wang, Fei] Fix NPE

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
(cherry picked from commit ee677a6feb)
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2025-04-23 20:20:39 -07:00
parent bfada2e572
commit fc345fe2d8
10 changed files with 22 additions and 17 deletions

View File

@ -19,13 +19,14 @@ package org.apache.kyuubi.engine
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationState.ApplicationState
import org.apache.kyuubi.server.metadata.MetadataManager
trait ApplicationOperation {
/**
* Step for initializing the instance.
*/
def initialize(conf: KyuubiConf): Unit
def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit
/**
* Step to clean up the instance

View File

@ -22,13 +22,14 @@ import java.nio.file.Paths
import scala.sys.process._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.server.metadata.MetadataManager
class JpsApplicationOperation extends ApplicationOperation {
import ApplicationOperation._
private var runner: String = _
override def initialize(conf: KyuubiConf): Unit = {
override def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit = {
val jps = sys.env.get("JAVA_HOME").orElse(sys.props.get("java.home"))
.map(Paths.get(_, "bin", "jps").toString)
.getOrElse("jps")

View File

@ -36,8 +36,7 @@ import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.Kube
import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE}
import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, KILLED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.KyuubiServer
import org.apache.kyuubi.session.KyuubiSessionManager
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils}
class KubernetesApplicationOperation extends ApplicationOperation with Logging {
@ -77,8 +76,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo))
}
private def metadataManager = KyuubiServer.kyuubiServer.backendService
.sessionManager.asInstanceOf[KyuubiSessionManager].metadataManager
private var metadataManager: Option[MetadataManager] = _
// Visible for testing
private[engine] def checkKubernetesInfo(kubernetesInfo: KubernetesInfo): Unit = {
@ -113,8 +111,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
}
}
override def initialize(conf: KyuubiConf): Unit = {
override def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit = {
kyuubiConf = conf
this.metadataManager = metadataManager
info("Start initializing Kubernetes application operation.")
submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT)
// Defer cleaning terminated application information

View File

@ -29,10 +29,12 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.util.reflect.ReflectUtils._
class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager") {
class KyuubiApplicationManager(metadataManager: Option[MetadataManager])
extends AbstractService("KyuubiApplicationManager") {
// TODO: maybe add a configuration is better
private val operations =
@ -41,7 +43,7 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
override def initialize(conf: KyuubiConf): Unit = {
operations.foreach { op =>
try {
op.initialize(conf)
op.initialize(conf, metadataManager)
} catch {
case NonFatal(e) => warn(s"Error starting ${op.getClass.getSimpleName}: ${e.getMessage}")
}

View File

@ -32,6 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf.YarnUserStrategy._
import org.apache.kyuubi.engine.ApplicationOperation._
import org.apache.kyuubi.engine.ApplicationState.ApplicationState
import org.apache.kyuubi.engine.YarnApplicationOperation.toApplicationState
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.util.KyuubiHadoopUtils
class YarnApplicationOperation extends ApplicationOperation with Logging {
@ -40,7 +41,7 @@ class YarnApplicationOperation extends ApplicationOperation with Logging {
@volatile private var adminYarnClient: Option[YarnClient] = None
private var submitTimeout: Long = _
override def initialize(conf: KyuubiConf): Unit = {
override def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit = {
submitTimeout = conf.get(KyuubiConf.ENGINE_YARN_SUBMIT_TIMEOUT)
yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)

View File

@ -51,12 +51,11 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
val operationManager = new KyuubiOperationManager()
val credentialsManager = new HadoopCredentialsManager()
val applicationManager = new KyuubiApplicationManager()
// Currently, the metadata manager is used by the REST frontend which provides batch job APIs,
// so we initialize it only when Kyuubi starts with the REST frontend.
lazy val metadataManager: Option[MetadataManager] =
if (conf.isRESTEnabled) Some(new MetadataManager()) else None
var metadataManager: Option[MetadataManager] = None
var applicationManager: KyuubiApplicationManager = _
// lazy is required for plugins since the conf is null when this class initialization
lazy val sessionConfAdvisor: Seq[SessionConfAdvisor] = PluginLoader.loadSessionConfAdvisor(conf)
@ -73,6 +72,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
if (conf.isRESTEnabled) metadataManager = Some(new MetadataManager())
applicationManager = new KyuubiApplicationManager(metadataManager)
addService(applicationManager)
addService(credentialsManager)
metadataManager.foreach(addService)

View File

@ -42,7 +42,7 @@ sealed trait WithKyuubiServerOnYarn extends WithKyuubiServer {
protected lazy val yarnOperation: YarnApplicationOperation = {
val operation = new YarnApplicationOperation()
operation.initialize(miniYarnService.getConf)
operation.initialize(miniYarnService.getConf, None)
operation
}

View File

@ -35,7 +35,7 @@ import org.apache.kyuubi.util.reflect.ReflectUtils._
class JpsApplicationOperationSuite extends KyuubiFunSuite {
private val jps = loadFromServiceLoader[ApplicationOperation]()
.find(_.getClass.isAssignableFrom(classOf[JpsApplicationOperation])).get
jps.initialize(null)
jps.initialize(null, None)
test("JpsApplicationOperation with jstat") {
assert(jps.isSupported(ApplicationManagerInfo(None)))

View File

@ -28,7 +28,7 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite {
kyuubiConf.set(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST.key, "ns1,ns2")
val operation = new KubernetesApplicationOperation()
operation.initialize(kyuubiConf)
operation.initialize(kyuubiConf, None)
operation.checkKubernetesInfo(KubernetesInfo(None, None))
operation.checkKubernetesInfo(KubernetesInfo(Some("1"), None))

View File

@ -49,7 +49,7 @@ import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion.HIVE_CL
class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
private val engineMgr = new KyuubiApplicationManager()
private val engineMgr = new KyuubiApplicationManager(None)
override protected lazy val conf: KyuubiConf = KyuubiConf()
.set(AUTHENTICATION_METHOD, Seq("CUSTOM"))