diff --git a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala index baf000228..b0abfa139 100644 --- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala @@ -40,17 +40,27 @@ import org.apache.celeborn.common.util.FunctionConverter._ import org.apache.celeborn.common.util.ThreadUtils case class ShuffleCommittedInfo( + // partition id -> unique partition ids committedMasterIds: ConcurrentHashMap[Int, util.List[String]], + // partition id -> unique partition ids committedSlaveIds: ConcurrentHashMap[Int, util.List[String]], + // unique partition id -> worker info failedMasterPartitionIds: ConcurrentHashMap[String, WorkerInfo], + // unique partition id -> worker info failedSlavePartitionIds: ConcurrentHashMap[String, WorkerInfo], + // unique partition id -> storage info committedMasterStorageInfos: ConcurrentHashMap[String, StorageInfo], + // unique partition id -> storage info committedSlaveStorageInfos: ConcurrentHashMap[String, StorageInfo], + // unique partition id -> mapid bitmat committedMapIdBitmap: ConcurrentHashMap[String, RoaringBitmap], + // number of partition files currentShuffleFileCount: LongAdder, - unHandledPartitionLocations: util.Set[PartitionLocation], + unhandledPartitionLocations: util.Set[PartitionLocation], handledPartitionLocations: util.Set[PartitionLocation], + // for ReducePartition, number of in flight commit requests to worker allInFlightCommitRequestNum: AtomicInteger, + // for MapPartition, partition id -> number of in flight commit requests partitionInFlightCommitRequestNum: ConcurrentHashMap[Int, AtomicInteger]) object CommitManager { @@ -91,7 +101,7 @@ class CommitManager(appId: String, val conf: CelebornConf, lifecycleManager: Lif var workerToRequests: Map[WorkerInfo, collection.Set[PartitionLocation]] = null shuffleCommittedInfo.synchronized { workerToRequests = - commitHandler.batchUnHandledRequests(shuffleId, shuffleCommittedInfo) + commitHandler.batchUnhandledRequests(shuffleId, shuffleCommittedInfo) // when batch commit thread starts to commit these requests, we should increment inFlightNum, // then stage/partition end would be able to recognize all requests are over. commitHandler.incrementInFlightNum(shuffleCommittedInfo, workerToRequests) @@ -113,7 +123,7 @@ class CommitManager(appId: String, val conf: CelebornConf, lifecycleManager: Lif .find(_._1.equals(worker)) .get ._1 - val mastersIds = + val masterIds = requests .filter(_.getMode == PartitionLocation.Mode.MASTER) .map(_.getUniqueId) @@ -131,7 +141,7 @@ class CommitManager(appId: String, val conf: CelebornConf, lifecycleManager: Lif shuffleId, shuffleCommittedInfo, workerInfo, - mastersIds, + masterIds, slaveIds, commitFilesFailedWorkers) } @@ -214,7 +224,7 @@ class CommitManager(appId: String, val conf: CelebornConf, lifecycleManager: Lif if (batchHandleCommitPartitionEnabled && cause.isDefined && cause.get == StatusCode.HARD_SPLIT) { val shuffleCommittedInfo = committedPartitionInfo.get(shuffleId) shuffleCommittedInfo.synchronized { - shuffleCommittedInfo.unHandledPartitionLocations.add(partitionLocation) + shuffleCommittedInfo.unhandledPartitionLocations.add(partitionLocation) } } } diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index df08f06c0..a0a45f4ff 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -43,6 +43,7 @@ import org.apache.celeborn.common.util.{PbSerDeUtils, ThreadUtils, Utils} import org.apache.celeborn.common.util.FunctionConverter._ object LifecycleManager { + // shuffle id -> partition id -> partition locations type ShuffleFileGroups = ConcurrentHashMap[Int, ConcurrentHashMap[Integer, util.Set[PartitionLocation]]] type ShuffleAllocatedWorkers = @@ -331,7 +332,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin applicationId: String, shuffleId: Int, numMappers: Int, - numReducers: Int, + numPartitions: Int, partitionId: Int = -1): Unit = { val partitionType = getPartitionType(shuffleId) registeringShuffleRequest.synchronized { @@ -366,13 +367,34 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin } logInfo(s"New shuffle request, shuffleId $shuffleId, partitionType: $partitionType " + - s"numMappers: $numMappers, numReducers: $numReducers.") + s"numMappers: $numMappers, numReducers: $numPartitions.") val set = new util.HashSet[RegisterCallContext]() set.add(context) registeringShuffleRequest.put(shuffleId, set) } } + def processMapTaskReply( + applicationId: String, + shuffleId: Int, + context: RpcCallContext, + partitionId: Int, + partitionLocations: Array[PartitionLocation]): Unit = { + // if any partition location resource exist just reply + if (partitionLocations.size > 0) { + context.reply(RegisterShuffleResponse(StatusCode.SUCCESS, partitionLocations)) + } else { + // request new resource for this task + changePartitionManager.handleRequestPartitionLocation( + ApplyNewLocationCallContext(context), + applicationId, + shuffleId, + partitionId, + -1, + null) + } + } + // Reply to all RegisterShuffle request for current shuffle id. def reply(response: PbRegisterShuffleResponse): Unit = { registeringShuffleRequest.synchronized { @@ -381,7 +403,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin .foreach(_.asScala.foreach(context => { partitionType match { case PartitionType.MAP => - if (response.getStatus == 0) { + if (response.getStatus == StatusCode.SUCCESS.getValue) { val partitionLocations = response.getPartitionLocationsList.asScala.filter( _.getId == context.partitionId).map(r => @@ -405,11 +427,7 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin } // First, request to get allocated slots from Master - val ids = new util.ArrayList[Integer] - val numPartitions: Int = partitionType match { - case PartitionType.REDUCE => numReducers - case PartitionType.MAP => numMappers - } + val ids = new util.ArrayList[Integer](numPartitions) (0 until numPartitions).foreach(idx => ids.add(new Integer(idx))) val res = requestSlotsWithRetry(applicationId, shuffleId, ids) @@ -494,27 +512,6 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin } } - private def processMapTaskReply( - applicationId: String, - shuffleId: Int, - context: RpcCallContext, - partitionId: Int, - partitionLocations: Array[PartitionLocation]): Unit = { - // if any partition location resource exist just reply - if (partitionLocations.size > 0) { - context.reply(RegisterShuffleResponse(StatusCode.SUCCESS, partitionLocations)) - } else { - // request new resource for this task - changePartitionManager.handleRequestPartitionLocation( - ApplyNewLocationCallContext(context), - applicationId, - shuffleId, - partitionId, - -1, - null) - } - } - def blacklistPartition( shuffleId: Int, oldPartition: PartitionLocation, @@ -716,6 +713,43 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin logInfo(s"Unregister for $shuffleId success.") } + private def handleGetBlacklist(msg: GetBlacklist): Unit = { + val res = requestGetBlacklist(rssHARetryClient, msg) + if (res.statusCode == StatusCode.SUCCESS) { + logInfo(s"Received Blacklist from Master, blacklist: ${res.blacklist} " + + s"unknown workers: ${res.unknownWorkers}") + val current = System.currentTimeMillis() + val reserved = blacklist.asScala + .filter { case (_, entry) => + val (statusCode, registerTime) = entry + statusCode match { + case StatusCode.WORKER_SHUTDOWN | + StatusCode.NO_AVAILABLE_WORKING_DIR | + StatusCode.RESERVE_SLOTS_FAILED | + StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER | + StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE | + StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER | + StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE | + StatusCode.PUSH_DATA_TIMEOUT_MASTER | + StatusCode.PUSH_DATA_TIMEOUT_SLAVE + if current - registerTime < workerExcludedExpireTimeout => + true + case StatusCode.UNKNOWN_WORKER => true + case _ => false + } + }.asJava + val reservedBlackList = new ShuffleFailedWorkers() + reservedBlackList.putAll(reserved) + blacklist.clear() + blacklist.putAll( + res.blacklist.asScala.map(_ -> (StatusCode.WORKER_IN_BLACKLIST -> current)).toMap.asJava) + blacklist.putAll( + res.unknownWorkers.asScala.map(_ -> (StatusCode.UNKNOWN_WORKER -> current)).toMap.asJava) + // put reserved blacklist at last to cover blacklist's local status. + blacklist.putAll(reservedBlackList) + } + } + /* ========================================================== * | END OF EVENT HANDLER | * ========================================================== */ @@ -1083,43 +1117,6 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin } } - private def handleGetBlacklist(msg: GetBlacklist): Unit = { - val res = requestGetBlacklist(rssHARetryClient, msg) - if (res.statusCode == StatusCode.SUCCESS) { - logInfo(s"Received Blacklist from Master, blacklist: ${res.blacklist} " + - s"unknown workers: ${res.unknownWorkers}") - val current = System.currentTimeMillis() - val reserved = blacklist.asScala - .filter { case (_, entry) => - val (statusCode, registerTime) = entry - statusCode match { - case StatusCode.WORKER_SHUTDOWN | - StatusCode.NO_AVAILABLE_WORKING_DIR | - StatusCode.RESERVE_SLOTS_FAILED | - StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_MASTER | - StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_SLAVE | - StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_MASTER | - StatusCode.PUSH_DATA_CONNECTION_EXCEPTION_SLAVE | - StatusCode.PUSH_DATA_TIMEOUT_MASTER | - StatusCode.PUSH_DATA_TIMEOUT_SLAVE - if current - registerTime < workerExcludedExpireTimeout => - true - case StatusCode.UNKNOWN_WORKER => true - case _ => false - } - }.asJava - val reservedBlackList = new ShuffleFailedWorkers() - reservedBlackList.putAll(reserved) - blacklist.clear() - blacklist.putAll( - res.blacklist.asScala.map(_ -> (StatusCode.WORKER_IN_BLACKLIST -> current)).toMap.asJava) - blacklist.putAll( - res.unknownWorkers.asScala.map(_ -> (StatusCode.UNKNOWN_WORKER -> current)).toMap.asJava) - // put reserved blacklist at last to cover blacklist's local status. - blacklist.putAll(reservedBlackList) - } - } - private def requestSlotsWithRetry( applicationId: String, shuffleId: Int, diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala index f885de1ad..d413d0191 100644 --- a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala +++ b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala @@ -46,7 +46,6 @@ case class CommitResult( abstract class CommitHandler( appId: String, conf: CelebornConf, - allocatedWorkers: ShuffleAllocatedWorkers, committedPartitionInfo: CommittedPartitionInfo) extends Logging { private val pushReplicateEnabled = conf.pushReplicateEnabled @@ -65,7 +64,11 @@ abstract class CommitHandler( def isStageDataLost(shuffleId: Int): Boolean = false - def setStageEnd(shuffleId: Int): Unit + def setStageEnd(shuffleId: Int): Unit = { + throw new UnsupportedOperationException( + "Failed when do setStageEnd Operation, MapPartition shuffleType don't " + + "support set stage end") + } /** * return (waitStage isTimeOut, waitTime) @@ -74,20 +77,20 @@ abstract class CommitHandler( def isPartitionInProcess(shuffleId: Int, partitionId: Int): Boolean = false - def batchUnHandledRequests(shuffleId: Int, shuffleCommittedInfo: ShuffleCommittedInfo) + def batchUnhandledRequests(shuffleId: Int, shuffleCommittedInfo: ShuffleCommittedInfo) : Map[WorkerInfo, collection.Set[PartitionLocation]] = { // When running to here, if handleStageEnd got lock first and commitFiles, // then this batch get this lock, commitPartitionRequests may contains // partitions which are already committed by stageEnd process. // But inProcessStageEndShuffleSet should have contain this shuffle id, // can directly return empty. - if (this.isStageEndOrInProcess(shuffleId)) { + if (isStageEndOrInProcess(shuffleId)) { logWarning(s"Shuffle $shuffleId ended or during processing stage end.") - shuffleCommittedInfo.unHandledPartitionLocations.clear() + shuffleCommittedInfo.unhandledPartitionLocations.clear() Map.empty[WorkerInfo, Set[PartitionLocation]] } else { - val currentBatch = this.getUnHandledPartitionLocations(shuffleId, shuffleCommittedInfo) - shuffleCommittedInfo.unHandledPartitionLocations.clear() + val currentBatch = getUnhandledPartitionLocations(shuffleId, shuffleCommittedInfo) + shuffleCommittedInfo.unhandledPartitionLocations.clear() currentBatch.foreach { partitionLocation => shuffleCommittedInfo.handledPartitionLocations.add(partitionLocation) if (partitionLocation.getPeer != null) { @@ -112,7 +115,7 @@ abstract class CommitHandler( } } - protected def getUnHandledPartitionLocations( + protected def getUnhandledPartitionLocations( shuffleId: Int, shuffleCommittedInfo: ShuffleCommittedInfo): mutable.Set[PartitionLocation] @@ -136,7 +139,11 @@ abstract class CommitHandler( */ def tryFinalCommit( shuffleId: Int, - recordWorkerFailure: ShuffleFailedWorkers => Unit): Boolean + recordWorkerFailure: ShuffleFailedWorkers => Unit): Boolean = { + throw new UnsupportedOperationException( + "Failed when do final Commit Operation, MapPartition shuffleType only " + + "support final partition Commit") + } /** * Only Reduce partition mode supports cache all file groups for reducer. Map partition doesn't guarantee that all @@ -349,10 +356,11 @@ abstract class CommitHandler( slavePartMap: ConcurrentHashMap[String, PartitionLocation]): Unit = { val committedPartitions = new util.HashMap[String, PartitionLocation] masterPartitionUniqueIds.asScala.foreach { id => - masterPartMap.get(id).setStorageInfo( + val partitionLocation = masterPartMap.get(id) + partitionLocation.setStorageInfo( shuffleCommittedInfo.committedMasterStorageInfos.get(id)) - masterPartMap.get(id).setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id)) - committedPartitions.put(id, masterPartMap.get(id)) + partitionLocation.setMapIdBitMap(shuffleCommittedInfo.committedMapIdBitmap.get(id)) + committedPartitions.put(id, partitionLocation) } slavePartitionUniqueIds.asScala.foreach { id => @@ -411,12 +419,12 @@ abstract class CommitHandler( def checkDataLost( shuffleId: Int, - masterPartitionUniqueIdMap: util.Map[String, WorkerInfo], - slavePartitionUniqueIdMap: util.Map[String, WorkerInfo]): Boolean = { + failedMasters: util.Map[String, WorkerInfo], + failedSlaves: util.Map[String, WorkerInfo]): Boolean = { val shuffleKey = Utils.makeShuffleKey(appId, shuffleId) - if (!pushReplicateEnabled && masterPartitionUniqueIdMap.size() != 0) { + if (!pushReplicateEnabled && failedMasters.size() != 0) { val msg = - masterPartitionUniqueIdMap.asScala.map { + failedMasters.asScala.map { case (partitionUniqueId, workerInfo) => s"Lost partition $partitionUniqueId in worker [${workerInfo.readableAddress()}]" }.mkString("\n") @@ -427,10 +435,10 @@ abstract class CommitHandler( |""".stripMargin) true } else { - val failedBothPartitionIdsToWorker = masterPartitionUniqueIdMap.asScala.flatMap { + val failedBothPartitionIdsToWorker = failedMasters.asScala.flatMap { case (partitionUniqueId, worker) => - if (slavePartitionUniqueIdMap.asScala.contains(partitionUniqueId)) { - Some(partitionUniqueId -> (worker, slavePartitionUniqueIdMap.get(partitionUniqueId))) + if (failedSlaves.asScala.contains(partitionUniqueId)) { + Some(partitionUniqueId -> (worker, failedSlaves.get(partitionUniqueId))) } else { None } diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala index 090c7a7f1..297bc5877 100644 --- a/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala +++ b/client/src/main/scala/org/apache/celeborn/client/commit/MapPartitionCommitHandler.scala @@ -49,9 +49,9 @@ import org.apache.celeborn.common.util.Utils class MapPartitionCommitHandler( appId: String, conf: CelebornConf, - allocatedWorkers: ShuffleAllocatedWorkers, + shuffleAllocatedWorkers: ShuffleAllocatedWorkers, committedPartitionInfo: CommittedPartitionInfo) - extends CommitHandler(appId, conf, allocatedWorkers, committedPartitionInfo) + extends CommitHandler(appId, conf, committedPartitionInfo) with Logging { private val shuffleSucceedPartitionIds = new ConcurrentHashMap[Int, util.Set[Integer]]() @@ -63,31 +63,17 @@ class MapPartitionCommitHandler( PartitionType.MAP } - override def setStageEnd(shuffleId: Int): Unit = { - throw new UnsupportedOperationException( - "Failed when do setStageEnd Operation, MapPartition shuffleType don't " + - "support set stage end") - } - override def isPartitionInProcess(shuffleId: Int, partitionId: Int): Boolean = { inProcessMapPartitionEndIds.containsKey(shuffleId) && inProcessMapPartitionEndIds.get( shuffleId).contains(partitionId) } - override def tryFinalCommit( - shuffleId: Int, - recordWorkerFailure: ShuffleFailedWorkers => Unit): Boolean = { - throw new UnsupportedOperationException( - "Failed when do final Commit Operation, MapPartition shuffleType only " + - "support final partition Commit") - } - - override def getUnHandledPartitionLocations( + override def getUnhandledPartitionLocations( shuffleId: Int, shuffleCommittedInfo: ShuffleCommittedInfo): mutable.Set[PartitionLocation] = { - shuffleCommittedInfo.unHandledPartitionLocations.asScala.filterNot { partitionLocation => + shuffleCommittedInfo.unhandledPartitionLocations.asScala.filterNot { partitionLocation => shuffleCommittedInfo.handledPartitionLocations.contains(partitionLocation) && - this.isPartitionInProcess(shuffleId, partitionLocation.getId) + isPartitionInProcess(shuffleId, partitionLocation.getId) } } @@ -98,7 +84,7 @@ class MapPartitionCommitHandler( case (_, partitions) => partitions.groupBy(_.getId).foreach { case (id, _) => val atomicInteger = shuffleCommittedInfo.partitionInFlightCommitRequestNum - .computeIfAbsent(id, (k: Int) => new AtomicInteger(0)) + .computeIfAbsent(id, (_: Int) => new AtomicInteger(0)) atomicInteger.incrementAndGet() } } @@ -197,7 +183,7 @@ class MapPartitionCommitHandler( (k: Int) => ConcurrentHashMap.newKeySet[Integer]()) inProcessingPartitionIds.add(partitionId) - val partitionAllocatedWorkers = allocatedWorkers.get(shuffleId).asScala.filter(p => + val partitionAllocatedWorkers = shuffleAllocatedWorkers.get(shuffleId).asScala.filter(p => p._2.containsPartition(partitionId)).asJava var dataCommitSuccess = true diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala index 12475a4cd..97929a832 100644 --- a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala +++ b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala @@ -48,9 +48,9 @@ import org.apache.celeborn.common.rpc.netty.{LocalNettyRpcCallContext, RemoteNet class ReducePartitionCommitHandler( appId: String, conf: CelebornConf, - allocatedWorkers: ShuffleAllocatedWorkers, + shuffleAllocatedWorkers: ShuffleAllocatedWorkers, committedPartitionInfo: CommittedPartitionInfo) - extends CommitHandler(appId, conf, allocatedWorkers, committedPartitionInfo) + extends CommitHandler(appId, conf, committedPartitionInfo) with Logging { private val dataLostShuffleSet = ConcurrentHashMap.newKeySet[Int]() @@ -106,7 +106,7 @@ class ReducePartitionCommitHandler( override def tryFinalCommit( shuffleId: Int, recordWorkerFailure: ShuffleFailedWorkers => Unit): Boolean = { - if (this.isStageEnd(shuffleId)) { + if (isStageEnd(shuffleId)) { logInfo(s"[handleStageEnd] Shuffle $shuffleId already ended!") return false } else { @@ -121,8 +121,8 @@ class ReducePartitionCommitHandler( } // ask allLocations workers holding partitions to commit files - val shuffleAllocatedWorkers = allocatedWorkers.get(shuffleId) - val (dataLost, commitFailedWorkers) = handleFinalCommitFiles(shuffleId, shuffleAllocatedWorkers) + val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId) + val (dataLost, commitFailedWorkers) = handleFinalCommitFiles(shuffleId, allocatedWorkers) recordWorkerFailure(commitFailedWorkers) // reply if (!dataLost) { @@ -171,10 +171,10 @@ class ReducePartitionCommitHandler( (dataLost, parallelCommitResult.commitFilesFailedWorkers) } - override def getUnHandledPartitionLocations( + override def getUnhandledPartitionLocations( shuffleId: Int, shuffleCommittedInfo: ShuffleCommittedInfo): mutable.Set[PartitionLocation] = { - shuffleCommittedInfo.unHandledPartitionLocations.asScala.filterNot { partitionLocation => + shuffleCommittedInfo.unhandledPartitionLocations.asScala.filterNot { partitionLocation => shuffleCommittedInfo.handledPartitionLocations.contains(partitionLocation) } } diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala index 21427d4e7..c888d7883 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/ShufflePartitionLocationInfo.scala @@ -72,7 +72,7 @@ class ShufflePartitionLocationInfo { private def addPartitions( partitionInfo: PartitionInfo, - locations: util.List[PartitionLocation]): Unit = { + locations: util.List[PartitionLocation]): Unit = synchronized { if (locations != null && locations.size() > 0) { locations.asScala.foreach { loc => partitionInfo.putIfAbsent(loc.getId, new util.ArrayList)