diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala index 23a49c1ae..b9bce2e85 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala @@ -19,13 +19,14 @@ package org.apache.kyuubi.engine import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.ApplicationState.ApplicationState +import org.apache.kyuubi.server.metadata.MetadataManager trait ApplicationOperation { /** * Step for initializing the instance. */ - def initialize(conf: KyuubiConf): Unit + def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit /** * Step to clean up the instance diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala index 1d0d58d16..ffc233c01 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala @@ -22,13 +22,14 @@ import java.nio.file.Paths import scala.sys.process._ import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.server.metadata.MetadataManager class JpsApplicationOperation extends ApplicationOperation { import ApplicationOperation._ private var runner: String = _ - override def initialize(conf: KyuubiConf): Unit = { + override def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit = { val jps = sys.env.get("JAVA_HOME").orElse(sys.props.get("java.home")) .map(Paths.get(_, "bin", "jps").toString) .getOrElse("jps") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index ee5d06a3c..81e97dddc 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -36,8 +36,7 @@ import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.Kube import org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, COMPLETED, NONE} import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, KILLED, NOT_FOUND, PENDING, RUNNING, UNKNOWN} import org.apache.kyuubi.operation.OperationState -import org.apache.kyuubi.server.KyuubiServer -import org.apache.kyuubi.session.KyuubiSessionManager +import org.apache.kyuubi.server.metadata.MetadataManager import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils} class KubernetesApplicationOperation extends ApplicationOperation with Logging { @@ -77,8 +76,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo)) } - private def metadataManager = KyuubiServer.kyuubiServer.backendService - .sessionManager.asInstanceOf[KyuubiSessionManager].metadataManager + private var metadataManager: Option[MetadataManager] = _ // Visible for testing private[engine] def checkKubernetesInfo(kubernetesInfo: KubernetesInfo): Unit = { @@ -113,8 +111,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } } - override def initialize(conf: KyuubiConf): Unit = { + override def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit = { kyuubiConf = conf + this.metadataManager = metadataManager info("Start initializing Kubernetes application operation.") submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT) // Defer cleaning terminated application information diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala index f0965a05a..cd2101baf 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala @@ -29,10 +29,12 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY import org.apache.kyuubi.engine.flink.FlinkProcessBuilder import org.apache.kyuubi.engine.spark.SparkProcessBuilder +import org.apache.kyuubi.server.metadata.MetadataManager import org.apache.kyuubi.service.AbstractService import org.apache.kyuubi.util.reflect.ReflectUtils._ -class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager") { +class KyuubiApplicationManager(metadataManager: Option[MetadataManager]) + extends AbstractService("KyuubiApplicationManager") { // TODO: maybe add a configuration is better private val operations = @@ -41,7 +43,7 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager override def initialize(conf: KyuubiConf): Unit = { operations.foreach { op => try { - op.initialize(conf) + op.initialize(conf, metadataManager) } catch { case NonFatal(e) => warn(s"Error starting ${op.getClass.getSimpleName}: ${e.getMessage}") } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala index 9d6ca32fa..39ee29487 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala @@ -32,6 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf.YarnUserStrategy._ import org.apache.kyuubi.engine.ApplicationOperation._ import org.apache.kyuubi.engine.ApplicationState.ApplicationState import org.apache.kyuubi.engine.YarnApplicationOperation.toApplicationState +import org.apache.kyuubi.server.metadata.MetadataManager import org.apache.kyuubi.util.KyuubiHadoopUtils class YarnApplicationOperation extends ApplicationOperation with Logging { @@ -40,7 +41,7 @@ class YarnApplicationOperation extends ApplicationOperation with Logging { @volatile private var adminYarnClient: Option[YarnClient] = None private var submitTimeout: Long = _ - override def initialize(conf: KyuubiConf): Unit = { + override def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): Unit = { submitTimeout = conf.get(KyuubiConf.ENGINE_YARN_SUBMIT_TIMEOUT) yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 9edc8218e..13869051a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -51,12 +51,11 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { val operationManager = new KyuubiOperationManager() val credentialsManager = new HadoopCredentialsManager() - val applicationManager = new KyuubiApplicationManager() // Currently, the metadata manager is used by the REST frontend which provides batch job APIs, // so we initialize it only when Kyuubi starts with the REST frontend. - lazy val metadataManager: Option[MetadataManager] = - if (conf.isRESTEnabled) Some(new MetadataManager()) else None + var metadataManager: Option[MetadataManager] = None + var applicationManager: KyuubiApplicationManager = _ // lazy is required for plugins since the conf is null when this class initialization lazy val sessionConfAdvisor: Seq[SessionConfAdvisor] = PluginLoader.loadSessionConfAdvisor(conf) @@ -73,6 +72,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { override def initialize(conf: KyuubiConf): Unit = { this.conf = conf + if (conf.isRESTEnabled) metadataManager = Some(new MetadataManager()) + applicationManager = new KyuubiApplicationManager(metadataManager) addService(applicationManager) addService(credentialsManager) metadataManager.foreach(addService) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala index 8d3f7b17d..eef2a024b 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala @@ -42,7 +42,7 @@ sealed trait WithKyuubiServerOnYarn extends WithKyuubiServer { protected lazy val yarnOperation: YarnApplicationOperation = { val operation = new YarnApplicationOperation() - operation.initialize(miniYarnService.getConf) + operation.initialize(miniYarnService.getConf, None) operation } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala index b667d9c39..c1e9a25c8 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala @@ -35,7 +35,7 @@ import org.apache.kyuubi.util.reflect.ReflectUtils._ class JpsApplicationOperationSuite extends KyuubiFunSuite { private val jps = loadFromServiceLoader[ApplicationOperation]() .find(_.getClass.isAssignableFrom(classOf[JpsApplicationOperation])).get - jps.initialize(null) + jps.initialize(null, None) test("JpsApplicationOperation with jstat") { assert(jps.isSupported(ApplicationManagerInfo(None))) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala index 2ea1939d2..7454ad652 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala @@ -28,7 +28,7 @@ class KubernetesApplicationOperationSuite extends KyuubiFunSuite { kyuubiConf.set(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST.key, "ns1,ns2") val operation = new KubernetesApplicationOperation() - operation.initialize(kyuubiConf) + operation.initialize(kyuubiConf, None) operation.checkKubernetesInfo(KubernetesInfo(None, None)) operation.checkKubernetesInfo(KubernetesInfo(Some("1"), None)) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala index bc53563f7..cd0e0d6c7 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala @@ -49,7 +49,7 @@ import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion.HIVE_CL class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { - private val engineMgr = new KyuubiApplicationManager() + private val engineMgr = new KyuubiApplicationManager(None) override protected lazy val conf: KyuubiConf = KyuubiConf() .set(AUTHENTICATION_METHOD, Seq("CUSTOM"))