From dab865b68ba105fde7c99d8b266960481dabc2a5 Mon Sep 17 00:00:00 2001 From: "zky.zhoukeyong" Date: Sat, 10 Jun 2023 18:36:25 +0800 Subject: [PATCH] [CELEBORN-662] Report worker unavailable regardless graceful shutdown ### What changes were proposed in this pull request? In this PR, worker always report node unavailable regardless graceful shutdown is turned on or off. ### Why are the changes needed? To inform master the shutting down worker as soon as possible. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual test. Closes #1575 from waitinfuture/662. Authored-by: zky.zhoukeyong Signed-off-by: zky.zhoukeyong --- .../service/deploy/worker/Worker.scala | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 9a3b050ef..55cbf737a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -534,22 +534,21 @@ private[celeborn] class Worker( new Thread(new Runnable { override def run(): Unit = { logInfo("Shutdown hook called.") + // During shutdown, to avoid allocate slots in this worker, + // add this worker to master's blacklist. When restart, register worker will + // make master remove this worker from blacklist. + try { + rssHARetryClient.askSync( + ReportWorkerUnavailable(List(workerInfo).asJava), + OneWayMessageResponse.getClass) + } catch { + case e: Throwable => + logError( + s"Fail report to master, need wait PartitionLocation auto release: \n$partitionLocationInfo", + e) + } shutdown.set(true) if (gracefulShutdown) { - // During graceful shutdown, to avoid allocate slots in this worker, - // add this worker to master's blacklist. When restart, register worker will - // make master remove this worker from blacklist. - try { - rssHARetryClient.askSync( - ReportWorkerUnavailable(List(workerInfo).asJava), - OneWayMessageResponse.getClass) - } catch { - case e: Throwable => - logError( - s"Fail report to master, need wait PartitionLocation auto release: \n$partitionLocationInfo", - e) - } - val interval = conf.workerGracefulShutdownCheckSlotsFinishedInterval val timeout = conf.workerGracefulShutdownCheckSlotsFinishedTimeoutMs var waitTimes = 0