diff --git a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java index ced5dede2..73c08d2d3 100644 --- a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java +++ b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java @@ -58,7 +58,7 @@ public class RemoteShuffleMasterSuiteJ { @Before public void setUp() { configuration = new Configuration(); - int startPort = Utils$.MODULE$.selectRandomPort(1024, 65535); + int startPort = Utils$.MODULE$.selectRandomInt(1024, 65535); configuration.setInteger("celeborn.master.port", startPort); configuration.setString("celeborn.master.endpoints", "localhost:" + startPort); remoteShuffleMaster = createShuffleMaster(configuration); diff --git a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java index ced5dede2..73c08d2d3 100644 --- a/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java +++ b/client-flink/flink-1.15/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java @@ -58,7 +58,7 @@ public class RemoteShuffleMasterSuiteJ { @Before public void setUp() { configuration = new Configuration(); - int startPort = Utils$.MODULE$.selectRandomPort(1024, 65535); + int startPort = Utils$.MODULE$.selectRandomInt(1024, 65535); configuration.setInteger("celeborn.master.port", startPort); configuration.setString("celeborn.master.endpoints", "localhost:" + startPort); remoteShuffleMaster = createShuffleMaster(configuration); diff --git a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java index 8b474e56c..95eb47538 100644 --- a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java +++ b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java @@ -65,7 +65,7 @@ public class RemoteShuffleMasterSuiteJ { @Before public void setUp() { configuration = new Configuration(); - int startPort = Utils$.MODULE$.selectRandomPort(1024, 65535); + int startPort = Utils$.MODULE$.selectRandomInt(1024, 65535); configuration.setInteger("celeborn.master.port", startPort); configuration.setString("celeborn.master.endpoints", "localhost:" + startPort); remoteShuffleMaster = createShuffleMaster(configuration); diff --git a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java index b9a2c94f4..ed6fc18e4 100644 --- a/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java +++ b/client-flink/flink-1.17/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java @@ -65,7 +65,7 @@ public class RemoteShuffleMasterSuiteJ { @Before public void setUp() { configuration = new Configuration(); - int startPort = Utils$.MODULE$.selectRandomPort(1024, 65535); + int startPort = Utils$.MODULE$.selectRandomInt(1024, 65535); configuration.setInteger("celeborn.master.port", startPort); configuration.setString("celeborn.master.endpoints", "localhost:" + startPort); remoteShuffleMaster = createShuffleMaster(configuration); diff --git a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java index b9a2c94f4..ed6fc18e4 100644 --- a/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java +++ b/client-flink/flink-1.18/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java @@ -65,7 +65,7 @@ public class RemoteShuffleMasterSuiteJ { @Before public void setUp() { configuration = new Configuration(); - int startPort = Utils$.MODULE$.selectRandomPort(1024, 65535); + int startPort = Utils$.MODULE$.selectRandomInt(1024, 65535); configuration.setInteger("celeborn.master.port", startPort); configuration.setString("celeborn.master.endpoints", "localhost:" + startPort); remoteShuffleMaster = createShuffleMaster(configuration); diff --git a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java index c02c87ac6..bd50c87b1 100644 --- a/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java +++ b/client-flink/flink-1.19/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java @@ -65,7 +65,7 @@ public class RemoteShuffleMasterSuiteJ { @Before public void setUp() { configuration = new Configuration(); - int startPort = Utils$.MODULE$.selectRandomPort(1024, 65535); + int startPort = Utils$.MODULE$.selectRandomInt(1024, 65535); configuration.setInteger("celeborn.master.port", startPort); configuration.setString("celeborn.master.endpoints", "localhost:" + startPort); remoteShuffleMaster = createShuffleMaster(configuration); diff --git a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java index ba8b9c2e4..db5da0f38 100644 --- a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java +++ b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterSuiteJ.java @@ -68,7 +68,7 @@ public class RemoteShuffleMasterSuiteJ { @Before public void setUp() { configuration = new Configuration(); - int startPort = Utils$.MODULE$.selectRandomPort(1024, 65535); + int startPort = Utils$.MODULE$.selectRandomInt(1024, 65535); configuration.setInteger("celeborn.master.port", startPort); configuration.setString("celeborn.master.endpoints", "localhost:" + startPort); remoteShuffleMaster = createShuffleMaster(configuration); diff --git a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgentSuiteJ.java b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgentSuiteJ.java index f53d010cd..1ee5b8a19 100644 --- a/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgentSuiteJ.java +++ b/client-flink/flink-1.20/src/test/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierMasterAgentSuiteJ.java @@ -53,7 +53,7 @@ public class CelebornTierMasterAgentSuiteJ { @Before public void setUp() { Configuration configuration = new Configuration(); - int startPort = Utils$.MODULE$.selectRandomPort(1024, 65535); + int startPort = Utils$.MODULE$.selectRandomInt(1024, 65535); configuration.setInteger("celeborn.master.port", startPort); configuration.setString("celeborn.master.endpoints", "localhost:" + startPort); masterAgent = createMasterAgent(configuration); diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index 7c8f16846..545685378 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -245,8 +245,15 @@ object Utils extends Logging { } } - def selectRandomPort(from: Int, to: Int): Int = { - ScalaRandom.nextInt(to - from) + from + /** + * Select a random integer within the specified range. + * + * @param from the lower bound of the range (inclusive) + * @param until the upper bound of the range (exclusive) + * @return a randomly selected integer within the range [from, until) + */ + def selectRandomInt(from: Int, until: Int): Int = { + ScalaRandom.nextInt(until - 1 - from) + from } def startServiceOnPort[T]( diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java index b04a419f4..5f9e07de2 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java @@ -119,7 +119,7 @@ public class RatisMasterStatusSystemSuiteJ { String id2 = UUID.randomUUID().toString(); String id3 = UUID.randomUUID().toString(); - int ratisPort1 = Utils$.MODULE$.selectRandomPort(1024, 65535); + int ratisPort1 = Utils$.MODULE$.selectRandomInt(1024, 65535); int ratisPort2 = ratisPort1 + 1; int ratisPort3 = ratisPort2 + 1; diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala index 73a4e3307..50a3eaf6f 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceAuthenticationSuite.scala @@ -36,7 +36,7 @@ class ApiMasterResourceAuthenticationSuite extends ApiBaseResourceAuthentication } override def beforeAll(): Unit = { - val randomMasterPort = Utils.selectRandomPort(1024, 65535) + val randomMasterPort = Utils.selectRandomInt(1024, 65535) val randomHttpPort = randomMasterPort + 1 celebornConf.set(CelebornConf.HA_ENABLED.key, "false") celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala index 940ac017f..ed27aac48 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala @@ -41,7 +41,7 @@ class MasterSuite extends AnyFunSuite test("test single node startup functionality") { val conf = new CelebornConf() - val randomMasterPort = Utils.selectRandomPort(1024, 65535) + val randomMasterPort = Utils.selectRandomInt(1024, 65535) val randomHttpPort = randomMasterPort + 1 conf.set(CelebornConf.HA_ENABLED.key, "false") conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) @@ -104,7 +104,7 @@ class MasterSuite extends AnyFunSuite test("test master worker host allow and deny pattern") { val conf = new CelebornConf() - val randomMasterPort = Utils.selectRandomPort(1024, 65535) + val randomMasterPort = Utils.selectRandomInt(1024, 65535) val randomHttpPort = randomMasterPort + 1 conf.set(CelebornConf.HA_ENABLED.key, "false") conf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala index b0c3a5e21..fba331bb2 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala @@ -39,7 +39,7 @@ class ApiMasterResourceSuite extends ApiBaseResourceSuite { } override def beforeAll(): Unit = { - val randomMasterPort = Utils.selectRandomPort(1024, 65535) + val randomMasterPort = Utils.selectRandomInt(1024, 65535) val randomHttpPort = randomMasterPort + 1 celebornConf.set(CelebornConf.HA_ENABLED.key, "false") celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala index 400761109..089e28df1 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApiV1MasterResourceSuite.scala @@ -42,7 +42,7 @@ class ApiV1MasterResourceSuite extends ApiV1BaseResourceSuite { } override def beforeAll(): Unit = { - val randomMasterPort = Utils.selectRandomPort(1024, 65535) + val randomMasterPort = Utils.selectRandomInt(1024, 65535) val randomHttpPort = randomMasterPort + 1 celebornConf.set(CelebornConf.HA_ENABLED.key, "false") celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala index c66a36d51..bf5994527 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala @@ -28,7 +28,7 @@ import scala.collection.mutable import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.util.{CelebornExitKind, Utils} -import org.apache.celeborn.common.util.Utils.selectRandomPort +import org.apache.celeborn.common.util.Utils.selectRandomInt import org.apache.celeborn.service.deploy.master.{Master, MasterArguments} import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments} import org.apache.celeborn.service.deploy.worker.memory.MemoryManager @@ -56,8 +56,8 @@ trait MiniClusterFeature extends Logging { var workers: collection.Set[Worker] = null while (!created) { try { - val randomPort = selectRandomPort(1024, 65535) - val randomInternalPort = selectRandomPort(1024, 65535) + val randomPort = selectRandomInt(1024, 65535) + val randomInternalPort = selectRandomInt(1024, 65535) val finalMasterConf = Map( s"${CelebornConf.MASTER_HOST.key}" -> "localhost", s"${CelebornConf.PORT_MAX_RETRY.key}" -> "0", @@ -103,7 +103,7 @@ trait MiniClusterFeature extends Logging { private def createMaster(map: Map[String, String] = null): Master = { val conf = new CelebornConf() conf.set(CelebornConf.METRICS_ENABLED.key, "false") - val httpPort = selectRandomPort(1024, 65535) + val httpPort = selectRandomInt(1024, 65535) conf.set(CelebornConf.MASTER_HTTP_PORT.key, s"$httpPort") logInfo(s"set ${CelebornConf.MASTER_HTTP_PORT.key} to $httpPort") if (map != null) { @@ -128,7 +128,7 @@ trait MiniClusterFeature extends Logging { conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir) conf.set(CelebornConf.WORKER_DISK_MONITOR_ENABLED.key, "false") conf.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K") - conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomPort(1024, 65535)}") + conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomInt(1024, 65535)}") conf.set("celeborn.fetch.io.threads", "4") conf.set("celeborn.push.io.threads", "4") if (map != null) {