### What changes were proposed in this pull request?
Document [Java Object Serialization Specification](https://docs.oracle.com/javase/8/docs/platform/serialization/spec/protocol.html) on `JavaSerializer`
### Why are the changes needed?
To help developers who extend the `Serializer` to understand the existing `JavaSerializer` protocol.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Review.
Closes#3194 from pan3793/minor-docs.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
As title.
### Why are the changes needed?
To satisfy ASF policy.
```
An error occurred while processing the github feature in .asf.yaml:
GitHub discussions can only be enabled if a mailing list target exists for it.
---
With regards, ASF Infra.
For further information, please see the .asf.yaml documentation at:
https://github.com/apache/infrastructure-asfyaml/blob/main/README.md
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This can be manually tested after merging to main.
Closes#3195 from pan3793/CELEBORN-1956.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Most of the contents of this PR is the changes for UT, and it's derived from another PR #3083.
### Why are the changes needed?
Introduce tier writer proxy and simplify the partition data writer.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and manual tests.
I have verified data correctness on a cluster using TPC-DS tests.
The average performance tests result for this PR:
with this PR : without this PR = 2562.5 : 2576.85
There is no performance reduction.
Closes#3085 from FMX/1844.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Add a new value `image.registry` to `values.yaml`.
### Why are the changes needed?
Add a new value `image.registry` so we can easily switch between different image registries (e.g. `docker.io`, `ghcr.io`, `quay.io` and private image registries).
This PR is part of #2654.
### Does this PR introduce _any_ user-facing change?
Yes, but it should be backward-compatible.
### How was this patch tested?
Update helm unit test and run `helm unittest charts/celeborn --file "tests/**/*_test.yaml" --strict --debug`.
Closes#3187 from ChenYi015/helm/image.
Authored-by: Yi Chen <github@chenyicn.net>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
Define Helm template helpers for master/worker respectively.
### Why are the changes needed?
For better code organization, all master/worker related files will be placed at `charts/celeborn/templates/master` and `charts/celeborn/templates/worker` respectively.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Run Helm unit tests locally by `helm unittest charts/celeborn --file "tests/**/*_test.yaml" --strict --debug`
Closes#3183 from ChenYi015/helm/template-helpers.
Authored-by: Yi Chen <github@chenyicn.net>
Signed-off-by: Cheng Pan <chengpan@apache.org>
### What changes were proposed in this pull request?
For spark celeborn application, if the GetReducerFileGroupResponse is larger than the threshold, Spark driver would broadcast the GetReducerFileGroupResponse to the executors, it prevents the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor).
### Why are the changes needed?
To prevent the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor).
### Does this PR introduce _any_ user-facing change?
No, the feature is not enabled by defaults.
### How was this patch tested?
UT.
Cluster testing with `spark.celeborn.client.spark.shuffle.getReducerFileGroup.broadcast.enabled=true`.
The broadcast response size should be always about 1kb.


Application succeed.

Closes#3158 from turboFei/broadcast_rgf.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Follow up for https://github.com/apache/celeborn/pull/2819
1. add timer for UpdateResourceConsumptionTime
2. prevent NPE if metrics not found
### Why are the changes needed?
The timer not added and cause NPE.
```
25/03/31 13:18:48,219 WARN [master-quota-checker] MasterSource: Metric UpdateResourceConsumptionTime{instance="zeus-slc-cm-1.zeus-slc-cm-cm.hm-dev.svc.140.tess.io:8080",role="master"} not found!
25/03/31 13:18:48,220 WARN [master-quota-checker] MasterSource: Exception encountered during stop timer of metric UpdateResourceConsumptionTime{instance="zeus-slc-cm-1.zeus-slc-cm-cm.hm-dev.svc.140.tess.io:8080",role="master"}
scala.MatchError: null
at org.apache.celeborn.common.metrics.source.AbstractSource.doStopTimer(AbstractSource.scala:316)
at org.apache.celeborn.common.metrics.source.AbstractSource.sample(AbstractSource.scala:279)
at org.apache.celeborn.service.deploy.master.quota.QuotaManager.updateResourceConsumption(QuotaManager.scala:201)
at org.apache.celeborn.service.deploy.master.quota.QuotaManager$$anon$1.run(QuotaManager.scala:59)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT.
Closes#3190 from turboFei/fix_npe.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
1. Add the rpc times and BatchopenStreamTime to read wait time.
2. remove the updateFileGroup rpc time for BatchopenStreamTime log.
3. reduce the sleep time waiting for creating inputStream since it cost a lot of sleeping time for tiny shuffle read.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested in cluster.
Closes#3180 from zaynt4606/clb1918.
Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
the workflow was just building the images, but not pushing
### Why are the changes needed?
images are now also pushed
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
not yet
Closes#3174 from jesusch/CELEBORN-1900-docker-images-fix-3.
Authored-by: Björn Boschman <bjoern.boschman@innovid.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
- Create file `charts/celeborn/files/conf/log4j2.xml` and `charts/celeborn/files/conf/metrics.properties` to store log4j2 and metrics configurations respectively.
- ConfigMap will read configurations from these two files.
### Why are the changes needed?
For better code readability.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Run Helm unit tests locally by:
```
helm unittest charts/celeborn
```
Closes#3182 from ChenYi015/refactor/helm.
Authored-by: Yi Chen <github@chenyicn.net>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
`celeborn-flink-it` project should set `FLINK_VERSION` environment variable for `HybridShuffleWordCountTest`.
Follow up #2662.
### Why are the changes needed?
Because of the lack of `FLINK_VERSION` environment variable, the sbt flink project of CI cancels the tests of `HybridShuffleWordCountTest` as follows:
```
[info] - Celeborn Flink Hybrid Shuffle Integration test(Local) - word count !!! CANCELED !!!
[info] "" was empty (HybridShuffleWordCountTest.scala:75)
[info] org.scalatest.exceptions.TestCanceledException:
[info] at org.scalatest.Assertions.newTestCanceledException(Assertions.scala:475)
[info] at org.scalatest.Assertions.newTestCanceledException$(Assertions.scala:474)
[info] at org.scalatest.Assertions$.newTestCanceledException(Assertions.scala:1231)
[info] at org.scalatest.Assertions$AssertionsHelper.macroAssume(Assertions.scala:1310)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.assumeFlinkVersion(HybridShuffleWordCountTest.scala:75)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.$anonfun$new$1(HybridShuffleWordCountTest.scala:62)
[info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info] at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
[info] at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
[info] at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info] at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info] at scala.collection.immutable.List.foreach(List.scala:431)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info] at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info] at org.scalatest.Suite.run(Suite.scala:1114)
[info] at org.scalatest.Suite.run$(Suite.scala:1096)
[info] at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.org$scalatest$BeforeAndAfterAll$$super$run(HybridShuffleWordCountTest.scala:37)
[info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.run(HybridShuffleWordCountTest.scala:37)
[info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[info] at java.base/java.lang.Thread.run(Thread.java:829)
[info] - Celeborn Flink Hybrid Shuffle Integration test(Flink mini cluster) single tier - word count !!! CANCELED !!!
[info] "" was empty (HybridShuffleWordCountTest.scala:75)
[info] org.scalatest.exceptions.TestCanceledException:
[info] at org.scalatest.Assertions.newTestCanceledException(Assertions.scala:475)
[info] at org.scalatest.Assertions.newTestCanceledException$(Assertions.scala:474)
[info] at org.scalatest.Assertions$.newTestCanceledException(Assertions.scala:1231)
[info] at org.scalatest.Assertions$AssertionsHelper.macroAssume(Assertions.scala:1310)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.assumeFlinkVersion(HybridShuffleWordCountTest.scala:75)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.$anonfun$new$2(HybridShuffleWordCountTest.scala:68)
[info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info] at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
[info] at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
[info] at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info] at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info] at scala.collection.immutable.List.foreach(List.scala:431)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info] at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info] at org.scalatest.Suite.run(Suite.scala:1114)
[info] at org.scalatest.Suite.run$(Suite.scala:1096)
[info] at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.org$scalatest$BeforeAndAfterAll$$super$run(HybridShuffleWordCountTest.scala:37)
[info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.run(HybridShuffleWordCountTest.scala:37)
[info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[info] at java.base/java.lang.Thread.run(Thread.java:829)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`HybridShuffleWordCountTest`
- Flink 1.16
```
[info] - Celeborn Flink Hybrid Shuffle Integration test(Local) - word count !!! CANCELED !!!
[info] "1.16.3" was not empty, but FlinkVersion.fromVersionStr(scala.Predef.refArrayOps[String](scala.Predef.refArrayOps[String](flinkVersion.split("\\.")).take(2)).mkString(".")).isNewerOrEqualVersionThan(v1_20) was false (HybridShuffleWordCountTest.scala:75)
[info] org.scalatest.exceptions.TestCanceledException:
[info] at org.scalatest.Assertions.newTestCanceledException(Assertions.scala:475)
[info] at org.scalatest.Assertions.newTestCanceledException$(Assertions.scala:474)
[info] at org.scalatest.Assertions$.newTestCanceledException(Assertions.scala:1231)
[info] at org.scalatest.Assertions$AssertionsHelper.macroAssume(Assertions.scala:1310)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.assumeFlinkVersion(HybridShuffleWordCountTest.scala:75)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.$anonfun$new$1(HybridShuffleWordCountTest.scala:62)
[info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info] at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
[info] at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
[info] at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info] at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:[1564](https://github.com/apache/celeborn/actions/runs/14124619270/job/39570964025?pr=3184#step:4:1565))
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info] at scala.collection.immutable.List.foreach(List.scala:431)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info] at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info] at org.scalatest.Suite.run(Suite.scala:1114)
[info] at org.scalatest.Suite.run$(Suite.scala:1096)
[info] at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.org$scalatest$BeforeAndAfterAll$$super$run(HybridShuffleWordCountTest.scala:37)
[info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.run(HybridShuffleWordCountTest.scala:37)
[info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[info] at java.base/java.lang.Thread.run(Thread.java:829)
[info] - Celeborn Flink Hybrid Shuffle Integration test(Flink mini cluster) single tier - word count !!! CANCELED !!!
[info] "1.16.3" was not empty, but FlinkVersion.fromVersionStr(scala.Predef.refArrayOps[String](scala.Predef.refArrayOps[String](flinkVersion.split("\\.")).take(2)).mkString(".")).isNewerOrEqualVersionThan(v1_20) was false (HybridShuffleWordCountTest.scala:75)
[info] org.scalatest.exceptions.TestCanceledException:
[info] at org.scalatest.Assertions.newTestCanceledException(Assertions.scala:475)
[info] at org.scalatest.Assertions.newTestCanceledException$(Assertions.scala:474)
[info] at org.scalatest.Assertions$.newTestCanceledException(Assertions.scala:1231)
[info] at org.scalatest.Assertions$AssertionsHelper.macroAssume(Assertions.scala:1310)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.assumeFlinkVersion(HybridShuffleWordCountTest.scala:75)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.$anonfun$new$2(HybridShuffleWordCountTest.scala:68)
[info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info] at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
[info] at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
[info] at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info] at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info] at scala.collection.immutable.List.foreach(List.scala:431)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info] at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info] at org.scalatest.Suite.run(Suite.scala:1114)
[info] at org.scalatest.Suite.run$(Suite.scala:1096)
[info] at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.org$scalatest$BeforeAndAfterAll$$super$run(HybridShuffleWordCountTest.scala:37)
[info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.run(HybridShuffleWordCountTest.scala:37)
[info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[info] at java.base/java.lang.Thread.run(Thread.java:829)
```
- Flink 1.20
```
[info] - Celeborn Flink Hybrid Shuffle Integration test(Local) - word count
2025-03-28T08:15:25.149779Z flink-pekko.actor.default-dispatcher-10 WARN found 2 argument placeholders, but provided 3 for pattern `Disconnect TaskExecutor {} because: {}`
25/03/28 08:15:25,199 ERROR [celeborn-client-lifecycle-manager-release-partition-executor-1] LifecycleManager: Request DestroyWorkerSlots(1743149720044-d5114c42fdccf6220ec7a9e1fb856ac3-1,[1-0, 4-0, 7-0],[],false) to NettyRpcEndpointRef(celeborn://WorkerEndpoint10.1.0.100:37683) failed 1/3 for 1743149720044-d5114c42fdccf6220ec7a9e1fb856ac3-1, reason: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask351c2453[Not completed, task = java.util.concurrent.Executors$RunnableAdaptera979257[Wrapped task = org.apache.celeborn.common.rpc.netty.NettyRpcEnv$$anon$16312b56f]] rejected from java.util.concurrent.ScheduledThreadPoolExecutor23161ba2[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], will retry.
...
[info] - Celeborn Flink Hybrid Shuffle Integration test(Flink mini cluster) single tier - word count
25/03/28 08:15:30,980 ERROR [worker 1 starter thread] MasterClient: Send rpc with failure, has tried 0, max try 15!
```
Closes#3184 from SteNicholas/CELEBORN-1543.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
When shuffle read timeout, a large number of logs are output in the executor log.
```
inputStream is null, sleeping...
```
<img width="799" alt="image" src="https://github.com/user-attachments/assets/f5b2bfde-5874-4b1e-8992-7037a9c81aa5" />
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#3181 from cxzl25/CELEBORN-1947.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Support HARD_SPLIT in PushMergedData should support handle older worker success response
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#3172 from AngersZhuuuu/CELEBORN-1928-.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
When hard split happens, some FileWriters may be null which causes handle congestion control NPE.
### Why are the changes needed?
When hard split happens, some FileWriters may be null which causes handle congestion control NPE.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing uts.
Closes#3176 from leixm/CELEBORN-1930.
Authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Use `-XepDisableWarningsInGeneratedCode` to disable warnings for `openapi-client` module.
### Why are the changes needed?
There are some warnings in compilation of `openapi-client` module as follows:
```
$ mvn clean install -pl openapi/openapi-client -DskipTests -Dcheckstyle.skip=true -Drat.skip=true -Dspotless.check.skip=true|grep "[WARNING].*java.*"
[WARNING] /Users/nicholasjiang/Github/celeborn/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/worker/invoker/ApiException.java:[100,18] [OverrideThrowableToString] To return a custom message with a Throwable class, one should override getMessage() instead of toString().
[WARNING] /Users/nicholasjiang/Github/celeborn/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/DynamicConfig.java:[55,19] [ImmutableEnumChecker] enums should be immutable: 'LevelEnum' has non-final field 'value'
[WARNING] /Users/nicholasjiang/Github/celeborn/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/invoker/ApiException.java:[100,18] [OverrideThrowableToString] To return a custom message with a Throwable class, one should override getMessage() instead of toString().
[WARNING] /Users/nicholasjiang/Github/celeborn/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerExitRequest.java:[51,19] [ImmutableEnumChecker] enums should be immutable: 'TypeEnum' has non-final field 'value'
[WARNING] /Users/nicholasjiang/Github/celeborn/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/PartitionLocationData.java:[58,19] [ImmutableEnumChecker] enums should be immutable: 'ModeEnum' has non-final field 'value'
[WARNING] /Users/nicholasjiang/Github/celeborn/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/PartitionLocationData.java:[107,19] [ImmutableEnumChecker] enums should be immutable: 'StorageEnum' has non-final field 'value'
[WARNING] /Users/nicholasjiang/Github/celeborn/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/SendWorkerEventRequest.java:[60,19] [ImmutableEnumChecker] enums should be immutable: 'EventTypeEnum' has non-final field 'value'
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Local test:
```
$ mvn clean install -pl openapi/openapi-client -DskipTests -Dcheckstyle.skip=true -Drat.skip=true -Dspotless.check.skip=true|grep "[WARNING].*java.*"
```
Closes#3169 from SteNicholas/CELEBORN-1190.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
use correct docker secrets
### Why are the changes needed?
use github actions to login to docker hub
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Can test this patch because the forked repo does not have docker secrets.
Closes#3168 from jesusch/CELEBORN-1900-docker-images-fix-2.
Authored-by: Björn Boschman <bjoern.boschman@innovid.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR adds reader end's ShuffleClient to cppClient.
### Why are the changes needed?
ShuffleClient is the user interface for cppClient usage.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation.
Closes#3156 from HolyLow/issue/celeborn-1915-add-reader-shuffle-client-to-cppclient.
Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Worker reports resourceConsumption to master
2. QuotaManager calculates the resourceConsumption of each app and marks the apps that exceed the quota.
2.1 When the tenant's resourceConsumption exceeds the tenant's quota, select the app with a larger consumption to mark interrupted.
2.2 When the resourceConsumption of the cluster exceeds the cluster quota, select the app with larger consumption to mark interrupted.
3. Master returns to Driver through heartbeat, whether app is marked interrupted
### Why are the changes needed?
The current storage quota logic can only limit new shuffles, and cannot limit the writing of existing shuffles. In our production environment, there is such an scenario: the cluster is small, but the user's app single shuffle is large which occupied disk resources, we want to interrupt those shuffle.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UTs.
Closes#2819 from leixm/CELEBORN-1577-2.
Authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Support stage-rerun when read partition by chunkOffsets when enable optimize skew partition read
### Why are the changes needed?
In [CELEBORN-1319](https://issues.apache.org/jira/browse/CELEBORN-1319), we have already implemented the skew partition read optimization based on chunk offsets, but we don't support skew partition shuffle retry, so we need support the stage rerun.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Cluster test
Closes#3118 from wangshengjie123/support-stage-rerun.
Lead-authored-by: wangshengjie3 <wangshengjie3@xiaomi.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
The referred step ID used to extract the version was wrong
### Why are the changes needed?
To fix docker build workflow failures.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Manually tests.
Closes#3160 from jesusch/CELEBORN-1900-docker-images-fix.
Authored-by: Björn Boschman <bjoern.boschman@innovid.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix incorrect logic when calculate disk available slots
### Why are the changes needed?
Now we use `usableSize / estimatedPartitionSize = maxSlots`
Then `availableSlots = maxSlots - allocatedSlots`
But `availableSlots` should be `usableSize / estimizatedPartitionSize`
### Does this PR introduce _any_ user-facing change?
Yea
### How was this patch tested?
MT
Closes#3162 from AngersZhuuuu/CELEBORN-1923.
Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Update master and worker commands in `celeborn_cli.md` including:
- Remove `--show-top-disk-used-apps` command of master and worker.
- Add `--show-workers-topology` command of master.
### Why are the changes needed?
- #2949 removes `--show-top-disk-used-apps` command of master and worker.
- #3119 adds `--show-workers-topology` command of master.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes#3165 from SteNicholas/CELEBORN-1678.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
build & publish docker images on release
### Why are the changes needed?
No offical images available
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
I wish I could - but I cannot push against a non existing docker repo :)
Closes#3152 from jesusch/CELEBORN-1900-docker-images.
Authored-by: Björn Boschman <bjoern.boschman@innovid.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
incWriteTime when ShuffleWriter invoke pushGiantRecord
### Why are the changes needed?
When ShuffleWriter calls pushGiantRecord, the task thread needs to wait synchronously for the push to complete, just like pushing the last part of data in the close method. The time spent on pushGiantRecord needs to be included in writeTime.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
ut
Closes#3155 from TheodoreLx/fix-write-time.
Authored-by: TheodoreLx <1548069580@qq.com >
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Whenever a `WorkerPartitionReader` is recreated (due celeborn worker restarts / any other chunk fetch failure), the entire shuffle partition file is re-read from beginning, discarding already read chunks in `CelebornInputStream` based on the batchIdSet metadata maintained.
This can be improved (only for cases where shuffle data is unreplicated) by skipping already read chunk id since they'd be discarded anyway. This improves overall shuffle read performance (reducer's total time, network usage etc).
### Why are the changes needed?
Allow skipping already read shuffle chunks
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UTs added
Closes#3132 from saurabhd336/skipReadChunks.
Authored-by: Saurabh Dubey <saurabhd336@uber.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Support pre-run static code blocks of `TransportMessages` to improve performance of protobuf serialization.
### Why are the changes needed?
The protobuf message protocol defines many map type fields, which makes it time-consuming to build these message instances. This is because `TransportMessages` contains static code blocks to initialize a large number of `Descriptor`s and `FieldAccessorTable`s, where the instantiation of `FieldAccessorTable` includes reflection. The test result proves that the static code blocks execute in about 70 milliseconds.
Therefore, it's better to pre-run static code blocks of `TransportMessages` to improve performance of protobuf serialization. Meanwhile, it's recommended to use repeated instead of map type field for rpc messages.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3149 from SteNicholas/CELEBORN-1909.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
- close [CELEBORN-1911](https://issues.apache.org/jira/browse/CELEBORN-1911)
This PR refactors the project structure by moving the multipart-uploader module into multipart-uploader/multipart-uploader-s3.
### Why are the changes needed?
This change improves modularity and enables future extensions, such as multipart-uploader/multipart-uploader-oss, allowing better support for multiple object storage backends.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Deployment integration testing has been completed in the local environment.
Closes#3153 from shouwangyw/optimize/mpu-s3.
Authored-by: veli.yang <897900564@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR adds CelebornInputStream to cppClient.
### Why are the changes needed?
The CelebornInputStream is the readerClient's feeding stream.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation.
Closes#3151 from HolyLow/issue/celeborn-1906-add-celeborn-input-stream-to-cppclient.
Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
LifecycleManager respond to RegisterShuffle with max epoch PartitionLocation.
### Why are the changes needed?
Newly spun up executors in a Spark job will still get the partitionLocations with the minEpoch of the celeborn lost worker.
These executors will fail to connect to the lost worker and then call into revive to get the latest PartitionLocation for a given partitionId in `ChangePartitionManager.getLatestPartition()`.
Return with max epoch can reduce such revive requests.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT.
Closes#3135 from zaynt4606/clb1822.
Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Remove the redundant synchronized keyword from the `isTerminated` method in `sameThreadExecutorService()` implementation within the `ThreadUtils` class. The method was using both a synchronized block and an explicit `ReentrantLock`, which is unnecessary and potentially problematic.
Backport https://github.com/apache/spark/pull/50210.
### Why are the changes needed?
The changes are needed for several reasons:
1. **Eliminates redundant synchronization**: The current implementation uses both synchronized keyword and explicit ReentrantLock, which is redundant and creates unnecessary overhead.
2. **Reduces potential deadlock risks**: Using two different locking mechanisms (built-in synchronized and explicit `ReentrantLock`) in the same method could lead to complex lock ordering issues and increase deadlock risks. Although the risk of deadlock in the current implementation is low, if someone modifies the code in the future and adds a method that acquires these two locks in a different order, it would introduce a deadlock risk.
3. **Improves performance**: Removing the unnecessary synchronization layer reduces lock contention and context switching overhead.
4. **Code consistency**: Other methods in the same class only use `ReentrantLock` for synchronization, so removing synchronized makes the code style more consistent.
5. **More precise control**: `ReentrantLock` already provides all the synchronization needed with more features than the implicit synchronized keyword.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3150 from SteNicholas/CELEBORN-1910.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
close [issues-3145](https://github.com/apache/celeborn/issues/3145)
### Why are the changes needed?
1. Support s3 schema.
2. Fixed the problem that diskFileInfo judged the S3 type incorrectly.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Deployment integration testing has been completed in the local environment.
Closes#3146 from shouwangyw/bugfix/resolve_bugs_3145.
Authored-by: veli.yang <897900564@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
<!--
Thanks for sending a pull request! Here are some tips for you:
- Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
- Be sure to keep the PR description updated to reflect all changes.
- Please write your PR title to summarize what this PR proposes.
- If possible, provide a concise example to reproduce the issue for a faster review.
-->
CELEBORN-1901
### What changes were proposed in this pull request?
Update base docker image to ubuntu 24.04 (noble)
### Why are the changes needed?
Current used ubuntu 20.04 is close to end-of-life
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Tested on our celeborn installation
Closes#3143 from jesusch/CELEBORN-1901-update-docker-base-image.
Authored-by: Björn Boschman <bjoern.boschman@innovid.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
0.6.0 -> 0.5.4
- `celeborn.rpc.retryWait`
- `celeborn.client.rpc.retryWait`
`empty` -> 0.5.4
- `celeborn.<module>.io.conflictAvoidChooser.enable`
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#3142 from s0nskar/config_rpc_retry.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
This PR adds WorkerPartitionReader to cppClient.
### Why are the changes needed?
WorkerPartitionReader is the building block of CelebornInputStream.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation and UTs.
Closes#3137 from HolyLow/issue/celeborn-1881-add-workerpartitionreader-to-cppclient.
Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
SPARK-49946 (4.0.0) removes single String constructor of class `SparkOutOfMemoryError` and introduces `_LEGACY_ERROR_TEMP_3301` error condition, SPARK-51386 (4.1.0) renames the error condition to `POINTER_ARRAY_OUT_OF_MEMORY`.
### Why are the changes needed?
To be compatible with Spark 4.0 and 4.1
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GHA checks Spark 2.4 to Spark 3.5, I manually tested with Spark 4.0.0 RC2
Closes#3141 from pan3793/CELEBORN-1898.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
196ad607cd/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala (L256) we hit the issue that the ignored message is too long , and when calling toString, it applies for a too large array which is beyond the JVM's limit
I don't think this log convey too much info, , so we could avoid calling toString to improve robustness of the applications
### Why are the changes needed?
more details in JIRA
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
prod
Closes#3139 from CodingCat/log.
Authored-by: CodingCat <zhunansjtu@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Bump log4j2 version to 2.24.3
https://github.com/apache/logging-log4j2/releases/tag/rel%2F2.24.3
### Why are the changes needed?
Bump to latest log4j2 bug fix release.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes#3134 from turboFei/log4j2.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Minor change for the log, to make the fetch failure message and rpc metrics dump much clear.
### Why are the changes needed?
As title.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes#3136 from turboFei/log_shuffle.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
In the switchServingState after resumeByPinnedMemory, keep the resume channel to prevent the channel from frequently resuming and pausing before memoryUsage decreases to pausePushDataThreshold.
### Why are the changes needed?
Frequent channel resume and pause will result in slow data reception and failure to quickly reduce memoryUsage to below pausePushDataThreshold.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
ut
Closes#3099 from TheodoreLx/keep-resume.
Lead-authored-by: TheodoreLx <1548069580@qq.com >
Co-authored-by: 慧枫 <zhengqi.zzq@antgroup.com>
Co-authored-by: Zhengqi Zhang <1548069580@qq.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Handling nullptr exception in FetchHandler after worker restarts.
### Why are the changes needed?
Current code throw nullptr during handleChunkFetchRequest –
```
25/02/27 09:23:25 WARN [data-client-5-1] TransportResponseHandler: Ignoring response for RPC 425133 from phx61-u46.prod.uber.internal/10.154.141.80:19103 (java.lang.NullPointerException: Cannot read field "shuffleKey" because "state" is null
at org.apache.celeborn.service.deploy.worker.storage.ChunkStreamManager.getShuffleKeyAndFileName(ChunkStreamManager.java:199)
at org.apache.celeborn.service.deploy.worker.FetchHandler.handleChunkFetchRequest(FetchHandler.scala:493)
at org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:181)
at org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:101)
at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
) since it is not outstanding
```
and during handleEndStreamFromClient –
```
2025-02-27T17:31:03+05:30 WARN [data-client-5-3] TransportResponseHandler: Ignoring response for RPC 461011 from phx61-hkr.prod.uber.internal/10.154.133.115:19103 (java.lang.NullPointerException: Cannot read field "shuffleKey" because "streamState" is null
at org.apache.celeborn.service.deploy.worker.FetchHandler.handleEndStreamFromClient(FetchHandler.scala:458)
at org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:174)
at org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:101)
at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
) since it is not outstanding
```
Due to which this error does not get handled properly in the `TransportResponseHandler` as ChunkFetchFailure. Instead it considered this failure as `RpcFailure`.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested in our staging cluster.
Closes#3128 from s0nskar/fix_nullptr_restart.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR partially reverts the change of https://github.com/apache/celeborn/pull/2813, namely, restores the renaming of `celeborn-client-spark-3`
### Why are the changes needed?
The renaming is not necessary, and might cause some confusion, for example, I wrongly interpreted the `spark-3-4` as Spark 3.4, it also increases the backport efforts for branch-0.5
### Does this PR introduce _any_ user-facing change?
No, it's dev only, before/after this change, the end users always use the shaded client
```
celeborn-client-spark-2-shaded_2.11-0.6.0-SNAPSHOT.jar
celeborn-client-spark-3-shaded_2.12-0.6.0-SNAPSHOT.jar
celeborn-client-spark-4-shaded_2.13-0.6.0-SNAPSHOT.jar
```
### How was this patch tested?
Pass GA.
Closes#3133 from pan3793/CELEBORN-1413-followup.
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Bump Spark from 3.5.4 to 3.5.5.
### Why are the changes needed?
Spark 3.5.5 has been announced to release: [Spark 3.5.5 released](https://spark.apache.org/news/spark-3-5-5-released.html). The profile spark-3.5 could bump Spark from 3.5.4 to 3.5.5.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3129 from SteNicholas/CELEBORN-1890.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
add '()' to override method, because method in parent class has '()'
### Why are the changes needed?
./build/make-distribution.sh -Dscala.version=2.13.5 -Dscala.binary.version=2.13 -Pjdk-17
complains error
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test
Closes#3127 from chenkovsky/fix/tierwriter.
Authored-by: Chongchen Chen <chenkovsky@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Followup for https://github.com/apache/celeborn/pull/2373,
fix the `IndexOutOfBoundsException` when using old celeborn client.
### Why are the changes needed?
I meet below exception when using old celeborn client, seem incompatibility issue.
The error log in worker end:
```
25/02/27 14:18:24,182 ERROR [rpc_service-server-4-6] NettyRpcHandler: Error while invoking NettyRpcHandler#receive() on RPC id 4330
java.lang.IndexOutOfBoundsException: Index:0, Size:0
at com.google.protobuf.LongArrayList.ensureIndexInRange(LongArrayList.java:265)
at com.google.protobuf.LongArrayList.getLong(LongArrayList.java:113)
at org.apache.celeborn.common.protocol.PbPackedPartitionLocations.getFileSizes(PbPackedPartitionLocations.java:465)
at org.apache.celeborn.common.util.PbSerDeUtils$.fromPackedPartitionLocations(PbSerDeUtils.scala:653)
at org.apache.celeborn.common.util.PbSerDeUtils$.fromPbPackedPartitionLocationsPair(PbSerDeUtils.scala:591)
at org.apache.celeborn.common.protocol.message.ControlMessages$.fromTransportMessage(ControlMessages.scala:1312)
at org.apache.celeborn.common.util.Utils$.fromTransportMessage(Utils.scala:1056)
at org.apache.celeborn.common.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:110)
at org.apache.celeborn.common.rpc.netty.NettyRpcEnv.$anonfun$deserialize$2(NettyRpcEnv.scala:313)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.celeborn.common.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:363)
at org.apache.celeborn.common.rpc.netty.NettyRpcEnv.$anonfun$deserialize$1(NettyRpcEnv.scala:312)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.celeborn.common.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:312)
at org.apache.celeborn.common.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:555)
at org.apache.celeborn.common.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:619)
at org.apache.celeborn.common.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:605)
at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:101)
at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:85)
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
25/02/27 14:18:24,234 WARN [celeborn-dispatcher-81] Controller: Shuffle application_1739172886147_152994_1-0 not registered!
25/02/27 14:18:24,284 WARN [celeborn-dispatcher-82] Controller: Shuffle application_1739172886147_152994_1-0 not registered!
25/02/27 14:18:24,335 WARN [celeborn-dispatcher-83] Controller: Shuffle application_1739172886147_152994_1-0 not registered!
```
The client end log:
```
org.apache.celeborn.common.exception.CelebornIOException: Register shuffle failed for shuffle 0, reason: RESERVE_SLOTS_FAILED
at org.apache.celeborn.client.ShuffleClientImpl.registerShuffleInternal(ShuffleClientImpl.java:710)
at org.apache.celeborn.client.ShuffleClientImpl.registerShuffle(ShuffleClientImpl.java:519)
at org.apache.celeborn.client.ShuffleClientImpl.lambda$getPartitionLocation$4(ShuffleClientImpl.java:579)
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
at org.apache.celeborn.client.ShuffleClientImpl.getPartitionLocation(ShuffleClientImpl.java:575)
at org.apache.celeborn.client.write.DataPushQueue.takePushTasks(DataPushQueue.java:92)
at org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:124)
at java.base/java.lang.Thread.run(Thread.java:833)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT and IT.
Closes#3122 from turboFei/index_outof.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Support configuring the SSL handshake timeout for SSLHandler, for `rpc_app` and `rpc_service` transport modules.
### Why are the changes needed?
To make the SSLHandler handshake timeout configurable. We are working on ramping shuffle traffic to Celeborn internally, and have observed spark task failures which related to the connection timeout. This will make SSLHandler handshake timeout in line with our ecosystem’s production config, and should minimize those failures and improve robustness.
### Does this PR introduce _any_ user-facing change?
Introduces a new server side configuration.
### How was this patch tested?
Added a new UT, validated with existing UTs.
Closes#3120 from rmcyang/rmcyang/CELEBORN-1882.
Authored-by: Minchu Yang <minyang@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
Follow up #3112.
Support `master --show-workers-topology` command to show registered workers topology.
### Why are the changes needed?
Rest API `/api/v1/workers/topology` supports the grouped workers topology info, which is lack of cli command to show registered workers topology.
### Does this PR introduce _any_ user-facing change?
Introduce `master --show-workers-topology` command.
### How was this patch tested?
`TestCelebornCliCommands#master --show-workers-topology`
Closes#3119 from SteNicholas/CELEBORN-1875.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>