From 52dcd3b5df9c6ee27261f776545b341ceeeabdcd Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Sat, 8 Jul 2023 21:46:37 +0800 Subject: [PATCH] [CELEBORN-777][BUG] CongestionControl getPotentialConsumeSpeed throw /zero error ### What changes were proposed in this pull request? In `TimeSlidingHub.add()` `_deque` will clear then add the pair. ``` if (nodesToAdd >= maxQueueSize) { // The new node exceed existing sliding list, need to clear all old nodes // and create a new sliding list _deque.clear(); _deque.add(Pair.of(currentTimestamp, (N) newNode.clone())); sumNode = (N) newNode.clone(); return; } ``` Then when call `BufferStatusHub.avgBytesPerSec()`, `currentNumBytes` can be `> 0` but `getCurrentTimeWindowsInMills` may return 0. Cause the error. ``` public long avgBytesPerSec() { long currentNumBytes = sum().numBytes(); if (currentNumBytes > 0) { return currentNumBytes * 1000 / (long) getCurrentTimeWindowsInMills(); } return 0L; } ``` ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1690 from AngersZhuuuu/CELEBORN-777. Authored-by: Angerszhuuuu Signed-off-by: Angerszhuuuu --- .../congestcontrol/BufferStatusHub.java | 7 +++- .../worker/congestcontrol/TimeSlidingHub.java | 38 +++++++++---------- .../congestcontrol/TestTimeSlidingHub.java | 18 ++++----- 3 files changed, 33 insertions(+), 30 deletions(-) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/BufferStatusHub.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/BufferStatusHub.java index 38c7282f4..14a0dfd5a 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/BufferStatusHub.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/BufferStatusHub.java @@ -19,6 +19,8 @@ package org.apache.celeborn.service.deploy.worker.congestcontrol; import java.util.concurrent.atomic.LongAdder; +import org.apache.commons.lang3.tuple.Pair; + public class BufferStatusHub extends TimeSlidingHub { public static class BufferStatusNode implements TimeSlidingHub.TimeSlidingNode { @@ -73,9 +75,10 @@ public class BufferStatusHub extends TimeSlidingHub sumInfo = sum(); + long currentNumBytes = sumInfo.getKey().numBytes(); if (currentNumBytes > 0) { - return currentNumBytes * 1000 / (long) getCurrentTimeWindowsInMills(); + return currentNumBytes * 1000 / (long) sumInfo.getRight() * intervalPerBucketInMills; } return 0L; } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java index ef34f07c6..6951b0d5d 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TimeSlidingHub.java @@ -25,7 +25,7 @@ import org.apache.commons.lang3.tuple.Pair; /** * A time sliding list that group different {@link TimeSlidingNode} with corresponding timestamp by - * exact interval 1 second. Internally hold a {@link sumNode} to get the sum of the nodes in the + * exact interval 1 second. Internally hold a {@link sumInfo} to get the sum of the nodes in the * list. * *

This list is thread-safe, but {@link TimeSlidingNode} returned by the method {@link sum} @@ -51,20 +51,20 @@ public abstract class TimeSlidingHub { } // 1 second. - private final int intervalPerBucketInMills = 1000; + protected final int intervalPerBucketInMills = 1000; private final int maxQueueSize; - private N sumNode; + private Pair sumInfo; private final LinkedBlockingDeque> _deque; public TimeSlidingHub(int timeWindowsInSecs) { this._deque = new LinkedBlockingDeque<>(); this.maxQueueSize = timeWindowsInSecs * 1000 / intervalPerBucketInMills; - this.sumNode = newEmptyNode(); + this.sumInfo = Pair.of(newEmptyNode(), 0); } - public N sum() { - return sumNode; + public Pair sum() { + return sumInfo; } public void add(N newNode) { @@ -75,7 +75,7 @@ public abstract class TimeSlidingHub { public synchronized void add(long currentTimestamp, N newNode) { if (_deque.size() == 0) { _deque.add(Pair.of(currentTimestamp, (N) newNode.clone())); - sumNode = (N) newNode.clone(); + sumInfo = Pair.of((N) newNode.clone(), 1); return; } @@ -87,29 +87,33 @@ public abstract class TimeSlidingHub { // The node doesn't belong to the lastNode, there might be 2 different scenarios // 1. All existing nodes are out of date, should be removed // 2. some nodes are out of date, should be removed - long nodesToAdd = timeDiff / intervalPerBucketInMills; + int nodesToAdd = (int) timeDiff / intervalPerBucketInMills; if (nodesToAdd >= maxQueueSize) { // The new node exceed existing sliding list, need to clear all old nodes // and create a new sliding list _deque.clear(); _deque.add(Pair.of(currentTimestamp, (N) newNode.clone())); - sumNode = (N) newNode.clone(); + sumInfo = Pair.of((N) newNode.clone(), 1); return; } // Add new node at the end of the list, and deprecate nodes out of timeInterval - for (long i = 1; i < nodesToAdd; i++) { + for (int i = 1; i < nodesToAdd; i++) { N toAdd = newEmptyNode(); lastNode = Pair.of(lastNode.getLeft() + intervalPerBucketInMills, toAdd); _deque.add(lastNode); } _deque.add(Pair.of(lastNode.getLeft() + intervalPerBucketInMills, (N) newNode.clone())); - sumNode.combineNode(newNode); + N nodeToCombine = sumInfo.getLeft(); + nodeToCombine.combineNode(newNode); + sumInfo = Pair.of(nodeToCombine, sumInfo.getRight() + nodesToAdd); while (_deque.size() > maxQueueSize) { Pair removed = _deque.removeFirst(); - sumNode.separateNode(removed.getRight()); + N nodeToSeparate = sumInfo.getLeft(); + nodeToSeparate.separateNode(removed.getRight()); + sumInfo = Pair.of(nodeToSeparate, sumInfo.getRight() - 1); } return; } @@ -121,7 +125,7 @@ public abstract class TimeSlidingHub { Pair curNode = iter.next(); if (currentTimestamp - curNode.getLeft() >= 0) { curNode.getRight().combineNode(newNode); - sumNode.combineNode(newNode); + sumInfo.getLeft().combineNode(newNode); return; } } @@ -132,22 +136,18 @@ public abstract class TimeSlidingHub { // Belong to last node lastNode.getRight().combineNode(newNode); - sumNode.combineNode(newNode); + sumInfo.getLeft().combineNode(newNode); } public void clear() { synchronized (_deque) { _deque.clear(); - sumNode = newEmptyNode(); + sumInfo = Pair.of(newEmptyNode(), 0); } } protected abstract N newEmptyNode(); - protected int getCurrentTimeWindowsInMills() { - return _deque.size() * intervalPerBucketInMills; - } - @VisibleForTesting protected long currentTimeMillis() { return System.currentTimeMillis(); diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java index 683eafa5b..0bcc7af8b 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestTimeSlidingHub.java @@ -84,34 +84,34 @@ public class TestTimeSlidingHub { hub.setDummyTimestamp(0L); hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(1)); - Assert.assertEquals(1, hub.sum().getValue()); + Assert.assertEquals(1, hub.sum().getLeft().getValue()); hub.setDummyTimestamp(1000L); hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(2)); - Assert.assertEquals(3, hub.sum().getValue()); + Assert.assertEquals(3, hub.sum().getLeft().getValue()); hub.setDummyTimestamp(2200L); hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(3)); - Assert.assertEquals(6, hub.sum().getValue()); + Assert.assertEquals(6, hub.sum().getLeft().getValue()); hub.setDummyTimestamp(2400L); hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(4)); - Assert.assertEquals(10, hub.sum().getValue()); + Assert.assertEquals(10, hub.sum().getLeft().getValue()); // Should remove the value 1 hub.setDummyTimestamp(3000L); hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(5)); - Assert.assertEquals(14, hub.sum().getValue()); + Assert.assertEquals(14, hub.sum().getLeft().getValue()); // Should remove the value 2 hub.setDummyTimestamp(4000L); hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(6)); - Assert.assertEquals(18, hub.sum().getValue()); + Assert.assertEquals(18, hub.sum().getLeft().getValue()); // Should remove the value 3 and 4 hub.setDummyTimestamp(5000L); hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(7)); - Assert.assertEquals(18, hub.sum().getValue()); + Assert.assertEquals(18, hub.sum().getLeft().getValue()); } @Test @@ -120,10 +120,10 @@ public class TestTimeSlidingHub { hub.setDummyTimestamp(0L); hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(1)); - Assert.assertEquals(1, hub.sum().getValue()); + Assert.assertEquals(1, hub.sum().getLeft().getValue()); hub.setDummyTimestamp(10000L); hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(2)); - Assert.assertEquals(2, hub.sum().getValue()); + Assert.assertEquals(2, hub.sum().getLeft().getValue()); } }