diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 18a5b7560..a2226b6b0 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -615,22 +615,31 @@ private[celeborn] class Master( val numReducers = requestSlots.partitionIdList.size() val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, requestSlots.shuffleId) - var availableWorkers = workersAvailable() - Collections.shuffle(availableWorkers) + val availableWorkers = workersAvailable() + val numAvailableWorkers = availableWorkers.size() val numWorkers = Math.min( Math.max( if (requestSlots.shouldReplicate) 2 else 1, if (requestSlots.maxWorkers <= 0) slotsAssignMaxWorkers else Math.min(slotsAssignMaxWorkers, requestSlots.maxWorkers)), - availableWorkers.size()) - availableWorkers = availableWorkers.subList(0, numWorkers) + numAvailableWorkers) + val startIndex = Random.nextInt(numAvailableWorkers) + val selectedWorkers = new util.ArrayList[WorkerInfo](numWorkers) + selectedWorkers.addAll(availableWorkers.subList( + startIndex, + Math.min(numAvailableWorkers, startIndex + numWorkers))) + if (startIndex + numWorkers > numAvailableWorkers) { + selectedWorkers.addAll(availableWorkers.subList( + 0, + startIndex + numWorkers - numAvailableWorkers)) + } // offer slots val slots = masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") { statusSystem.workers.synchronized { if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE && !hasHDFSStorage) { SlotsAllocator.offerSlotsLoadAware( - availableWorkers, + selectedWorkers, requestSlots.partitionIdList, requestSlots.shouldReplicate, requestSlots.shouldRackAware, @@ -641,7 +650,7 @@ private[celeborn] class Master( loadAwareFetchTimeWeight) } else { SlotsAllocator.offerSlotsRoundRobin( - availableWorkers, + selectedWorkers, requestSlots.partitionIdList, requestSlots.shouldReplicate, requestSlots.shouldRackAware)