[CELEBORN-668] Report WorkerLost instead of WorkerUnavailable if grac…

…eful is disabled

### What changes were proposed in this pull request?
Worker should report WorkerLost instead of WorkerUnavailable in it's shutdown hook if graceful shutdown is disabled.

### Why are the changes needed?
To avoid unnecessary commit file requests from lifecycle manager since it's not graceful shutdown.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

Closes #1580 from waitinfuture/668.

Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
zky.zhoukeyong 2023-06-13 11:30:59 +08:00
parent e284f72c95
commit 76831e805d
2 changed files with 33 additions and 8 deletions

View File

@ -224,7 +224,7 @@ private[celeborn] class Master(
val fetchPort = pb.getFetchPort
val replicatePort = pb.getReplicatePort
val requestId = pb.getRequestId
logDebug(s"Received worker lost $host:$rpcPort:$pushPort:$fetchPort.")
logDebug(s"Received worker lost $host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
executeWithLeaderChecker(
null,
handleWorkerLost(null, host, rpcPort, pushPort, fetchPort, replicatePort, requestId))
@ -310,7 +310,7 @@ private[celeborn] class Master(
estimatedAppDiskUsage,
requestId) =>
logDebug(s"Received heartbeat from" +
s" worker $host:$rpcPort:$pushPort:$fetchPort with $disks.")
s" worker $host:$rpcPort:$pushPort:$fetchPort:$replicatePort with $disks.")
executeWithLeaderChecker(
context,
handleHeartbeatFromWorker(
@ -334,6 +334,18 @@ private[celeborn] class Master(
context,
handleReportNodeUnavailable(context, failedWorkers, requestId))
case pb: PbWorkerLost =>
val host = pb.getHost
val rpcPort = pb.getRpcPort
val pushPort = pb.getPushPort
val fetchPort = pb.getFetchPort
val replicatePort = pb.getReplicatePort
val requestId = pb.getRequestId
logInfo(s"Received worker lost $host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
executeWithLeaderChecker(
context,
handleWorkerLost(context, host, rpcPort, pushPort, fetchPort, replicatePort, requestId))
case CheckQuota(userIdentifier) =>
executeWithLeaderChecker(context, handleCheckQuota(userIdentifier, context))
}
@ -420,7 +432,8 @@ private[celeborn] class Master(
val expiredShuffleKeys = new util.HashSet[String]
activeShuffleKeys.asScala.foreach { shuffleKey =>
if (!statusSystem.registeredShuffle.contains(shuffleKey)) {
logWarning(s"Shuffle $shuffleKey expired on $host:$rpcPort:$pushPort:$fetchPort.")
logWarning(
s"Shuffle $shuffleKey expired on $host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
expiredShuffleKeys.add(shuffleKey)
}
}
@ -449,7 +462,7 @@ private[celeborn] class Master(
.find(_ == targetWorker)
.orNull
if (worker == null) {
logWarning(s"Unknown worker $host:$rpcPort:$pushPort:$fetchPort" +
logWarning(s"Unknown worker $host:$rpcPort:$pushPort:$fetchPort:$replicatePort" +
s" for WorkerLost handler!")
return
}

View File

@ -38,7 +38,7 @@ import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerPartitionLoc
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, SystemMiscSource}
import org.apache.celeborn.common.network.TransportContext
import org.apache.celeborn.common.protocol.{PartitionType, PbRegisterWorkerResponse, RpcNameConstants, TransportModuleConstants}
import org.apache.celeborn.common.protocol.{PartitionType, PbRegisterWorkerResponse, PbWorkerLostResponse, RpcNameConstants, TransportModuleConstants}
import org.apache.celeborn.common.protocol.message.ControlMessages._
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.rpc._
@ -538,9 +538,21 @@ private[celeborn] class Worker(
// add this worker to master's blacklist. When restart, register worker will
// make master remove this worker from blacklist.
try {
rssHARetryClient.askSync(
ReportWorkerUnavailable(List(workerInfo).asJava),
OneWayMessageResponse.getClass)
if (gracefulShutdown) {
rssHARetryClient.askSync(
ReportWorkerUnavailable(List(workerInfo).asJava),
OneWayMessageResponse.getClass)
} else {
rssHARetryClient.askSync[PbWorkerLostResponse](
WorkerLost(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
RssHARetryClient.genRequestId()),
classOf[PbWorkerLostResponse])
}
} catch {
case e: Throwable =>
logError(