diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java index 526998208..0a3736a38 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java @@ -44,7 +44,8 @@ public class CreditStreamManager { private final AtomicLong nextStreamId; private final ConcurrentHashMap streams; private final ConcurrentHashMap activeMapPartitions; - private final HashMap storageFetcherPool = new HashMap<>(); + private final ConcurrentHashMap storageFetcherPool = + JavaUtils.newConcurrentHashMap(); private int minReadBuffers; private int maxReadBuffers; private int threadsPerMountPoint; diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java index 10cd0332b..f94c17c95 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java @@ -19,7 +19,6 @@ package org.apache.celeborn.service.deploy.worker.storage; import java.io.IOException; import java.nio.channels.FileChannel; -import java.util.HashMap; import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; @@ -68,7 +67,7 @@ public class MapPartitionData implements MemoryManager.ReadBufferTargetChangeLis public MapPartitionData( int minReadBuffers, int maxReadBuffers, - HashMap storageFetcherPool, + ConcurrentHashMap storageFetcherPool, int threadsPerMountPoint, DiskFileInfo diskFileInfo, Consumer recycleStream, diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java index 3d3b6cfe8..aa3090a3d 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/segment/SegmentMapPartitionData.java @@ -19,7 +19,7 @@ package org.apache.celeborn.service.deploy.worker.storage.segment; import java.io.IOException; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; @@ -38,7 +38,7 @@ public class SegmentMapPartitionData extends MapPartitionData { public SegmentMapPartitionData( int minReadBuffers, int maxReadBuffers, - HashMap storageFetcherPool, + ConcurrentHashMap storageFetcherPool, int threadsPerMountPoint, DiskFileInfo fileInfo, Consumer recycleStream, diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java index 95064b90e..d886cc864 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManagerSuiteJ.java @@ -79,6 +79,7 @@ public class CreditStreamManagerSuiteJ { new DiskFileInfo( createTemporaryFileWithIndexFile(), new UserIdentifier("default", "default"), conf); MapFileMeta mapFileMeta = new MapFileMeta(1024, 10); + mapFileMeta.setMountPoint("/tmp"); diskFileInfo.replaceFileMeta(mapFileMeta); Consumer streamIdConsumer = streamId -> Assert.assertTrue(streamId > 0);