diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala similarity index 96% rename from master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala rename to master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala index f5e06e6a5..5c272446f 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.celeborn.service.deploy.master +package org.apache.celeborn.service.deploy.master.http.api import javax.ws.rs.core.MediaType @@ -25,6 +25,7 @@ import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.util.{CelebornExitKind, Utils} import org.apache.celeborn.server.common.HttpService import org.apache.celeborn.server.common.http.ApiBaseResourceSuite +import org.apache.celeborn.service.deploy.master.{Master, MasterArguments} class ApiMasterResourceSuite extends ApiBaseResourceSuite { private var master: Master = _ diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala index 336132576..cbb61136e 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala +++ b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala @@ -44,27 +44,34 @@ private[celeborn] case class HttpServer( isStarted = true } catch { case e: Exception => - stop(CelebornExitKind.EXIT_IMMEDIATELY) + stopInternal(CelebornExitKind.EXIT_IMMEDIATELY) throw e } } def stop(exitCode: Int): Unit = synchronized { if (isStarted) { - if (exitCode == CelebornExitKind.EXIT_IMMEDIATELY) { - server.setStopTimeout(0) - } - logInfo(s"$role: Stopping HttpServer") - server.stop() - connector.stop() - server.getThreadPool match { - case lifeCycle: LifeCycle => lifeCycle.stop() - case _ => - } - logInfo(s"$role: HttpServer stopped.") - isStarted = false + stopInternal(exitCode) } } + + private def stopInternal(exitCode: Int): Unit = { + if (exitCode == CelebornExitKind.EXIT_IMMEDIATELY) { + server.setStopTimeout(0) + connector.setStopTimeout(0) + } + logInfo(s"$role: Stopping HttpServer") + server.stop() + server.join() + connector.stop() + server.getThreadPool match { + case lifeCycle: LifeCycle => lifeCycle.stop() + case _ => + } + logInfo(s"$role: HttpServer stopped.") + isStarted = false + } + def getServerUri: String = connector.getHost + ":" + connector.getLocalPort def addHandler(handler: Handler): Unit = synchronized { diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala index beecb76a5..914512cca 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala @@ -17,6 +17,7 @@ package org.apache.celeborn.service.deploy +import java.io.IOException import java.net.BindException import java.nio.file.Files import java.util.concurrent.locks.{Lock, ReentrantLock} @@ -70,7 +71,9 @@ trait MiniClusterFeature extends Logging { workers = w created = true } catch { - case e: BindException => + case e: IOException + if e.isInstanceOf[BindException] || Option(e.getCause).exists( + _.isInstanceOf[BindException]) => logError(s"failed to setup mini cluster, retrying (retry count: $retryCount)", e) retryCount += 1 if (retryCount == 3) { diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala similarity index 75% rename from worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala rename to worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala index cc09764d0..d950d0674 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala @@ -15,33 +15,31 @@ * limitations under the License. */ -package org.apache.celeborn.service.deploy.worker.storage +package org.apache.celeborn.service.deploy.worker.http.api import javax.ws.rs.core.MediaType -import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.util.{CelebornExitKind, Utils} import org.apache.celeborn.server.common.HttpService import org.apache.celeborn.server.common.http.ApiBaseResourceSuite -import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments} +import org.apache.celeborn.service.deploy.MiniClusterFeature +import org.apache.celeborn.service.deploy.worker.Worker -class ApiWorkerResourceSuite extends ApiBaseResourceSuite { +class ApiWorkerResourceSuite extends ApiBaseResourceSuite with MiniClusterFeature { private var worker: Worker = _ override protected def httpService: HttpService = worker override def beforeAll(): Unit = { - val workerArgs = new WorkerArguments(Array(), celebornConf) - worker = new Worker(celebornConf, workerArgs) - worker.metricsSystem.start() - worker.startHttpServer() + logInfo("test initialized, setup celeborn mini cluster") + val (_, w) = + setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap, workerNum = 1) + worker = w.head super.beforeAll() } override def afterAll(): Unit = { super.afterAll() - worker.metricsSystem.stop() - worker.rpcEnv.shutdown() - worker.stop(CelebornExitKind.EXIT_IMMEDIATELY) + logInfo("all test complete, stop celeborn mini cluster") + shutdownMiniCluster() } test("listPartitionLocationInfo") {