diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 9024c1b7f..84be612da 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -19,6 +19,7 @@ package org.apache.celeborn.common.meta
import java.util
import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
@@ -45,9 +46,11 @@ class WorkerInfo(
var networkLocation = NetworkTopology.DEFAULT_RACK
var lastHeartbeat: Long = 0
var workerStatus = WorkerStatus.normalWorkerStatus()
- val diskInfos =
+ val diskInfos = {
if (_diskInfos != null) JavaUtils.newConcurrentHashMap[String, DiskInfo](_diskInfos)
else null
+ }
+ val workerHasDisk: AtomicBoolean = new AtomicBoolean(computeWorkerHaveDisk)
val userResourceConsumption =
if (_userResourceConsumption != null)
JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption](_userResourceConsumption)
@@ -236,6 +239,7 @@ class WorkerInfo(
diskInfos.remove(nonExistsMountPoint)
}
}
+ workerHasDisk.set(computeWorkerHaveDisk)
JavaUtils.newConcurrentHashMap[String, DiskInfo](diskInfos)
}
@@ -306,9 +310,17 @@ class WorkerInfo(
result
}
+ private def computeWorkerHaveDisk = {
+ if (diskInfos != null) {
+ diskInfos.values().asScala.exists(p =>
+ p.storageType == StorageInfo.Type.SSD || p.storageType == StorageInfo.Type.HDD)
+ } else {
+ false
+ }
+ }
+
def haveDisk(): Boolean = {
- diskInfos.values().asScala.exists(p =>
- p.storageType == StorageInfo.Type.SSD || p.storageType == StorageInfo.Type.HDD)
+ workerHasDisk.get()
}
}
diff --git a/master/pom.xml b/master/pom.xml
index 25f349f4e..94858ba63 100644
--- a/master/pom.xml
+++ b/master/pom.xml
@@ -121,6 +121,16 @@
test-jar
test
+
+ org.openjdk.jmh
+ jmh-core
+ test
+
+
+ org.openjdk.jmh
+ jmh-generator-annprocess
+ test
+
diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index 75e680a3b..62582018f 100644
--- a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -356,12 +356,15 @@ public class SlotsAllocator {
int primaryIndex = rand.nextInt(workerSize);
int replicaIndex = rand.nextInt(workerSize);
- Iterator iter = partitionIdList.iterator();
+ ListIterator iter = partitionIdList.listIterator(partitionIdList.size());
+ // Iterate from the end to preserve O(1) removal of processed partitions.
+ // This is important when we have a high number of concurrent apps that have a
+ // high number of partitions.
outer:
- while (iter.hasNext()) {
+ while (iter.hasPrevious()) {
int nextPrimaryInd = primaryIndex;
- int partitionId = iter.next();
+ int partitionId = iter.previous();
StorageInfo storageInfo;
if (slotsRestrictions != null && !slotsRestrictions.isEmpty()) {
// this means that we'll select a mount point
diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorJmhBenchmark.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorJmhBenchmark.java
new file mode 100644
index 000000000..7ce80fee9
--- /dev/null
+++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorJmhBenchmark.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.master;
+
+import static org.apache.celeborn.service.deploy.master.SlotsAllocatorSuiteJ.*;
+import static org.openjdk.jmh.annotations.Mode.*;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.infra.Blackhole;
+
+import org.apache.celeborn.common.meta.WorkerInfo;
+import org.apache.celeborn.common.protocol.StorageInfo;
+
+public class SlotsAllocatorJmhBenchmark {
+
+ private static final int NUM_WORKERS = 1500;
+ private static final long PARTITION_SIZE = 64 * 1024 * 1024;
+ private static final String DISK_PATH = "/mnt/disk";
+ private static final long DISK_SPACE = 512 * 1024 * 1024L;
+ private static final boolean HAS_DISKS = true;
+ private static final int NUM_NETWORK_LOCATIONS = 20;
+ private static final int NUM_PARTITIONS = 100000;
+
+ @State(Scope.Thread)
+ public static class BenchmarkState {
+ List workers;
+ List partitionIds =
+ Collections.unmodifiableList(
+ IntStream.range(0, NUM_PARTITIONS).boxed().collect(Collectors.toList()));
+
+ @Setup
+ public void initialize() {
+ Map diskPartitionToSize = new HashMap<>();
+ diskPartitionToSize.put(DISK_PATH, DISK_SPACE);
+ workers =
+ basePrepareWorkers(
+ NUM_WORKERS,
+ HAS_DISKS,
+ diskPartitionToSize,
+ PARTITION_SIZE,
+ NUM_NETWORK_LOCATIONS,
+ new Random());
+ }
+ }
+
+ @Benchmark
+ @Fork(1)
+ @Threads(Threads.MAX)
+ @BenchmarkMode(AverageTime)
+ @Warmup(iterations = 5, time = 5)
+ @Measurement(time = 60, timeUnit = TimeUnit.SECONDS, iterations = 5)
+ @OutputTimeUnit(TimeUnit.MICROSECONDS)
+ public void benchmarkSlotSelection(Blackhole blackhole, BenchmarkState state) {
+
+ blackhole.consume(
+ SlotsAllocator.offerSlotsRoundRobin(
+ state.workers, state.partitionIds, true, true, StorageInfo.ALL_TYPES_AVAILABLE_MASK));
+ }
+
+ public static void main(String[] args) throws Exception {
+ org.openjdk.jmh.Main.main(args);
+ }
+}
diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 95bd3a284..f43acb927 100644
--- a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -22,9 +22,12 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import scala.Tuple2;
+import com.google.common.collect.ImmutableMap;
import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
@@ -36,106 +39,17 @@ import org.apache.celeborn.common.protocol.StorageInfo;
public class SlotsAllocatorSuiteJ {
private List prepareWorkers(boolean hasDisks) {
long assumedPartitionSize = 64 * 1024 * 1024;
-
Random random = new Random();
- Map disks1 = new HashMap<>();
- DiskInfo diskInfo1 =
- new DiskInfo(
- "/mnt/disk1",
- random.nextInt() + 100 * 1024 * 1024 * 1024L,
- random.nextInt(1000),
- random.nextInt(1000),
- 0);
- DiskInfo diskInfo2 =
- new DiskInfo(
- "/mnt/disk2",
- random.nextInt() + 95 * 1024 * 1024 * 1024L,
- random.nextInt(1000),
- random.nextInt(1000),
- 0);
- DiskInfo diskInfo3 =
- new DiskInfo(
- "/mnt/disk3",
- random.nextInt() + 90 * 1024 * 1024 * 1024L,
- random.nextInt(1000),
- random.nextInt(1000),
- 0);
- diskInfo1.maxSlots_$eq(diskInfo1.actualUsableSpace() / assumedPartitionSize);
- diskInfo2.maxSlots_$eq(diskInfo2.actualUsableSpace() / assumedPartitionSize);
- diskInfo3.maxSlots_$eq(diskInfo3.actualUsableSpace() / assumedPartitionSize);
- if (hasDisks) {
- disks1.put("/mnt/disk1", diskInfo1);
- disks1.put("/mnt/disk2", diskInfo2);
- disks1.put("/mnt/disk3", diskInfo3);
- }
-
- Map disks2 = new HashMap<>();
- DiskInfo diskInfo4 =
- new DiskInfo(
- "/mnt/disk1",
- random.nextInt() + 100 * 1024 * 1024 * 1024L,
- random.nextInt(1000),
- random.nextInt(1000),
- 0);
- DiskInfo diskInfo5 =
- new DiskInfo(
- "/mnt/disk2",
- random.nextInt() + 95 * 1024 * 1024 * 1024L,
- random.nextInt(1000),
- random.nextInt(1000),
- 0);
- DiskInfo diskInfo6 =
- new DiskInfo(
- "/mnt/disk3",
- random.nextInt() + 90 * 1024 * 1024 * 1024L,
- random.nextInt(1000),
- random.nextInt(1000),
- 0);
- diskInfo4.maxSlots_$eq(diskInfo4.actualUsableSpace() / assumedPartitionSize);
- diskInfo5.maxSlots_$eq(diskInfo5.actualUsableSpace() / assumedPartitionSize);
- diskInfo6.maxSlots_$eq(diskInfo6.actualUsableSpace() / assumedPartitionSize);
- if (hasDisks) {
- disks2.put("/mnt/disk1", diskInfo4);
- disks2.put("/mnt/disk2", diskInfo5);
- disks2.put("/mnt/disk3", diskInfo6);
- }
-
- Map disks3 = new HashMap<>();
- DiskInfo diskInfo7 =
- new DiskInfo(
- "/mnt/disk1",
- random.nextInt() + 100 * 1024 * 1024 * 1024L,
- random.nextInt(1000),
- random.nextInt(1000),
- 0);
- DiskInfo diskInfo8 =
- new DiskInfo(
- "/mnt/disk2",
- random.nextInt() + 95 * 1024 * 1024 * 1024L,
- random.nextInt(1000),
- random.nextInt(1000),
- 0);
- DiskInfo diskInfo9 =
- new DiskInfo(
- "/mnt/disk3",
- random.nextInt() + 90 * 1024 * 1024 * 1024L,
- random.nextInt(1000),
- random.nextInt(1000),
- 0);
- diskInfo7.maxSlots_$eq(diskInfo7.actualUsableSpace() / assumedPartitionSize);
- diskInfo8.maxSlots_$eq(diskInfo8.actualUsableSpace() / assumedPartitionSize);
- diskInfo9.maxSlots_$eq(diskInfo9.actualUsableSpace() / assumedPartitionSize);
- if (hasDisks) {
- disks3.put("/mnt/disk2", diskInfo8);
- disks3.put("/mnt/disk1", diskInfo7);
- disks3.put("/mnt/disk3", diskInfo9);
- }
-
- ArrayList workers = new ArrayList<>(3);
- workers.add(new WorkerInfo("host1", 9, 10, 110, 113, 116, disks1, null));
- workers.add(new WorkerInfo("host2", 9, 11, 111, 114, 118, disks2, null));
- workers.add(new WorkerInfo("host3", 9, 12, 112, 115, 120, disks3, null));
- return workers;
+ return basePrepareWorkers(
+ 3,
+ hasDisks,
+ ImmutableMap.of(
+ "/mnt/disk1", random.nextInt() + 100 * 1024 * 1024 * 1024L,
+ "/mnt/disk2", random.nextInt() + 95 * 1024 * 1024 * 1024L,
+ "/mnt/disk3", random.nextInt() + 90 * 1024 * 1024 * 1024L),
+ assumedPartitionSize,
+ 3,
+ random);
}
@Test
@@ -551,4 +465,36 @@ public class SlotsAllocatorSuiteJ {
checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, true, false, true);
checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, false, false, true);
}
+
+ static List basePrepareWorkers(
+ int numWorkers,
+ boolean hasDisks,
+ Map diskPartitionToSize,
+ long assumedPartitionSize,
+ int numNetworkLocations,
+ Random random) {
+ return IntStream.range(0, numWorkers)
+ .mapToObj(
+ i -> {
+ Map disks = new HashMap<>();
+ if (hasDisks) {
+ diskPartitionToSize.forEach(
+ (diskMountPoint, diskSize) -> {
+ DiskInfo diskInfo =
+ new DiskInfo(
+ diskMountPoint,
+ diskSize,
+ random.nextInt(1000),
+ random.nextInt(1000),
+ 0);
+ diskInfo.maxSlots_$eq(diskInfo.actualUsableSpace() / assumedPartitionSize);
+ disks.put(diskMountPoint, diskInfo);
+ });
+ }
+ WorkerInfo worker = new WorkerInfo("host" + i, i, i, i, i, i, disks, null);
+ worker.networkLocation_$eq(String.valueOf(i % numNetworkLocations));
+ return worker;
+ })
+ .collect(Collectors.toList());
+ }
}
diff --git a/pom.xml b/pom.xml
index e523c2eae..0b7d37af0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,6 +108,7 @@
1.1.10.5
3.0-9
4.7.6
+ 1.37
3.5.15
@@ -740,6 +741,18 @@
picocli
${picocli.version}
+
+ org.openjdk.jmh
+ jmh-core
+ ${jmh.version}
+ test
+
+
+ org.openjdk.jmh
+ jmh-generator-annprocess
+ ${jmh.version}
+ test
+
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index dfeff9c30..e8818977f 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -85,6 +85,7 @@ object Dependencies {
val httpCore5Version = "5.2.4"
val jakartaAnnotationApiVersion = "1.3.5"
val picocliVersion = "4.7.6"
+ val jmhVersion = "1.37"
// For SSL support
val bouncycastleVersion = "1.77"
@@ -264,6 +265,9 @@ object Dependencies {
)
val picocli = "info.picocli" % "picocli" % picocliVersion
+
+ val jmhCore = "org.openjdk.jmh" % "jmh-core" % jmhVersion % "test"
+ val jmhGeneratorAnnprocess = "org.openjdk.jmh" % "jmh-generator-annprocess" % jmhVersion % "test"
}
object CelebornCommonSettings {
@@ -728,6 +732,8 @@ object CelebornMaster {
Seq.empty
}
+ lazy val jmhDependencies = Seq(Dependencies.jmhCore, Dependencies.jmhGeneratorAnnprocess)
+
lazy val master = Project("celeborn-master", file("master"))
.dependsOn(CelebornCommon.common)
.dependsOn(CelebornCommon.common % "test->test;compile->compile")
@@ -750,7 +756,7 @@ object CelebornMaster {
Dependencies.ratisServer,
Dependencies.ratisShell,
Dependencies.scalatestMockito % "test",
- ) ++ commonUnitTestDependencies ++ mpuDependencies
+ ) ++ commonUnitTestDependencies ++ mpuDependencies ++ jmhDependencies
)
}