[CELEBORN-1317][FOLLOWUP] Retry to setup mini cluster if the cause is BindException

### What changes were proposed in this pull request?
To fix the UT for http server port already in use issue.

For Jetty HttpServer, if failed to bind port, the exception is IOException and the cause is BindException, we should retry for that.

Before:
```
    case e: BindException => // retry to setup mini cluster
```

Now:
```
    case e: IOException
         if e.isInstanceOf[BindException] || Option(e.getCause).exists(
           _.isInstanceOf[BindException]) =>  // retry to setup mini cluster
```

### Why are the changes needed?

To fix the UT for http server port already in use issue.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Will trigger GA for 3 three times.

Closes #2424 from turboFei/set_connector_stop_timeout.

Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
Fei Wang 2024-03-28 10:28:47 +08:00 committed by Shuang
parent adbc77cd4f
commit ceed216a39
4 changed files with 36 additions and 27 deletions

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.master
package org.apache.celeborn.service.deploy.master.http.api
import javax.ws.rs.core.MediaType
@ -25,6 +25,7 @@ import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
import org.apache.celeborn.server.common.HttpService
import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
class ApiMasterResourceSuite extends ApiBaseResourceSuite {
private var master: Master = _

View File

@ -44,27 +44,34 @@ private[celeborn] case class HttpServer(
isStarted = true
} catch {
case e: Exception =>
stop(CelebornExitKind.EXIT_IMMEDIATELY)
stopInternal(CelebornExitKind.EXIT_IMMEDIATELY)
throw e
}
}
def stop(exitCode: Int): Unit = synchronized {
if (isStarted) {
if (exitCode == CelebornExitKind.EXIT_IMMEDIATELY) {
server.setStopTimeout(0)
}
logInfo(s"$role: Stopping HttpServer")
server.stop()
connector.stop()
server.getThreadPool match {
case lifeCycle: LifeCycle => lifeCycle.stop()
case _ =>
}
logInfo(s"$role: HttpServer stopped.")
isStarted = false
stopInternal(exitCode)
}
}
private def stopInternal(exitCode: Int): Unit = {
if (exitCode == CelebornExitKind.EXIT_IMMEDIATELY) {
server.setStopTimeout(0)
connector.setStopTimeout(0)
}
logInfo(s"$role: Stopping HttpServer")
server.stop()
server.join()
connector.stop()
server.getThreadPool match {
case lifeCycle: LifeCycle => lifeCycle.stop()
case _ =>
}
logInfo(s"$role: HttpServer stopped.")
isStarted = false
}
def getServerUri: String = connector.getHost + ":" + connector.getLocalPort
def addHandler(handler: Handler): Unit = synchronized {

View File

@ -17,6 +17,7 @@
package org.apache.celeborn.service.deploy
import java.io.IOException
import java.net.BindException
import java.nio.file.Files
import java.util.concurrent.locks.{Lock, ReentrantLock}
@ -70,7 +71,9 @@ trait MiniClusterFeature extends Logging {
workers = w
created = true
} catch {
case e: BindException =>
case e: IOException
if e.isInstanceOf[BindException] || Option(e.getCause).exists(
_.isInstanceOf[BindException]) =>
logError(s"failed to setup mini cluster, retrying (retry count: $retryCount)", e)
retryCount += 1
if (retryCount == 3) {

View File

@ -15,33 +15,31 @@
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.worker.storage
package org.apache.celeborn.service.deploy.worker.http.api
import javax.ws.rs.core.MediaType
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
import org.apache.celeborn.server.common.HttpService
import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
import org.apache.celeborn.service.deploy.MiniClusterFeature
import org.apache.celeborn.service.deploy.worker.Worker
class ApiWorkerResourceSuite extends ApiBaseResourceSuite {
class ApiWorkerResourceSuite extends ApiBaseResourceSuite with MiniClusterFeature {
private var worker: Worker = _
override protected def httpService: HttpService = worker
override def beforeAll(): Unit = {
val workerArgs = new WorkerArguments(Array(), celebornConf)
worker = new Worker(celebornConf, workerArgs)
worker.metricsSystem.start()
worker.startHttpServer()
logInfo("test initialized, setup celeborn mini cluster")
val (_, w) =
setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap, workerNum = 1)
worker = w.head
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
worker.metricsSystem.stop()
worker.rpcEnv.shutdown()
worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
logInfo("all test complete, stop celeborn mini cluster")
shutdownMiniCluster()
}
test("listPartitionLocationInfo") {