From 714722b5d39939dcc6c676efb38399a4ce4b241a Mon Sep 17 00:00:00 2001 From: Aravind Patnam Date: Sun, 27 Apr 2025 11:13:21 +0800 Subject: [PATCH] [CELEBORN-1982] Slot Selection Perf Improvements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? After profiling to see where the hotspots are for slot selection, we identified 2 main areas: - iter.remove ([link](https://github.com/apache/celeborn/blob/main/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java#L447)) is a major hotspot, especially if partitionIdList is massive - since it is an ArrayList and we are removing from the begining - resulting in O(n) deletion costs. - `haveDisk` is computed per partitionId, iterated across all workers. We precompute this and store it as a field in `WorkerInfo`. See the below flamegraph for the hotspot of `iter.remove` (`oop_disjoint_arraycopy`) after running a benchmark. ![Screenshot 2025-04-24 at 12 58 34 AM](https://github.com/user-attachments/assets/30bb38f7-9a92-4b52-8480-5e7f26b0d48b) Below is what we actually observed in production which matches with the above observation from the benchmark: ![realprodflamegraph](https://github.com/user-attachments/assets/d06e095c-2d6d-4892-982a-1c2e828eb71e) ### Why are the changes needed? speed up slot selection performance in the case of large partitionIds ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? After applying the above changes, we can see the hotspot is removed in the flamegraph: ![Screenshot 2025-04-24 at 12 53 24 AM](https://github.com/user-attachments/assets/99372140-5746-4a34-9918-642c81fb52e8) Benchmarks: Without changes: ``` # Detecting actual CPU count: 12 detected # JMH version: 1.37 # VM version: JDK 1.8.0_172, Java HotSpot(TM) 64-Bit Server VM, 25.172-b11 # VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/bin/java # Blackhole mode: full + dont-inline hint (auto-detected, use -Djmh.blackhole.autoDetect=false to disable) # Warmup: 5 iterations, 5 s each # Measurement: 5 iterations, 60 s each # Timeout: 10 min per iteration # Threads: 12 threads, will synchronize iterations # Benchmark mode: Average time, time/op # Benchmark: org.apache.celeborn.service.deploy.master.SlotsAllocatorBenchmark.benchmarkSlotSelection # Run progress: 0.00% complete, ETA 00:05:25 # Fork: 1 of 1 # Warmup Iteration 1: 2060198.745 ±(99.9%) 306976.270 us/op # Warmup Iteration 2: 1137534.950 ±(99.9%) 72065.776 us/op # Warmup Iteration 3: 1032434.221 ±(99.9%) 59585.256 us/op # Warmup Iteration 4: 903621.382 ±(99.9%) 41542.172 us/op # Warmup Iteration 5: 921816.398 ±(99.9%) 44025.884 us/op Iteration 1: 853276.360 ±(99.9%) 13285.688 us/op Iteration 2: 865183.111 ±(99.9%) 9691.856 us/op Iteration 3: 909971.254 ±(99.9%) 10201.037 us/op Iteration 4: 874154.240 ±(99.9%) 11287.538 us/op Iteration 5: 907655.363 ±(99.9%) 11893.789 us/op Result "org.apache.celeborn.service.deploy.master.SlotsAllocatorBenchmark.benchmarkSlotSelection": 882048.066 ±(99.9%) 98360.936 us/op [Average] (min, avg, max) = (853276.360, 882048.066, 909971.254), stdev = 25544.023 CI (99.9%): [783687.130, 980409.001] (assumes normal distribution) # Run complete. Total time: 00:05:43 REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial experiments, perform baseline and negative tests that provide experimental control, make sure the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts. Do not assume the numbers tell you what you want them to tell. Benchmark Mode Cnt Score Error Units SlotsAllocatorBenchmark.benchmarkSlotSelection avgt 5 882048.066 ± 98360.936 us/op Process finished with exit code 0 ``` With changes: ``` # Detecting actual CPU count: 12 detected # JMH version: 1.37 # VM version: JDK 1.8.0_172, Java HotSpot(TM) 64-Bit Server VM, 25.172-b11 # VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/bin/java # Blackhole mode: full + dont-inline hint (auto-detected, use -Djmh.blackhole.autoDetect=false to disable) # Warmup: 5 iterations, 5 s each # Measurement: 5 iterations, 60 s each # Timeout: 10 min per iteration # Threads: 12 threads, will synchronize iterations # Benchmark mode: Average time, time/op # Benchmark: org.apache.celeborn.service.deploy.master.SlotsAllocatorBenchmark.benchmarkSlotSelection # Run progress: 0.00% complete, ETA 00:05:25 # Fork: 1 of 1 # Warmup Iteration 1: 305437.719 ±(99.9%) 81860.733 us/op # Warmup Iteration 2: 137498.811 ±(99.9%) 7669.102 us/op # Warmup Iteration 3: 129355.869 ±(99.9%) 5030.972 us/op # Warmup Iteration 4: 135311.734 ±(99.9%) 6964.080 us/op # Warmup Iteration 5: 131013.323 ±(99.9%) 8560.232 us/op Iteration 1: 133695.396 ±(99.9%) 3713.684 us/op Iteration 2: 143735.961 ±(99.9%) 5858.078 us/op Iteration 3: 135619.704 ±(99.9%) 5257.352 us/op Iteration 4: 128806.160 ±(99.9%) 4541.790 us/op Iteration 5: 134179.546 ±(99.9%) 5137.425 us/op Result "org.apache.celeborn.service.deploy.master.SlotsAllocatorBenchmark.benchmarkSlotSelection": 135207.354 ±(99.9%) 20845.544 us/op [Average] (min, avg, max) = (128806.160, 135207.354, 143735.961), stdev = 5413.522 CI (99.9%): [114361.809, 156052.898] (assumes normal distribution) # Run complete. Total time: 00:05:29 REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial experiments, perform baseline and negative tests that provide experimental control, make sure the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts. Do not assume the numbers tell you what you want them to tell. Benchmark Mode Cnt Score Error Units SlotsAllocatorBenchmark.benchmarkSlotSelection avgt 5 135207.354 ± 20845.544 us/op Process finished with exit code 0 ``` 882048.066 us/ops without changes vs 135207.354 us/op with changes. That is about 6.5x improvement. Closes #3228 from akpatnam25/CELEBORN-1982. Lead-authored-by: Aravind Patnam Co-authored-by: Wang, Fei Signed-off-by: mingji --- .../celeborn/common/meta/WorkerInfo.scala | 18 ++- master/pom.xml | 10 ++ .../service/deploy/master/SlotsAllocator.java | 9 +- .../master/SlotsAllocatorJmhBenchmark.java | 87 +++++++++++ .../deploy/master/SlotsAllocatorSuiteJ.java | 144 ++++++------------ pom.xml | 13 ++ project/CelebornBuild.scala | 8 +- 7 files changed, 183 insertions(+), 106 deletions(-) create mode 100644 master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorJmhBenchmark.java diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala index 9024c1b7f..84be612da 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala @@ -19,6 +19,7 @@ package org.apache.celeborn.common.meta import java.util import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ @@ -45,9 +46,11 @@ class WorkerInfo( var networkLocation = NetworkTopology.DEFAULT_RACK var lastHeartbeat: Long = 0 var workerStatus = WorkerStatus.normalWorkerStatus() - val diskInfos = + val diskInfos = { if (_diskInfos != null) JavaUtils.newConcurrentHashMap[String, DiskInfo](_diskInfos) else null + } + val workerHasDisk: AtomicBoolean = new AtomicBoolean(computeWorkerHaveDisk) val userResourceConsumption = if (_userResourceConsumption != null) JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption](_userResourceConsumption) @@ -236,6 +239,7 @@ class WorkerInfo( diskInfos.remove(nonExistsMountPoint) } } + workerHasDisk.set(computeWorkerHaveDisk) JavaUtils.newConcurrentHashMap[String, DiskInfo](diskInfos) } @@ -306,9 +310,17 @@ class WorkerInfo( result } + private def computeWorkerHaveDisk = { + if (diskInfos != null) { + diskInfos.values().asScala.exists(p => + p.storageType == StorageInfo.Type.SSD || p.storageType == StorageInfo.Type.HDD) + } else { + false + } + } + def haveDisk(): Boolean = { - diskInfos.values().asScala.exists(p => - p.storageType == StorageInfo.Type.SSD || p.storageType == StorageInfo.Type.HDD) + workerHasDisk.get() } } diff --git a/master/pom.xml b/master/pom.xml index 25f349f4e..94858ba63 100644 --- a/master/pom.xml +++ b/master/pom.xml @@ -121,6 +121,16 @@ test-jar test + + org.openjdk.jmh + jmh-core + test + + + org.openjdk.jmh + jmh-generator-annprocess + test + diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java index 75e680a3b..62582018f 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java @@ -356,12 +356,15 @@ public class SlotsAllocator { int primaryIndex = rand.nextInt(workerSize); int replicaIndex = rand.nextInt(workerSize); - Iterator iter = partitionIdList.iterator(); + ListIterator iter = partitionIdList.listIterator(partitionIdList.size()); + // Iterate from the end to preserve O(1) removal of processed partitions. + // This is important when we have a high number of concurrent apps that have a + // high number of partitions. outer: - while (iter.hasNext()) { + while (iter.hasPrevious()) { int nextPrimaryInd = primaryIndex; - int partitionId = iter.next(); + int partitionId = iter.previous(); StorageInfo storageInfo; if (slotsRestrictions != null && !slotsRestrictions.isEmpty()) { // this means that we'll select a mount point diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorJmhBenchmark.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorJmhBenchmark.java new file mode 100644 index 000000000..7ce80fee9 --- /dev/null +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorJmhBenchmark.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.master; + +import static org.apache.celeborn.service.deploy.master.SlotsAllocatorSuiteJ.*; +import static org.openjdk.jmh.annotations.Mode.*; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import org.apache.celeborn.common.meta.WorkerInfo; +import org.apache.celeborn.common.protocol.StorageInfo; + +public class SlotsAllocatorJmhBenchmark { + + private static final int NUM_WORKERS = 1500; + private static final long PARTITION_SIZE = 64 * 1024 * 1024; + private static final String DISK_PATH = "/mnt/disk"; + private static final long DISK_SPACE = 512 * 1024 * 1024L; + private static final boolean HAS_DISKS = true; + private static final int NUM_NETWORK_LOCATIONS = 20; + private static final int NUM_PARTITIONS = 100000; + + @State(Scope.Thread) + public static class BenchmarkState { + List workers; + List partitionIds = + Collections.unmodifiableList( + IntStream.range(0, NUM_PARTITIONS).boxed().collect(Collectors.toList())); + + @Setup + public void initialize() { + Map diskPartitionToSize = new HashMap<>(); + diskPartitionToSize.put(DISK_PATH, DISK_SPACE); + workers = + basePrepareWorkers( + NUM_WORKERS, + HAS_DISKS, + diskPartitionToSize, + PARTITION_SIZE, + NUM_NETWORK_LOCATIONS, + new Random()); + } + } + + @Benchmark + @Fork(1) + @Threads(Threads.MAX) + @BenchmarkMode(AverageTime) + @Warmup(iterations = 5, time = 5) + @Measurement(time = 60, timeUnit = TimeUnit.SECONDS, iterations = 5) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void benchmarkSlotSelection(Blackhole blackhole, BenchmarkState state) { + + blackhole.consume( + SlotsAllocator.offerSlotsRoundRobin( + state.workers, state.partitionIds, true, true, StorageInfo.ALL_TYPES_AVAILABLE_MASK)); + } + + public static void main(String[] args) throws Exception { + org.openjdk.jmh.Main.main(args); + } +} diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java index 95bd3a284..f43acb927 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java @@ -22,9 +22,12 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import scala.Tuple2; +import com.google.common.collect.ImmutableMap; import org.junit.Test; import org.apache.celeborn.common.CelebornConf; @@ -36,106 +39,17 @@ import org.apache.celeborn.common.protocol.StorageInfo; public class SlotsAllocatorSuiteJ { private List prepareWorkers(boolean hasDisks) { long assumedPartitionSize = 64 * 1024 * 1024; - Random random = new Random(); - Map disks1 = new HashMap<>(); - DiskInfo diskInfo1 = - new DiskInfo( - "/mnt/disk1", - random.nextInt() + 100 * 1024 * 1024 * 1024L, - random.nextInt(1000), - random.nextInt(1000), - 0); - DiskInfo diskInfo2 = - new DiskInfo( - "/mnt/disk2", - random.nextInt() + 95 * 1024 * 1024 * 1024L, - random.nextInt(1000), - random.nextInt(1000), - 0); - DiskInfo diskInfo3 = - new DiskInfo( - "/mnt/disk3", - random.nextInt() + 90 * 1024 * 1024 * 1024L, - random.nextInt(1000), - random.nextInt(1000), - 0); - diskInfo1.maxSlots_$eq(diskInfo1.actualUsableSpace() / assumedPartitionSize); - diskInfo2.maxSlots_$eq(diskInfo2.actualUsableSpace() / assumedPartitionSize); - diskInfo3.maxSlots_$eq(diskInfo3.actualUsableSpace() / assumedPartitionSize); - if (hasDisks) { - disks1.put("/mnt/disk1", diskInfo1); - disks1.put("/mnt/disk2", diskInfo2); - disks1.put("/mnt/disk3", diskInfo3); - } - - Map disks2 = new HashMap<>(); - DiskInfo diskInfo4 = - new DiskInfo( - "/mnt/disk1", - random.nextInt() + 100 * 1024 * 1024 * 1024L, - random.nextInt(1000), - random.nextInt(1000), - 0); - DiskInfo diskInfo5 = - new DiskInfo( - "/mnt/disk2", - random.nextInt() + 95 * 1024 * 1024 * 1024L, - random.nextInt(1000), - random.nextInt(1000), - 0); - DiskInfo diskInfo6 = - new DiskInfo( - "/mnt/disk3", - random.nextInt() + 90 * 1024 * 1024 * 1024L, - random.nextInt(1000), - random.nextInt(1000), - 0); - diskInfo4.maxSlots_$eq(diskInfo4.actualUsableSpace() / assumedPartitionSize); - diskInfo5.maxSlots_$eq(diskInfo5.actualUsableSpace() / assumedPartitionSize); - diskInfo6.maxSlots_$eq(diskInfo6.actualUsableSpace() / assumedPartitionSize); - if (hasDisks) { - disks2.put("/mnt/disk1", diskInfo4); - disks2.put("/mnt/disk2", diskInfo5); - disks2.put("/mnt/disk3", diskInfo6); - } - - Map disks3 = new HashMap<>(); - DiskInfo diskInfo7 = - new DiskInfo( - "/mnt/disk1", - random.nextInt() + 100 * 1024 * 1024 * 1024L, - random.nextInt(1000), - random.nextInt(1000), - 0); - DiskInfo diskInfo8 = - new DiskInfo( - "/mnt/disk2", - random.nextInt() + 95 * 1024 * 1024 * 1024L, - random.nextInt(1000), - random.nextInt(1000), - 0); - DiskInfo diskInfo9 = - new DiskInfo( - "/mnt/disk3", - random.nextInt() + 90 * 1024 * 1024 * 1024L, - random.nextInt(1000), - random.nextInt(1000), - 0); - diskInfo7.maxSlots_$eq(diskInfo7.actualUsableSpace() / assumedPartitionSize); - diskInfo8.maxSlots_$eq(diskInfo8.actualUsableSpace() / assumedPartitionSize); - diskInfo9.maxSlots_$eq(diskInfo9.actualUsableSpace() / assumedPartitionSize); - if (hasDisks) { - disks3.put("/mnt/disk2", diskInfo8); - disks3.put("/mnt/disk1", diskInfo7); - disks3.put("/mnt/disk3", diskInfo9); - } - - ArrayList workers = new ArrayList<>(3); - workers.add(new WorkerInfo("host1", 9, 10, 110, 113, 116, disks1, null)); - workers.add(new WorkerInfo("host2", 9, 11, 111, 114, 118, disks2, null)); - workers.add(new WorkerInfo("host3", 9, 12, 112, 115, 120, disks3, null)); - return workers; + return basePrepareWorkers( + 3, + hasDisks, + ImmutableMap.of( + "/mnt/disk1", random.nextInt() + 100 * 1024 * 1024 * 1024L, + "/mnt/disk2", random.nextInt() + 95 * 1024 * 1024 * 1024L, + "/mnt/disk3", random.nextInt() + 90 * 1024 * 1024 * 1024L), + assumedPartitionSize, + 3, + random); } @Test @@ -551,4 +465,36 @@ public class SlotsAllocatorSuiteJ { checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, true, false, true); checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, false, false, true); } + + static List basePrepareWorkers( + int numWorkers, + boolean hasDisks, + Map diskPartitionToSize, + long assumedPartitionSize, + int numNetworkLocations, + Random random) { + return IntStream.range(0, numWorkers) + .mapToObj( + i -> { + Map disks = new HashMap<>(); + if (hasDisks) { + diskPartitionToSize.forEach( + (diskMountPoint, diskSize) -> { + DiskInfo diskInfo = + new DiskInfo( + diskMountPoint, + diskSize, + random.nextInt(1000), + random.nextInt(1000), + 0); + diskInfo.maxSlots_$eq(diskInfo.actualUsableSpace() / assumedPartitionSize); + disks.put(diskMountPoint, diskInfo); + }); + } + WorkerInfo worker = new WorkerInfo("host" + i, i, i, i, i, i, disks, null); + worker.networkLocation_$eq(String.valueOf(i % numNetworkLocations)); + return worker; + }) + .collect(Collectors.toList()); + } } diff --git a/pom.xml b/pom.xml index e523c2eae..0b7d37af0 100644 --- a/pom.xml +++ b/pom.xml @@ -108,6 +108,7 @@ 1.1.10.5 3.0-9 4.7.6 + 1.37 3.5.15 @@ -740,6 +741,18 @@ picocli ${picocli.version} + + org.openjdk.jmh + jmh-core + ${jmh.version} + test + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + test + diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala index dfeff9c30..e8818977f 100644 --- a/project/CelebornBuild.scala +++ b/project/CelebornBuild.scala @@ -85,6 +85,7 @@ object Dependencies { val httpCore5Version = "5.2.4" val jakartaAnnotationApiVersion = "1.3.5" val picocliVersion = "4.7.6" + val jmhVersion = "1.37" // For SSL support val bouncycastleVersion = "1.77" @@ -264,6 +265,9 @@ object Dependencies { ) val picocli = "info.picocli" % "picocli" % picocliVersion + + val jmhCore = "org.openjdk.jmh" % "jmh-core" % jmhVersion % "test" + val jmhGeneratorAnnprocess = "org.openjdk.jmh" % "jmh-generator-annprocess" % jmhVersion % "test" } object CelebornCommonSettings { @@ -728,6 +732,8 @@ object CelebornMaster { Seq.empty } + lazy val jmhDependencies = Seq(Dependencies.jmhCore, Dependencies.jmhGeneratorAnnprocess) + lazy val master = Project("celeborn-master", file("master")) .dependsOn(CelebornCommon.common) .dependsOn(CelebornCommon.common % "test->test;compile->compile") @@ -750,7 +756,7 @@ object CelebornMaster { Dependencies.ratisServer, Dependencies.ratisShell, Dependencies.scalatestMockito % "test", - ) ++ commonUnitTestDependencies ++ mpuDependencies + ) ++ commonUnitTestDependencies ++ mpuDependencies ++ jmhDependencies ) }