From d4cb6dd8abc230b7ad8bd7a7ec163c221deb98d2 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 7 Jun 2023 16:34:44 +0800 Subject: [PATCH] [CELEBORN-645][REFACTOR] Refine logic about handle HeartbeatFromWorkerResponse ### What changes were proposed in this pull request? Refine the logic here to make it easier understand. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1555 from AngersZhuuuu/CELEBORN-645. Authored-by: Angerszhuuuu Signed-off-by: Angerszhuuuu --- .../apache/celeborn/service/deploy/worker/Worker.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index c0a819d29..9a3b050ef 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -304,13 +304,10 @@ private[celeborn] class Worker( activeShuffleKeys, estimatedAppDiskUsage), classOf[HeartbeatResponse]) - if (response.registered) { - response.expiredShuffleKeys.asScala.foreach(shuffleKey => workerInfo.releaseSlots(shuffleKey)) - cleanTaskQueue.put(response.expiredShuffleKeys) - } else { + response.expiredShuffleKeys.asScala.foreach(shuffleKey => workerInfo.releaseSlots(shuffleKey)) + cleanTaskQueue.put(response.expiredShuffleKeys) + if (!response.registered) { logError("Worker not registered in master, clean expired shuffle data and register again.") - // Clean expired shuffle. - cleanup(response.expiredShuffleKeys) try { registerWithMaster() } catch {