Shutdown worker if initialized failed. (#931)
This commit is contained in:
parent
99a7b85708
commit
496f44eda4
@ -358,6 +358,7 @@ private[celeborn] class Worker(
|
||||
private def registerWithMaster(): Unit = {
|
||||
var registerTimeout = conf.registerWorkerTimeout
|
||||
val interval = 2000
|
||||
var exception: Throwable = null
|
||||
while (registerTimeout > 0) {
|
||||
val resp =
|
||||
try {
|
||||
@ -380,6 +381,7 @@ private[celeborn] class Worker(
|
||||
logWarning(
|
||||
s"Register worker to master failed, will retry after ${Utils.msDurationToString(interval)}",
|
||||
throwable)
|
||||
exception = throwable
|
||||
null
|
||||
}
|
||||
// Register successfully
|
||||
@ -393,7 +395,7 @@ private[celeborn] class Worker(
|
||||
registerTimeout = registerTimeout - interval
|
||||
}
|
||||
// If worker register still failed after retry, throw exception to stop worker process
|
||||
throw new RssException("Register worker failed.")
|
||||
throw new RssException("Register worker failed.", exception)
|
||||
}
|
||||
|
||||
private def cleanup(expiredShuffleKeys: JHashSet[String]): Unit = synchronized {
|
||||
@ -483,6 +485,13 @@ private[deploy] object Worker extends Logging {
|
||||
}
|
||||
|
||||
val worker = new Worker(conf, workerArgs)
|
||||
worker.initialize()
|
||||
try {
|
||||
worker.initialize()
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
logError("Initialize worker failed.", e)
|
||||
System.exit(-1)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user