From 4c71415f99653b41f15e4f8a37ae31e096e84558 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Thu, 29 Aug 2024 14:21:40 -0700 Subject: [PATCH] [KYUUBI #6646] Fix RESTful API NPE when restarting Kyuubi server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— We meet issue when restarting the kyuubi server: ``` 2024-08-28 09:48:41.507 WARN org.apache.kyuubi.server.api.RestExceptionMapper: Error occurs on accessing REST API. java.lang.NullPointerException: Cannot invoke "org.apache.kyuubi.server.KyuubiServer.getConf()" because the return value of "org.apache.kyuubi.server.KyuubiServer$.kyuubiServer()" is null at org.apache.kyuubi.server.api.v1.BatchesResource.batchV2Enabled(BatchesResource.scala:72) ~[kyuubi-server_2.12-1.10.0.0.1.0.jar:1.10.0.0.1.0] at org.apache.kyuubi.server.api.v1.BatchesResource.$anonfun$batchInfo$3(BatchesResource.scala:345) ~[kyuubi-server_2.12-1.10.0.0.1.0.jar:1.10.0.0.1.0] at scala.Option.map(Option.scala:230) ~[scala-library-2.12.18.jar:?] ``` The root cause is that, the `KyuubiServer.kyuubiServer` is null until the kyuubi server start finished. https://github.com/apache/kyuubi/blob/db57e9365d7933942197936ac7ff711d58f7ea91/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala#L231-L234 But the RESTful api initialization finished before than that, so, NPE will be thrown between REST front started to all the kyuubi service started. ## Describe Your Solution ๐Ÿ”ง Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. In this PR, 1. move the KyuubiBatchService into KyuubiRestFrontendService 2. move the TempFileService (introduced in #6587) into KyuubiSessionManager image ## Types of changes :bookmark: - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐Ÿ“ - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6646 from turboFei/npe. Closes #6646 42c1b7512 [Wang, Fei] common 42b305557 [Wang, Fei] fix e0f58d234 [Wang, Fei] fix 84492f21a [Wang, Fei] move code 530eb21c1 [Wang, Fei] fix bc206ab25 [Wang, Fei] unused dd8908861 [Wang, Fei] prevent NPE 4e6d39ecf [Wang, Fei] npe Authored-by: Wang, Fei Signed-off-by: Wang, Fei --- .../scala/org/apache/kyuubi/engine/EngineRef.scala | 5 ++--- .../apache/kyuubi/server/KyuubiBatchService.scala | 8 ++------ .../kyuubi/server/KyuubiRestFrontendService.scala | 8 ++++++++ .../org/apache/kyuubi/server/KyuubiServer.scala | 14 ++------------ .../scala/org/apache/kyuubi/server/api/api.scala | 7 +------ .../kyuubi/server/api/v1/AdminResource.scala | 2 +- .../kyuubi/server/api/v1/BatchesResource.scala | 10 ++++------ .../kyuubi/session/KyuubiSessionManager.scala | 4 ++-- .../kyuubi/server/TempFileServiceSuite.scala | 4 +++- .../server/api/v1/BatchesResourceSuite.scala | 4 ++-- 10 files changed, 27 insertions(+), 39 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index 30bf32c39..b7985fcf5 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -43,7 +43,6 @@ import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, import org.apache.kyuubi.metrics.MetricsSystem import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.plugin.GroupProvider -import org.apache.kyuubi.server.KyuubiServer /** * The description and functionality of an engine at server side @@ -72,8 +71,8 @@ private[kyuubi] class EngineRef( private val engineType: EngineType = EngineType.withName(conf.get(ENGINE_TYPE)) // Server-side engine pool size threshold - private val poolThreshold: Int = Option(KyuubiServer.kyuubiServer).map(_.getConf) - .getOrElse(KyuubiConf()).get(ENGINE_POOL_SIZE_THRESHOLD) + private val poolThreshold: Int = + Option(engineManager).map(_.getConf).getOrElse(KyuubiConf()).get(ENGINE_POOL_SIZE_THRESHOLD) private val clientPoolSize: Int = conf.get(ENGINE_POOL_SIZE) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala index c099f2cb9..38bb999c3 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala @@ -23,19 +23,15 @@ import org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS import org.apache.kyuubi.engine.ApplicationState import org.apache.kyuubi.operation.OperationState import org.apache.kyuubi.server.metadata.MetadataManager -import org.apache.kyuubi.service.{AbstractService, Serverable} +import org.apache.kyuubi.service.AbstractService import org.apache.kyuubi.session.KyuubiSessionManager import org.apache.kyuubi.util.ThreadUtils class KyuubiBatchService( - server: Serverable, + restFrontend: KyuubiRestFrontendService, sessionManager: KyuubiSessionManager) extends AbstractService(classOf[KyuubiBatchService].getSimpleName) { - private lazy val restFrontend = server.frontendServices - .filter(_.isInstanceOf[KyuubiRestFrontendService]) - .head.asInstanceOf[KyuubiRestFrontendService] - private def kyuubiInstance: String = restFrontend.connectionUrl // TODO expose metrics, including pending/running/succeeded/failed batches diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala index ca2bbbe02..706bea8ab 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala @@ -57,6 +57,13 @@ class KyuubiRestFrontendService(override val serverable: Serverable) private val batchChecker = ThreadUtils.newDaemonSingleThreadScheduledExecutor("batch-checker") + private[kyuubi] lazy val batchService: Option[KyuubiBatchService] = + if (conf.get(BATCH_SUBMITTER_ENABLED)) { + Some(new KyuubiBatchService(this, sessionManager)) + } else { + None + } + lazy val host: String = conf.get(FRONTEND_REST_BIND_HOST) .getOrElse { if (JavaUtils.isWindows || JavaUtils.isMac) { @@ -92,6 +99,7 @@ class KyuubiRestFrontendService(override val serverable: Serverable) conf.get(FRONTEND_REST_MAX_WORKER_THREADS), conf.get(FRONTEND_REST_JETTY_STOP_TIMEOUT), conf.get(FRONTEND_JETTY_SEND_VERSION_ENABLED)) + batchService.foreach(addService) super.initialize(conf) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index f8c9ffa1b..338ac6b41 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -25,14 +25,14 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.kyuubi._ import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.config.KyuubiConf.{BATCH_SUBMITTER_ENABLED, FRONTEND_PROTOCOLS, FrontendProtocols, KYUUBI_KUBERNETES_CONF_PREFIX} +import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS, FrontendProtocols, KYUUBI_KUBERNETES_CONF_PREFIX} import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._ import org.apache.kyuubi.events.{EventBus, KyuubiServerInfoEvent, ServerEventHandlerRegister} import org.apache.kyuubi.ha.HighAvailabilityConf._ import org.apache.kyuubi.ha.client.{AuthTypes, ServiceDiscovery} import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem} import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf -import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState, TempFileService} +import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState} import org.apache.kyuubi.session.KyuubiSessionManager import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister} import org.apache.kyuubi.zookeeper.EmbeddedZookeeper @@ -202,8 +202,6 @@ class KyuubiServer(name: String) extends Serverable(name) { throw new UnsupportedOperationException(s"Frontend protocol $other is not supported yet.") } - final var tempFileService: TempFileService = _ - override def initialize(conf: KyuubiConf): Unit = synchronized { val kinit = new KinitAuxiliaryService() addService(kinit) @@ -211,18 +209,10 @@ class KyuubiServer(name: String) extends Serverable(name) { val periodicGCService = new PeriodicGCService addService(periodicGCService) - tempFileService = new TempFileService - addService(tempFileService) - if (conf.get(MetricsConf.METRICS_ENABLED)) { addService(new MetricsSystem) } - if (conf.isRESTEnabled && conf.get(BATCH_SUBMITTER_ENABLED)) { - addService(new KyuubiBatchService( - this, - backendService.sessionManager.asInstanceOf[KyuubiSessionManager])) - } super.initialize(conf) initLoggerEventHandler(conf) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala index 93953a577..b56131542 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala @@ -26,7 +26,7 @@ import javax.ws.rs.ext.{ExceptionMapper, Provider} import org.eclipse.jetty.server.handler.ContextHandler import org.apache.kyuubi.Logging -import org.apache.kyuubi.server.{KyuubiBatchService, KyuubiRestFrontendService, KyuubiServer} +import org.apache.kyuubi.server.KyuubiRestFrontendService private[api] trait ApiRequestContext { @@ -36,11 +36,6 @@ private[api] trait ApiRequestContext { @Context protected var httpRequest: HttpServletRequest = _ - protected lazy val batchService: Option[KyuubiBatchService] = - KyuubiServer.kyuubiServer.getServices - .find(_.isInstanceOf[KyuubiBatchService]) - .map(_.asInstanceOf[KyuubiBatchService]) - final protected def fe: KyuubiRestFrontendService = FrontendServiceContext.get(servletContext) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala index 0f1544fe6..4edae8e7a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala @@ -461,7 +461,7 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { throw new NotAllowedException( s"$userName is not allowed to count the batches") } - val batchCount = batchService + val batchCount = fe.batchService .map(_.countBatch(batchType, Option(batchUser), Option(batchState))) .getOrElse(0) new Count(batchCount) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index de69cf377..d370ee2cb 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -43,8 +43,6 @@ import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys._ import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo, ApplicationState, KillResponse, KyuubiApplicationManager} import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, OperationState} -import org.apache.kyuubi.server.KyuubiServer -import org.apache.kyuubi.server.KyuubiServer.kyuubiServer import org.apache.kyuubi.server.api.ApiRequestContext import org.apache.kyuubi.server.api.v1.BatchesResource._ import org.apache.kyuubi.server.metadata.MetadataManager @@ -68,7 +66,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { fe.getConf.get(ENGINE_SECURITY_ENABLED) private def batchV2Enabled(reqConf: Map[String, String]): Boolean = { - KyuubiServer.kyuubiServer.getConf.get(BATCH_SUBMITTER_ENABLED) && + fe.getConf.get(BATCH_SUBMITTER_ENABLED) && reqConf.getOrElse(BATCH_IMPL_VERSION.key, fe.getConf.get(BATCH_IMPL_VERSION)) == "2" } @@ -511,7 +509,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { } else if (batchV2Enabled(metadata.requestConf) && metadata.state == "INITIALIZED" && // there is a chance that metadata is outdated, then `cancelUnscheduledBatch` fails // and returns false - batchService.get.cancelUnscheduledBatch(batchId)) { + fe.batchService.get.cancelUnscheduledBatch(batchId)) { new CloseBatchResponse(true, s"Unscheduled batch $batchId is canceled.") } else if (batchV2Enabled(metadata.requestConf) && metadata.kyuubiInstance == null) { // code goes here indicates metadata is outdated, recursively calls itself to refresh @@ -569,7 +567,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { uploadFileFolderPath: JPath): Unit = { try { val tempFile = Utils.writeToTempFile(inputStream, uploadFileFolderPath, fileName) - kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath) + fe.sessionManager.tempFileService.addPathToExpiration(tempFile.toPath) request.setResource(tempFile.getPath) } catch { case e: Exception => @@ -605,7 +603,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { filePart.getValueAs(classOf[InputStream]), uploadFileFolderPath, fileName) - kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath) + fe.sessionManager.tempFileService.addPathToExpiration(tempFile.toPath) tempFile.getPath } catch { case e: Exception => diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index a20b4bc97..3c163520f 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -36,7 +36,6 @@ import org.apache.kyuubi.metrics.MetricsConstants._ import org.apache.kyuubi.metrics.MetricsSystem import org.apache.kyuubi.operation.{KyuubiOperationManager, OperationState} import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader, SessionConfAdvisor} -import org.apache.kyuubi.server.KyuubiServer.kyuubiServer import org.apache.kyuubi.server.metadata.{MetadataManager, MetadataRequestsRetryRef} import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} import org.apache.kyuubi.service.TempFileService @@ -73,12 +72,13 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { private val engineConnectionAliveChecker = ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker") - def tempFileService: TempFileService = kyuubiServer.tempFileService + val tempFileService = new TempFileService() override def initialize(conf: KyuubiConf): Unit = { this.conf = conf addService(applicationManager) addService(credentialsManager) + addService(tempFileService) metadataManager.foreach(addService) initSessionLimiter(conf) initEngineStartupProcessSemaphore(conf) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala index 4b7568c1c..656aef5ad 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala @@ -27,6 +27,7 @@ import org.apache.kyuubi.{Utils, WithKyuubiServer} import org.apache.kyuubi.Utils.writeToTempFile import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.SERVER_TEMP_FILE_EXPIRE_TIME +import org.apache.kyuubi.session.KyuubiSessionManager class TempFileServiceSuite extends WithKyuubiServer { private val expirationInMs = 100 @@ -35,7 +36,8 @@ class TempFileServiceSuite extends WithKyuubiServer { .set(SERVER_TEMP_FILE_EXPIRE_TIME, Duration.ofMillis(expirationInMs).toMillis) test("file cleaned up after expiration") { - val tempFileService = KyuubiServer.kyuubiServer.tempFileService + val tempFileService = + server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager].tempFileService (0 until 3).map { i => val dir = Utils.createTempDir() writeToTempFile(new ByteArrayInputStream(s"$i".getBytes()), dir, s"$i.txt") diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala index ac287e906..dcddc15e6 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala @@ -42,7 +42,7 @@ import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState} import org.apache.kyuubi.operation.OperationState.OperationState -import org.apache.kyuubi.server.{KyuubiBatchService, KyuubiRestFrontendService} +import org.apache.kyuubi.server.KyuubiRestFrontendService import org.apache.kyuubi.server.http.util.HttpAuthUtils.{basicAuthorizationHeader, AUTHORIZATION_HEADER} import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} import org.apache.kyuubi.service.authentication.{AnonymousAuthenticationProviderImpl, AuthUtils} @@ -64,7 +64,7 @@ class BatchesV2ResourceSuite extends BatchesResourceSuiteBase { override def afterEach(): Unit = { val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager] - val batchService = server.getServices.collectFirst { case b: KyuubiBatchService => b }.get + val batchService = fe.asInstanceOf[KyuubiRestFrontendService].batchService.get sessionManager.getBatchesFromMetadataStore(MetadataFilter(), 0, Int.MaxValue) .foreach { batch => batchService.cancelUnscheduledBatch(batch.getId) } super.afterEach()