[CELEBORN-1711][TEST] Fix flaky test caused by master/worker setup issue

### What changes were proposed in this pull request?

1. retry on BindException when starting master/worker http server
2. record the used ports and pre-check whether the selected port is used or bounded before binding

### Why are the changes needed?

To fix flaky test.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
GA.

Closes #2906 from turboFei/retry_master_suite.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
Wang, Fei 2024-12-17 10:45:40 +08:00 committed by Shuang
parent 17df678c77
commit 2efdf755cc
9 changed files with 291 additions and 184 deletions

View File

@ -17,50 +17,22 @@
package org.apache.celeborn.service.deploy.master
import java.nio.file.Files
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
import org.apache.celeborn.server.common.HttpService
import org.apache.celeborn.server.common.http.ApiBaseResourceAuthenticationSuite
class ApiMasterResourceAuthenticationSuite extends ApiBaseResourceAuthenticationSuite {
class ApiMasterResourceAuthenticationSuite extends ApiBaseResourceAuthenticationSuite
with MasterClusterFeature {
private var master: Master = _
override protected def httpService: HttpService = master
def getTmpDir(): String = {
val tmpDir = Files.createTempDirectory(null).toFile
tmpDir.deleteOnExit()
tmpDir.getAbsolutePath
}
override def beforeAll(): Unit = {
val randomMasterPort = Utils.selectRandomInt(1024, 65535)
val randomHttpPort = randomMasterPort + 1
celebornConf.set(CelebornConf.HA_ENABLED.key, "false")
celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
val masterArgs = new MasterArguments(args, celebornConf)
master = new Master(celebornConf, masterArgs)
ThreadUtils.newThread(
new Runnable {
override def run(): Unit = {
master.initialize()
}
},
"api-master-thread").start()
master = setupMasterWithRandomPort(celebornConf.getAll.toMap)
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
master.rpcEnv.shutdown()
shutdownMaster()
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.master
import java.io.IOException
import java.net.{BindException, InetSocketAddress, Socket}
import java.util.concurrent.TimeUnit
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
trait MasterClusterFeature extends Logging {
var masterInfo: (Master, Thread) = _
val maxRetries = 3
val masterWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(30)
class RunnerWrap[T](code: => T) extends Thread {
override def run(): Unit = {
Utils.tryLogNonFatalError(code)
}
}
val usedPorts = new java.util.HashSet[Integer]()
def portBounded(port: Int): Boolean = {
val socket = new Socket()
try {
socket.connect(new InetSocketAddress("localhost", port), 100)
true
} catch {
case _: IOException => false
} finally {
socket.close()
}
}
def selectRandomPort(): Int = synchronized {
val port = Utils.selectRandomInt(1024, 65535)
val portUsed = usedPorts.contains(port) || portBounded(port)
usedPorts.add(port)
if (portUsed) {
selectRandomPort()
} else {
port
}
}
def withRetryOnPortBindException(f: () => Unit): Unit = {
var retryCount = 0
var pass = false
while (!pass) {
try {
f()
pass = true
} catch {
case e: IOException
if e.isInstanceOf[BindException] || Option(e.getCause).exists(
_.isInstanceOf[BindException]) =>
logError(s"failed due to BindException, retrying (retry count: $retryCount)", e)
retryCount += 1
if (retryCount == maxRetries) {
logError("failed due to reach the max retry count", e)
throw e
}
}
}
}
def setupMasterWithRandomPort(masterConf: Map[String, String] = Map()): Master = {
var master: Master = null
withRetryOnPortBindException { () =>
val randomPort = selectRandomPort()
val randomInternalPort = selectRandomPort()
val finalMasterConf = Map(
s"${CelebornConf.MASTER_HOST.key}" -> "localhost",
s"${CelebornConf.PORT_MAX_RETRY.key}" -> "0",
s"${CelebornConf.MASTER_PORT.key}" -> s"$randomPort",
s"${CelebornConf.MASTER_ENDPOINTS.key}" -> s"localhost:$randomPort",
s"${CelebornConf.MASTER_INTERNAL_PORT.key}" -> s"$randomInternalPort",
s"${CelebornConf.MASTER_INTERNAL_ENDPOINTS.key}" -> s"localhost:$randomInternalPort") ++
masterConf
master = setUpMaster(masterConf = finalMasterConf)
}
master
}
private def createMaster(map: Map[String, String] = null): Master = {
val conf = new CelebornConf()
conf.set(CelebornConf.METRICS_ENABLED.key, "false")
val httpPort = selectRandomPort()
conf.set(CelebornConf.MASTER_HTTP_PORT.key, s"$httpPort")
logInfo(s"set ${CelebornConf.MASTER_HTTP_PORT.key} to $httpPort")
if (map != null) {
map.foreach(m => conf.set(m._1, m._2))
}
val masterArguments = new MasterArguments(Array(), conf)
val master = new Master(conf, masterArguments)
if (conf.metricsSystemEnable) {
master.metricsSystem.start()
}
master.startHttpServer()
Thread.sleep(3000L)
master
}
def setUpMaster(masterConf: Map[String, String] = null): Master = {
val master = createMaster(masterConf)
val masterStartedSignal = Array(false)
val masterThread = new RunnerWrap({
try {
masterStartedSignal(0) = true
master.rpcEnv.awaitTermination()
} catch {
case ex: Exception =>
masterStartedSignal(0) = false
throw ex
}
})
masterThread.start()
masterInfo = (master, masterThread)
var masterStartWaitingTime = 0
while (!masterStartedSignal.head) {
logInfo("waiting for master node starting")
if (masterStartWaitingTime >= masterWaitingTimeoutMs) {
throw new BindException("cannot start master rpc endpoint")
}
Thread.sleep(3000)
masterStartWaitingTime += 3000
}
master
}
def shutdownMaster(): Unit = {
masterInfo._1.stop(CelebornExitKind.EXIT_IMMEDIATELY)
masterInfo._1.rpcEnv.shutdown()
Thread.sleep(3000)
masterInfo._2.interrupt()
usedPorts.clear()
}
}

View File

@ -24,14 +24,13 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.{PbCheckForWorkerTimeout, PbRegisterWorker}
import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils}
class MasterSuite extends AnyFunSuite
with BeforeAndAfterAll
with BeforeAndAfterEach
with Logging {
with MasterClusterFeature {
def getTmpDir(): String = {
val tmpDir = Files.createTempDirectory(null).toFile
@ -40,72 +39,86 @@ class MasterSuite extends AnyFunSuite
}
test("test single node startup functionality") {
val conf = new CelebornConf()
val randomMasterPort = Utils.selectRandomInt(1024, 65535)
val randomHttpPort = randomMasterPort + 1
conf.set(CelebornConf.HA_ENABLED.key, "false")
conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
conf.set(CelebornConf.METRICS_ENABLED.key, "true")
conf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
conf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
withRetryOnPortBindException { () =>
val conf = new CelebornConf()
val randomMasterPort = selectRandomPort()
val randomHttpPort = selectRandomPort()
conf.set(CelebornConf.HA_ENABLED.key, "false")
conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
conf.set(CelebornConf.METRICS_ENABLED.key, "true")
conf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
conf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
val masterArgs = new MasterArguments(args, conf)
val master = new Master(conf, masterArgs)
ThreadUtils.newThread(
new Runnable {
override def run(): Unit = {
master.initialize()
}
},
"master-init-thread").start()
Thread.sleep(5000L)
master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
master.rpcEnv.shutdown()
val masterArgs = new MasterArguments(args, conf)
val master = new Master(conf, masterArgs)
ThreadUtils.newThread(
new Runnable {
override def run(): Unit = {
master.initialize()
}
},
"master-init-thread").start()
Thread.sleep(5000L)
master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
master.rpcEnv.shutdown()
}
}
test("test dedicated internal port receives") {
val conf = new CelebornConf()
conf.set(CelebornConf.HA_ENABLED.key, "false")
conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
conf.set(CelebornConf.METRICS_ENABLED.key, "true")
conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true")
withRetryOnPortBindException { () =>
val conf = new CelebornConf()
val randomMasterPort = selectRandomPort()
val randomHttpPort = selectRandomPort()
val randomInternalPort = selectRandomPort()
conf.set(CelebornConf.HA_ENABLED.key, "false")
conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
conf.set(CelebornConf.METRICS_ENABLED.key, "true")
conf.set(CelebornConf.INTERNAL_PORT_ENABLED.key, "true")
conf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
val args = Array("-h", "localhost", "-p", "9097", "--internal-port", "8097")
val args = Array(
"-h",
"localhost",
"-p",
randomMasterPort.toString,
"--internal-port",
randomInternalPort.toString)
val masterArgs = new MasterArguments(args, conf)
val master = new Master(conf, masterArgs)
ThreadUtils.newThread(
new Runnable {
override def run(): Unit = {
master.initialize()
}
},
"master-init-thread").start()
Thread.sleep(5000L)
master.receive.applyOrElse(
PbCheckForWorkerTimeout.newBuilder().build(),
(_: Any) => fail("Unexpected message"))
master.internalRpcEndpoint.receive.applyOrElse(
PbCheckForWorkerTimeout.newBuilder().build(),
(_: Any) => fail("Unexpected message"))
val masterArgs = new MasterArguments(args, conf)
val master = new Master(conf, masterArgs)
ThreadUtils.newThread(
new Runnable {
override def run(): Unit = {
master.initialize()
}
},
"master-init-thread").start()
Thread.sleep(5000L)
master.receive.applyOrElse(
PbCheckForWorkerTimeout.newBuilder().build(),
(_: Any) => fail("Unexpected message"))
master.internalRpcEndpoint.receive.applyOrElse(
PbCheckForWorkerTimeout.newBuilder().build(),
(_: Any) => fail("Unexpected message"))
master.internalRpcEndpoint.receiveAndReply(
mock(classOf[org.apache.celeborn.common.rpc.RpcCallContext])).applyOrElse(
PbRegisterWorker.newBuilder().build(),
(_: Any) => fail("Unexpected message"))
master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
master.rpcEnv.shutdown()
master.internalRpcEnvInUse.shutdown()
master.internalRpcEndpoint.receiveAndReply(
mock(classOf[org.apache.celeborn.common.rpc.RpcCallContext])).applyOrElse(
PbRegisterWorker.newBuilder().build(),
(_: Any) => fail("Unexpected message"))
master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
master.rpcEnv.shutdown()
master.internalRpcEnvInUse.shutdown()
}
}
test("test master worker host allow and deny pattern") {
val conf = new CelebornConf()
val randomMasterPort = Utils.selectRandomInt(1024, 65535)
val randomHttpPort = randomMasterPort + 1
val randomMasterPort = selectRandomPort()
val randomHttpPort = selectRandomPort()
conf.set(CelebornConf.HA_ENABLED.key, "false")
conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())

View File

@ -17,54 +17,26 @@
package org.apache.celeborn.service.deploy.master.http.api
import java.nio.file.Files
import javax.ws.rs.client.Entity
import javax.ws.rs.core.{Form, MediaType}
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
import org.apache.celeborn.server.common.HttpService
import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
import org.apache.celeborn.service.deploy.master.{Master, MasterClusterFeature}
class ApiMasterResourceSuite extends ApiBaseResourceSuite {
class ApiMasterResourceSuite extends ApiBaseResourceSuite with MasterClusterFeature {
private var master: Master = _
override protected def httpService: HttpService = master
def getTmpDir(): String = {
val tmpDir = Files.createTempDirectory(null).toFile
tmpDir.deleteOnExit()
tmpDir.getAbsolutePath
}
override def beforeAll(): Unit = {
val randomMasterPort = Utils.selectRandomInt(1024, 65535)
val randomHttpPort = randomMasterPort + 1
celebornConf.set(CelebornConf.HA_ENABLED.key, "false")
celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
val masterArgs = new MasterArguments(args, celebornConf)
master = new Master(celebornConf, masterArgs)
ThreadUtils.newThread(
new Runnable {
override def run(): Unit = {
master.initialize()
}
},
"master-init-thread").start()
master = setupMasterWithRandomPort(celebornConf.getAll.toMap)
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
master.rpcEnv.shutdown()
shutdownMaster()
}
test("masterGroupInfo") {

View File

@ -17,57 +17,29 @@
package org.apache.celeborn.service.deploy.master.http.api.v1
import java.nio.file.Files
import java.util.Collections
import javax.servlet.http.HttpServletResponse
import javax.ws.rs.client.Entity
import javax.ws.rs.core.MediaType
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
import org.apache.celeborn.rest.v1.model.{ApplicationsResponse, ExcludeWorkerRequest, HandleResponse, HostnamesResponse, RemoveWorkersUnavailableInfoRequest, SendWorkerEventRequest, ShufflesResponse, WorkerEventsResponse, WorkerId, WorkersResponse}
import org.apache.celeborn.server.common.HttpService
import org.apache.celeborn.server.common.http.api.v1.ApiV1BaseResourceSuite
import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
import org.apache.celeborn.service.deploy.master.{Master, MasterClusterFeature}
class ApiV1MasterResourceSuite extends ApiV1BaseResourceSuite {
class ApiV1MasterResourceSuite extends ApiV1BaseResourceSuite with MasterClusterFeature {
private var master: Master = _
override protected def httpService: HttpService = master
def getTmpDir(): String = {
val tmpDir = Files.createTempDirectory(null).toFile
tmpDir.deleteOnExit()
tmpDir.getAbsolutePath
}
override def beforeAll(): Unit = {
val randomMasterPort = Utils.selectRandomInt(1024, 65535)
val randomHttpPort = randomMasterPort + 1
celebornConf.set(CelebornConf.HA_ENABLED.key, "false")
celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
val masterArgs = new MasterArguments(args, celebornConf)
master = new Master(celebornConf, masterArgs)
ThreadUtils.newThread(
new Runnable {
override def run(): Unit = {
master.initialize()
}
},
"master-init-thread").start()
master = setupMasterWithRandomPort(celebornConf.getAll.toMap)
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
master.rpcEnv.shutdown()
shutdownMaster()
}
test("shuffle resource") {

View File

@ -64,7 +64,6 @@ trait HttpTestHelper extends AnyFunSuite
override def beforeAll(): Unit = {
super.beforeAll()
restApiBaseSuite.setUp()
Thread.sleep(1000) // sleep for http server initialization
}
override def afterAll(): Unit = {

View File

@ -18,7 +18,7 @@
package org.apache.celeborn.service.deploy
import java.io.IOException
import java.net.BindException
import java.net.{BindException, InetSocketAddress, Socket}
import java.nio.file.Files
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
@ -28,7 +28,6 @@ import scala.collection.mutable
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
import org.apache.celeborn.common.util.Utils.selectRandomInt
import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
@ -39,6 +38,10 @@ trait MiniClusterFeature extends Logging {
val workerInfos = new mutable.HashMap[Worker, Thread]()
var workerConfForAdding: Map[String, String] = _
val maxRetries = 4
val masterWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(60)
val workersWaitingTimeoutMs = TimeUnit.SECONDS.toMillis(60)
class RunnerWrap[T](code: => T) extends Thread {
override def run(): Unit = {
@ -46,6 +49,29 @@ trait MiniClusterFeature extends Logging {
}
}
val usedPorts = new java.util.HashSet[Integer]()
def portBounded(port: Int): Boolean = {
val socket = new Socket()
try {
socket.connect(new InetSocketAddress("localhost", port), 100)
true
} catch {
case _: IOException => false
} finally {
socket.close()
}
}
def selectRandomPort(): Int = synchronized {
val port = Utils.selectRandomInt(1024, 65535)
val portUsed = usedPorts.contains(port) || portBounded(port)
usedPorts.add(port)
if (portUsed) {
selectRandomPort()
} else {
port
}
}
def setupMiniClusterWithRandomPorts(
masterConf: Map[String, String] = Map(),
workerConf: Map[String, String] = Map(),
@ -56,8 +82,8 @@ trait MiniClusterFeature extends Logging {
var workers: collection.Set[Worker] = null
while (!created) {
try {
val randomPort = selectRandomInt(1024, 65535)
val randomInternalPort = selectRandomInt(1024, 65535)
val randomPort = selectRandomPort()
val randomInternalPort = selectRandomPort()
val finalMasterConf = Map(
s"${CelebornConf.MASTER_HOST.key}" -> "localhost",
s"${CelebornConf.PORT_MAX_RETRY.key}" -> "0",
@ -84,7 +110,7 @@ trait MiniClusterFeature extends Logging {
_.isInstanceOf[BindException]) =>
logError(s"failed to setup mini cluster, retrying (retry count: $retryCount)", e)
retryCount += 1
if (retryCount == 3) {
if (retryCount == maxRetries) {
logError("failed to setup mini cluster, reached the max retry count", e)
throw e
}
@ -103,7 +129,7 @@ trait MiniClusterFeature extends Logging {
private def createMaster(map: Map[String, String] = null): Master = {
val conf = new CelebornConf()
conf.set(CelebornConf.METRICS_ENABLED.key, "false")
val httpPort = selectRandomInt(1024, 65535)
val httpPort = selectRandomPort()
conf.set(CelebornConf.MASTER_HTTP_PORT.key, s"$httpPort")
logInfo(s"set ${CelebornConf.MASTER_HTTP_PORT.key} to $httpPort")
if (map != null) {
@ -128,7 +154,7 @@ trait MiniClusterFeature extends Logging {
conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir)
conf.set(CelebornConf.WORKER_DISK_MONITOR_ENABLED.key, "false")
conf.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomInt(1024, 65535)}")
conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomPort()}")
conf.set("celeborn.fetch.io.threads", "4")
conf.set("celeborn.push.io.threads", "4")
if (map != null) {
@ -151,7 +177,6 @@ trait MiniClusterFeature extends Logging {
}
def setUpMaster(masterConf: Map[String, String] = null): Master = {
val timeout = 30000
val master = createMaster(masterConf)
val masterStartedSignal = Array(false)
val masterThread = new RunnerWrap({
@ -167,13 +192,13 @@ trait MiniClusterFeature extends Logging {
masterThread.start()
masterInfo = (master, masterThread)
var masterStartWaitingTime = 0
Thread.sleep(5000)
while (!masterStartedSignal.head) {
logInfo("waiting for master node starting")
masterStartWaitingTime += 5000
if (masterStartWaitingTime >= timeout) {
if (masterStartWaitingTime >= masterWaitingTimeoutMs) {
throw new BindException("cannot start master rpc endpoint")
}
Thread.sleep(5000)
masterStartWaitingTime += 5000
}
master
}
@ -181,7 +206,6 @@ trait MiniClusterFeature extends Logging {
def setUpWorkers(
workerConf: Map[String, String] = null,
workerNum: Int = 3): collection.Set[Worker] = {
val timeout = 30000
val workers = new Array[Worker](workerNum)
val flagUpdateLock = new ReentrantLock()
val threads = (1 to workerNum).map { i =>
@ -204,11 +228,9 @@ trait MiniClusterFeature extends Logging {
workerStarted = false
workerStartRetry += 1
logError(s"cannot start worker $i, retrying: ", ex)
if (workerStartRetry == 3) {
if (workerStartRetry == maxRetries) {
logError(s"cannot start worker $i, reached to max retrying", ex)
throw ex
} else {
TimeUnit.SECONDS.sleep(Math.pow(2, workerStartRetry).toLong)
}
}
}
@ -238,12 +260,12 @@ trait MiniClusterFeature extends Logging {
} catch {
case ex: Throwable =>
logError("all workers haven't been started retrying", ex)
Thread.sleep(5000)
workersWaitingTime += 5000
if (workersWaitingTime >= timeout) {
logError(s"cannot start all workers after $timeout ms", ex)
if (workersWaitingTime >= workersWaitingTimeoutMs) {
logError(s"cannot start all workers after $workersWaitingTimeoutMs ms", ex)
throw ex
}
Thread.sleep(5000)
workersWaitingTime += 5000
}
}
workerInfos.keySet
@ -278,5 +300,7 @@ trait MiniClusterFeature extends Logging {
workerInfos.clear()
masterInfo._2.interrupt()
MemoryManager.reset()
usedPorts.clear()
}
}

View File

@ -32,7 +32,7 @@ class ApiV1WorkerResourceSuite extends ApiV1BaseResourceSuite with MiniClusterFe
override def beforeAll(): Unit = {
logInfo("test initialized, setup celeborn mini cluster")
val (m, w) =
val (_, w) =
setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap, workerNum = 1)
worker = w.head
super.beforeAll()

View File

@ -17,27 +17,25 @@
package org.apache.celeborn.service.deploy.worker.storage
import org.apache.celeborn.common.util.CelebornExitKind
import org.apache.celeborn.server.common.HttpService
import org.apache.celeborn.server.common.http.ApiBaseResourceAuthenticationSuite
import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
import org.apache.celeborn.service.deploy.MiniClusterFeature
import org.apache.celeborn.service.deploy.worker.Worker
class ApiWorkerResourceAuthenticationSuite extends ApiBaseResourceAuthenticationSuite {
class ApiWorkerResourceAuthenticationSuite extends ApiBaseResourceAuthenticationSuite
with MiniClusterFeature {
private var worker: Worker = _
override protected def httpService: HttpService = worker
override def beforeAll(): Unit = {
val workerArgs = new WorkerArguments(Array(), celebornConf)
worker = new Worker(celebornConf, workerArgs)
worker.metricsSystem.start()
worker.startHttpServer()
val (_, w) =
setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap, workerNum = 1)
worker = w.head
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
worker.metricsSystem.stop()
worker.rpcEnv.shutdown()
worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
shutdownMiniCluster()
}
}