From e8ae23bc7a7a44e468fafa1ccde1a3d4dd4938a1 Mon Sep 17 00:00:00 2001 From: zhengtao Date: Thu, 15 May 2025 23:15:28 +0800 Subject: [PATCH] [CELEBORN-1960] Fix PauseSpentTime only append the interval check time ### What changes were proposed in this pull request? Fix the pauseTime metrics count error. ### Why are the changes needed? Assume that 0 is NONE PAUSED status, 1 is PAUSE PUSH, 2 is PAUSE PUSH AND REPLICATE. Every check interval will record the status. Here is the status changements: 0 -> 0 -> 1 -> 1 -> 1 -> 1 -> 2 -> 2 -> 2 -> 1 -> 1 -> 1 -> 0 -> 0 -> 0 The previous code only count the interval time and every interval time will update the pauseStartTime. ps(pauseStartTime), pe(pauseEndTime) 0 -> 0 -> 1(ps)-> 1(ps ) -> 1(ps ) -> 1(ps) -> 2(rs) -> 2(rs) -> 2(re-rs) -> 1(ps) -> 1(ps) ->` _1(ps) -> 0(pe)_` -> 0 -> 0 It should be 0 -> 0 -> 1(ps)-> 1 -> 1-> 1 -> 2(rs) -> 2 -> 2 -> 1(re) -> 1-> 1 -> 0(pe) -> 0 -> 0 0 -> 0 -> 1(ps)-> 1-> 1 -> 0(pe) -> 0 -> 0 0 -> 0 -> 2(ps, rs)-> 2-> 2 -> 0(pe, re) -> 0 0 -> 0 -> 1(ps)-> 1-> 2(rs) -> 2 -> 0(pe, re) 0 -> 0 -> 2(ps, rs)-> 2-> 1(re) -> 1 -> 0(pe) The pauseRpelicaTime should include pausePushTime. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #3207 from zaynt4606/clb1960. Authored-by: zhengtao Signed-off-by: Shuang --- .../deploy/worker/memory/MemoryManager.java | 24 +++++---- .../deploy/memory/MemoryManagerSuite.scala | 49 +++++++++++++++++-- 2 files changed, 59 insertions(+), 14 deletions(-) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java index b3891d9b5..ff08c077c 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java @@ -337,10 +337,13 @@ public class MemoryManager { if (!tryResumeByPinnedMemory(servingState, lastState)) { pausePushDataCounter.increment(); if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) { + appendPauseSpentTime(lastState); resumeReplicate(); } else { - logger.info("Trigger action: PAUSE PUSH"); - pausePushDataStartTime = System.currentTimeMillis(); + if (servingState != lastState) { + pausePushDataStartTime = System.currentTimeMillis(); + logger.info("Trigger action: PAUSE PUSH"); + } resumingByPinnedMemory = false; memoryPressureListeners.forEach( memoryPressureListener -> @@ -362,13 +365,17 @@ public class MemoryManager { case PUSH_AND_REPLICATE_PAUSED: if (!tryResumeByPinnedMemory(servingState, lastState)) { pausePushDataAndReplicateCounter.increment(); - logger.info("Trigger action: PAUSE PUSH"); - pausePushDataAndReplicateStartTime = System.currentTimeMillis(); + if (servingState != lastState) { + pausePushDataAndReplicateStartTime = System.currentTimeMillis(); + logger.info("Trigger action: PAUSE PUSH and REPLICATE"); + if (lastState == ServingState.NONE_PAUSED) { + pausePushDataStartTime = pausePushDataAndReplicateStartTime; + } + } resumingByPinnedMemory = false; memoryPressureListeners.forEach( memoryPressureListener -> memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE)); - logger.info("Trigger action: PAUSE REPLICATE"); memoryPressureListeners.forEach( memoryPressureListener -> memoryPressureListener.onPause(TransportModuleConstants.REPLICATE_MODULE)); @@ -517,10 +524,9 @@ public class MemoryManager { private void appendPauseSpentTime(ServingState servingState) { long nextPauseStartTime = System.currentTimeMillis(); - if (servingState == ServingState.PUSH_PAUSED) { - pausePushDataTime += nextPauseStartTime - pausePushDataStartTime; - pausePushDataStartTime = nextPauseStartTime; - } else { + pausePushDataTime += nextPauseStartTime - pausePushDataStartTime; + pausePushDataStartTime = nextPauseStartTime; + if (servingState == ServingState.PUSH_AND_REPLICATE_PAUSED) { pausePushDataAndReplicateTime += nextPauseStartTime - pausePushDataAndReplicateStartTime; pausePushDataAndReplicateStartTime = nextPauseStartTime; } diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala index d74f1889e..2a9f8fa09 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala @@ -113,6 +113,7 @@ class MemoryManagerSuite extends CelebornFunSuite { assert(pushListener.isPause) assert(!replicateListener.isPause) } + Thread.sleep(20) // PAUSE PUSH -> PAUSE PUSH AND REPLICATE memoryCounter.set(replicateThreshold + 1) @@ -120,6 +121,7 @@ class MemoryManagerSuite extends CelebornFunSuite { assert(pushListener.isPause) assert(replicateListener.isPause) } + Thread.sleep(20) // PAUSE PUSH AND REPLICATE -> PAUSE PUSH memoryCounter.set(pushThreshold + 1) @@ -127,6 +129,7 @@ class MemoryManagerSuite extends CelebornFunSuite { assert(pushListener.isPause) assert(!replicateListener.isPause) } + Thread.sleep(20) // PAUSE PUSH -> NONE PAUSED memoryCounter.set(0) @@ -134,10 +137,14 @@ class MemoryManagerSuite extends CelebornFunSuite { assert(!pushListener.isPause) assert(!replicateListener.isPause) } + Thread.sleep(20) // [CELEBORN-882] Test record pause push time - assert(memoryManager.getPausePushDataTime.longValue() > 0) - assert(memoryManager.getPausePushDataAndReplicateTime.longValue() == 0) - val lastPauseTime = memoryManager.getPausePushDataTime.longValue() + val lastPauseTime1 = memoryManager.getPausePushDataTime.longValue() + val lastPauseReplicaTime1 = memoryManager.getPausePushDataAndReplicateTime.longValue() + // PauseTime should count the actual waiting time + assert(lastPauseTime1 >= 60) + assert(lastPauseReplicaTime1 >= 20) + logInfo(s"lastPauseTime1: $lastPauseTime1, lastPauseReplicaTime1: $lastPauseReplicaTime1") // NONE PAUSED -> PAUSE PUSH AND REPLICATE memoryCounter.set(replicateThreshold + 1) @@ -146,14 +153,46 @@ class MemoryManagerSuite extends CelebornFunSuite { assert(replicateListener.isPause) } + Thread.sleep(20) + // PAUSE PUSH AND REPLICATE -> NONE PAUSED memoryCounter.set(0) eventually(timeout(30.second), interval(10.milliseconds)) { assert(!pushListener.isPause) assert(!replicateListener.isPause) } - assert(memoryManager.getPausePushDataTime.longValue() == lastPauseTime) - assert(memoryManager.getPausePushDataAndReplicateTime.longValue() > 0) + + // Wait for the check thread to update the metrics + memoryManager.switchServingState() + val lastPauseTime2 = memoryManager.getPausePushDataTime.longValue() + val lastPauseReplicaTime2 = memoryManager.getPausePushDataAndReplicateTime.longValue() + assert(lastPauseTime2 > lastPauseTime1) + assert(lastPauseReplicaTime2 > lastPauseReplicaTime1) + logInfo(s"lastPauseTime2: $lastPauseTime2, lastPauseReplicaTime2: $lastPauseReplicaTime2") + + // NONE PAUSED -> PAUSE PUSH + memoryCounter.set(pushThreshold + 1) + eventually(timeout(30.second), interval(10.milliseconds)) { + assert(pushListener.isPause) + assert(!replicateListener.isPause) + } + + Thread.sleep(20) + + // PAUSE PUSH -> NONE PAUSED + memoryCounter.set(0) + eventually(timeout(30.second), interval(10.milliseconds)) { + assert(!pushListener.isPause) + assert(!replicateListener.isPause) + } + + // Wait for the check thread to update the metrics + memoryManager.switchServingState() + val lastPauseTime3 = memoryManager.getPausePushDataTime.longValue() + val lastPauseReplicaTime3 = memoryManager.getPausePushDataAndReplicateTime.longValue() + assert(lastPauseTime3 > lastPauseTime2) + assert(lastPauseReplicaTime3 == lastPauseReplicaTime2) + logInfo(s"lastPauseTime3: $lastPauseTime3, lastPauseReplicaTime3: $lastPauseReplicaTime3") } test("[CELEBORN-1792] Test MemoryManager resume by pinned memory") {