diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 86f780316..694fae6dd 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -805,8 +805,8 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit val committedMasterStorageInfos = new ConcurrentHashMap[String, StorageInfo]() val committedSlaveStorageInfos = new ConcurrentHashMap[String, StorageInfo]() val committedMapIdBitmap = new ConcurrentHashMap[String, RoaringBitmap]() - val failedMasterIds = ConcurrentHashMap.newKeySet[String]() - val failedSlaveIds = ConcurrentHashMap.newKeySet[String]() + val failedMasterPartitionIds = new ConcurrentHashMap[String, WorkerInfo]() + val failedSlavePartitionIds = new ConcurrentHashMap[String, WorkerInfo]() val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId) val commitFilesFailedWorkers = ConcurrentHashMap.newKeySet[WorkerInfo]() @@ -864,8 +864,8 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit committedSlaveStorageInfos.putAll(res.committedSlaveStorageInfos) // record failed partitions - failedMasterIds.addAll(res.failedMasterIds) - failedSlaveIds.addAll(res.failedSlaveIds) + failedMasterPartitionIds.putAll(res.failedMasterIds.asScala.map((_, worker)).toMap.asJava) + failedSlavePartitionIds.putAll(res.failedSlaveIds.asScala.map((_, worker)).toMap.asJava) if (!res.committedMapIdBitMap.isEmpty) { committedMapIdBitmap.putAll(res.committedMapIdBitMap) @@ -888,16 +888,42 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit ReleaseSlots(applicationId, shuffleId, List.empty.asJava, List.empty.asJava)) def hasCommitFailedIds: Boolean = { - if (!ShouldReplicate && failedMasterIds.size() != 0) { - return true - } - failedMasterIds.asScala.foreach { id => - if (failedSlaveIds.contains(id)) { - logError(s"For $shuffleId partition $id: data lost.") - return true + val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId) + if (!ShouldReplicate && failedMasterPartitionIds.size() != 0) { + val msg = failedMasterPartitionIds.asScala.map { case (partitionId, workerInfo) => + s"Lost partition $partitionId in worker [${workerInfo.readableAddress()}]" + }.mkString("\n") + logError( + s""" + |For shuffle $shuffleKey partition data lost: + |$msg + |""".stripMargin) + true + } else { + val failedBothPartitionIdsToWorker = failedMasterPartitionIds.asScala.flatMap { + case (partitionId, worker) => + if (failedSlavePartitionIds.contains(partitionId)) { + Some(partitionId -> (worker, failedSlavePartitionIds.get(partitionId))) + } else { + None + } + } + if (failedBothPartitionIdsToWorker.nonEmpty) { + val msg = failedBothPartitionIdsToWorker.map { + case (partitionId, (masterWorker, slaveWorker)) => + s"Lost partition $partitionId " + + s"in master worker [${masterWorker.readableAddress()}] and slave worker [${slaveWorker}]" + }.mkString("\n") + logError( + s""" + |For shuffle $shuffleKey partition data lost: + |$msg + |""".stripMargin) + true + } else { + false } } - false } val dataLost = hasCommitFailedIds