From 2efdf755ccbc5d44d22b8f7ed3dadfa80afb3fb7 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Tue, 17 Dec 2024 10:45:40 +0800 Subject: [PATCH] [CELEBORN-1711][TEST] Fix flaky test caused by master/worker setup issue ### What changes were proposed in this pull request? 1. retry on BindException when starting master/worker http server 2. record the used ports and pre-check whether the selected port is used or bounded before binding ### Why are the changes needed? To fix flaky test. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. Closes #2906 from turboFei/retry_master_suite. Authored-by: Wang, Fei Signed-off-by: Shuang --- ...ApiMasterResourceAuthenticationSuite.scala | 36 +--- .../deploy/master/MasterClusterFeature.scala | 157 ++++++++++++++++++ .../service/deploy/master/MasterSuite.scala | 127 +++++++------- .../http/api/ApiMasterResourceSuite.scala | 36 +--- .../api/v1/ApiV1MasterResourceSuite.scala | 36 +--- .../server/common/http/HttpTestHelper.scala | 1 - .../service/deploy/MiniClusterFeature.scala | 62 ++++--- .../api/v1/ApiV1WorkerResourceSuite.scala | 2 +- ...ApiWorkerResourceAuthenticationSuite.scala | 18 +- 9 files changed, 291 insertions(+), 184 deletions(-) create mode 100644 master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterClusterFeature.scala diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala index 50a3eaf6f..d96607197 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala @@ -17,50 +17,22 @@ package org.apache.celeborn.service.deploy.master -import java.nio.file.Files - -import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils} import org.apache.celeborn.server.common.HttpService import org.apache.celeborn.server.common.http.ApiBaseResourceAuthenticationSuite -class ApiMasterResourceAuthenticationSuite extends ApiBaseResourceAuthenticationSuite { +class ApiMasterResourceAuthenticationSuite extends ApiBaseResourceAuthenticationSuite + with MasterClusterFeature { private var master: Master = _ override protected def httpService: HttpService = master - def getTmpDir(): String = { - val tmpDir = Files.createTempDirectory(null).toFile - tmpDir.deleteOnExit() - tmpDir.getAbsolutePath - } - override def beforeAll(): Unit = { - val randomMasterPort = Utils.selectRandomInt(1024, 65535) - val randomHttpPort = randomMasterPort + 1 - celebornConf.set(CelebornConf.HA_ENABLED.key, "false") - celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) - celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir()) - celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1") - celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString) - - val args = Array("-h", "localhost", "-p", randomMasterPort.toString) - - val masterArgs = new MasterArguments(args, celebornConf) - master = new Master(celebornConf, masterArgs) - ThreadUtils.newThread( - new Runnable { - override def run(): Unit = { - master.initialize() - } - }, - "api-master-thread").start() + master = setupMasterWithRandomPort(celebornConf.getAll.toMap) super.beforeAll() } override def afterAll(): Unit = { super.afterAll() - master.stop(CelebornExitKind.EXIT_IMMEDIATELY) - master.rpcEnv.shutdown() + shutdownMaster() } } diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterClusterFeature.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterClusterFeature.scala new file mode 100644 index 000000000..65995bfc3 --- /dev/null +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterClusterFeature.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.master + +import java.io.IOException +import java.net.{BindException, InetSocketAddress, Socket} +import java.util.concurrent.TimeUnit + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.common.util.{CelebornExitKind, Utils} + +trait MasterClusterFeature extends Logging { + var masterInfo: (Master, Thread) = _ + + val maxRetries = 3 + val masterWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(30) + + class RunnerWrap[T](code: => T) extends Thread { + override def run(): Unit = { + Utils.tryLogNonFatalError(code) + } + } + + val usedPorts = new java.util.HashSet[Integer]() + def portBounded(port: Int): Boolean = { + val socket = new Socket() + try { + socket.connect(new InetSocketAddress("localhost", port), 100) + true + } catch { + case _: IOException => false + } finally { + socket.close() + } + } + def selectRandomPort(): Int = synchronized { + val port = Utils.selectRandomInt(1024, 65535) + val portUsed = usedPorts.contains(port) || portBounded(port) + usedPorts.add(port) + if (portUsed) { + selectRandomPort() + } else { + port + } + } + + def withRetryOnPortBindException(f: () => Unit): Unit = { + var retryCount = 0 + var pass = false + while (!pass) { + try { + f() + pass = true + } catch { + case e: IOException + if e.isInstanceOf[BindException] || Option(e.getCause).exists( + _.isInstanceOf[BindException]) => + logError(s"failed due to BindException, retrying (retry count: $retryCount)", e) + retryCount += 1 + if (retryCount == maxRetries) { + logError("failed due to reach the max retry count", e) + throw e + } + } + } + } + + def setupMasterWithRandomPort(masterConf: Map[String, String] = Map()): Master = { + var master: Master = null + withRetryOnPortBindException { () => + val randomPort = selectRandomPort() + val randomInternalPort = selectRandomPort() + val finalMasterConf = Map( + s"${CelebornConf.MASTER_HOST.key}" -> "localhost", + s"${CelebornConf.PORT_MAX_RETRY.key}" -> "0", + s"${CelebornConf.MASTER_PORT.key}" -> s"$randomPort", + s"${CelebornConf.MASTER_ENDPOINTS.key}" -> s"localhost:$randomPort", + s"${CelebornConf.MASTER_INTERNAL_PORT.key}" -> s"$randomInternalPort", + s"${CelebornConf.MASTER_INTERNAL_ENDPOINTS.key}" -> s"localhost:$randomInternalPort") ++ + masterConf + master = setUpMaster(masterConf = finalMasterConf) + } + master + } + + private def createMaster(map: Map[String, String] = null): Master = { + val conf = new CelebornConf() + conf.set(CelebornConf.METRICS_ENABLED.key, "false") + val httpPort = selectRandomPort() + conf.set(CelebornConf.MASTER_HTTP_PORT.key, s"$httpPort") + logInfo(s"set ${CelebornConf.MASTER_HTTP_PORT.key} to $httpPort") + if (map != null) { + map.foreach(m => conf.set(m._1, m._2)) + } + + val masterArguments = new MasterArguments(Array(), conf) + val master = new Master(conf, masterArguments) + if (conf.metricsSystemEnable) { + master.metricsSystem.start() + } + master.startHttpServer() + + Thread.sleep(3000L) + master + } + + def setUpMaster(masterConf: Map[String, String] = null): Master = { + val master = createMaster(masterConf) + val masterStartedSignal = Array(false) + val masterThread = new RunnerWrap({ + try { + masterStartedSignal(0) = true + master.rpcEnv.awaitTermination() + } catch { + case ex: Exception => + masterStartedSignal(0) = false + throw ex + } + }) + masterThread.start() + masterInfo = (master, masterThread) + var masterStartWaitingTime = 0 + while (!masterStartedSignal.head) { + logInfo("waiting for master node starting") + if (masterStartWaitingTime >= masterWaitingTimeoutMs) { + throw new BindException("cannot start master rpc endpoint") + } + Thread.sleep(3000) + masterStartWaitingTime += 3000 + } + master + } + + def shutdownMaster(): Unit = { + masterInfo._1.stop(CelebornExitKind.EXIT_IMMEDIATELY) + masterInfo._1.rpcEnv.shutdown() + Thread.sleep(3000) + masterInfo._2.interrupt() + usedPorts.clear() + } +} diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala index ed27aac48..7b7a7a1f6 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala @@ -24,14 +24,13 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.funsuite.AnyFunSuite import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.protocol.{PbCheckForWorkerTimeout, PbRegisterWorker} -import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils} +import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils} class MasterSuite extends AnyFunSuite with BeforeAndAfterAll with BeforeAndAfterEach - with Logging { + with MasterClusterFeature { def getTmpDir(): String = { val tmpDir = Files.createTempDirectory(null).toFile @@ -40,72 +39,86 @@ class MasterSuite extends AnyFunSuite } test("test single node startup functionality") { - val conf = new CelebornConf() - val randomMasterPort = Utils.selectRandomInt(1024, 65535) - val randomHttpPort = randomMasterPort + 1 - conf.set(CelebornConf.HA_ENABLED.key, "false") - conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) - conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir()) - conf.set(CelebornConf.METRICS_ENABLED.key, "true") - conf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1") - conf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString) + withRetryOnPortBindException { () => + val conf = new CelebornConf() + val randomMasterPort = selectRandomPort() + val randomHttpPort = selectRandomPort() + conf.set(CelebornConf.HA_ENABLED.key, "false") + conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) + conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir()) + conf.set(CelebornConf.METRICS_ENABLED.key, "true") + conf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1") + conf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString) - val args = Array("-h", "localhost", "-p", randomMasterPort.toString) + val args = Array("-h", "localhost", "-p", randomMasterPort.toString) - val masterArgs = new MasterArguments(args, conf) - val master = new Master(conf, masterArgs) - ThreadUtils.newThread( - new Runnable { - override def run(): Unit = { - master.initialize() - } - }, - "master-init-thread").start() - Thread.sleep(5000L) - master.stop(CelebornExitKind.EXIT_IMMEDIATELY) - master.rpcEnv.shutdown() + val masterArgs = new MasterArguments(args, conf) + val master = new Master(conf, masterArgs) + ThreadUtils.newThread( + new Runnable { + override def run(): Unit = { + master.initialize() + } + }, + "master-init-thread").start() + Thread.sleep(5000L) + master.stop(CelebornExitKind.EXIT_IMMEDIATELY) + master.rpcEnv.shutdown() + } } test("test dedicated internal port receives") { - val conf = new CelebornConf() - conf.set(CelebornConf.HA_ENABLED.key, "false") - conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) - conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir()) - conf.set(CelebornConf.METRICS_ENABLED.key, "true") - conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true") + withRetryOnPortBindException { () => + val conf = new CelebornConf() + val randomMasterPort = selectRandomPort() + val randomHttpPort = selectRandomPort() + val randomInternalPort = selectRandomPort() + conf.set(CelebornConf.HA_ENABLED.key, "false") + conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) + conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir()) + conf.set(CelebornConf.METRICS_ENABLED.key, "true") + conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true") + conf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString) - val args = Array("-h", "localhost", "-p", "9097", "--internal-port", "8097") + val args = Array( + "-h", + "localhost", + "-p", + randomMasterPort.toString, + "--internal-port", + randomInternalPort.toString) - val masterArgs = new MasterArguments(args, conf) - val master = new Master(conf, masterArgs) - ThreadUtils.newThread( - new Runnable { - override def run(): Unit = { - master.initialize() - } - }, - "master-init-thread").start() - Thread.sleep(5000L) - master.receive.applyOrElse( - PbCheckForWorkerTimeout.newBuilder().build(), - (_: Any) => fail("Unexpected message")) - master.internalRpcEndpoint.receive.applyOrElse( - PbCheckForWorkerTimeout.newBuilder().build(), - (_: Any) => fail("Unexpected message")) + val masterArgs = new MasterArguments(args, conf) + val master = new Master(conf, masterArgs) + ThreadUtils.newThread( + new Runnable { + override def run(): Unit = { + master.initialize() + } + }, + "master-init-thread").start() + Thread.sleep(5000L) + master.receive.applyOrElse( + PbCheckForWorkerTimeout.newBuilder().build(), + (_: Any) => fail("Unexpected message")) + master.internalRpcEndpoint.receive.applyOrElse( + PbCheckForWorkerTimeout.newBuilder().build(), + (_: Any) => fail("Unexpected message")) - master.internalRpcEndpoint.receiveAndReply( - mock(classOf[org.apache.celeborn.common.rpc.RpcCallContext])).applyOrElse( - PbRegisterWorker.newBuilder().build(), - (_: Any) => fail("Unexpected message")) - master.stop(CelebornExitKind.EXIT_IMMEDIATELY) - master.rpcEnv.shutdown() - master.internalRpcEnvInUse.shutdown() + master.internalRpcEndpoint.receiveAndReply( + mock(classOf[org.apache.celeborn.common.rpc.RpcCallContext])).applyOrElse( + PbRegisterWorker.newBuilder().build(), + (_: Any) => fail("Unexpected message")) + master.stop(CelebornExitKind.EXIT_IMMEDIATELY) + master.rpcEnv.shutdown() + master.internalRpcEnvInUse.shutdown() + } } test("test master worker host allow and deny pattern") { val conf = new CelebornConf() - val randomMasterPort = Utils.selectRandomInt(1024, 65535) - val randomHttpPort = randomMasterPort + 1 + val randomMasterPort = selectRandomPort() + val randomHttpPort = selectRandomPort() conf.set(CelebornConf.HA_ENABLED.key, "false") conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir()) diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala index fba331bb2..d34377f08 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala @@ -17,54 +17,26 @@ package org.apache.celeborn.service.deploy.master.http.api -import java.nio.file.Files import javax.ws.rs.client.Entity import javax.ws.rs.core.{Form, MediaType} -import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils} import org.apache.celeborn.server.common.HttpService import org.apache.celeborn.server.common.http.ApiBaseResourceSuite -import org.apache.celeborn.service.deploy.master.{Master, MasterArguments} +import org.apache.celeborn.service.deploy.master.{Master, MasterClusterFeature} -class ApiMasterResourceSuite extends ApiBaseResourceSuite { +class ApiMasterResourceSuite extends ApiBaseResourceSuite with MasterClusterFeature { private var master: Master = _ override protected def httpService: HttpService = master - def getTmpDir(): String = { - val tmpDir = Files.createTempDirectory(null).toFile - tmpDir.deleteOnExit() - tmpDir.getAbsolutePath - } - override def beforeAll(): Unit = { - val randomMasterPort = Utils.selectRandomInt(1024, 65535) - val randomHttpPort = randomMasterPort + 1 - celebornConf.set(CelebornConf.HA_ENABLED.key, "false") - celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) - celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir()) - celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1") - celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString) - - val args = Array("-h", "localhost", "-p", randomMasterPort.toString) - - val masterArgs = new MasterArguments(args, celebornConf) - master = new Master(celebornConf, masterArgs) - ThreadUtils.newThread( - new Runnable { - override def run(): Unit = { - master.initialize() - } - }, - "master-init-thread").start() + master = setupMasterWithRandomPort(celebornConf.getAll.toMap) super.beforeAll() } override def afterAll(): Unit = { super.afterAll() - master.stop(CelebornExitKind.EXIT_IMMEDIATELY) - master.rpcEnv.shutdown() + shutdownMaster() } test("masterGroupInfo") { diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala index edcec11c0..69dd19e4d 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala @@ -17,57 +17,29 @@ package org.apache.celeborn.service.deploy.master.http.api.v1 -import java.nio.file.Files import java.util.Collections import javax.servlet.http.HttpServletResponse import javax.ws.rs.client.Entity import javax.ws.rs.core.MediaType -import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils} import org.apache.celeborn.rest.v1.model.{ApplicationsResponse, ExcludeWorkerRequest, HandleResponse, HostnamesResponse, RemoveWorkersUnavailableInfoRequest, SendWorkerEventRequest, ShufflesResponse, WorkerEventsResponse, WorkerId, WorkersResponse} import org.apache.celeborn.server.common.HttpService import org.apache.celeborn.server.common.http.api.v1.ApiV1BaseResourceSuite -import org.apache.celeborn.service.deploy.master.{Master, MasterArguments} +import org.apache.celeborn.service.deploy.master.{Master, MasterClusterFeature} -class ApiV1MasterResourceSuite extends ApiV1BaseResourceSuite { +class ApiV1MasterResourceSuite extends ApiV1BaseResourceSuite with MasterClusterFeature { private var master: Master = _ override protected def httpService: HttpService = master - def getTmpDir(): String = { - val tmpDir = Files.createTempDirectory(null).toFile - tmpDir.deleteOnExit() - tmpDir.getAbsolutePath - } - override def beforeAll(): Unit = { - val randomMasterPort = Utils.selectRandomInt(1024, 65535) - val randomHttpPort = randomMasterPort + 1 - celebornConf.set(CelebornConf.HA_ENABLED.key, "false") - celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) - celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir()) - celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1") - celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString) - - val args = Array("-h", "localhost", "-p", randomMasterPort.toString) - - val masterArgs = new MasterArguments(args, celebornConf) - master = new Master(celebornConf, masterArgs) - ThreadUtils.newThread( - new Runnable { - override def run(): Unit = { - master.initialize() - } - }, - "master-init-thread").start() + master = setupMasterWithRandomPort(celebornConf.getAll.toMap) super.beforeAll() } override def afterAll(): Unit = { super.afterAll() - master.stop(CelebornExitKind.EXIT_IMMEDIATELY) - master.rpcEnv.shutdown() + shutdownMaster() } test("shuffle resource") { diff --git a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala index 6a6ee9242..8674fc39c 100644 --- a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala +++ b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala @@ -64,7 +64,6 @@ trait HttpTestHelper extends AnyFunSuite override def beforeAll(): Unit = { super.beforeAll() restApiBaseSuite.setUp() - Thread.sleep(1000) // sleep for http server initialization } override def afterAll(): Unit = { diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala index f393872f9..8c3a47a07 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala @@ -18,7 +18,7 @@ package org.apache.celeborn.service.deploy import java.io.IOException -import java.net.BindException +import java.net.{BindException, InetSocketAddress, Socket} import java.nio.file.Files import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock @@ -28,7 +28,6 @@ import scala.collection.mutable import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.util.{CelebornExitKind, Utils} -import org.apache.celeborn.common.util.Utils.selectRandomInt import org.apache.celeborn.service.deploy.master.{Master, MasterArguments} import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments} import org.apache.celeborn.service.deploy.worker.memory.MemoryManager @@ -39,6 +38,10 @@ trait MiniClusterFeature extends Logging { val workerInfos = new mutable.HashMap[Worker, Thread]() var workerConfForAdding: Map[String, String] = _ + val maxRetries = 4 + val masterWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(60) + val workersWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(60) + class RunnerWrap[T](code: => T) extends Thread { override def run(): Unit = { @@ -46,6 +49,29 @@ trait MiniClusterFeature extends Logging { } } + val usedPorts = new java.util.HashSet[Integer]() + def portBounded(port: Int): Boolean = { + val socket = new Socket() + try { + socket.connect(new InetSocketAddress("localhost", port), 100) + true + } catch { + case _: IOException => false + } finally { + socket.close() + } + } + def selectRandomPort(): Int = synchronized { + val port = Utils.selectRandomInt(1024, 65535) + val portUsed = usedPorts.contains(port) || portBounded(port) + usedPorts.add(port) + if (portUsed) { + selectRandomPort() + } else { + port + } + } + def setupMiniClusterWithRandomPorts( masterConf: Map[String, String] = Map(), workerConf: Map[String, String] = Map(), @@ -56,8 +82,8 @@ trait MiniClusterFeature extends Logging { var workers: collection.Set[Worker] = null while (!created) { try { - val randomPort = selectRandomInt(1024, 65535) - val randomInternalPort = selectRandomInt(1024, 65535) + val randomPort = selectRandomPort() + val randomInternalPort = selectRandomPort() val finalMasterConf = Map( s"${CelebornConf.MASTER_HOST.key}" -> "localhost", s"${CelebornConf.PORT_MAX_RETRY.key}" -> "0", @@ -84,7 +110,7 @@ trait MiniClusterFeature extends Logging { _.isInstanceOf[BindException]) => logError(s"failed to setup mini cluster, retrying (retry count: $retryCount)", e) retryCount += 1 - if (retryCount == 3) { + if (retryCount == maxRetries) { logError("failed to setup mini cluster, reached the max retry count", e) throw e } @@ -103,7 +129,7 @@ trait MiniClusterFeature extends Logging { private def createMaster(map: Map[String, String] = null): Master = { val conf = new CelebornConf() conf.set(CelebornConf.METRICS_ENABLED.key, "false") - val httpPort = selectRandomInt(1024, 65535) + val httpPort = selectRandomPort() conf.set(CelebornConf.MASTER_HTTP_PORT.key, s"$httpPort") logInfo(s"set ${CelebornConf.MASTER_HTTP_PORT.key} to $httpPort") if (map != null) { @@ -128,7 +154,7 @@ trait MiniClusterFeature extends Logging { conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir) conf.set(CelebornConf.WORKER_DISK_MONITOR_ENABLED.key, "false") conf.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K") - conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomInt(1024, 65535)}") + conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomPort()}") conf.set("celeborn.fetch.io.threads", "4") conf.set("celeborn.push.io.threads", "4") if (map != null) { @@ -151,7 +177,6 @@ trait MiniClusterFeature extends Logging { } def setUpMaster(masterConf: Map[String, String] = null): Master = { - val timeout = 30000 val master = createMaster(masterConf) val masterStartedSignal = Array(false) val masterThread = new RunnerWrap({ @@ -167,13 +192,13 @@ trait MiniClusterFeature extends Logging { masterThread.start() masterInfo = (master, masterThread) var masterStartWaitingTime = 0 - Thread.sleep(5000) while (!masterStartedSignal.head) { logInfo("waiting for master node starting") - masterStartWaitingTime += 5000 - if (masterStartWaitingTime >= timeout) { + if (masterStartWaitingTime >= masterWaitingTimeoutMs) { throw new BindException("cannot start master rpc endpoint") } + Thread.sleep(5000) + masterStartWaitingTime += 5000 } master } @@ -181,7 +206,6 @@ trait MiniClusterFeature extends Logging { def setUpWorkers( workerConf: Map[String, String] = null, workerNum: Int = 3): collection.Set[Worker] = { - val timeout = 30000 val workers = new Array[Worker](workerNum) val flagUpdateLock = new ReentrantLock() val threads = (1 to workerNum).map { i => @@ -204,11 +228,9 @@ trait MiniClusterFeature extends Logging { workerStarted = false workerStartRetry += 1 logError(s"cannot start worker $i, retrying: ", ex) - if (workerStartRetry == 3) { + if (workerStartRetry == maxRetries) { logError(s"cannot start worker $i, reached to max retrying", ex) throw ex - } else { - TimeUnit.SECONDS.sleep(Math.pow(2, workerStartRetry).toLong) } } } @@ -238,12 +260,12 @@ trait MiniClusterFeature extends Logging { } catch { case ex: Throwable => logError("all workers haven't been started retrying", ex) - Thread.sleep(5000) - workersWaitingTime += 5000 - if (workersWaitingTime >= timeout) { - logError(s"cannot start all workers after $timeout ms", ex) + if (workersWaitingTime >= workersWaitingTimeoutMs) { + logError(s"cannot start all workers after $workersWaitingTimeoutMs ms", ex) throw ex } + Thread.sleep(5000) + workersWaitingTime += 5000 } } workerInfos.keySet @@ -278,5 +300,7 @@ trait MiniClusterFeature extends Logging { workerInfos.clear() masterInfo._2.interrupt() MemoryManager.reset() + + usedPorts.clear() } } diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala index 87bc65481..81342ba68 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/http/api/v1/ApiV1WorkerResourceSuite.scala @@ -32,7 +32,7 @@ class ApiV1WorkerResourceSuite extends ApiV1BaseResourceSuite with MiniClusterFe override def beforeAll(): Unit = { logInfo("test initialized, setup celeborn mini cluster") - val (m, w) = + val (_, w) = setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap, workerNum = 1) worker = w.head super.beforeAll() diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala index 8f0a862e6..5e28441a5 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceAuthenticationSuite.scala @@ -17,27 +17,25 @@ package org.apache.celeborn.service.deploy.worker.storage -import org.apache.celeborn.common.util.CelebornExitKind import org.apache.celeborn.server.common.HttpService import org.apache.celeborn.server.common.http.ApiBaseResourceAuthenticationSuite -import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments} +import org.apache.celeborn.service.deploy.MiniClusterFeature +import org.apache.celeborn.service.deploy.worker.Worker -class ApiWorkerResourceAuthenticationSuite extends ApiBaseResourceAuthenticationSuite { +class ApiWorkerResourceAuthenticationSuite extends ApiBaseResourceAuthenticationSuite + with MiniClusterFeature { private var worker: Worker = _ override protected def httpService: HttpService = worker override def beforeAll(): Unit = { - val workerArgs = new WorkerArguments(Array(), celebornConf) - worker = new Worker(celebornConf, workerArgs) - worker.metricsSystem.start() - worker.startHttpServer() + val (_, w) = + setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap, workerNum = 1) + worker = w.head super.beforeAll() } override def afterAll(): Unit = { super.afterAll() - worker.metricsSystem.stop() - worker.rpcEnv.shutdown() - worker.stop(CelebornExitKind.EXIT_IMMEDIATELY) + shutdownMiniCluster() } }