From 4e7df13af7db018a86fced467623062bfca8d1f2 Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Thu, 7 Nov 2024 13:51:39 +0800 Subject: [PATCH] [CELEBORN-1693] Fix storageFetcherPool concurrent problem ### What changes were proposed in this pull request? Fix storageFetcherPool concurrent problem. There may be duplicate thread pools created as multi-thread race condition. ![image](https://github.com/user-attachments/assets/ba4b0964-700e-4502-933a-b6c7cb93f32d) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No need. Closes #2886 from reswqa/storageFetcherPool. Authored-by: Weijie Guo Signed-off-by: SteNicholas --- .../service/deploy/worker/storage/CreditStreamManager.java | 3 ++- .../service/deploy/worker/storage/MapPartitionData.java | 3 +-- .../worker/storage/segment/SegmentMapPartitionData.java | 4 ++-- .../deploy/worker/storage/CreditStreamManagerSuiteJ.java | 1 + 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java index 526998208..0a3736a38 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java @@ -44,7 +44,8 @@ public class CreditStreamManager { private final AtomicLong nextStreamId; private final ConcurrentHashMap streams; private final ConcurrentHashMap activeMapPartitions; - private final HashMap storageFetcherPool = new HashMap<>(); + private final ConcurrentHashMap storageFetcherPool = + JavaUtils.newConcurrentHashMap(); private int minReadBuffers; private int maxReadBuffers; private int threadsPerMountPoint; diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java index 10cd0332b..f94c17c95 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java @@ -19,7 +19,6 @@ package org.apache.celeborn.service.deploy.worker.storage; import java.io.IOException; import java.nio.channels.FileChannel; -import java.util.HashMap; import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; @@ -68,7 +67,7 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis public MapPartitionData( int minReadBuffers, int maxReadBuffers, - HashMap storageFetcherPool, + ConcurrentHashMap storageFetcherPool, int threadsPerMountPoint, DiskFileInfo diskFileInfo, Consumer recycleStream, diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java index 3d3b6cfe8..aa3090a3d 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java @@ -19,7 +19,7 @@ package org.apache.celeborn.service.deploy.worker.storage.segment; import java.io.IOException; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; @@ -38,7 +38,7 @@ public class SegmentMapPartitionData extends MapPartitionData { public SegmentMapPartitionData( int minReadBuffers, int maxReadBuffers, - HashMap storageFetcherPool, + ConcurrentHashMap storageFetcherPool, int threadsPerMountPoint, DiskFileInfo fileInfo, Consumer recycleStream, diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java index 95064b90e..d886cc864 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java @@ -79,6 +79,7 @@ public class CreditStreamManagerSuiteJ { new DiskFileInfo( createTemporaryFileWithIndexFile(), new UserIdentifier("default", "default"), conf); MapFileMeta mapFileMeta = new MapFileMeta(1024, 10); + mapFileMeta.setMountPoint("/tmp"); diskFileInfo.replaceFileMeta(mapFileMeta); Consumer streamIdConsumer = streamId -> Assert.assertTrue(streamId > 0);