[CELEBORN-1982] Slot Selection Perf Improvements

### What changes were proposed in this pull request?
After profiling to see where the hotspots are for slot selection, we identified 2 main areas:
- iter.remove ([link](https://github.com/apache/celeborn/blob/main/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java#L447)) is a major hotspot, especially if partitionIdList is massive - since it is an ArrayList and we are removing from the begining - resulting in O(n) deletion costs.
- `haveDisk` is computed per partitionId, iterated across all workers.  We precompute this and store it as a field in `WorkerInfo`.

See the below flamegraph for the hotspot of `iter.remove` (`oop_disjoint_arraycopy`) after running a benchmark.

![Screenshot 2025-04-24 at 12 58 34 AM](https://github.com/user-attachments/assets/30bb38f7-9a92-4b52-8480-5e7f26b0d48b)

Below is what we actually observed in production which matches with the above observation from the benchmark:
![realprodflamegraph](https://github.com/user-attachments/assets/d06e095c-2d6d-4892-982a-1c2e828eb71e)

### Why are the changes needed?
speed up slot selection performance in the case of large partitionIds

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
After applying the above changes, we can see the hotspot is removed in the flamegraph:
![Screenshot 2025-04-24 at 12 53 24 AM](https://github.com/user-attachments/assets/99372140-5746-4a34-9918-642c81fb52e8)

Benchmarks:
Without changes:
```
# Detecting actual CPU count: 12 detected
# JMH version: 1.37
# VM version: JDK 1.8.0_172, Java HotSpot(TM) 64-Bit Server VM, 25.172-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/bin/java
# Blackhole mode: full + dont-inline hint (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
# Warmup: 5 iterations, 5 s each
# Measurement: 5 iterations, 60 s each
# Timeout: 10 min per iteration
# Threads: 12 threads, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: org.apache.celeborn.service.deploy.master.SlotsAllocatorBenchmark.benchmarkSlotSelection

# Run progress: 0.00% complete, ETA 00:05:25
# Fork: 1 of 1
# Warmup Iteration   1: 2060198.745 ±(99.9%) 306976.270 us/op
# Warmup Iteration   2: 1137534.950 ±(99.9%) 72065.776 us/op
# Warmup Iteration   3: 1032434.221 ±(99.9%) 59585.256 us/op
# Warmup Iteration   4: 903621.382 ±(99.9%) 41542.172 us/op
# Warmup Iteration   5: 921816.398 ±(99.9%) 44025.884 us/op
Iteration   1: 853276.360 ±(99.9%) 13285.688 us/op
Iteration   2: 865183.111 ±(99.9%) 9691.856 us/op
Iteration   3: 909971.254 ±(99.9%) 10201.037 us/op
Iteration   4: 874154.240 ±(99.9%) 11287.538 us/op
Iteration   5: 907655.363 ±(99.9%) 11893.789 us/op

Result "org.apache.celeborn.service.deploy.master.SlotsAllocatorBenchmark.benchmarkSlotSelection":
  882048.066 ±(99.9%) 98360.936 us/op [Average]
  (min, avg, max) = (853276.360, 882048.066, 909971.254), stdev = 25544.023
  CI (99.9%): [783687.130, 980409.001] (assumes normal distribution)

# Run complete. Total time: 00:05:43

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark                                       Mode  Cnt       Score       Error  Units
SlotsAllocatorBenchmark.benchmarkSlotSelection  avgt    5  882048.066 ± 98360.936  us/op

Process finished with exit code 0
```
With changes:
```
# Detecting actual CPU count: 12 detected
# JMH version: 1.37
# VM version: JDK 1.8.0_172, Java HotSpot(TM) 64-Bit Server VM, 25.172-b11
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/bin/java
# Blackhole mode: full + dont-inline hint (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
# Warmup: 5 iterations, 5 s each
# Measurement: 5 iterations, 60 s each
# Timeout: 10 min per iteration
# Threads: 12 threads, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: org.apache.celeborn.service.deploy.master.SlotsAllocatorBenchmark.benchmarkSlotSelection

# Run progress: 0.00% complete, ETA 00:05:25
# Fork: 1 of 1
# Warmup Iteration   1: 305437.719 ±(99.9%) 81860.733 us/op
# Warmup Iteration   2: 137498.811 ±(99.9%) 7669.102 us/op
# Warmup Iteration   3: 129355.869 ±(99.9%) 5030.972 us/op
# Warmup Iteration   4: 135311.734 ±(99.9%) 6964.080 us/op
# Warmup Iteration   5: 131013.323 ±(99.9%) 8560.232 us/op
Iteration   1: 133695.396 ±(99.9%) 3713.684 us/op
Iteration   2: 143735.961 ±(99.9%) 5858.078 us/op
Iteration   3: 135619.704 ±(99.9%) 5257.352 us/op
Iteration   4: 128806.160 ±(99.9%) 4541.790 us/op
Iteration   5: 134179.546 ±(99.9%) 5137.425 us/op

Result "org.apache.celeborn.service.deploy.master.SlotsAllocatorBenchmark.benchmarkSlotSelection":
  135207.354 ±(99.9%) 20845.544 us/op [Average]
  (min, avg, max) = (128806.160, 135207.354, 143735.961), stdev = 5413.522
  CI (99.9%): [114361.809, 156052.898] (assumes normal distribution)

# Run complete. Total time: 00:05:29

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark                                       Mode  Cnt       Score       Error  Units
SlotsAllocatorBenchmark.benchmarkSlotSelection  avgt    5  135207.354 ± 20845.544  us/op

Process finished with exit code 0
```

882048.066 us/ops without changes vs 135207.354 us/op with changes. That is about 6.5x improvement.

Closes #3228 from akpatnam25/CELEBORN-1982.

Lead-authored-by: Aravind Patnam <akpatnam25@gmail.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
Aravind Patnam 2025-04-27 11:13:21 +08:00 committed by mingji
parent 5dc79a6353
commit 714722b5d3
7 changed files with 183 additions and 106 deletions

View File

@ -19,6 +19,7 @@ package org.apache.celeborn.common.meta
import java.util
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
@ -45,9 +46,11 @@ class WorkerInfo(
var networkLocation = NetworkTopology.DEFAULT_RACK
var lastHeartbeat: Long = 0
var workerStatus = WorkerStatus.normalWorkerStatus()
val diskInfos =
val diskInfos = {
if (_diskInfos != null) JavaUtils.newConcurrentHashMap[String, DiskInfo](_diskInfos)
else null
}
val workerHasDisk: AtomicBoolean = new AtomicBoolean(computeWorkerHaveDisk)
val userResourceConsumption =
if (_userResourceConsumption != null)
JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption](_userResourceConsumption)
@ -236,6 +239,7 @@ class WorkerInfo(
diskInfos.remove(nonExistsMountPoint)
}
}
workerHasDisk.set(computeWorkerHaveDisk)
JavaUtils.newConcurrentHashMap[String, DiskInfo](diskInfos)
}
@ -306,9 +310,17 @@ class WorkerInfo(
result
}
private def computeWorkerHaveDisk = {
if (diskInfos != null) {
diskInfos.values().asScala.exists(p =>
p.storageType == StorageInfo.Type.SSD || p.storageType == StorageInfo.Type.HDD)
} else {
false
}
}
def haveDisk(): Boolean = {
diskInfos.values().asScala.exists(p =>
p.storageType == StorageInfo.Type.SSD || p.storageType == StorageInfo.Type.HDD)
workerHasDisk.get()
}
}

View File

@ -121,6 +121,16 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -356,12 +356,15 @@ public class SlotsAllocator {
int primaryIndex = rand.nextInt(workerSize);
int replicaIndex = rand.nextInt(workerSize);
Iterator<Integer> iter = partitionIdList.iterator();
ListIterator<Integer> iter = partitionIdList.listIterator(partitionIdList.size());
// Iterate from the end to preserve O(1) removal of processed partitions.
// This is important when we have a high number of concurrent apps that have a
// high number of partitions.
outer:
while (iter.hasNext()) {
while (iter.hasPrevious()) {
int nextPrimaryInd = primaryIndex;
int partitionId = iter.next();
int partitionId = iter.previous();
StorageInfo storageInfo;
if (slotsRestrictions != null && !slotsRestrictions.isEmpty()) {
// this means that we'll select a mount point

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.master;
import static org.apache.celeborn.service.deploy.master.SlotsAllocatorSuiteJ.*;
import static org.openjdk.jmh.annotations.Mode.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.StorageInfo;
public class SlotsAllocatorJmhBenchmark {
private static final int NUM_WORKERS = 1500;
private static final long PARTITION_SIZE = 64 * 1024 * 1024;
private static final String DISK_PATH = "/mnt/disk";
private static final long DISK_SPACE = 512 * 1024 * 1024L;
private static final boolean HAS_DISKS = true;
private static final int NUM_NETWORK_LOCATIONS = 20;
private static final int NUM_PARTITIONS = 100000;
@State(Scope.Thread)
public static class BenchmarkState {
List<WorkerInfo> workers;
List<Integer> partitionIds =
Collections.unmodifiableList(
IntStream.range(0, NUM_PARTITIONS).boxed().collect(Collectors.toList()));
@Setup
public void initialize() {
Map<String, Long> diskPartitionToSize = new HashMap<>();
diskPartitionToSize.put(DISK_PATH, DISK_SPACE);
workers =
basePrepareWorkers(
NUM_WORKERS,
HAS_DISKS,
diskPartitionToSize,
PARTITION_SIZE,
NUM_NETWORK_LOCATIONS,
new Random());
}
}
@Benchmark
@Fork(1)
@Threads(Threads.MAX)
@BenchmarkMode(AverageTime)
@Warmup(iterations = 5, time = 5)
@Measurement(time = 60, timeUnit = TimeUnit.SECONDS, iterations = 5)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void benchmarkSlotSelection(Blackhole blackhole, BenchmarkState state) {
blackhole.consume(
SlotsAllocator.offerSlotsRoundRobin(
state.workers, state.partitionIds, true, true, StorageInfo.ALL_TYPES_AVAILABLE_MASK));
}
public static void main(String[] args) throws Exception {
org.openjdk.jmh.Main.main(args);
}
}

View File

@ -22,9 +22,12 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import scala.Tuple2;
import com.google.common.collect.ImmutableMap;
import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
@ -36,106 +39,17 @@ import org.apache.celeborn.common.protocol.StorageInfo;
public class SlotsAllocatorSuiteJ {
private List<WorkerInfo> prepareWorkers(boolean hasDisks) {
long assumedPartitionSize = 64 * 1024 * 1024;
Random random = new Random();
Map<String, DiskInfo> disks1 = new HashMap<>();
DiskInfo diskInfo1 =
new DiskInfo(
"/mnt/disk1",
random.nextInt() + 100 * 1024 * 1024 * 1024L,
random.nextInt(1000),
random.nextInt(1000),
0);
DiskInfo diskInfo2 =
new DiskInfo(
"/mnt/disk2",
random.nextInt() + 95 * 1024 * 1024 * 1024L,
random.nextInt(1000),
random.nextInt(1000),
0);
DiskInfo diskInfo3 =
new DiskInfo(
"/mnt/disk3",
random.nextInt() + 90 * 1024 * 1024 * 1024L,
random.nextInt(1000),
random.nextInt(1000),
0);
diskInfo1.maxSlots_$eq(diskInfo1.actualUsableSpace() / assumedPartitionSize);
diskInfo2.maxSlots_$eq(diskInfo2.actualUsableSpace() / assumedPartitionSize);
diskInfo3.maxSlots_$eq(diskInfo3.actualUsableSpace() / assumedPartitionSize);
if (hasDisks) {
disks1.put("/mnt/disk1", diskInfo1);
disks1.put("/mnt/disk2", diskInfo2);
disks1.put("/mnt/disk3", diskInfo3);
}
Map<String, DiskInfo> disks2 = new HashMap<>();
DiskInfo diskInfo4 =
new DiskInfo(
"/mnt/disk1",
random.nextInt() + 100 * 1024 * 1024 * 1024L,
random.nextInt(1000),
random.nextInt(1000),
0);
DiskInfo diskInfo5 =
new DiskInfo(
"/mnt/disk2",
random.nextInt() + 95 * 1024 * 1024 * 1024L,
random.nextInt(1000),
random.nextInt(1000),
0);
DiskInfo diskInfo6 =
new DiskInfo(
"/mnt/disk3",
random.nextInt() + 90 * 1024 * 1024 * 1024L,
random.nextInt(1000),
random.nextInt(1000),
0);
diskInfo4.maxSlots_$eq(diskInfo4.actualUsableSpace() / assumedPartitionSize);
diskInfo5.maxSlots_$eq(diskInfo5.actualUsableSpace() / assumedPartitionSize);
diskInfo6.maxSlots_$eq(diskInfo6.actualUsableSpace() / assumedPartitionSize);
if (hasDisks) {
disks2.put("/mnt/disk1", diskInfo4);
disks2.put("/mnt/disk2", diskInfo5);
disks2.put("/mnt/disk3", diskInfo6);
}
Map<String, DiskInfo> disks3 = new HashMap<>();
DiskInfo diskInfo7 =
new DiskInfo(
"/mnt/disk1",
random.nextInt() + 100 * 1024 * 1024 * 1024L,
random.nextInt(1000),
random.nextInt(1000),
0);
DiskInfo diskInfo8 =
new DiskInfo(
"/mnt/disk2",
random.nextInt() + 95 * 1024 * 1024 * 1024L,
random.nextInt(1000),
random.nextInt(1000),
0);
DiskInfo diskInfo9 =
new DiskInfo(
"/mnt/disk3",
random.nextInt() + 90 * 1024 * 1024 * 1024L,
random.nextInt(1000),
random.nextInt(1000),
0);
diskInfo7.maxSlots_$eq(diskInfo7.actualUsableSpace() / assumedPartitionSize);
diskInfo8.maxSlots_$eq(diskInfo8.actualUsableSpace() / assumedPartitionSize);
diskInfo9.maxSlots_$eq(diskInfo9.actualUsableSpace() / assumedPartitionSize);
if (hasDisks) {
disks3.put("/mnt/disk2", diskInfo8);
disks3.put("/mnt/disk1", diskInfo7);
disks3.put("/mnt/disk3", diskInfo9);
}
ArrayList<WorkerInfo> workers = new ArrayList<>(3);
workers.add(new WorkerInfo("host1", 9, 10, 110, 113, 116, disks1, null));
workers.add(new WorkerInfo("host2", 9, 11, 111, 114, 118, disks2, null));
workers.add(new WorkerInfo("host3", 9, 12, 112, 115, 120, disks3, null));
return workers;
return basePrepareWorkers(
3,
hasDisks,
ImmutableMap.of(
"/mnt/disk1", random.nextInt() + 100 * 1024 * 1024 * 1024L,
"/mnt/disk2", random.nextInt() + 95 * 1024 * 1024 * 1024L,
"/mnt/disk3", random.nextInt() + 90 * 1024 * 1024 * 1024L),
assumedPartitionSize,
3,
random);
}
@Test
@ -551,4 +465,36 @@ public class SlotsAllocatorSuiteJ {
checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, true, false, true);
checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, false, false, true);
}
static List<WorkerInfo> basePrepareWorkers(
int numWorkers,
boolean hasDisks,
Map<String, Long> diskPartitionToSize,
long assumedPartitionSize,
int numNetworkLocations,
Random random) {
return IntStream.range(0, numWorkers)
.mapToObj(
i -> {
Map<String, DiskInfo> disks = new HashMap<>();
if (hasDisks) {
diskPartitionToSize.forEach(
(diskMountPoint, diskSize) -> {
DiskInfo diskInfo =
new DiskInfo(
diskMountPoint,
diskSize,
random.nextInt(1000),
random.nextInt(1000),
0);
diskInfo.maxSlots_$eq(diskInfo.actualUsableSpace() / assumedPartitionSize);
disks.put(diskMountPoint, diskInfo);
});
}
WorkerInfo worker = new WorkerInfo("host" + i, i, i, i, i, i, disks, null);
worker.networkLocation_$eq(String.valueOf(i % numNetworkLocations));
return worker;
})
.collect(Collectors.toList());
}
}

13
pom.xml
View File

@ -108,6 +108,7 @@
<snappy.version>1.1.10.5</snappy.version>
<ap.loader.version>3.0-9</ap.loader.version>
<picocli.version>4.7.6</picocli.version>
<jmh.version>1.37</jmh.version>
<!-- Db dependencies -->
<mybatis.version>3.5.15</mybatis.version>
@ -740,6 +741,18 @@
<artifactId>picocli</artifactId>
<version>${picocli.version}</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -85,6 +85,7 @@ object Dependencies {
val httpCore5Version = "5.2.4"
val jakartaAnnotationApiVersion = "1.3.5"
val picocliVersion = "4.7.6"
val jmhVersion = "1.37"
// For SSL support
val bouncycastleVersion = "1.77"
@ -264,6 +265,9 @@ object Dependencies {
)
val picocli = "info.picocli" % "picocli" % picocliVersion
val jmhCore = "org.openjdk.jmh" % "jmh-core" % jmhVersion % "test"
val jmhGeneratorAnnprocess = "org.openjdk.jmh" % "jmh-generator-annprocess" % jmhVersion % "test"
}
object CelebornCommonSettings {
@ -728,6 +732,8 @@ object CelebornMaster {
Seq.empty
}
lazy val jmhDependencies = Seq(Dependencies.jmhCore, Dependencies.jmhGeneratorAnnprocess)
lazy val master = Project("celeborn-master", file("master"))
.dependsOn(CelebornCommon.common)
.dependsOn(CelebornCommon.common % "test->test;compile->compile")
@ -750,7 +756,7 @@ object CelebornMaster {
Dependencies.ratisServer,
Dependencies.ratisShell,
Dependencies.scalatestMockito % "test",
) ++ commonUnitTestDependencies ++ mpuDependencies
) ++ commonUnitTestDependencies ++ mpuDependencies ++ jmhDependencies
)
}