From fc345fe2d89d87e68e7038dc70bf3d0a00db69e3 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Wed, 23 Apr 2025 20:20:39 -0700 Subject: [PATCH] [KYUUBI #7041] Fix NPE when getting metadtamanager in KubernetesApplicationOperation ### Why are the changes needed? To fix NPE. Before, we use below method to get `metadataManager`. ``` private def metadataManager = KyuubiServer.kyuubiServer.backendService .sessionManager.asInstanceOf[KyuubiSessionManager].metadataManager ``` But before the kyuubi server fully restarted, the `KyuubiServer.kyuubiServer` is null and might throw NPE during batch recovery phase. For example: ``` :2025-04-23 14:06:24.040 ERROR [KyuubiSessionManager-exec-pool: Thread-231] org.apache.kyuubi.engine.KubernetesApplicationOperation: Failed to get application by label: kyuubi-unique-tag=95116703-4240-4cc1-9886-ccae3a2ac879, due to Cannot invoke "org.apache.kyuubi.server.KyuubiServer.backendService()" because the return value of "org.apache.kyuubi.server.KyuubiServer$.kyuubiServer()" is null ``` ### How was this patch tested? Existing GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7041 from turboFei/fix_NPE. Closes #7041 064d88707 [Wang, Fei] Fix NPE Authored-by: Wang, Fei Signed-off-by: Wang, Fei (cherry picked from commit ee677a6feb5cf18f27118a9c335d52af59a1ecb6) Signed-off-by: Wang, Fei --- .../org/apache/kyuubi/engine/ApplicationOperation.scala | 3 ++- .../apache/kyuubi/engine/JpsApplicationOperation.scala | 3 ++- .../kyuubi/engine/KubernetesApplicationOperation.scala | 9 ++++----- .../apache/kyuubi/engine/KyuubiApplicationManager.scala | 6 ++++-- .../apache/kyuubi/engine/YarnApplicationOperation.scala | 3 ++- .../org/apache/kyuubi/session/KyuubiSessionManager.scala | 7 ++++--- .../scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 2 +- .../kyuubi/engine/JpsApplicationOperationSuite.scala | 2 +- .../engine/KubernetesApplicationOperationSuite.scala | 2 +- .../apache/kyuubi/server/api/v1/AdminResourceSuite.scala | 2 +- 10 files changed, 22 insertions(+), 17 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala index 23a49c1ae..b9bce2e85 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala @@ -19,13 +19,14 @@ package org.apache.kyuubi.engine import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.ApplicationState.ApplicationState +import org.apache.kyuubi.server.metadata.MetadataManager trait ApplicationOperation { /** * Step for initializing the instance. */ - def initialize(conf: KyuubiConf): Unit + def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit /** * Step to clean up the instance diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala index 1d0d58d16..ffc233c01 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala @@ -22,13 +22,14 @@ import java.nio.file.Paths import scala.sys.process._ import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.server.metadata.MetadataManager class JpsApplicationOperation extends ApplicationOperation { import ApplicationOperation._ private var runner: String = _ - override def initialize(conf: KyuubiConf): Unit = { + override def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit = { val jps = sys.env.get("JAVA_HOME").orElse(sys.props.get("java.home")) .map(Paths.get(_, "bin", "jps").toString) .getOrElse("jps") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index ee5d06a3c..81e97dddc 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -36,8 +36,7 @@ import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.Kube import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE} import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, KILLED, NOT_FOUND, PENDING, RUNNING, UNKNOWN} import org.apache.kyuubi.operation.OperationState -import org.apache.kyuubi.server.KyuubiServer -import org.apache.kyuubi.session.KyuubiSessionManager +import org.apache.kyuubi.server.metadata.MetadataManager import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils} class KubernetesApplicationOperation extends ApplicationOperation with Logging { @@ -77,8 +76,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo)) } - private def metadataManager = KyuubiServer.kyuubiServer.backendService - .sessionManager.asInstanceOf[KyuubiSessionManager].metadataManager + private var metadataManager: Option[MetadataManager] = _ // Visible for testing private[engine] def checkKubernetesInfo(kubernetesInfo: KubernetesInfo): Unit = { @@ -113,8 +111,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } } - override def initialize(conf: KyuubiConf): Unit = { + override def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit = { kyuubiConf = conf + this.metadataManager = metadataManager info("Start initializing Kubernetes application operation.") submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT) // Defer cleaning terminated application information diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala index f0965a05a..cd2101baf 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala @@ -29,10 +29,12 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY import org.apache.kyuubi.engine.flink.FlinkProcessBuilder import org.apache.kyuubi.engine.spark.SparkProcessBuilder +import org.apache.kyuubi.server.metadata.MetadataManager import org.apache.kyuubi.service.AbstractService import org.apache.kyuubi.util.reflect.ReflectUtils._ -class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager") { +class KyuubiApplicationManager(metadataManager: Option[MetadataManager]) + extends AbstractService("KyuubiApplicationManager") { // TODO: maybe add a configuration is better private val operations = @@ -41,7 +43,7 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager override def initialize(conf: KyuubiConf): Unit = { operations.foreach { op => try { - op.initialize(conf) + op.initialize(conf, metadataManager) } catch { case NonFatal(e) => warn(s"Error starting ${op.getClass.getSimpleName}: ${e.getMessage}") } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala index 9d6ca32fa..39ee29487 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala @@ -32,6 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf.YarnUserStrategy._ import org.apache.kyuubi.engine.ApplicationOperation._ import org.apache.kyuubi.engine.ApplicationState.ApplicationState import org.apache.kyuubi.engine.YarnApplicationOperation.toApplicationState +import org.apache.kyuubi.server.metadata.MetadataManager import org.apache.kyuubi.util.KyuubiHadoopUtils class YarnApplicationOperation extends ApplicationOperation with Logging { @@ -40,7 +41,7 @@ class YarnApplicationOperation extends ApplicationOperation with Logging { @volatile private var adminYarnClient: Option[YarnClient] = None private var submitTimeout: Long = _ - override def initialize(conf: KyuubiConf): Unit = { + override def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit = { submitTimeout = conf.get(KyuubiConf.ENGINE_YARN_SUBMIT_TIMEOUT) yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 9edc8218e..13869051a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -51,12 +51,11 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { val operationManager = new KyuubiOperationManager() val credentialsManager = new HadoopCredentialsManager() - val applicationManager = new KyuubiApplicationManager() // Currently, the metadata manager is used by the REST frontend which provides batch job APIs, // so we initialize it only when Kyuubi starts with the REST frontend. - lazy val metadataManager: Option[MetadataManager] = - if (conf.isRESTEnabled) Some(new MetadataManager()) else None + var metadataManager: Option[MetadataManager] = None + var applicationManager: KyuubiApplicationManager = _ // lazy is required for plugins since the conf is null when this class initialization lazy val sessionConfAdvisor: Seq[SessionConfAdvisor] = PluginLoader.loadSessionConfAdvisor(conf) @@ -73,6 +72,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { override def initialize(conf: KyuubiConf): Unit = { this.conf = conf + if (conf.isRESTEnabled) metadataManager = Some(new MetadataManager()) + applicationManager = new KyuubiApplicationManager(metadataManager) addService(applicationManager) addService(credentialsManager) metadataManager.foreach(addService) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala index 8d3f7b17d..eef2a024b 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala @@ -42,7 +42,7 @@ sealed trait WithKyuubiServerOnYarn extends WithKyuubiServer { protected lazy val yarnOperation: YarnApplicationOperation = { val operation = new YarnApplicationOperation() - operation.initialize(miniYarnService.getConf) + operation.initialize(miniYarnService.getConf, None) operation } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala index b667d9c39..c1e9a25c8 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala @@ -35,7 +35,7 @@ import org.apache.kyuubi.util.reflect.ReflectUtils._ class JpsApplicationOperationSuite extends KyuubiFunSuite { private val jps = loadFromServiceLoader[ApplicationOperation]() .find(_.getClass.isAssignableFrom(classOf[JpsApplicationOperation])).get - jps.initialize(null) + jps.initialize(null, None) test("JpsApplicationOperation with jstat") { assert(jps.isSupported(ApplicationManagerInfo(None))) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala index 2ea1939d2..7454ad652 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala @@ -28,7 +28,7 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite { kyuubiConf.set(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST.key, "ns1,ns2") val operation = new KubernetesApplicationOperation() - operation.initialize(kyuubiConf) + operation.initialize(kyuubiConf, None) operation.checkKubernetesInfo(KubernetesInfo(None, None)) operation.checkKubernetesInfo(KubernetesInfo(Some("1"), None)) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala index bc53563f7..cd0e0d6c7 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala @@ -49,7 +49,7 @@ import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion.HIVE_CL class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { - private val engineMgr = new KyuubiApplicationManager() + private val engineMgr = new KyuubiApplicationManager(None) override protected lazy val conf: KyuubiConf = KyuubiConf() .set(AUTHENTICATION_METHOD, Seq("CUSTOM"))