[KYUUBI #6542] KyuubiBatchService should wait for HTTP server started before picking jobs

# 🔍 Description

This is similar to https://github.com/apache/kyuubi/pull/5310.

We need to wait for Jetty Server started before picking jobs, otherwise, it may get the wrong local HTTP server port -1.

This only affects Batch V2

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

Tested in internal workloads.

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6542 from pan3793/batch-wait-started.

Closes #6542

1f62debfe [Cheng Pan] fix
ee1d05d07 [Cheng Pan] KyuubiBatchService should wait for HTTP server started before picking jobs

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2024-07-18 11:39:58 +08:00
parent 5a20267dbc
commit 1309819749
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
3 changed files with 15 additions and 10 deletions

View File

@ -34,7 +34,7 @@ class KyuubiBatchService(
private lazy val restFrontend = server.frontendServices
.filter(_.isInstanceOf[KyuubiRestFrontendService])
.head
.head.asInstanceOf[KyuubiRestFrontendService]
private def kyuubiInstance: String = restFrontend.connectionUrl
@ -66,6 +66,7 @@ class KyuubiBatchService(
override def start(): Unit = {
assert(running.compareAndSet(false, true))
val submitTask: Runnable = () => {
restFrontend.waitForServerStarted()
while (running.get) {
metadataManager.pickBatchForSubmitting(kyuubiInstance) match {
case None => Thread.sleep(1000)

View File

@ -193,19 +193,23 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
}
}
def waitForServerStarted(): Unit = {
// block until the HTTP server is started, otherwise, we may get
// the wrong HTTP server port -1
while (!server.isStarted) {
info(s"Waiting for $getName's HTTP server getting started")
Thread.sleep(1000)
}
}
override def start(): Unit = synchronized {
if (!isStarted.get) {
try {
server.start()
startInternal()
waitForServerStarted()
isStarted.set(true)
startBatchChecker()
startInternal()
// block until the HTTP server is started, otherwise, we may get
// the wrong HTTP server port -1
while (server.getState != "STARTED") {
info(s"Waiting for $getName's HTTP server getting started")
Thread.sleep(1000)
}
recoverBatchSessions()
} catch {
case e: Exception => throw new KyuubiException(s"Cannot start $getName", e)

View File

@ -24,7 +24,7 @@ import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorSchedul
import org.apache.kyuubi.util.JavaUtils
private[kyuubi] case class JettyServer(
private[kyuubi] class JettyServer(
server: Server,
connector: ServerConnector,
rootHandler: ContextHandlerCollection) {
@ -68,7 +68,7 @@ private[kyuubi] case class JettyServer(
addHandler(JettyUtils.createRedirectHandler(src, dest))
}
def getState: String = server.getState
def isStarted: Boolean = server.isStarted
}
object JettyServer {