[CELEBORN-945] Change ShutdownHook's timeout for decommission

### What changes were proposed in this pull request?
When shutdown type is decommission, we should change the `ShutdownHookManager#HookEntry`'s
timeout to `celeborn.worker.decommission.forceExitTimeout`.

### Why are the changes needed?
ditto

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manual test

Closes #1877 from waitinfuture/945.

Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
zky.zhoukeyong 2023-09-05 10:24:08 +08:00
parent a42ec85a6e
commit 8d005b8d39
2 changed files with 20 additions and 2 deletions

View File

@ -52,6 +52,8 @@ import org.apache.celeborn.common.CelebornConf;
*/
public final class ShutdownHookManager {
private static final Logger logger = LoggerFactory.getLogger(ShutdownHookManager.class);
private static final ShutdownHookManager MGR = new ShutdownHookManager();
private static final Logger LOG = LoggerFactory.getLogger(ShutdownHookManager.class);
@ -105,6 +107,10 @@ public final class ShutdownHookManager {
for (HookEntry entry : MGR.getShutdownHooksInOrder()) {
Future<?> future = EXECUTOR.submit(entry.getHook());
try {
logger.info(
"timeout {}",
Utils.msDurationToString(
entry.getTimeUnit().convert(entry.getTimeout(), TimeUnit.MILLISECONDS)));
future.get(entry.getTimeout(), entry.getTimeUnit());
} catch (TimeoutException ex) {
timeouts++;
@ -165,8 +171,8 @@ public final class ShutdownHookManager {
static class HookEntry {
private final Runnable hook;
private final int priority;
private final long timeout;
private final TimeUnit unit;
private long timeout;
private TimeUnit unit;
HookEntry(Runnable hook, int priority) {
this(hook, priority, getShutdownTimeout(new CelebornConf()), TIME_UNIT_DEFAULT);
@ -207,6 +213,11 @@ public final class ShutdownHookManager {
return timeout;
}
public void setTimeout(long timeout, TimeUnit unit) {
this.timeout = timeout;
this.unit = unit;
}
TimeUnit getTimeUnit() {
return unit;
}
@ -279,6 +290,10 @@ public final class ShutdownHookManager {
hooks.add(new HookEntry(shutdownHook, priority, timeout, unit));
}
public void updateTimeout(long timeout, TimeUnit unit) {
hooks.forEach(hook -> hook.setTimeout(timeout, unit));
}
/**
* Removes a shutdownHook.
*

View File

@ -581,6 +581,9 @@ private[celeborn] class Worker(
exitType match {
case "DECOMMISSION" =>
exitKind = CelebornExitKind.WORKER_DECOMMISSION
ShutdownHookManager.get().updateTimeout(
conf.workerDecommissionForceExitTimeout,
TimeUnit.MILLISECONDS)
case "GRACEFUL" =>
exitKind = CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN
case "IMMEDIATELY" =>