[CELEBORN-777][BUG] CongestionControl getPotentialConsumeSpeed throw /zero error

### What changes were proposed in this pull request?
In `TimeSlidingHub.add()` `_deque` will clear then add the pair.

```
      if (nodesToAdd >= maxQueueSize) {
        // The new node exceed existing sliding list, need to clear all old nodes
        // and create a new sliding list
        _deque.clear();
        _deque.add(Pair.of(currentTimestamp, (N) newNode.clone()));
        sumNode = (N) newNode.clone();
        return;
      }

```

Then when call `BufferStatusHub.avgBytesPerSec()`,  `currentNumBytes` can be `> 0` but `getCurrentTimeWindowsInMills` may return 0. Cause the error.

```
  public long avgBytesPerSec() {
    long currentNumBytes = sum().numBytes();
    if (currentNumBytes > 0) {
      return currentNumBytes * 1000 / (long) getCurrentTimeWindowsInMills();
    }
    return 0L;
  }
```

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #1690 from AngersZhuuuu/CELEBORN-777.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
This commit is contained in:
Angerszhuuuu 2023-07-08 21:46:37 +08:00
parent e314f85087
commit 52dcd3b5df
3 changed files with 33 additions and 30 deletions

View File

@ -19,6 +19,8 @@ package org.apache.celeborn.service.deploy.worker.congestcontrol;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.tuple.Pair;
public class BufferStatusHub extends TimeSlidingHub<BufferStatusHub.BufferStatusNode> {
public static class BufferStatusNode implements TimeSlidingHub.TimeSlidingNode {
@ -73,9 +75,10 @@ public class BufferStatusHub extends TimeSlidingHub<BufferStatusHub.BufferStatus
}
public long avgBytesPerSec() {
long currentNumBytes = sum().numBytes();
Pair<BufferStatusNode, Integer> sumInfo = sum();
long currentNumBytes = sumInfo.getKey().numBytes();
if (currentNumBytes > 0) {
return currentNumBytes * 1000 / (long) getCurrentTimeWindowsInMills();
return currentNumBytes * 1000 / (long) sumInfo.getRight() * intervalPerBucketInMills;
}
return 0L;
}

View File

@ -25,7 +25,7 @@ import org.apache.commons.lang3.tuple.Pair;
/**
* A time sliding list that group different {@link TimeSlidingNode} with corresponding timestamp by
* exact interval 1 second. Internally hold a {@link sumNode} to get the sum of the nodes in the
* exact interval 1 second. Internally hold a {@link sumInfo} to get the sum of the nodes in the
* list.
*
* <p>This list is thread-safe, but {@link TimeSlidingNode} returned by the method {@link sum}
@ -51,20 +51,20 @@ public abstract class TimeSlidingHub<N extends TimeSlidingHub.TimeSlidingNode> {
}
// 1 second.
private final int intervalPerBucketInMills = 1000;
protected final int intervalPerBucketInMills = 1000;
private final int maxQueueSize;
private N sumNode;
private Pair<N, Integer> sumInfo;
private final LinkedBlockingDeque<Pair<Long, N>> _deque;
public TimeSlidingHub(int timeWindowsInSecs) {
this._deque = new LinkedBlockingDeque<>();
this.maxQueueSize = timeWindowsInSecs * 1000 / intervalPerBucketInMills;
this.sumNode = newEmptyNode();
this.sumInfo = Pair.of(newEmptyNode(), 0);
}
public N sum() {
return sumNode;
public Pair<N, Integer> sum() {
return sumInfo;
}
public void add(N newNode) {
@ -75,7 +75,7 @@ public abstract class TimeSlidingHub<N extends TimeSlidingHub.TimeSlidingNode> {
public synchronized void add(long currentTimestamp, N newNode) {
if (_deque.size() == 0) {
_deque.add(Pair.of(currentTimestamp, (N) newNode.clone()));
sumNode = (N) newNode.clone();
sumInfo = Pair.of((N) newNode.clone(), 1);
return;
}
@ -87,29 +87,33 @@ public abstract class TimeSlidingHub<N extends TimeSlidingHub.TimeSlidingNode> {
// The node doesn't belong to the lastNode, there might be 2 different scenarios
// 1. All existing nodes are out of date, should be removed
// 2. some nodes are out of date, should be removed
long nodesToAdd = timeDiff / intervalPerBucketInMills;
int nodesToAdd = (int) timeDiff / intervalPerBucketInMills;
if (nodesToAdd >= maxQueueSize) {
// The new node exceed existing sliding list, need to clear all old nodes
// and create a new sliding list
_deque.clear();
_deque.add(Pair.of(currentTimestamp, (N) newNode.clone()));
sumNode = (N) newNode.clone();
sumInfo = Pair.of((N) newNode.clone(), 1);
return;
}
// Add new node at the end of the list, and deprecate nodes out of timeInterval
for (long i = 1; i < nodesToAdd; i++) {
for (int i = 1; i < nodesToAdd; i++) {
N toAdd = newEmptyNode();
lastNode = Pair.of(lastNode.getLeft() + intervalPerBucketInMills, toAdd);
_deque.add(lastNode);
}
_deque.add(Pair.of(lastNode.getLeft() + intervalPerBucketInMills, (N) newNode.clone()));
sumNode.combineNode(newNode);
N nodeToCombine = sumInfo.getLeft();
nodeToCombine.combineNode(newNode);
sumInfo = Pair.of(nodeToCombine, sumInfo.getRight() + nodesToAdd);
while (_deque.size() > maxQueueSize) {
Pair<Long, N> removed = _deque.removeFirst();
sumNode.separateNode(removed.getRight());
N nodeToSeparate = sumInfo.getLeft();
nodeToSeparate.separateNode(removed.getRight());
sumInfo = Pair.of(nodeToSeparate, sumInfo.getRight() - 1);
}
return;
}
@ -121,7 +125,7 @@ public abstract class TimeSlidingHub<N extends TimeSlidingHub.TimeSlidingNode> {
Pair<Long, N> curNode = iter.next();
if (currentTimestamp - curNode.getLeft() >= 0) {
curNode.getRight().combineNode(newNode);
sumNode.combineNode(newNode);
sumInfo.getLeft().combineNode(newNode);
return;
}
}
@ -132,22 +136,18 @@ public abstract class TimeSlidingHub<N extends TimeSlidingHub.TimeSlidingNode> {
// Belong to last node
lastNode.getRight().combineNode(newNode);
sumNode.combineNode(newNode);
sumInfo.getLeft().combineNode(newNode);
}
public void clear() {
synchronized (_deque) {
_deque.clear();
sumNode = newEmptyNode();
sumInfo = Pair.of(newEmptyNode(), 0);
}
}
protected abstract N newEmptyNode();
protected int getCurrentTimeWindowsInMills() {
return _deque.size() * intervalPerBucketInMills;
}
@VisibleForTesting
protected long currentTimeMillis() {
return System.currentTimeMillis();

View File

@ -84,34 +84,34 @@ public class TestTimeSlidingHub {
hub.setDummyTimestamp(0L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(1));
Assert.assertEquals(1, hub.sum().getValue());
Assert.assertEquals(1, hub.sum().getLeft().getValue());
hub.setDummyTimestamp(1000L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(2));
Assert.assertEquals(3, hub.sum().getValue());
Assert.assertEquals(3, hub.sum().getLeft().getValue());
hub.setDummyTimestamp(2200L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(3));
Assert.assertEquals(6, hub.sum().getValue());
Assert.assertEquals(6, hub.sum().getLeft().getValue());
hub.setDummyTimestamp(2400L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(4));
Assert.assertEquals(10, hub.sum().getValue());
Assert.assertEquals(10, hub.sum().getLeft().getValue());
// Should remove the value 1
hub.setDummyTimestamp(3000L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(5));
Assert.assertEquals(14, hub.sum().getValue());
Assert.assertEquals(14, hub.sum().getLeft().getValue());
// Should remove the value 2
hub.setDummyTimestamp(4000L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(6));
Assert.assertEquals(18, hub.sum().getValue());
Assert.assertEquals(18, hub.sum().getLeft().getValue());
// Should remove the value 3 and 4
hub.setDummyTimestamp(5000L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(7));
Assert.assertEquals(18, hub.sum().getValue());
Assert.assertEquals(18, hub.sum().getLeft().getValue());
}
@Test
@ -120,10 +120,10 @@ public class TestTimeSlidingHub {
hub.setDummyTimestamp(0L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(1));
Assert.assertEquals(1, hub.sum().getValue());
Assert.assertEquals(1, hub.sum().getLeft().getValue());
hub.setDummyTimestamp(10000L);
hub.add(new DummyTimeSlidingHub.DummyTimeSlidingNode(2));
Assert.assertEquals(2, hub.sum().getValue());
Assert.assertEquals(2, hub.sum().getLeft().getValue());
}
}