[CELEBORN-645][REFACTOR] Refine logic about handle HeartbeatFromWorkerResponse

### What changes were proposed in this pull request?
Refine the logic here to make it easier understand.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #1555 from AngersZhuuuu/CELEBORN-645.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
This commit is contained in:
Angerszhuuuu 2023-06-07 16:34:44 +08:00
parent 9502b1f26d
commit d4cb6dd8ab

View File

@ -304,13 +304,10 @@ private[celeborn] class Worker(
activeShuffleKeys,
estimatedAppDiskUsage),
classOf[HeartbeatResponse])
if (response.registered) {
response.expiredShuffleKeys.asScala.foreach(shuffleKey => workerInfo.releaseSlots(shuffleKey))
cleanTaskQueue.put(response.expiredShuffleKeys)
} else {
response.expiredShuffleKeys.asScala.foreach(shuffleKey => workerInfo.releaseSlots(shuffleKey))
cleanTaskQueue.put(response.expiredShuffleKeys)
if (!response.registered) {
logError("Worker not registered in master, clean expired shuffle data and register again.")
// Clean expired shuffle.
cleanup(response.expiredShuffleKeys)
try {
registerWithMaster()
} catch {