[CELEBORN-443] Code refine for client and common (#1362)

This commit is contained in:
Keyong Zhou 2023-03-20 10:37:43 +08:00 committed by GitHub
parent 0b78c6d325
commit 9401db2bc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 120 additions and 119 deletions

View File

@ -40,17 +40,27 @@ import org.apache.celeborn.common.util.FunctionConverter._
import org.apache.celeborn.common.util.ThreadUtils
case class ShuffleCommittedInfo(
// partition id -> unique partition ids
committedMasterIds: ConcurrentHashMap[Int, util.List[String]],
// partition id -> unique partition ids
committedSlaveIds: ConcurrentHashMap[Int, util.List[String]],
// unique partition id -> worker info
failedMasterPartitionIds: ConcurrentHashMap[String, WorkerInfo],
// unique partition id -> worker info
failedSlavePartitionIds: ConcurrentHashMap[String, WorkerInfo],
// unique partition id -> storage info
committedMasterStorageInfos: ConcurrentHashMap[String, StorageInfo],
// unique partition id -> storage info
committedSlaveStorageInfos: ConcurrentHashMap[String, StorageInfo],
// unique partition id -> mapid bitmat
committedMapIdBitmap: ConcurrentHashMap[String, RoaringBitmap],
// number of partition files
currentShuffleFileCount: LongAdder,
unHandledPartitionLocations: util.Set[PartitionLocation],
unhandledPartitionLocations: util.Set[PartitionLocation],
handledPartitionLocations: util.Set[PartitionLocation],
// for ReducePartition, number of in flight commit requests to worker
allInFlightCommitRequestNum: AtomicInteger,
// for MapPartition, partition id -> number of in flight commit requests
partitionInFlightCommitRequestNum: ConcurrentHashMap[Int, AtomicInteger])
object CommitManager {
@ -91,7 +101,7 @@ class CommitManager(appId: String, val conf: CelebornConf, lifecycleManager: Lif
var workerToRequests: Map[WorkerInfo, collection.Set[PartitionLocation]] = null
shuffleCommittedInfo.synchronized {
workerToRequests =
commitHandler.batchUnHandledRequests(shuffleId, shuffleCommittedInfo)
commitHandler.batchUnhandledRequests(shuffleId, shuffleCommittedInfo)
// when batch commit thread starts to commit these requests, we should increment inFlightNum,
// then stage/partition end would be able to recognize all requests are over.
commitHandler.incrementInFlightNum(shuffleCommittedInfo, workerToRequests)
@ -113,7 +123,7 @@ class CommitManager(appId: String, val conf: CelebornConf, lifecycleManager: Lif
.find(_._1.equals(worker))
.get
._1
val mastersIds =
val masterIds =
requests
.filter(_.getMode == PartitionLocation.Mode.MASTER)
.map(_.getUniqueId)
@ -131,7 +141,7 @@ class CommitManager(appId: String, val conf: CelebornConf, lifecycleManager: Lif
shuffleId,
shuffleCommittedInfo,
workerInfo,
mastersIds,
masterIds,
slaveIds,
commitFilesFailedWorkers)
}
@ -214,7 +224,7 @@ class CommitManager(appId: String, val conf: CelebornConf, lifecycleManager: Lif
if (batchHandleCommitPartitionEnabled && cause.isDefined && cause.get == StatusCode.HARD_SPLIT) {
val shuffleCommittedInfo = committedPartitionInfo.get(shuffleId)
shuffleCommittedInfo.synchronized {
shuffleCommittedInfo.unHandledPartitionLocations.add(partitionLocation)
shuffleCommittedInfo.unhandledPartitionLocations.add(partitionLocation)
}
}
}

View File

@ -43,6 +43,7 @@ import org.apache.celeborn.common.util.{PbSerDeUtils, ThreadUtils, Utils}
import org.apache.celeborn.common.util.FunctionConverter._
object LifecycleManager {
// shuffle id -> partition id -> partition locations
type ShuffleFileGroups =
ConcurrentHashMap[Int, ConcurrentHashMap[Integer, util.Set[PartitionLocation]]]
type ShuffleAllocatedWorkers =
@ -331,7 +332,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
applicationId: String,
shuffleId: Int,
numMappers: Int,
numReducers: Int,
numPartitions: Int,
partitionId: Int = -1): Unit = {
val partitionType = getPartitionType(shuffleId)
registeringShuffleRequest.synchronized {
@ -366,13 +367,34 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
}
logInfo(s"New shuffle request, shuffleId $shuffleId, partitionType: $partitionType " +
s"numMappers: $numMappers, numReducers: $numReducers.")
s"numMappers: $numMappers, numReducers: $numPartitions.")
val set = new util.HashSet[RegisterCallContext]()
set.add(context)
registeringShuffleRequest.put(shuffleId, set)
}
}
def processMapTaskReply(
applicationId: String,
shuffleId: Int,
context: RpcCallContext,
partitionId: Int,
partitionLocations: Array[PartitionLocation]): Unit = {
// if any partition location resource exist just reply
if (partitionLocations.size > 0) {
context.reply(RegisterShuffleResponse(StatusCode.SUCCESS, partitionLocations))
} else {
// request new resource for this task
changePartitionManager.handleRequestPartitionLocation(
ApplyNewLocationCallContext(context),
applicationId,
shuffleId,
partitionId,
-1,
null)
}
}
// Reply to all RegisterShuffle request for current shuffle id.
def reply(response: PbRegisterShuffleResponse): Unit = {
registeringShuffleRequest.synchronized {
@ -381,7 +403,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
.foreach(_.asScala.foreach(context => {
partitionType match {
case PartitionType.MAP =>
if (response.getStatus == 0) {
if (response.getStatus == StatusCode.SUCCESS.getValue) {
val partitionLocations =
response.getPartitionLocationsList.asScala.filter(
_.getId == context.partitionId).map(r =>
@ -405,11 +427,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
}
// First, request to get allocated slots from Master
val ids = new util.ArrayList[Integer]
val numPartitions: Int = partitionType match {
case PartitionType.REDUCE => numReducers
case PartitionType.MAP => numMappers
}
val ids = new util.ArrayList[Integer](numPartitions)
(0 until numPartitions).foreach(idx => ids.add(new Integer(idx)))
val res = requestSlotsWithRetry(applicationId, shuffleId, ids)
@ -494,27 +512,6 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
}
}
private def processMapTaskReply(
applicationId: String,
shuffleId: Int,
context: RpcCallContext,
partitionId: Int,
partitionLocations: Array[PartitionLocation]): Unit = {
// if any partition location resource exist just reply
if (partitionLocations.size > 0) {
context.reply(RegisterShuffleResponse(StatusCode.SUCCESS, partitionLocations))
} else {
// request new resource for this task
changePartitionManager.handleRequestPartitionLocation(
ApplyNewLocationCallContext(context),
applicationId,
shuffleId,
partitionId,
-1,
null)
}
}
def blacklistPartition(
shuffleId: Int,
oldPartition: PartitionLocation,
@ -716,6 +713,43 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
logInfo(s"Unregister for $shuffleId success.")
}
private def handleGetBlacklist(msg: GetBlacklist): Unit = {
val res = requestGetBlacklist(rssHARetryClient, msg)
if (res.statusCode == StatusCode.SUCCESS) {
logInfo(s"Received Blacklist from Master, blacklist: ${res.blacklist} " +
s"unknown workers: ${res.unknownWorkers}")
val current = System.currentTimeMillis()
val reserved = blacklist.asScala
.filter { case (_, entry) =>
val (statusCode, registerTime) = entry
statusCode match {
case StatusCode.WORKER_SHUTDOWN |
StatusCode.NO_AVAILABLE_WORKING_DIR |
StatusCode.RESERVE_SLOTS_FAILED |
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER |
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE |
StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER |
StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE |
StatusCode.PUSH_DATA_TIMEOUT_MASTER |
StatusCode.PUSH_DATA_TIMEOUT_SLAVE
if current - registerTime < workerExcludedExpireTimeout =>
true
case StatusCode.UNKNOWN_WORKER => true
case _ => false
}
}.asJava
val reservedBlackList = new ShuffleFailedWorkers()
reservedBlackList.putAll(reserved)
blacklist.clear()
blacklist.putAll(
res.blacklist.asScala.map(_ -> (StatusCode.WORKER_IN_BLACKLIST -> current)).toMap.asJava)
blacklist.putAll(
res.unknownWorkers.asScala.map(_ -> (StatusCode.UNKNOWN_WORKER -> current)).toMap.asJava)
// put reserved blacklist at last to cover blacklist's local status.
blacklist.putAll(reservedBlackList)
}
}
/* ========================================================== *
| END OF EVENT HANDLER |
* ========================================================== */
@ -1083,43 +1117,6 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin
}
}
private def handleGetBlacklist(msg: GetBlacklist): Unit = {
val res = requestGetBlacklist(rssHARetryClient, msg)
if (res.statusCode == StatusCode.SUCCESS) {
logInfo(s"Received Blacklist from Master, blacklist: ${res.blacklist} " +
s"unknown workers: ${res.unknownWorkers}")
val current = System.currentTimeMillis()
val reserved = blacklist.asScala
.filter { case (_, entry) =>
val (statusCode, registerTime) = entry
statusCode match {
case StatusCode.WORKER_SHUTDOWN |
StatusCode.NO_AVAILABLE_WORKING_DIR |
StatusCode.RESERVE_SLOTS_FAILED |
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER |
StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE |
StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER |
StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE |
StatusCode.PUSH_DATA_TIMEOUT_MASTER |
StatusCode.PUSH_DATA_TIMEOUT_SLAVE
if current - registerTime < workerExcludedExpireTimeout =>
true
case StatusCode.UNKNOWN_WORKER => true
case _ => false
}
}.asJava
val reservedBlackList = new ShuffleFailedWorkers()
reservedBlackList.putAll(reserved)
blacklist.clear()
blacklist.putAll(
res.blacklist.asScala.map(_ -> (StatusCode.WORKER_IN_BLACKLIST -> current)).toMap.asJava)
blacklist.putAll(
res.unknownWorkers.asScala.map(_ -> (StatusCode.UNKNOWN_WORKER -> current)).toMap.asJava)
// put reserved blacklist at last to cover blacklist's local status.
blacklist.putAll(reservedBlackList)
}
}
private def requestSlotsWithRetry(
applicationId: String,
shuffleId: Int,

View File

@ -46,7 +46,6 @@ case class CommitResult(
abstract class CommitHandler(
appId: String,
conf: CelebornConf,
allocatedWorkers: ShuffleAllocatedWorkers,
committedPartitionInfo: CommittedPartitionInfo) extends Logging {
private val pushReplicateEnabled = conf.pushReplicateEnabled
@ -65,7 +64,11 @@ abstract class CommitHandler(
def isStageDataLost(shuffleId: Int): Boolean = false
def setStageEnd(shuffleId: Int): Unit
def setStageEnd(shuffleId: Int): Unit = {
throw new UnsupportedOperationException(
"Failed when do setStageEnd Operation, MapPartition shuffleType don't " +
"support set stage end")
}
/**
* return (waitStage isTimeOut, waitTime)
@ -74,20 +77,20 @@ abstract class CommitHandler(
def isPartitionInProcess(shuffleId: Int, partitionId: Int): Boolean = false
def batchUnHandledRequests(shuffleId: Int, shuffleCommittedInfo: ShuffleCommittedInfo)
def batchUnhandledRequests(shuffleId: Int, shuffleCommittedInfo: ShuffleCommittedInfo)
: Map[WorkerInfo, collection.Set[PartitionLocation]] = {
// When running to here, if handleStageEnd got lock first and commitFiles,
// then this batch get this lock, commitPartitionRequests may contains
// partitions which are already committed by stageEnd process.
// But inProcessStageEndShuffleSet should have contain this shuffle id,
// can directly return empty.
if (this.isStageEndOrInProcess(shuffleId)) {
if (isStageEndOrInProcess(shuffleId)) {
logWarning(s"Shuffle $shuffleId ended or during processing stage end.")
shuffleCommittedInfo.unHandledPartitionLocations.clear()
shuffleCommittedInfo.unhandledPartitionLocations.clear()
Map.empty[WorkerInfo, Set[PartitionLocation]]
} else {
val currentBatch = this.getUnHandledPartitionLocations(shuffleId, shuffleCommittedInfo)
shuffleCommittedInfo.unHandledPartitionLocations.clear()
val currentBatch = getUnhandledPartitionLocations(shuffleId, shuffleCommittedInfo)
shuffleCommittedInfo.unhandledPartitionLocations.clear()
currentBatch.foreach { partitionLocation =>
shuffleCommittedInfo.handledPartitionLocations.add(partitionLocation)
if (partitionLocation.getPeer != null) {
@ -112,7 +115,7 @@ abstract class CommitHandler(
}
}
protected def getUnHandledPartitionLocations(
protected def getUnhandledPartitionLocations(
shuffleId: Int,
shuffleCommittedInfo: ShuffleCommittedInfo): mutable.Set[PartitionLocation]
@ -136,7 +139,11 @@ abstract class CommitHandler(
*/
def tryFinalCommit(
shuffleId: Int,
recordWorkerFailure: ShuffleFailedWorkers => Unit): Boolean
recordWorkerFailure: ShuffleFailedWorkers => Unit): Boolean = {
throw new UnsupportedOperationException(
"Failed when do final Commit Operation, MapPartition shuffleType only " +
"support final partition Commit")
}
/**
* Only Reduce partition mode supports cache all file groups for reducer. Map partition doesn't guarantee that all
@ -349,10 +356,11 @@ abstract class CommitHandler(
slavePartMap: ConcurrentHashMap[String, PartitionLocation]): Unit = {
val committedPartitions = new util.HashMap[String, PartitionLocation]
masterPartitionUniqueIds.asScala.foreach { id =>
masterPartMap.get(id).setStorageInfo(
val partitionLocation = masterPartMap.get(id)
partitionLocation.setStorageInfo(
shuffleCommittedInfo.committedMasterStorageInfos.get(id))
masterPartMap.get(id).setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
committedPartitions.put(id, masterPartMap.get(id))
partitionLocation.setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id))
committedPartitions.put(id, partitionLocation)
}
slavePartitionUniqueIds.asScala.foreach { id =>
@ -411,12 +419,12 @@ abstract class CommitHandler(
def checkDataLost(
shuffleId: Int,
masterPartitionUniqueIdMap: util.Map[String, WorkerInfo],
slavePartitionUniqueIdMap: util.Map[String, WorkerInfo]): Boolean = {
failedMasters: util.Map[String, WorkerInfo],
failedSlaves: util.Map[String, WorkerInfo]): Boolean = {
val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
if (!pushReplicateEnabled && masterPartitionUniqueIdMap.size() != 0) {
if (!pushReplicateEnabled && failedMasters.size() != 0) {
val msg =
masterPartitionUniqueIdMap.asScala.map {
failedMasters.asScala.map {
case (partitionUniqueId, workerInfo) =>
s"Lost partition $partitionUniqueId in worker [${workerInfo.readableAddress()}]"
}.mkString("\n")
@ -427,10 +435,10 @@ abstract class CommitHandler(
|""".stripMargin)
true
} else {
val failedBothPartitionIdsToWorker = masterPartitionUniqueIdMap.asScala.flatMap {
val failedBothPartitionIdsToWorker = failedMasters.asScala.flatMap {
case (partitionUniqueId, worker) =>
if (slavePartitionUniqueIdMap.asScala.contains(partitionUniqueId)) {
Some(partitionUniqueId -> (worker, slavePartitionUniqueIdMap.get(partitionUniqueId)))
if (failedSlaves.asScala.contains(partitionUniqueId)) {
Some(partitionUniqueId -> (worker, failedSlaves.get(partitionUniqueId)))
} else {
None
}

View File

@ -49,9 +49,9 @@ import org.apache.celeborn.common.util.Utils
class MapPartitionCommitHandler(
appId: String,
conf: CelebornConf,
allocatedWorkers: ShuffleAllocatedWorkers,
shuffleAllocatedWorkers: ShuffleAllocatedWorkers,
committedPartitionInfo: CommittedPartitionInfo)
extends CommitHandler(appId, conf, allocatedWorkers, committedPartitionInfo)
extends CommitHandler(appId, conf, committedPartitionInfo)
with Logging {
private val shuffleSucceedPartitionIds = new ConcurrentHashMap[Int, util.Set[Integer]]()
@ -63,31 +63,17 @@ class MapPartitionCommitHandler(
PartitionType.MAP
}
override def setStageEnd(shuffleId: Int): Unit = {
throw new UnsupportedOperationException(
"Failed when do setStageEnd Operation, MapPartition shuffleType don't " +
"support set stage end")
}
override def isPartitionInProcess(shuffleId: Int, partitionId: Int): Boolean = {
inProcessMapPartitionEndIds.containsKey(shuffleId) && inProcessMapPartitionEndIds.get(
shuffleId).contains(partitionId)
}
override def tryFinalCommit(
shuffleId: Int,
recordWorkerFailure: ShuffleFailedWorkers => Unit): Boolean = {
throw new UnsupportedOperationException(
"Failed when do final Commit Operation, MapPartition shuffleType only " +
"support final partition Commit")
}
override def getUnHandledPartitionLocations(
override def getUnhandledPartitionLocations(
shuffleId: Int,
shuffleCommittedInfo: ShuffleCommittedInfo): mutable.Set[PartitionLocation] = {
shuffleCommittedInfo.unHandledPartitionLocations.asScala.filterNot { partitionLocation =>
shuffleCommittedInfo.unhandledPartitionLocations.asScala.filterNot { partitionLocation =>
shuffleCommittedInfo.handledPartitionLocations.contains(partitionLocation) &&
this.isPartitionInProcess(shuffleId, partitionLocation.getId)
isPartitionInProcess(shuffleId, partitionLocation.getId)
}
}
@ -98,7 +84,7 @@ class MapPartitionCommitHandler(
case (_, partitions) =>
partitions.groupBy(_.getId).foreach { case (id, _) =>
val atomicInteger = shuffleCommittedInfo.partitionInFlightCommitRequestNum
.computeIfAbsent(id, (k: Int) => new AtomicInteger(0))
.computeIfAbsent(id, (_: Int) => new AtomicInteger(0))
atomicInteger.incrementAndGet()
}
}
@ -197,7 +183,7 @@ class MapPartitionCommitHandler(
(k: Int) => ConcurrentHashMap.newKeySet[Integer]())
inProcessingPartitionIds.add(partitionId)
val partitionAllocatedWorkers = allocatedWorkers.get(shuffleId).asScala.filter(p =>
val partitionAllocatedWorkers = shuffleAllocatedWorkers.get(shuffleId).asScala.filter(p =>
p._2.containsPartition(partitionId)).asJava
var dataCommitSuccess = true

View File

@ -48,9 +48,9 @@ import org.apache.celeborn.common.rpc.netty.{LocalNettyRpcCallContext, RemoteNet
class ReducePartitionCommitHandler(
appId: String,
conf: CelebornConf,
allocatedWorkers: ShuffleAllocatedWorkers,
shuffleAllocatedWorkers: ShuffleAllocatedWorkers,
committedPartitionInfo: CommittedPartitionInfo)
extends CommitHandler(appId, conf, allocatedWorkers, committedPartitionInfo)
extends CommitHandler(appId, conf, committedPartitionInfo)
with Logging {
private val dataLostShuffleSet = ConcurrentHashMap.newKeySet[Int]()
@ -106,7 +106,7 @@ class ReducePartitionCommitHandler(
override def tryFinalCommit(
shuffleId: Int,
recordWorkerFailure: ShuffleFailedWorkers => Unit): Boolean = {
if (this.isStageEnd(shuffleId)) {
if (isStageEnd(shuffleId)) {
logInfo(s"[handleStageEnd] Shuffle $shuffleId already ended!")
return false
} else {
@ -121,8 +121,8 @@ class ReducePartitionCommitHandler(
}
// ask allLocations workers holding partitions to commit files
val shuffleAllocatedWorkers = allocatedWorkers.get(shuffleId)
val (dataLost, commitFailedWorkers) = handleFinalCommitFiles(shuffleId, shuffleAllocatedWorkers)
val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId)
val (dataLost, commitFailedWorkers) = handleFinalCommitFiles(shuffleId, allocatedWorkers)
recordWorkerFailure(commitFailedWorkers)
// reply
if (!dataLost) {
@ -171,10 +171,10 @@ class ReducePartitionCommitHandler(
(dataLost, parallelCommitResult.commitFilesFailedWorkers)
}
override def getUnHandledPartitionLocations(
override def getUnhandledPartitionLocations(
shuffleId: Int,
shuffleCommittedInfo: ShuffleCommittedInfo): mutable.Set[PartitionLocation] = {
shuffleCommittedInfo.unHandledPartitionLocations.asScala.filterNot { partitionLocation =>
shuffleCommittedInfo.unhandledPartitionLocations.asScala.filterNot { partitionLocation =>
shuffleCommittedInfo.handledPartitionLocations.contains(partitionLocation)
}
}

View File

@ -72,7 +72,7 @@ class ShufflePartitionLocationInfo {
private def addPartitions(
partitionInfo: PartitionInfo,
locations: util.List[PartitionLocation]): Unit = {
locations: util.List[PartitionLocation]): Unit = synchronized {
if (locations != null && locations.size() > 0) {
locations.asScala.foreach { loc =>
partitionInfo.putIfAbsent(loc.getId, new util.ArrayList)