### What changes were proposed in this pull request?
Replacing HashSet of PartitionLocations with concurrent version of it.
### Why are the changes needed?
We are seeing some race conditions between `handleGetReducerFileGroup`& `tryFinalCommit`, where reducers complete without processing partition, even though there's data.
### Problematic logs
On the driver side:
```
25/01/31 14:23:02 {} INFO org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Shuffle 23 commit files complete. File count 23200 using 240180 ms
...
25/01/31 14:23:02 {} INFO org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Shuffle 23 partition 11931-0: primary lost, use replica PartitionLocation[
id-epoch:11931-0
host-rpcPort-pushPort-fetchPort-replicatePort:10.68.138.242-39557-35555-37139-39685
mode:REPLICA
peer:(empty)
storage hint:StorageInfo{type=SSD, mountPoint='', finalResult=true, filePath=}
mapIdBitMap:null].
...
25/01/31 14:23:02 {} INFO org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Succeed to handle stageEnd for 23.
```
On the executor side:
```
25/01/31 14:23:02 {executorId=92, jobId=28, partitionId=420, stageId=74, taskAttemptId=82047} INFO org.apache.celeborn.client.ShuffleClientImpl: Shuffle 23 request reducer file group success using 59315 ms, result partition size 12000
...
25/01/31 14:40:54 {executorId=92, partitionId=11931, taskAttemptId=93846} INFO org.apache.spark.executor.Executor: Running task 11931.0 in stage 74.0 (TID 93846)
25/01/31 14:40:54 {jobId=28, executorId=92, taskAttemptId=93846, partitionId=11931, stageId=74} INFO org.apache.spark.shuffle.celeborn.SparkShuffleManager: Shuffle 24 write mode is changed to SORT because partition count 12000 is greater than threshold 2000
25/01/31 14:40:54 {executorId=92, jobId=28, partitionId=11931, stageId=74, taskAttemptId=93846} INFO org.apache.spark.shuffle.celeborn.CelebornShuffleReader: BatchOpenStream for 0 cost 0ms
25/01/31 14:40:54 {} WARN org.apache.celeborn.client.ShuffleClientImpl: Shuffle data is empty for shuffle 23 partition 11931.
```
### How was this patch tested?
No additional tests for this: I've tried to reproduce it, but we've only seen this happen with high number of nodes and during long execution time range.
### More explanation on why/how this happens
```
// write path
override def setStageEnd(shuffleId: Int): Unit = {
getReducerFileGroupRequest synchronized {
stageEndShuffleSet.add(shuffleId)
}
....
// read path
override def handleGetReducerFileGroup(context: RpcCallContext, shuffleId: Int): Unit = {
// Quick return for ended stage, avoid occupy sync lock.
if (isStageEnd(shuffleId)) {
replyGetReducerFileGroup(context, shuffleId)
} else {
getReducerFileGroupRequest.synchronized {
...
override def isStageEnd(shuffleId: Int): Boolean = {
stageEndShuffleSet.contains(shuffleId)
}
```
Since concurrency guarantees between read/write path are based on ConcurrentHashMap's volatile values there's no guarantee that content of a HashSet would be seen fully by the reader thread.
Closes#3100 from aidar-stripe/main-commit-concurrency-fix.
Authored-by: Aidar Bariev <aidar@stripe.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>