### What changes were proposed in this pull request?
Batch OpenStream RPCs by Worker to avoid too many RPCs.
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Passes GA and Manual tests.
Closes#2362 from waitinfuture/1144.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Unifiy license format of `pom.xml`.
### Why are the changes needed?
There are different license formats among modules, which standard license format has indent before `~`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes#2408 from SteNicholas/maven-license-format.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
`CELEBORN-1320` uses `ReviveManager` to batch processing SOFT_SPLIT event RPC, so `partitionSplitPool` is no longer used, and the configuration item `celeborn.client.push.splitPartition.threads` is meaningless.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#2396 from cxzl25/CELEBORN-1336.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Currently SOFT_SPLIT bypasses `ReviveManager` and sends `PartitionSplit` requests to
LifecycleManager individually, which can cause too many messages in `inbox`, see the issued
described in https://github.com/apache/incubator-celeborn/pull/2366
This PR uses `ReviveManager`, i.e. batch RPCs for `SOFT_SPLIT` events. Before this PR,
the max size of `Inbox#messages` is several hundreds in my experiment where frequent soft splits happen:
```
24/03/11 18:33:05 WARN [rpc-server-4-7] Inbox: last max msg cnt in 1 second: 620
24/03/11 18:33:06 WARN [rpc-server-4-5] Inbox: last max msg cnt in 1 second: 105
24/03/11 18:33:07 WARN [rpc-server-4-14] Inbox: last max msg cnt in 1 second: 94
24/03/11 18:33:08 WARN [rpc-server-4-13] Inbox: last max msg cnt in 1 second: 726
24/03/11 18:33:09 WARN [rpc-server-4-3] Inbox: last max msg cnt in 1 second: 50]
24/03/11 18:33:10 WARN [rpc-server-4-16] Inbox: last max msg cnt in 1 second: 98
24/03/11 18:33:11 WARN [rpc-server-4-12] Inbox: last max msg cnt in 1 second: 83
24/03/11 18:33:12 WARN [rpc-server-4-11] Inbox: last max msg cnt in 1 second: 138
24/03/11 18:33:13 WARN [rpc-server-4-9] Inbox: last max msg cnt in 1 second: 315
24/03/11 18:33:14 WARN [rpc-server-4-4] Inbox: last max msg cnt in 1 second: 787
```
After this PR, the size is reduced by one magnitude:
```
24/03/11 18:39:17 WARN [rpc-server-4-5] Inbox: last max msg cnt in 1 second: 30]
24/03/11 18:39:18 WARN [rpc-server-4-12] Inbox: last max msg cnt in 1 second: 1]
24/03/11 18:39:19 WARN [rpc-server-4-19] Inbox: last max msg cnt in 1 second: 1]
24/03/11 18:39:20 WARN [rpc-server-4-15] Inbox: last max msg cnt in 1 second: 1]
24/03/11 18:39:21 WARN [rpc-server-4-3] Inbox: last max msg cnt in 1 second: 10]
24/03/11 18:39:22 WARN [rpc-server-4-20] Inbox: last max msg cnt in 1 second: 1]
24/03/11 18:39:23 WARN [rpc-server-4-12] Inbox: last max msg cnt in 1 second: 1]
24/03/11 18:39:24 WARN [rpc-server-4-24] Inbox: last max msg cnt in 1 second: 1]
24/03/11 18:39:25 WARN [rpc-server-4-9] Inbox: last max msg cnt in 1 second: 10]
24/03/11 18:39:26 WARN [rpc-server-4-13] Inbox: last max msg cnt in 1 second: 1]
24/03/11 18:39:27 WARN [rpc-server-4-2] Inbox: last max msg cnt in 1 second: 10]
24/03/11 18:39:28 WARN [rpc-server-4-2] Inbox: last max msg cnt in 1 second: 80]
```
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA and manual test.
Closes#2377 from waitinfuture/1320.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title, `handleRequestPartitions` is quite heavy since it calls sync RPC.
It's unnecessary to put it in the sync block.
This fixes the same issue as https://github.com/apache/incubator-celeborn/pull/2207
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA and manual test.
Closes#2364 from waitinfuture/1312.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
This enables client to push and fetch shuffle data securely to Celeborn Workers.
### Why are the changes needed?
This change is required for adding authentication. ([CELEBORN-1011](https://issues.apache.org/jira/browse/CELEBORN-1011)).
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
It is part of bigger change which will be tested end to end.
Closes#2360 from otterc/CELEBORN-1261.
Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
To avoid too much memory usage when CelebornShuffleReader creates input streams.
This PR does the following:
1. Constructor of `CelebornInputStream` does not fetch chunk
2. `compressedBuf` and `rawDataBuf` are created first time `fillBuffer` is called
3. When `fillBuffer` returns false, which means the inputstream is exhausted, `close` is called and resource released
4. `CelebornFetchFailureSuite` is only run for Spark 3.0 and newer
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA and e2e test.
Closes#2348 from waitinfuture/1300.
Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Catch and throw FetchFailedException in CelebornInputStream#fillBuffer to enable spark's stage rerun
when fillBuffer encounters fetch chunk exception
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes#2349 from waitinfuture/1301.
Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
per https://issues.apache.org/jira/browse/CELEBORN-1271
fix the bug with SparkShuffleManager.unregisterShuffle when celeborn.client.spark.fetch.throwsFetchFailure=false
### Why are the changes needed?
the bug causes shuffle data can't be cleaned with unregisterShuffle
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
manual tested
Closes#2305 from ErikFang/CELEBORN-1271-fix-unregisterShuffle.
Authored-by: Erik.fang <fmerik@gmail.com>
Signed-off-by: waitinfuture <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Optimize the handling of exceptions during the push of replica data, now only throwing PUSH_DATA_CONNECTION_EXCEPTION_REPLICA in specific scenarios.
### Why are the changes needed?
When handling exceptions related to pushing replica data in the worker, unmatched exceptions, such as 'file already closed,' are uniformly transformed into REPLICATE_DATA_CONNECTION_EXCEPTION_COUNT and returned to the client. The client then excludes the peer node based on this count, which may not be appropriate in certain scenarios. For instance, in the case of an exception like 'file already closed,' it typically occurs during multiple splits and commitFile operations. Excluding a large number of nodes under such circumstances is clearly not in line with expectations.

### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
through exist uts
Closes#2323 from lyy-pineapple/CELEBORN-1282.
Authored-by: liangyongyuan <liangyongyuan@xiaomi.com>
Signed-off-by: waitinfuture <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix some typos.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Not needed.
Closes#2314 from turboFei/fix_typo.
Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
https://github.com/apache/incubator-celeborn/pull/2145https://github.com/apache/incubator-celeborn/pull/2162 changes the behavior that retry commit files should use the same epoch. This PR revert the behavior back.
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Passes UTs.
Closes#2299 from waitinfuture/1272.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: waitinfuture <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Improve log of current failed workers for `WorkerStatusTracker#recordWorkerFailure` and `WorkerStatusTracker#handleHeartbeatResponse`.
### Why are the changes needed?
It's recommended to improve the log of current failed workers in `recordWorkerFailure` and `handleHeartbeatResponse` of `WorkerStatusTracker`. Meanwhile the log level of current failed workers could be warn.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes#2290 from SteNicholas/CELEBORN-1266.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
To close CELEBORN-1016, fix the issue when parse IPv6 host address.
### Why are the changes needed?
Fix CELEBORN-1016
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
Closes#2293 from turboFei/CELEBORN-1016_ipv6.
Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
This adds a secured port to Celeborn Master which is used for secure communication with LifecycleManager.
This is part of adding authentication support in Celeborn (see CELEBORN-1011).
This change targets just adding the secured port to Master. The following items from the proposal are still pending:
1. Persisting the app secrets in Ratis.
2. Forwarding secrets to Workers and having ability for the workers to pull registration info from the Master.
3. Secured and internal port in Workers.
4. Secured communication between workers and clients.
In addition, since we are supporting both secured and unsecured communication for backward compatibility and seamless rolling upgrades, there is an additional change needed. An app which registers with the Master can try to talk to the workers on unsecured ports which is a security breach. So, the workers need to know whether an app registered with Master or not and for that Master has to propagate list of un-secured apps to Celeborn workers as well. We can discuss this more with https://issues.apache.org/jira/browse/CELEBORN-1261
### Why are the changes needed?
It is needed for adding authentication support to Celeborn (CELEBORN-1011)
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
Added a simple UT.
Closes#2281 from otterc/CELEBORN-1257.
Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Delete redundant remove operations and handle timeout requests in final check
### Why are the changes needed?
Delete redundant remove operations and handle timeout requests in final check
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
PASS GA
Closes#2251 from jiaoqingbo/CELEBORN-1244.
Authored-by: jiaoqingbo <1178404354@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Introduce `ThreadUtils#shutdown(executor)` method to improve the default gracePeriod of `ThreadUtils#shutdown`.
### Why are the changes needed?
The default value of `gracePeriod` for `ThreadUtils#shutdown` is 30 seconds at present. Meanwhile, the `gracePeriod` of most invoker for `ThreadUtils#shutdown` is 800 milliseconds. Therefore, the default `gracePeriod` of `ThreadUtils#shutdown` could be improved as 800 milliseconds.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes#2276 from SteNicholas/CELEBORN-1259.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
This connects client/server bootstraps to the RpcEnv in Celeborn. This is a prerequisite for leveraging RPC security in subsequent PRs where we will add Sasl authentication to the communication between the client and Celeborn Master/Workers.
It is part of the epic: https://issues.apache.org/jira/browse/CELEBORN-1011.
### Why are the changes needed?
The changes are needed for adding authentication to Celeborn. See [CELEBORN-1011](https://issues.apache.org/jira/browse/CELEBORN-1011).
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added some UTs
Closes#2257 from otterc/CELEBORN-1251.
Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Improve exception message of fetching chunk failure for `WorkerPartitionReader` including shuffle key.
### Why are the changes needed?
The exception message of fetching chunk failure for `WorkerPartitionReader` does not contain shuffle key of the chunk, which could not match the log of Worker to troubleshooting. It's recommend to add shuffle key in exception message of fetching chunk failure.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes#2261 from SteNicholas/CELEBORN-1253.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Unify celeborn thread name format with the following pattern:
- client: `celeborn-client-[component]-[function]er`
- service: `[master|worker]-[component]-[function]er`
- other: `celeborn-[component]-[function]er`
### Why are the changes needed?
It's recommended to unify celeborn thread name format especially client side for application.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes#2248 from AngersZhuuuu/CELEBORN-1242.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Unify creation of thread using ThreadUtils
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#2247 from AngersZhuuuu/CELEBORN-1226-FOLLOWUP.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
Make all single thread use standard ThreadUtils to simplify the code
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#2229 from AngersZhuuuu/CELEBORN-1226.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
### What changes were proposed in this pull request?
I tested 1T TPCDS with the following Celeborn 8-worker cluster setup:
1. Workers have fixed ports for rpc/push/replicate
2. `spark.celeborn.client.spark.fetch.throwsFetchFailure` is enabled
3. graceful shutdown is enabled
I randomly kill -9 and ./sbin/stop-worker.sh (both graceful shutdown and non-graceful shutdown) some workers and start it immediately. Then I encountered result incorrectness with low probability (1 out of 99 queries).
After digging into it, I found the reason is as follows:
1. At time T1, all workers are serving shuffle 602
2. At time T2, I run stop-worker.sh for worker2, and then run kill -9 and start worker1. Since the workers are configured with fixed ports, clients think they are OK and Master let them re-register, which will also success. And worker2 is clean in memory.
4. At time T3, push requests to worker2 fails and revives on worker1, so worker1 has reservation for shuffle 602. Then I start worker2.
5. At time T4, LifecycleManager sends CommitFiles to all workers, on worker1, it just logs that some PartitionLocations
don't exist and ignores them.
6. CommitFiles success, but worker1 loses some data before restarting, and no error happens.
The following snapshot shows the process.

This PR fixes this by treating unfound PartitionLocations as failed when handling CommitFiles.
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test
Closes#2235 from waitinfuture/1233.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
dataPusher fetches partitionLocationMap only once outside the loop.
### Why are the changes needed?
If an exception occurs while obtaining partitionLocationMap in dataPusher.takePushTasks, it will result in attempting to fetch partitionLocationMap for each iteration in the workingQueue traversal. This leads to an unusually prolonged job execution time.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Through existing uts
Closes#2220 from lyy-pineapple/CELEBORN-1218.
Authored-by: liangyongyuan <liangyongyuan@xiaomi.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Format the timestamp when recoding worker failure inforamtion.
### Why are the changes needed?
Now the long type timestamp is difficult to view and confuse without reading source code.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#2230 from turboFei/date_format.
Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Improve exception message of `loadFileGroup` for `ShuffleClientImpl`.
### Why are the changes needed?
The exception message of `ShuffleClientImpl#loadFileGroup` that is `org.apache.celeborn.common.exception.CelebornIOException: Shuffle data lost for shuffle %s partitionId %s!` is confusing to users, which does not only mean shuffle data lost but also other exception situation like stage end time out etc. It's recommended to improve exception message of `loadFileGroup` for `ShuffleClientImpl`.
```
Caused by: org.apache.kyuubi.jdbc.hive.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: org.apache.kyuubi.KyuubiSQLException: Error operating ExecuteStatement: org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 60.0 failed 4 times, most recent failure: Lost task 15.3 in stage 60.0 (TID 170802) (xxx executor 60): org.apache.celeborn.common.exception.CelebornIOException: Shuffle data lost for shuffle 1 partitionId 15!
at org.apache.celeborn.client.ShuffleClientImpl.loadFileGroup(ShuffleClientImpl.java:1591)
at org.apache.celeborn.client.ShuffleClientImpl.readPartition(ShuffleClientImpl.java:1600)
at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$1(CelebornShuffleReader.scala:88)
at org.apache.spark.shuffle.celeborn.CelebornShuffleReader.$anonfun$read$1$adapted(CelebornShuffleReader.scala:80)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage37.sort_addToSorter_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage37.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:822)
at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:686)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec.$anonfun$doExecute$1(SortMergeJoinExec.scala:185)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:398)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:362)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:91)
at org.apache.spark.scheduler.Task.run(Task.scala:143)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:591)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1545)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:596)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal test.
Closes#2219 from SteNicholas/CELEBORN-1217.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Add extension API to CelebornShuffleHandler.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA.
Closes#2206 from FMX/b1211.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
Revert "[CELEBORN-1150] support io encryption for spark".
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#2208 from FMX/b1150-3.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
MapperAttempts for a shuffle replies the `MAP_ENDED` when mapper has already been ended for receving push data or push merged data from speculative task.
Follow up #1591.
### Why are the changes needed?
When mapper has already been ended for receving push data or push merged data from speculative task, `PushDataHandler` should trigger MapEnd instead of StageEnd for worker. Meanwhile, the `ShuffleClientImpl` should handle `STAGE_ENDED` as MapEnd, otherwise causes that other tasks of the stage could not send shuffle data for data lost.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Internal test.
Closes#2190 from SteNicholas/CELEBORN-678.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Fix MissingOverride, DefaultCharset, UnnecessaryParentheses Rule
2. Exclude generated sources, FutureReturnValueIgnored, TypeParameterUnusedInFormals, UnusedVariable
### Why are the changes needed?
```
./build/make-distribution.sh --release
```
We get a lot of WARNINGs.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes#2177 from cxzl25/error_prone_patch.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
### What changes were proposed in this pull request?
Update log level of CommitFiles success for `CommitHandler` from error to info.
### Why are the changes needed?
The log level of sending CommitFiles success for `CommitHandler` should not be error.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes#2174 from SteNicholas/commit-files-log.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
1. To support io encryption for spark.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and manually test on a cluster.
Closes#2135 from FMX/B1150.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
To avoid NPE in `val future = workerInfo.endpoint.ask[DestroyWorkerSlotsResponse](destroy)`
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test
Closes#2166 from waitinfuture/1181.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
I'm testing main branch and encountered the following scenario.
I run `sbin/stop-worker.sh` near simultaneously on 3 out of 6 workers, and I'm expecting the 3 workers
will soon shutdown because I enabled graceful shutdown. However, only the first worker I stopped
shutdown in 15s as expected, the other two won't shutdown until shutdown timeout.
After digging into it, I found `LifecycleManager#reserveSlotsWithRetry` will reserve for the same location
twice:
1. At T1, only worker1 shutdown, pushes receive HARD_SPLIT and goes to revive
2. At T2, LifecycleManager handles revive requests in batch, and try to reallocate the locs to other workers
3. At T3, reserve to worker3 succeeds because it's not shutdown yet, but reserve to worker2 fails because it's shutdown
4. At T4, LifecycleManager will re-allocate the failed slots to other workers except worker1 and worker2. However, at this time Worker3 is also shutdown, so it fails to reserve on worker3
5. At T5, it re-allocates slots that failed to worker3. However, `getFailedPartitionLocations` will return slots allocated to worker3 in step 3, and increment the epoch to 2. At this time, worker3 has slots of epoch 1, but they will never to pushed to because newer epoch 3 is generated at the same time
6. Since the epoch 2 locs in worker3 will never be pushed to, it will never get a chance to return HARD_SPLIT, as a result it can't fast shutdown untile timeout.
This PR fixes this by destroying failed to be reserved slots in the process of `reserveSlotsWithRetry`
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test.
Before:

After:

Closes#2163 from waitinfuture/1178.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title.
### Why are the changes needed?
As title.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Passes UTs.
Closes#2162 from waitinfuture/1175-2.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
One user reported that LifecycleManager's parmap can create huge number of threads and causes OOM.

There are four places where parmap is called:
1. When LifecycleManager commits files
2. When LifecycleManager reserves slots
3. When LifecycleManager setup connection to workers
4. When LifecycleManager call destroy slots
This PR fixes the fourth one. To be more detail, this PR eliminates `parmap` when destroying slots, and also replaces `askSync` with `ask`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test and GA.
Closes#2156 from waitinfuture/1167.
Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Co-authored-by: cxzl25 <cxzl25@users.noreply.github.com>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
as title
### Why are the changes needed?
as title
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Passes GA
Closes#2159 from waitinfuture/1171.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
One user reported that LifecycleManager's parmap can create huge number of threads and causes OOM.

There are four places where parmap is called:
1. When LifecycleManager commits files
2. When LifecycleManager reserves slots
3. When LifecycleManager setup connection to workers
4. When StorageManager calls close
This PR fixes the third one. To be more detail, this PR eliminates `parmap` when setup connection to workers, and also replaces `askSync` with `ask`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test and GA.
Closes#2154 from waitinfuture/1166.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
One user reported that LifecycleManager's parmap can create huge number of threads and causes OOM.

There are four places where parmap is called:
1. When LifecycleManager commits files
2. When LifecycleManager reserves slots
3. When LifecycleManager setup connection to workers
4. When StorageManager calls close
This PR fixes the second one. To be more detail, this PR eliminates `parmap` when reserving slots, and also replaces `askSync` with `ask`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test and GA.
Closes#2152 from waitinfuture/1165-1.
Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
One user reported that LifecycleManager's parmap can create huge number of threads and causes OOM.

There are four places where parmap is called:
1. When LifecycleManager commits files
2. When LifecycleManager reserves slots
3. When LifecycleManager setup connection to workers
4. When StorageManager calls close
This PR fixes the first one. To be more detail, this PR eliminates `parmap` when doing committing files, and also replaces `askSync` with `ask`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test and GA.
Closes#2145 from waitinfuture/1160.
Lead-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Co-authored-by: Keyong Zhou <waitinfuture@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
When request slots, filter workers excluded by application
### Why are the changes needed?
If worker alive but can not service, register shuffle will remove the worker from application client exclude list and next shuffle may reserve slots on this worker,this will cause application revive unexpectly
### Does this PR introduce _any_ user-facing change?
Yes, request slots will filter workers excluded by application
### How was this patch tested?
UT,
Closes#2131 from wangshengjie123/fix-request-slots-blacklist.
Authored-by: wangshengjie <wangshengjie3@xiaomi.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
In [celeborn-955](https://github.com/apache/incubator-celeborn/pull/1924), GetShuffleId RPC was introduced to generate a celeborn shuffle id from app shuffle id to support spark stage rerun
GetShuffleId RPC assumes that Shuffle Write operation always happens before Shuffle Read operation, but this is not true for empty shuffle data in celeborn, which causes GetShuffleId RPC to throw NPE and fail the Job
This PR fixes this bug
### Why are the changes needed?
to avoid spark job failure with empty shuffle data
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
a new test case is included for empty shuffle data
Closes#2136 from ErikFang/fix-GetShuffleId-RPC-NPE-for-empty-shuffle.
Lead-authored-by: Erik.fang <fmerik@gmail.com>
Co-authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
The `clientPushBufferMaxSize` config is also used by `CelebornInputStreamImpl`, it's a config about push side and should not be used by fetch side. This pr introduces a fetch config to replace it.
### Why are the changes needed?
As above
### Does this PR introduce _any_ user-facing change?
Yes, a new config `celeborn.client.fetch.buffer.size` is introduced.
### How was this patch tested?
Pass CI
Closes#2118 from exmy/celeborn-1145.
Authored-by: exmy <xumovens@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
As Title
### Why are the changes needed?
As Title
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
PASS GA
Closes#2114 from jiaoqingbo/1142.
Authored-by: jiaoqingbo <1178404354@qq.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
### What changes were proposed in this pull request?
Currently, Celeborn uses replication to handle shuffle data lost for celeborn shuffle reader, this PR implements an alternative solution by Spark stage resubmission.
Design doc:
https://docs.google.com/document/d/1dkG6fww3g99VAb1wkphNlUES_MpngVPNg8601chmVp8/edit
### Why are the changes needed?
Spark stage resubmission uses less resources compared with replication, and some Celeborn users are also asking for it
### Does this PR introduce _any_ user-facing change?
a new config celeborn.client.fetch.throwsFetchFailure is introduced to enable this feature
### How was this patch tested?
two UTs are attached, and we also tested it in Ant Group's Dev spark cluster
Closes#1924 from ErikFang/Re-run-Spark-Stage-for-Celeborn-Shuffle-Fetch-Failure.
Lead-authored-by: Erik.fang <fmerik@gmail.com>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
As Title
### Why are the changes needed?
As Title
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
PASS GA
Closes#2113 from jiaoqingbo/1140.
Authored-by: jiaoqingbo <1178404354@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
As Title
As Title
NO
PASS GA
Closes#2111 from jiaoqingbo/1138.
Authored-by: jiaoqingbo <1178404354@qq.com>
Signed-off-by: Shuang <lvshuang.tb@gmail.com>
### What changes were proposed in this pull request?
Add lastException to CelebornIOException when createReaderWithRetry meet error
### Why are the changes needed?
Now we should to find the detail executor to dedicate the detail error msg
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Closes#2103 from wxplovecc/easy-to-dedicate-error.
Authored-by: 吴祥平 <wxp4532@ly.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
### What changes were proposed in this pull request?
When I kill -9 a Worker process, Master will not exclude the worker until heartbeat timeout.
During this time, Master will still allocate slots on this Worker, causing NPE when register shuffle
```
Caused by: java.lang.NullPointerException
at org.apache.celeborn.client.LifecycleManager.requestWorkerReserveSlots(LifecycleManager.scala:1246) ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
at org.apache.celeborn.client.LifecycleManager.$anonfun$reserveSlots$2(LifecycleManager.scala:864) ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
at org.apache.celeborn.common.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:301) ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) ~[scala-library-2.12.15.jar:?]
at scala.util.Success.$anonfun$map$1(Try.scala:255) ~[scala-library-2.12.15.jar:?]
at scala.util.Success.map(Try.scala:213) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?]
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[scala-library-2.12.15.jar:?]
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) ~[?:1.8.0_372]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_372]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_372]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_372]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) ~[?:1.8.0_372]
```
### Why are the changes needed?
ditto
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual test and passes GA
Closes#2104 from waitinfuture/1130.
Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>