[CELEBORN-596] Worker don't need to update disk max slots (#1502)
This commit is contained in:
parent
6e166662f1
commit
6619015a63
@ -197,7 +197,7 @@ class WorkerInfo(
|
||||
|
||||
def updateThenGetDiskInfos(
|
||||
newDiskInfos: java.util.Map[String, DiskInfo],
|
||||
estimatedPartitionSize: Long): util.Map[String, DiskInfo] = this.synchronized {
|
||||
estimatedPartitionSize: Option[Long] = None): util.Map[String, DiskInfo] = this.synchronized {
|
||||
import scala.collection.JavaConverters._
|
||||
for (newDisk <- newDiskInfos.values().asScala) {
|
||||
val mountPoint: String = newDisk.mountPoint
|
||||
@ -207,10 +207,14 @@ class WorkerInfo(
|
||||
curDisk.activeSlots_$eq(Math.max(curDisk.activeSlots, newDisk.activeSlots))
|
||||
curDisk.avgFlushTime_$eq(newDisk.avgFlushTime)
|
||||
curDisk.avgFetchTime_$eq(newDisk.avgFetchTime)
|
||||
curDisk.maxSlots_$eq(curDisk.actualUsableSpace / estimatedPartitionSize)
|
||||
if (estimatedPartitionSize.nonEmpty) {
|
||||
curDisk.maxSlots_$eq(curDisk.actualUsableSpace / estimatedPartitionSize.get)
|
||||
}
|
||||
curDisk.setStatus(newDisk.status)
|
||||
} else {
|
||||
newDisk.maxSlots_$eq(newDisk.actualUsableSpace / estimatedPartitionSize)
|
||||
if (estimatedPartitionSize.nonEmpty) {
|
||||
newDisk.maxSlots_$eq(newDisk.actualUsableSpace / estimatedPartitionSize.get)
|
||||
}
|
||||
diskInfos.put(mountPoint, newDisk)
|
||||
}
|
||||
}
|
||||
|
||||
@ -33,6 +33,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import scala.Option;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -200,7 +202,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
|
||||
Optional<WorkerInfo> workerInfo = workers.stream().filter(w -> w.equals(worker)).findFirst();
|
||||
workerInfo.ifPresent(
|
||||
info -> {
|
||||
info.updateThenGetDiskInfos(disks, estimatedPartitionSize);
|
||||
info.updateThenGetDiskInfos(disks, Option.apply(estimatedPartitionSize));
|
||||
info.updateThenGetUserResourceConsumption(userResourceConsumption);
|
||||
availableSlots.set(info.totalAvailableSlots());
|
||||
info.lastHeartbeat_$eq(time);
|
||||
|
||||
@ -285,9 +285,9 @@ private[celeborn] class Worker(
|
||||
Seq.empty[DiskInfo]
|
||||
} else {
|
||||
storageManager.updateDiskInfos()
|
||||
workerInfo.updateThenGetDiskInfos(
|
||||
storageManager.disksSnapshot().map { disk => disk.mountPoint -> disk }.toMap.asJava,
|
||||
conf.initialEstimatedPartitionSize).values().asScala.toSeq
|
||||
workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map { disk =>
|
||||
disk.mountPoint -> disk
|
||||
}.toMap.asJava).values().asScala.toSeq
|
||||
}
|
||||
val resourceConsumption = workerInfo.updateThenGetUserResourceConsumption(
|
||||
storageManager.userResourceConsumptionSnapshot().asJava)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user