[CELEBORN-1034] Offer slots uses random range of available workers instead of shuffling

### What changes were proposed in this pull request?
In original design, (primary worker, replica worker) pairs tends to stay stable, for example,
for primary PartitionLocations on Worker A, their replica PartitionLocations will all be on
Worker B in general scenarios, i.e. all workers are healthy and works well. This way, one Worker
will have only one (or very few) connections to other workers' replicate netty server.

However, https://github.com/apache/incubator-celeborn/pull/1790 calls `Collections.shuffle(availableWorkers)`,
causing the number of replica connections increases dramatically:
![image](https://github.com/apache/incubator-celeborn/assets/948245/013c7bc8-a224-413e-9c0c-519ae76c9d32)

### Why are the changes needed?
This PR refine the logic of selecting limited number of workers, instead of shuffling,
Master just randomly picks a range of available workers.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manual test.

Closes #1975 from waitinfuture/1034.

Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
zky.zhoukeyong 2023-10-18 17:00:03 +08:00 committed by mingji
parent 8bf7e5259d
commit a5dfd67d5b
No known key found for this signature in database
GPG Key ID: 6392F71F37356FA0

View File

@ -615,22 +615,31 @@ private[celeborn] class Master(
val numReducers = requestSlots.partitionIdList.size()
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, requestSlots.shuffleId)
var availableWorkers = workersAvailable()
Collections.shuffle(availableWorkers)
val availableWorkers = workersAvailable()
val numAvailableWorkers = availableWorkers.size()
val numWorkers = Math.min(
Math.max(
if (requestSlots.shouldReplicate) 2 else 1,
if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers
else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)),
availableWorkers.size())
availableWorkers = availableWorkers.subList(0, numWorkers)
numAvailableWorkers)
val startIndex = Random.nextInt(numAvailableWorkers)
val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers)
selectedWorkers.addAll(availableWorkers.subList(
startIndex,
Math.min(numAvailableWorkers, startIndex + numWorkers)))
if (startIndex + numWorkers > numAvailableWorkers) {
selectedWorkers.addAll(availableWorkers.subList(
0,
startIndex + numWorkers - numAvailableWorkers))
}
// offer slots
val slots =
masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") {
statusSystem.workers.synchronized {
if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && !hasHDFSStorage) {
SlotsAllocator.offerSlotsLoadAware(
availableWorkers,
selectedWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware,
@ -641,7 +650,7 @@ private[celeborn] class Master(
loadAwareFetchTimeWeight)
} else {
SlotsAllocator.offerSlotsRoundRobin(
availableWorkers,
selectedWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
requestSlots.shouldRackAware)