From ceed216a3976c97e8ffb87be02ef30781dfe00ca Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Thu, 28 Mar 2024 10:28:47 +0800 Subject: [PATCH] [CELEBORN-1317][FOLLOWUP] Retry to setup mini cluster if the cause is BindException ### What changes were proposed in this pull request? To fix the UT for http server port already in use issue. For Jetty HttpServer, if failed to bind port, the exception is IOException and the cause is BindException, we should retry for that. Before: ``` case e: BindException => // retry to setup mini cluster ``` Now: ``` case e: IOException if e.isInstanceOf[BindException] || Option(e.getCause).exists( _.isInstanceOf[BindException]) => // retry to setup mini cluster ``` ### Why are the changes needed? To fix the UT for http server port already in use issue. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Will trigger GA for 3 three times. Closes #2424 from turboFei/set_connector_stop_timeout. Authored-by: Fei Wang Signed-off-by: Shuang --- .../api}/ApiMasterResourceSuite.scala | 3 +- .../server/common/http/HttpServer.scala | 33 +++++++++++-------- .../service/deploy/MiniClusterFeature.scala | 5 ++- .../api}/ApiWorkerResourceSuite.scala | 22 ++++++------- 4 files changed, 36 insertions(+), 27 deletions(-) rename master/src/test/scala/org/apache/celeborn/service/deploy/master/{ => http/api}/ApiMasterResourceSuite.scala (96%) rename worker/src/test/scala/org/apache/celeborn/service/deploy/worker/{storage => http/api}/ApiWorkerResourceSuite.scala (75%) diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala similarity index 96% rename from master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala rename to master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala index f5e06e6a5..5c272446f 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.celeborn.service.deploy.master +package org.apache.celeborn.service.deploy.master.http.api import javax.ws.rs.core.MediaType @@ -25,6 +25,7 @@ import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.util.{CelebornExitKind, Utils} import org.apache.celeborn.server.common.HttpService import org.apache.celeborn.server.common.http.ApiBaseResourceSuite +import org.apache.celeborn.service.deploy.master.{Master, MasterArguments} class ApiMasterResourceSuite extends ApiBaseResourceSuite { private var master: Master = _ diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala index 336132576..cbb61136e 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala +++ b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala @@ -44,27 +44,34 @@ private[celeborn] case class HttpServer( isStarted = true } catch { case e: Exception => - stop(CelebornExitKind.EXIT_IMMEDIATELY) + stopInternal(CelebornExitKind.EXIT_IMMEDIATELY) throw e } } def stop(exitCode: Int): Unit = synchronized { if (isStarted) { - if (exitCode == CelebornExitKind.EXIT_IMMEDIATELY) { - server.setStopTimeout(0) - } - logInfo(s"$role: Stopping HttpServer") - server.stop() - connector.stop() - server.getThreadPool match { - case lifeCycle: LifeCycle => lifeCycle.stop() - case _ => - } - logInfo(s"$role: HttpServer stopped.") - isStarted = false + stopInternal(exitCode) } } + + private def stopInternal(exitCode: Int): Unit = { + if (exitCode == CelebornExitKind.EXIT_IMMEDIATELY) { + server.setStopTimeout(0) + connector.setStopTimeout(0) + } + logInfo(s"$role: Stopping HttpServer") + server.stop() + server.join() + connector.stop() + server.getThreadPool match { + case lifeCycle: LifeCycle => lifeCycle.stop() + case _ => + } + logInfo(s"$role: HttpServer stopped.") + isStarted = false + } + def getServerUri: String = connector.getHost + ":" + connector.getLocalPort def addHandler(handler: Handler): Unit = synchronized { diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala index beecb76a5..914512cca 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala @@ -17,6 +17,7 @@ package org.apache.celeborn.service.deploy +import java.io.IOException import java.net.BindException import java.nio.file.Files import java.util.concurrent.locks.{Lock, ReentrantLock} @@ -70,7 +71,9 @@ trait MiniClusterFeature extends Logging { workers = w created = true } catch { - case e: BindException => + case e: IOException + if e.isInstanceOf[BindException] || Option(e.getCause).exists( + _.isInstanceOf[BindException]) => logError(s"failed to setup mini cluster, retrying (retry count: $retryCount)", e) retryCount += 1 if (retryCount == 3) { diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala similarity index 75% rename from worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala rename to worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala index cc09764d0..d950d0674 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResourceSuite.scala @@ -15,33 +15,31 @@ * limitations under the License. */ -package org.apache.celeborn.service.deploy.worker.storage +package org.apache.celeborn.service.deploy.worker.http.api import javax.ws.rs.core.MediaType -import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.util.{CelebornExitKind, Utils} import org.apache.celeborn.server.common.HttpService import org.apache.celeborn.server.common.http.ApiBaseResourceSuite -import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments} +import org.apache.celeborn.service.deploy.MiniClusterFeature +import org.apache.celeborn.service.deploy.worker.Worker -class ApiWorkerResourceSuite extends ApiBaseResourceSuite { +class ApiWorkerResourceSuite extends ApiBaseResourceSuite with MiniClusterFeature { private var worker: Worker = _ override protected def httpService: HttpService = worker override def beforeAll(): Unit = { - val workerArgs = new WorkerArguments(Array(), celebornConf) - worker = new Worker(celebornConf, workerArgs) - worker.metricsSystem.start() - worker.startHttpServer() + logInfo("test initialized, setup celeborn mini cluster") + val (_, w) = + setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap, workerNum = 1) + worker = w.head super.beforeAll() } override def afterAll(): Unit = { super.afterAll() - worker.metricsSystem.stop() - worker.rpcEnv.shutdown() - worker.stop(CelebornExitKind.EXIT_IMMEDIATELY) + logInfo("all test complete, stop celeborn mini cluster") + shutdownMiniCluster() } test("listPartitionLocationInfo") {