[CELEBORN-662] Report worker unavailable regardless graceful shutdown

### What changes were proposed in this pull request?
In this PR, worker always report node unavailable regardless graceful shutdown is turned on or off.

### Why are the changes needed?
To inform master the shutting down worker as soon as possible.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manual test.

Closes #1575 from waitinfuture/662.

Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
zky.zhoukeyong 2023-06-10 18:36:25 +08:00
parent 6b725202a2
commit dab865b68b

View File

@ -534,22 +534,21 @@ private[celeborn] class Worker(
new Thread(new Runnable {
override def run(): Unit = {
logInfo("Shutdown hook called.")
// During shutdown, to avoid allocate slots in this worker,
// add this worker to master's blacklist. When restart, register worker will
// make master remove this worker from blacklist.
try {
rssHARetryClient.askSync(
ReportWorkerUnavailable(List(workerInfo).asJava),
OneWayMessageResponse.getClass)
} catch {
case e: Throwable =>
logError(
s"Fail report to master, need wait PartitionLocation auto release: \n$partitionLocationInfo",
e)
}
shutdown.set(true)
if (gracefulShutdown) {
// During graceful shutdown, to avoid allocate slots in this worker,
// add this worker to master's blacklist. When restart, register worker will
// make master remove this worker from blacklist.
try {
rssHARetryClient.askSync(
ReportWorkerUnavailable(List(workerInfo).asJava),
OneWayMessageResponse.getClass)
} catch {
case e: Throwable =>
logError(
s"Fail report to master, need wait PartitionLocation auto release: \n$partitionLocationInfo",
e)
}
val interval = conf.workerGracefulShutdownCheckSlotsFinishedInterval
val timeout = conf.workerGracefulShutdownCheckSlotsFinishedTimeoutMs
var waitTimes = 0