diff --git a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java index 6e11aa501..cab0a3624 100644 --- a/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java +++ b/common/src/main/java/org/apache/celeborn/common/write/InFlightRequestTracker.java @@ -58,28 +58,22 @@ public class InFlightRequestTracker { } public void addBatch(int batchId, String hostAndPushPort) { - Set batchIdSet = + Set batchIdSetPerPair = inflightBatchesPerAddress.computeIfAbsent( hostAndPushPort, id -> ConcurrentHashMap.newKeySet()); - if (batchIdSet.add(batchId)) { - totalInflightReqs.increment(); - } else { - logger.debug("{} has already been inflight.", batchId); - } + batchIdSetPerPair.add(batchId); + totalInflightReqs.increment(); } public void removeBatch(int batchId, String hostAndPushPort) { Set batchIdSet = inflightBatchesPerAddress.get(hostAndPushPort); // TODO: Need to debug why batchIdSet will be null. if (batchIdSet != null) { - if (batchIdSet.remove(batchId)) { - totalInflightReqs.decrement(); - } else { - logger.debug("BatchIdSet has removed {}.", batchId); - } + batchIdSet.remove(batchId); } else { logger.warn("BatchIdSet of {} is null.", hostAndPushPort); } + totalInflightReqs.decrement(); } public void onSuccess(String hostAndPushPort) { @@ -103,7 +97,7 @@ public class InFlightRequestTracker { pushStrategy.limitPushSpeed(pushState, hostAndPushPort); int currentMaxReqsInFlight = pushStrategy.getCurrentMaxReqsInFlight(hostAndPushPort); - Set batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort); + Set batchIdSet = getBatchIdSetByAddressPair(hostAndPushPort); long times = waitInflightTimeoutMs / delta; try { while (times > 0) {