[CELEBORN-1960] Fix PauseSpentTime only append the interval check time

### What changes were proposed in this pull request?
Fix the pauseTime metrics count error.

### Why are the changes needed?
Assume that 0 is NONE PAUSED status, 1 is PAUSE PUSH, 2 is PAUSE PUSH AND REPLICATE.
Every check interval will record the status.
Here is the status changements:
0 -> 0 -> 1 -> 1 -> 1 -> 1 -> 2 -> 2 -> 2 -> 1 -> 1 -> 1 -> 0 -> 0 -> 0

The previous code only count the interval time and every interval time will update the pauseStartTime.
ps(pauseStartTime), pe(pauseEndTime)
0 -> 0 -> 1(ps)-> 1(ps ) -> 1(ps ) -> 1(ps) -> 2(rs) -> 2(rs) -> 2(re-rs) -> 1(ps) -> 1(ps) ->` _1(ps) -> 0(pe)_` -> 0 -> 0

It should be
0 -> 0 -> 1(ps)-> 1 -> 1-> 1 -> 2(rs) -> 2 -> 2 -> 1(re) -> 1-> 1 -> 0(pe) -> 0 -> 0

0 -> 0 -> 1(ps)-> 1-> 1 -> 0(pe) -> 0 -> 0

0 -> 0 -> 2(ps, rs)-> 2-> 2 -> 0(pe, re) -> 0

0 -> 0 -> 1(ps)-> 1-> 2(rs) -> 2 -> 0(pe, re)

0 -> 0 -> 2(ps, rs)-> 2-> 1(re) -> 1 -> 0(pe)

The pauseRpelicaTime should include pausePushTime.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #3207 from zaynt4606/clb1960.

Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
zhengtao 2025-05-15 23:15:28 +08:00 committed by Shuang
parent 88124d763a
commit e8ae23bc7a
2 changed files with 59 additions and 14 deletions

View File

@ -337,10 +337,13 @@ public class MemoryManager {
if (!tryResumeByPinnedMemory(servingState, lastState)) {
pausePushDataCounter.increment();
if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
appendPauseSpentTime(lastState);
resumeReplicate();
} else {
logger.info("Trigger action: PAUSE PUSH");
pausePushDataStartTime = System.currentTimeMillis();
if (servingState != lastState) {
pausePushDataStartTime = System.currentTimeMillis();
logger.info("Trigger action: PAUSE PUSH");
}
resumingByPinnedMemory = false;
memoryPressureListeners.forEach(
memoryPressureListener ->
@ -362,13 +365,17 @@ public class MemoryManager {
case PUSH_AND_REPLICATE_PAUSED:
if (!tryResumeByPinnedMemory(servingState, lastState)) {
pausePushDataAndReplicateCounter.increment();
logger.info("Trigger action: PAUSE PUSH");
pausePushDataAndReplicateStartTime = System.currentTimeMillis();
if (servingState != lastState) {
pausePushDataAndReplicateStartTime = System.currentTimeMillis();
logger.info("Trigger action: PAUSE PUSH and REPLICATE");
if (lastState == ServingState.NONE_PAUSED) {
pausePushDataStartTime = pausePushDataAndReplicateStartTime;
}
}
resumingByPinnedMemory = false;
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
logger.info("Trigger action: PAUSE REPLICATE");
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.REPLICATE_MODULE));
@ -517,10 +524,9 @@ public class MemoryManager {
private void appendPauseSpentTime(ServingState servingState) {
long nextPauseStartTime = System.currentTimeMillis();
if (servingState == ServingState.PUSH_PAUSED) {
pausePushDataTime += nextPauseStartTime - pausePushDataStartTime;
pausePushDataStartTime = nextPauseStartTime;
} else {
pausePushDataTime += nextPauseStartTime - pausePushDataStartTime;
pausePushDataStartTime = nextPauseStartTime;
if (servingState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
pausePushDataAndReplicateTime += nextPauseStartTime - pausePushDataAndReplicateStartTime;
pausePushDataAndReplicateStartTime = nextPauseStartTime;
}

View File

@ -113,6 +113,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(pushListener.isPause)
assert(!replicateListener.isPause)
}
Thread.sleep(20)
// PAUSE PUSH -> PAUSE PUSH AND REPLICATE
memoryCounter.set(replicateThreshold + 1)
@ -120,6 +121,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(pushListener.isPause)
assert(replicateListener.isPause)
}
Thread.sleep(20)
// PAUSE PUSH AND REPLICATE -> PAUSE PUSH
memoryCounter.set(pushThreshold + 1)
@ -127,6 +129,7 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(pushListener.isPause)
assert(!replicateListener.isPause)
}
Thread.sleep(20)
// PAUSE PUSH -> NONE PAUSED
memoryCounter.set(0)
@ -134,10 +137,14 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
}
Thread.sleep(20)
// [CELEBORN-882] Test record pause push time
assert(memoryManager.getPausePushDataTime.longValue() > 0)
assert(memoryManager.getPausePushDataAndReplicateTime.longValue() == 0)
val lastPauseTime = memoryManager.getPausePushDataTime.longValue()
val lastPauseTime1 = memoryManager.getPausePushDataTime.longValue()
val lastPauseReplicaTime1 = memoryManager.getPausePushDataAndReplicateTime.longValue()
// PauseTime should count the actual waiting time
assert(lastPauseTime1 >= 60)
assert(lastPauseReplicaTime1 >= 20)
logInfo(s"lastPauseTime1: $lastPauseTime1, lastPauseReplicaTime1: $lastPauseReplicaTime1")
// NONE PAUSED -> PAUSE PUSH AND REPLICATE
memoryCounter.set(replicateThreshold + 1)
@ -146,14 +153,46 @@ class MemoryManagerSuite extends CelebornFunSuite {
assert(replicateListener.isPause)
}
Thread.sleep(20)
// PAUSE PUSH AND REPLICATE -> NONE PAUSED
memoryCounter.set(0)
eventually(timeout(30.second), interval(10.milliseconds)) {
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
}
assert(memoryManager.getPausePushDataTime.longValue() == lastPauseTime)
assert(memoryManager.getPausePushDataAndReplicateTime.longValue() > 0)
// Wait for the check thread to update the metrics
memoryManager.switchServingState()
val lastPauseTime2 = memoryManager.getPausePushDataTime.longValue()
val lastPauseReplicaTime2 = memoryManager.getPausePushDataAndReplicateTime.longValue()
assert(lastPauseTime2 > lastPauseTime1)
assert(lastPauseReplicaTime2 > lastPauseReplicaTime1)
logInfo(s"lastPauseTime2: $lastPauseTime2, lastPauseReplicaTime2: $lastPauseReplicaTime2")
// NONE PAUSED -> PAUSE PUSH
memoryCounter.set(pushThreshold + 1)
eventually(timeout(30.second), interval(10.milliseconds)) {
assert(pushListener.isPause)
assert(!replicateListener.isPause)
}
Thread.sleep(20)
// PAUSE PUSH -> NONE PAUSED
memoryCounter.set(0)
eventually(timeout(30.second), interval(10.milliseconds)) {
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
}
// Wait for the check thread to update the metrics
memoryManager.switchServingState()
val lastPauseTime3 = memoryManager.getPausePushDataTime.longValue()
val lastPauseReplicaTime3 = memoryManager.getPausePushDataAndReplicateTime.longValue()
assert(lastPauseTime3 > lastPauseTime2)
assert(lastPauseReplicaTime3 == lastPauseReplicaTime2)
logInfo(s"lastPauseTime3: $lastPauseTime3, lastPauseReplicaTime3: $lastPauseReplicaTime3")
}
test("[CELEBORN-1792] Test MemoryManager resume by pinned memory") {