[ISSUE-768][REFACTOR] Shuffle data lost should show more clear about lost data in which worker (#769)
This commit is contained in:
parent
373b4a744a
commit
3bad403c8b
@ -805,8 +805,8 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
|
||||
val committedMasterStorageInfos = new ConcurrentHashMap[String, StorageInfo]()
|
||||
val committedSlaveStorageInfos = new ConcurrentHashMap[String, StorageInfo]()
|
||||
val committedMapIdBitmap = new ConcurrentHashMap[String, RoaringBitmap]()
|
||||
val failedMasterIds = ConcurrentHashMap.newKeySet[String]()
|
||||
val failedSlaveIds = ConcurrentHashMap.newKeySet[String]()
|
||||
val failedMasterPartitionIds = new ConcurrentHashMap[String, WorkerInfo]()
|
||||
val failedSlavePartitionIds = new ConcurrentHashMap[String, WorkerInfo]()
|
||||
|
||||
val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId)
|
||||
val commitFilesFailedWorkers = ConcurrentHashMap.newKeySet[WorkerInfo]()
|
||||
@ -864,8 +864,8 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
|
||||
committedSlaveStorageInfos.putAll(res.committedSlaveStorageInfos)
|
||||
|
||||
// record failed partitions
|
||||
failedMasterIds.addAll(res.failedMasterIds)
|
||||
failedSlaveIds.addAll(res.failedSlaveIds)
|
||||
failedMasterPartitionIds.putAll(res.failedMasterIds.asScala.map((_, worker)).toMap.asJava)
|
||||
failedSlavePartitionIds.putAll(res.failedSlaveIds.asScala.map((_, worker)).toMap.asJava)
|
||||
|
||||
if (!res.committedMapIdBitMap.isEmpty) {
|
||||
committedMapIdBitmap.putAll(res.committedMapIdBitMap)
|
||||
@ -888,16 +888,42 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
|
||||
ReleaseSlots(applicationId, shuffleId, List.empty.asJava, List.empty.asJava))
|
||||
|
||||
def hasCommitFailedIds: Boolean = {
|
||||
if (!ShouldReplicate && failedMasterIds.size() != 0) {
|
||||
return true
|
||||
}
|
||||
failedMasterIds.asScala.foreach { id =>
|
||||
if (failedSlaveIds.contains(id)) {
|
||||
logError(s"For $shuffleId partition $id: data lost.")
|
||||
return true
|
||||
val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId)
|
||||
if (!ShouldReplicate && failedMasterPartitionIds.size() != 0) {
|
||||
val msg = failedMasterPartitionIds.asScala.map { case (partitionId, workerInfo) =>
|
||||
s"Lost partition $partitionId in worker [${workerInfo.readableAddress()}]"
|
||||
}.mkString("\n")
|
||||
logError(
|
||||
s"""
|
||||
|For shuffle $shuffleKey partition data lost:
|
||||
|$msg
|
||||
|""".stripMargin)
|
||||
true
|
||||
} else {
|
||||
val failedBothPartitionIdsToWorker = failedMasterPartitionIds.asScala.flatMap {
|
||||
case (partitionId, worker) =>
|
||||
if (failedSlavePartitionIds.contains(partitionId)) {
|
||||
Some(partitionId -> (worker, failedSlavePartitionIds.get(partitionId)))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
if (failedBothPartitionIdsToWorker.nonEmpty) {
|
||||
val msg = failedBothPartitionIdsToWorker.map {
|
||||
case (partitionId, (masterWorker, slaveWorker)) =>
|
||||
s"Lost partition $partitionId " +
|
||||
s"in master worker [${masterWorker.readableAddress()}] and slave worker [${slaveWorker}]"
|
||||
}.mkString("\n")
|
||||
logError(
|
||||
s"""
|
||||
|For shuffle $shuffleKey partition data lost:
|
||||
|$msg
|
||||
|""".stripMargin)
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
val dataLost = hasCommitFailedIds
|
||||
|
||||
Loading…
Reference in New Issue
Block a user