From 496f44eda4db798ca8280fa02df6989aef0e0b4e Mon Sep 17 00:00:00 2001 From: leesf <490081539@qq.com> Date: Mon, 7 Nov 2022 19:33:35 +0800 Subject: [PATCH] Shutdown worker if initialized failed. (#931) --- .../celeborn/service/deploy/worker/Worker.scala | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 5e96c5381..c1fe31def 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -358,6 +358,7 @@ private[celeborn] class Worker( private def registerWithMaster(): Unit = { var registerTimeout = conf.registerWorkerTimeout val interval = 2000 + var exception: Throwable = null while (registerTimeout > 0) { val resp = try { @@ -380,6 +381,7 @@ private[celeborn] class Worker( logWarning( s"Register worker to master failed, will retry after ${Utils.msDurationToString(interval)}", throwable) + exception = throwable null } // Register successfully @@ -393,7 +395,7 @@ private[celeborn] class Worker( registerTimeout = registerTimeout - interval } // If worker register still failed after retry, throw exception to stop worker process - throw new RssException("Register worker failed.") + throw new RssException("Register worker failed.", exception) } private def cleanup(expiredShuffleKeys: JHashSet[String]): Unit = synchronized { @@ -483,6 +485,13 @@ private[deploy] object Worker extends Logging { } val worker = new Worker(conf, workerArgs) - worker.initialize() + try { + worker.initialize() + } catch { + case e: Throwable => + logError("Initialize worker failed.", e) + System.exit(-1) + } + } }