[KYUUBI #6646] Fix RESTful API NPE when restarting Kyuubi server

# 🔍 Description
## Issue References 🔗

We meet issue when restarting the kyuubi server:
```
2024-08-28 09:48:41.507 WARN org.apache.kyuubi.server.api.RestExceptionMapper: Error occurs on accessing REST API.
java.lang.NullPointerException: Cannot invoke "org.apache.kyuubi.server.KyuubiServer.getConf()" because the return value of "org.apache.kyuubi.server.KyuubiServer$.kyuubiServer()" is null
	at org.apache.kyuubi.server.api.v1.BatchesResource.batchV2Enabled(BatchesResource.scala:72) ~[kyuubi-server_2.12-1.10.0.0.1.0.jar:1.10.0.0.1.0]
	at org.apache.kyuubi.server.api.v1.BatchesResource.$anonfun$batchInfo$3(BatchesResource.scala:345) ~[kyuubi-server_2.12-1.10.0.0.1.0.jar:1.10.0.0.1.0]
	at scala.Option.map(Option.scala:230) ~[scala-library-2.12.18.jar:?]
```

The root cause is that, the `KyuubiServer.kyuubiServer` is null until the kyuubi server start finished.

db57e9365d/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala (L231-L234)

But the RESTful api initialization finished before than that, so, NPE will be thrown between REST front started to all the kyuubi service started.
## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

In this PR,
1. move the KyuubiBatchService into KyuubiRestFrontendService
2. move the TempFileService (introduced in #6587) into KyuubiSessionManager

<img width="872" alt="image" src="https://github.com/user-attachments/assets/a4c5fe38-1d34-4fee-933f-72511bc06f27">

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6646 from turboFei/npe.

Closes #6646

42c1b7512 [Wang, Fei] common
42b305557 [Wang, Fei] fix
e0f58d234 [Wang, Fei] fix
84492f21a [Wang, Fei] move code
530eb21c1 [Wang, Fei] fix
bc206ab25 [Wang, Fei] unused
dd8908861 [Wang, Fei] prevent NPE
4e6d39ecf [Wang, Fei] npe

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2024-08-29 14:21:40 -07:00
parent 4ebcc5cc14
commit 4c71415f99
10 changed files with 27 additions and 39 deletions

View File

@ -43,7 +43,6 @@ import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT,
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.plugin.GroupProvider
import org.apache.kyuubi.server.KyuubiServer
/**
* The description and functionality of an engine at server side
@ -72,8 +71,8 @@ private[kyuubi] class EngineRef(
private val engineType: EngineType = EngineType.withName(conf.get(ENGINE_TYPE))
// Server-side engine pool size threshold
private val poolThreshold: Int = Option(KyuubiServer.kyuubiServer).map(_.getConf)
.getOrElse(KyuubiConf()).get(ENGINE_POOL_SIZE_THRESHOLD)
private val poolThreshold: Int =
Option(engineManager).map(_.getConf).getOrElse(KyuubiConf()).get(ENGINE_POOL_SIZE_THRESHOLD)
private val clientPoolSize: Int = conf.get(ENGINE_POOL_SIZE)

View File

@ -23,19 +23,15 @@ import org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS
import org.apache.kyuubi.engine.ApplicationState
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.service.{AbstractService, Serverable}
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.session.KyuubiSessionManager
import org.apache.kyuubi.util.ThreadUtils
class KyuubiBatchService(
server: Serverable,
restFrontend: KyuubiRestFrontendService,
sessionManager: KyuubiSessionManager)
extends AbstractService(classOf[KyuubiBatchService].getSimpleName) {
private lazy val restFrontend = server.frontendServices
.filter(_.isInstanceOf[KyuubiRestFrontendService])
.head.asInstanceOf[KyuubiRestFrontendService]
private def kyuubiInstance: String = restFrontend.connectionUrl
// TODO expose metrics, including pending/running/succeeded/failed batches

View File

@ -57,6 +57,13 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
private val batchChecker = ThreadUtils.newDaemonSingleThreadScheduledExecutor("batch-checker")
private[kyuubi] lazy val batchService: Option[KyuubiBatchService] =
if (conf.get(BATCH_SUBMITTER_ENABLED)) {
Some(new KyuubiBatchService(this, sessionManager))
} else {
None
}
lazy val host: String = conf.get(FRONTEND_REST_BIND_HOST)
.getOrElse {
if (JavaUtils.isWindows || JavaUtils.isMac) {
@ -92,6 +99,7 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
conf.get(FRONTEND_REST_MAX_WORKER_THREADS),
conf.get(FRONTEND_REST_JETTY_STOP_TIMEOUT),
conf.get(FRONTEND_JETTY_SEND_VERSION_ENABLED))
batchService.foreach(addService)
super.initialize(conf)
}

View File

@ -25,14 +25,14 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{BATCH_SUBMITTER_ENABLED, FRONTEND_PROTOCOLS, FrontendProtocols, KYUUBI_KUBERNETES_CONF_PREFIX}
import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS, FrontendProtocols, KYUUBI_KUBERNETES_CONF_PREFIX}
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._
import org.apache.kyuubi.events.{EventBus, KyuubiServerInfoEvent, ServerEventHandlerRegister}
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{AuthTypes, ServiceDiscovery}
import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem}
import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf
import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState, TempFileService}
import org.apache.kyuubi.service.{AbstractBackendService, AbstractFrontendService, Serverable, ServiceState}
import org.apache.kyuubi.session.KyuubiSessionManager
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
@ -202,8 +202,6 @@ class KyuubiServer(name: String) extends Serverable(name) {
throw new UnsupportedOperationException(s"Frontend protocol $other is not supported yet.")
}
final var tempFileService: TempFileService = _
override def initialize(conf: KyuubiConf): Unit = synchronized {
val kinit = new KinitAuxiliaryService()
addService(kinit)
@ -211,18 +209,10 @@ class KyuubiServer(name: String) extends Serverable(name) {
val periodicGCService = new PeriodicGCService
addService(periodicGCService)
tempFileService = new TempFileService
addService(tempFileService)
if (conf.get(MetricsConf.METRICS_ENABLED)) {
addService(new MetricsSystem)
}
if (conf.isRESTEnabled && conf.get(BATCH_SUBMITTER_ENABLED)) {
addService(new KyuubiBatchService(
this,
backendService.sessionManager.asInstanceOf[KyuubiSessionManager]))
}
super.initialize(conf)
initLoggerEventHandler(conf)

View File

@ -26,7 +26,7 @@ import javax.ws.rs.ext.{ExceptionMapper, Provider}
import org.eclipse.jetty.server.handler.ContextHandler
import org.apache.kyuubi.Logging
import org.apache.kyuubi.server.{KyuubiBatchService, KyuubiRestFrontendService, KyuubiServer}
import org.apache.kyuubi.server.KyuubiRestFrontendService
private[api] trait ApiRequestContext {
@ -36,11 +36,6 @@ private[api] trait ApiRequestContext {
@Context
protected var httpRequest: HttpServletRequest = _
protected lazy val batchService: Option[KyuubiBatchService] =
KyuubiServer.kyuubiServer.getServices
.find(_.isInstanceOf[KyuubiBatchService])
.map(_.asInstanceOf[KyuubiBatchService])
final protected def fe: KyuubiRestFrontendService = FrontendServiceContext.get(servletContext)
}

View File

@ -461,7 +461,7 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
throw new NotAllowedException(
s"$userName is not allowed to count the batches")
}
val batchCount = batchService
val batchCount = fe.batchService
.map(_.countBatch(batchType, Option(batchUser), Option(batchState)))
.getOrElse(0)
new Count(batchCount)

View File

@ -43,8 +43,6 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys._
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo, ApplicationState, KillResponse, KyuubiApplicationManager}
import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, OperationState}
import org.apache.kyuubi.server.KyuubiServer
import org.apache.kyuubi.server.KyuubiServer.kyuubiServer
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource._
import org.apache.kyuubi.server.metadata.MetadataManager
@ -68,7 +66,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
fe.getConf.get(ENGINE_SECURITY_ENABLED)
private def batchV2Enabled(reqConf: Map[String, String]): Boolean = {
KyuubiServer.kyuubiServer.getConf.get(BATCH_SUBMITTER_ENABLED) &&
fe.getConf.get(BATCH_SUBMITTER_ENABLED) &&
reqConf.getOrElse(BATCH_IMPL_VERSION.key, fe.getConf.get(BATCH_IMPL_VERSION)) == "2"
}
@ -511,7 +509,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
} else if (batchV2Enabled(metadata.requestConf) && metadata.state == "INITIALIZED" &&
// there is a chance that metadata is outdated, then `cancelUnscheduledBatch` fails
// and returns false
batchService.get.cancelUnscheduledBatch(batchId)) {
fe.batchService.get.cancelUnscheduledBatch(batchId)) {
new CloseBatchResponse(true, s"Unscheduled batch $batchId is canceled.")
} else if (batchV2Enabled(metadata.requestConf) && metadata.kyuubiInstance == null) {
// code goes here indicates metadata is outdated, recursively calls itself to refresh
@ -569,7 +567,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
uploadFileFolderPath: JPath): Unit = {
try {
val tempFile = Utils.writeToTempFile(inputStream, uploadFileFolderPath, fileName)
kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath)
fe.sessionManager.tempFileService.addPathToExpiration(tempFile.toPath)
request.setResource(tempFile.getPath)
} catch {
case e: Exception =>
@ -605,7 +603,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
filePart.getValueAs(classOf[InputStream]),
uploadFileFolderPath,
fileName)
kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath)
fe.sessionManager.tempFileService.addPathToExpiration(tempFile.toPath)
tempFile.getPath
} catch {
case e: Exception =>

View File

@ -36,7 +36,6 @@ import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.{KyuubiOperationManager, OperationState}
import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader, SessionConfAdvisor}
import org.apache.kyuubi.server.KyuubiServer.kyuubiServer
import org.apache.kyuubi.server.metadata.{MetadataManager, MetadataRequestsRetryRef}
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.service.TempFileService
@ -73,12 +72,13 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
private val engineConnectionAliveChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker")
def tempFileService: TempFileService = kyuubiServer.tempFileService
val tempFileService = new TempFileService()
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
addService(applicationManager)
addService(credentialsManager)
addService(tempFileService)
metadataManager.foreach(addService)
initSessionLimiter(conf)
initEngineStartupProcessSemaphore(conf)

View File

@ -27,6 +27,7 @@ import org.apache.kyuubi.{Utils, WithKyuubiServer}
import org.apache.kyuubi.Utils.writeToTempFile
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.SERVER_TEMP_FILE_EXPIRE_TIME
import org.apache.kyuubi.session.KyuubiSessionManager
class TempFileServiceSuite extends WithKyuubiServer {
private val expirationInMs = 100
@ -35,7 +36,8 @@ class TempFileServiceSuite extends WithKyuubiServer {
.set(SERVER_TEMP_FILE_EXPIRE_TIME, Duration.ofMillis(expirationInMs).toMillis)
test("file cleaned up after expiration") {
val tempFileService = KyuubiServer.kyuubiServer.tempFileService
val tempFileService =
server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager].tempFileService
(0 until 3).map { i =>
val dir = Utils.createTempDir()
writeToTempFile(new ByteArrayInputStream(s"$i".getBytes()), dir, s"$i.txt")

View File

@ -42,7 +42,7 @@ import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.server.{KyuubiBatchService, KyuubiRestFrontendService}
import org.apache.kyuubi.server.KyuubiRestFrontendService
import org.apache.kyuubi.server.http.util.HttpAuthUtils.{basicAuthorizationHeader, AUTHORIZATION_HEADER}
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.service.authentication.{AnonymousAuthenticationProviderImpl, AuthUtils}
@ -64,7 +64,7 @@ class BatchesV2ResourceSuite extends BatchesResourceSuiteBase {
override def afterEach(): Unit = {
val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
val batchService = server.getServices.collectFirst { case b: KyuubiBatchService => b }.get
val batchService = fe.asInstanceOf[KyuubiRestFrontendService].batchService.get
sessionManager.getBatchesFromMetadataStore(MetadataFilter(), 0, Int.MaxValue)
.foreach { batch => batchService.cancelUnscheduledBatch(batch.getId) }
super.afterEach()