Commit Graph

2455 Commits

Author SHA1 Message Date
Cheng Pan
dfeaef1355
[MINOR] Add spec link to JavaSerializer
### What changes were proposed in this pull request?

Document [Java Object Serialization Specification](https://docs.oracle.com/javase/8/docs/platform/serialization/spec/protocol.html) on `JavaSerializer`

### Why are the changes needed?

To help developers who extend the `Serializer` to understand the existing `JavaSerializer` protocol.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Review.

Closes #3194 from pan3793/minor-docs.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
2025-04-02 14:26:32 +08:00
Cheng Pan
0d923c37bf
[CELEBORN-1956] Forward GitHub discussion to ASF mailing list
### What changes were proposed in this pull request?

As title.

### Why are the changes needed?

To satisfy ASF policy.

```
An error occurred while processing the github feature in .asf.yaml:

GitHub discussions can only be enabled if a mailing list target exists for it.

---
With regards, ASF Infra.
For further information, please see the .asf.yaml documentation at:
https://github.com/apache/infrastructure-asfyaml/blob/main/README.md
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This can be manually tested after merging to main.

Closes #3195 from pan3793/CELEBORN-1956.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
2025-04-02 14:25:37 +08:00
Wang, Fei
621afaa5d7 [CELEBORN-1949][FOLLOWUP] Fix typo for kind:deploy label
### What changes were proposed in this pull request?

Fix typo, followup for https://github.com/apache/celeborn/pull/3189

### Why are the changes needed?

Fix typo for kind:deploy label

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

I have confirmed, now all the match conditions are `any-glob-to-any-file`.
<img width="1260" alt="image" src="https://github.com/user-attachments/assets/dde86439-b7b9-4690-99a2-889ff64fdd8b" />

<img width="1250" alt="image" src="https://github.com/user-attachments/assets/f3b8be49-c0d5-46bd-8c1a-9ae2518c7958" />

Closes #3197 from turboFei/fix_labeler_typo.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-04-01 23:11:29 -07:00
mingji
951b626a98 [CELEBORN-1844][CIP-8] introduce tier writer proxy and simplify partition data writer
### What changes were proposed in this pull request?
Most of the contents of this PR is the changes for UT, and it's derived from another PR #3083.

### Why are the changes needed?
Introduce tier writer proxy and simplify the partition data writer.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
GA and manual tests.
I have verified data correctness on a cluster using TPC-DS tests.
The average performance tests result for this PR:
with this PR : without this PR =  2562.5 : 2576.85
There is no performance reduction.

Closes #3085 from FMX/1844.

Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-04-02 13:29:39 +08:00
Yi Chen
8dbbebc644
[CELEBORN-1954][HELM] Add a new value image.registry
### What changes were proposed in this pull request?

Add a new value `image.registry` to `values.yaml`.

### Why are the changes needed?

Add a new value `image.registry` so we can easily switch between different image registries (e.g. `docker.io`, `ghcr.io`, `quay.io` and private image registries).

This PR is part of #2654.

### Does this PR introduce _any_ user-facing change?

Yes, but it should be backward-compatible.

### How was this patch tested?

Update helm unit test and run `helm unittest charts/celeborn  --file "tests/**/*_test.yaml" --strict --debug`.

Closes #3187 from ChenYi015/helm/image.

Authored-by: Yi Chen <github@chenyicn.net>
Signed-off-by: Cheng Pan <chengpan@apache.org>
2025-04-02 11:02:35 +08:00
Yi Chen
6e5bd2403c
[CELEBORN-1952][HELM] Define template helpers for master/worker respectively
### What changes were proposed in this pull request?

Define Helm template helpers for master/worker respectively.

### Why are the changes needed?

For better code organization, all master/worker related files will be placed at `charts/celeborn/templates/master` and `charts/celeborn/templates/worker` respectively.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Run Helm unit tests locally by `helm unittest charts/celeborn  --file "tests/**/*_test.yaml" --strict --debug`

Closes #3183 from ChenYi015/helm/template-helpers.

Authored-by: Yi Chen <github@chenyicn.net>
Signed-off-by: Cheng Pan <chengpan@apache.org>
2025-04-02 11:01:27 +08:00
Wang, Fei
5e12b7d607 [CELEBORN-1921] Broadcast large GetReducerFileGroupResponse to prevent Spark driver network exhausted
### What changes were proposed in this pull request?

For spark celeborn application, if the GetReducerFileGroupResponse is larger than the threshold, Spark driver would broadcast the GetReducerFileGroupResponse to the executors, it prevents the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor).

### Why are the changes needed?
To prevent the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor).

### Does this PR introduce _any_ user-facing change?
No, the feature is not enabled by defaults.

### How was this patch tested?

UT.

Cluster testing with `spark.celeborn.client.spark.shuffle.getReducerFileGroup.broadcast.enabled=true`.

The broadcast response size should be always about 1kb.
![image](https://github.com/user-attachments/assets/d5d1b751-762d-43c8-8a84-0674630a5638)
![image](https://github.com/user-attachments/assets/4841a29e-5d11-4932-9fa5-f6e78b7bc521)
Application succeed.
![image](https://github.com/user-attachments/assets/9b570f70-1433-4457-90ae-b8292e5476ba)

Closes #3158 from turboFei/broadcast_rgf.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-04-01 08:29:21 -07:00
Wang, Fei
1e30f159b9 [CELEBORN-1577][FOLLOWUP] Add UpdateResourceConsumptionTime timer and prevent NPE if metrics not found
### What changes were proposed in this pull request?
Follow up for https://github.com/apache/celeborn/pull/2819
1. add timer for UpdateResourceConsumptionTime
2. prevent NPE if metrics not found

### Why are the changes needed?

The timer not added and cause NPE.
```
25/03/31 13:18:48,219 WARN [master-quota-checker] MasterSource: Metric UpdateResourceConsumptionTime{instance="zeus-slc-cm-1.zeus-slc-cm-cm.hm-dev.svc.140.tess.io:8080",role="master"} not found!
25/03/31 13:18:48,220 WARN [master-quota-checker] MasterSource: Exception encountered during stop timer of metric UpdateResourceConsumptionTime{instance="zeus-slc-cm-1.zeus-slc-cm-cm.hm-dev.svc.140.tess.io:8080",role="master"}
scala.MatchError: null
	at org.apache.celeborn.common.metrics.source.AbstractSource.doStopTimer(AbstractSource.scala:316)
	at org.apache.celeborn.common.metrics.source.AbstractSource.sample(AbstractSource.scala:279)
	at org.apache.celeborn.service.deploy.master.quota.QuotaManager.updateResourceConsumption(QuotaManager.scala:201)
	at org.apache.celeborn.service.deploy.master.quota.QuotaManager$$anon$1.run(QuotaManager.scala:59)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

Existing UT.

Closes #3190 from turboFei/fix_npe.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-04-01 19:24:23 +08:00
Wang, Fei
5adce2b408 [CELEBORN-1949] Add a labeler github action to triage PRs
### What changes were proposed in this pull request?

Add a labeler github action to triage PRs.

### Why are the changes needed?

Provide insights for the PR.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
See testing in https://github.com/apache/celeborn/pull/3188

<img width="934" alt="image" src="https://github.com/user-attachments/assets/41a6048f-eb9a-4fa9-9696-4c3f6bb3ab6f" />

<img width="1367" alt="image" src="https://github.com/user-attachments/assets/f9ad32ea-d2f7-43d6-b041-f336baca02d9" />

Closes #3189 from turboFei/labels.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-04-01 19:13:34 +08:00
Saurabh Dubey
193dc6cf8b [CELEBORN-1929] Avoid unnecessary buffer loss to get better buffer reusability
### What changes were proposed in this pull request?
There is an unnecessary explicit `buffer[i] = null` that would cause certain set of buffers to not be reused from the pool, leading to higher memory usage and gc pressure

### Why are the changes needed?
https://github.com/apache/celeborn/pull/131 introduced a feature to reuse buffers across mapper tasks. However it seems like when closing the writer, we explicitly set certain (i.e. buffers which are non empty and need to be flushed when closing) buffer to null, leading to these buffers not being returned to the pool.

As a result, such buffers are permanently lost (GC'ed), and we'd end up creating more buffers that really necessary [here](https://github.com/apache/celeborn/blob/main/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java#L270), which contributes to the overall total memory usage.

I checked the flow and the buffer being merged, is compressed [here](https://github.com/apache/celeborn/blob/main/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java#L1019) and then copied into a new buffer [here](https://github.com/apache/celeborn/blob/main/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java#L1031), and all of these are sync operations i.e. the overwrite to the buffer by a newly started writer task, should not cause any data corruption, hence it's safe to do so.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
TODO (add UTs), but manually tested once

Closes #3173 from saurabhd336/bufferReuse.

Authored-by: Saurabh Dubey <saurabhd336@uber.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-03-30 23:53:48 -07:00
zhengtao
99ca4dffe8 [CELEBORN-1918] Add batchOpenStream time to fetch wait time
### What changes were proposed in this pull request?
1. Add the rpc times and BatchopenStreamTime to read wait time.
2. remove the  updateFileGroup rpc time for BatchopenStreamTime log.
3. reduce the sleep time waiting for creating inputStream since it cost a lot of sleeping time for tiny shuffle read.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Tested in cluster.

Closes #3180 from zaynt4606/clb1918.

Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-03-30 21:51:53 -07:00
Björn Boschman
3038942233 [CELEBORN-1900][FOLLOWUP] push celeborn docker image
### What changes were proposed in this pull request?

the workflow was just building the images, but not pushing

### Why are the changes needed?

images are now also pushed

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

not yet

Closes #3174 from jesusch/CELEBORN-1900-docker-images-fix-3.

Authored-by: Björn Boschman <bjoern.boschman@innovid.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-31 11:32:15 +08:00
Yi Chen
d8495e5b65 [CELEBORN-1532][HELM] Read log4j2 and metrics configurations from file
### What changes were proposed in this pull request?

- Create file `charts/celeborn/files/conf/log4j2.xml` and `charts/celeborn/files/conf/metrics.properties` to store log4j2 and metrics configurations respectively.
- ConfigMap will read configurations from these two files.

### Why are the changes needed?

For better code readability.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Run Helm unit tests locally by:

```
helm unittest charts/celeborn
```

Closes #3182 from ChenYi015/refactor/helm.

Authored-by: Yi Chen <github@chenyicn.net>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-03-31 11:00:38 +08:00
SteNicholas
56bf87d3c1 [CELEBORN-1543][FOLLOWUP] celeborn-flink-it project should set FLINK_VERSION environment variable for HybridShuffleWordCountTest
### What changes were proposed in this pull request?

`celeborn-flink-it` project should set `FLINK_VERSION` environment variable for `HybridShuffleWordCountTest`.

Follow up #2662.

### Why are the changes needed?

Because of the lack of `FLINK_VERSION` environment variable, the sbt flink project of CI cancels the tests of `HybridShuffleWordCountTest` as follows:

```
[info] - Celeborn Flink Hybrid Shuffle Integration test(Local) - word count !!! CANCELED !!!
[info]   "" was empty (HybridShuffleWordCountTest.scala:75)
[info]   org.scalatest.exceptions.TestCanceledException:
[info]   at org.scalatest.Assertions.newTestCanceledException(Assertions.scala:475)
[info]   at org.scalatest.Assertions.newTestCanceledException$(Assertions.scala:474)
[info]   at org.scalatest.Assertions$.newTestCanceledException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssume(Assertions.scala:1310)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.assumeFlinkVersion(HybridShuffleWordCountTest.scala:75)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.$anonfun$new$1(HybridShuffleWordCountTest.scala:62)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
[info]   at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
[info]   at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.org$scalatest$BeforeAndAfterAll$$super$run(HybridShuffleWordCountTest.scala:37)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.run(HybridShuffleWordCountTest.scala:37)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[info]   at java.base/java.lang.Thread.run(Thread.java:829)
[info] - Celeborn Flink Hybrid Shuffle Integration test(Flink mini cluster) single tier - word count !!! CANCELED !!!
[info]   "" was empty (HybridShuffleWordCountTest.scala:75)
[info]   org.scalatest.exceptions.TestCanceledException:
[info]   at org.scalatest.Assertions.newTestCanceledException(Assertions.scala:475)
[info]   at org.scalatest.Assertions.newTestCanceledException$(Assertions.scala:474)
[info]   at org.scalatest.Assertions$.newTestCanceledException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssume(Assertions.scala:1310)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.assumeFlinkVersion(HybridShuffleWordCountTest.scala:75)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.$anonfun$new$2(HybridShuffleWordCountTest.scala:68)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
[info]   at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
[info]   at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.org$scalatest$BeforeAndAfterAll$$super$run(HybridShuffleWordCountTest.scala:37)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.run(HybridShuffleWordCountTest.scala:37)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[info]   at java.base/java.lang.Thread.run(Thread.java:829)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`HybridShuffleWordCountTest`

- Flink 1.16
```
[info] - Celeborn Flink Hybrid Shuffle Integration test(Local) - word count !!! CANCELED !!!
[info]   "1.16.3" was not empty, but FlinkVersion.fromVersionStr(scala.Predef.refArrayOps[String](scala.Predef.refArrayOps[String](flinkVersion.split("\\.")).take(2)).mkString(".")).isNewerOrEqualVersionThan(v1_20) was false (HybridShuffleWordCountTest.scala:75)
[info]   org.scalatest.exceptions.TestCanceledException:
[info]   at org.scalatest.Assertions.newTestCanceledException(Assertions.scala:475)
[info]   at org.scalatest.Assertions.newTestCanceledException$(Assertions.scala:474)
[info]   at org.scalatest.Assertions$.newTestCanceledException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssume(Assertions.scala:1310)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.assumeFlinkVersion(HybridShuffleWordCountTest.scala:75)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.$anonfun$new$1(HybridShuffleWordCountTest.scala:62)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
[info]   at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
[info]   at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:[1564](https://github.com/apache/celeborn/actions/runs/14124619270/job/39570964025?pr=3184#step:4:1565))
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.org$scalatest$BeforeAndAfterAll$$super$run(HybridShuffleWordCountTest.scala:37)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.run(HybridShuffleWordCountTest.scala:37)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[info]   at java.base/java.lang.Thread.run(Thread.java:829)
[info] - Celeborn Flink Hybrid Shuffle Integration test(Flink mini cluster) single tier - word count !!! CANCELED !!!
[info]   "1.16.3" was not empty, but FlinkVersion.fromVersionStr(scala.Predef.refArrayOps[String](scala.Predef.refArrayOps[String](flinkVersion.split("\\.")).take(2)).mkString(".")).isNewerOrEqualVersionThan(v1_20) was false (HybridShuffleWordCountTest.scala:75)
[info]   org.scalatest.exceptions.TestCanceledException:
[info]   at org.scalatest.Assertions.newTestCanceledException(Assertions.scala:475)
[info]   at org.scalatest.Assertions.newTestCanceledException$(Assertions.scala:474)
[info]   at org.scalatest.Assertions$.newTestCanceledException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssume(Assertions.scala:1310)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.assumeFlinkVersion(HybridShuffleWordCountTest.scala:75)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.$anonfun$new$2(HybridShuffleWordCountTest.scala:68)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
[info]   at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
[info]   at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
[info]   at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info]   at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info]   at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.org$scalatest$BeforeAndAfterAll$$super$run(HybridShuffleWordCountTest.scala:37)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.celeborn.tests.flink.HybridShuffleWordCountTest.run(HybridShuffleWordCountTest.scala:37)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[info]   at java.base/java.lang.Thread.run(Thread.java:829)
```
- Flink 1.20
```
[info] - Celeborn Flink Hybrid Shuffle Integration test(Local) - word count
2025-03-28T08:15:25.149779Z flink-pekko.actor.default-dispatcher-10 WARN found 2 argument placeholders, but provided 3 for pattern `Disconnect TaskExecutor {} because: {}`
25/03/28 08:15:25,199 ERROR [celeborn-client-lifecycle-manager-release-partition-executor-1] LifecycleManager: Request DestroyWorkerSlots(1743149720044-d5114c42fdccf6220ec7a9e1fb856ac3-1,[1-0, 4-0, 7-0],[],false) to NettyRpcEndpointRef(celeborn://WorkerEndpoint10.1.0.100:37683) failed 1/3 for 1743149720044-d5114c42fdccf6220ec7a9e1fb856ac3-1, reason: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask351c2453[Not completed, task = java.util.concurrent.Executors$RunnableAdaptera979257[Wrapped task = org.apache.celeborn.common.rpc.netty.NettyRpcEnv$$anon$16312b56f]] rejected from java.util.concurrent.ScheduledThreadPoolExecutor23161ba2[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0], will retry.
...
[info] - Celeborn Flink Hybrid Shuffle Integration test(Flink mini cluster) single tier - word count
25/03/28 08:15:30,980 ERROR [worker 1 starter thread] MasterClient: Send rpc with failure, has tried 0, max try 15!
```

Closes #3184 from SteNicholas/CELEBORN-1543.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-31 10:51:12 +08:00
sychen
f1c963d0b0 [CELEBORN-1947] Reduce log for CelebornShuffleReader sleeping before inputStream ready
### What changes were proposed in this pull request?

### Why are the changes needed?
When shuffle read timeout, a large number of logs are output in the executor log.

```
inputStream is null, sleeping...
```

<img width="799" alt="image" src="https://github.com/user-attachments/assets/f5b2bfde-5874-4b1e-8992-7037a9c81aa5" />

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #3181 from cxzl25/CELEBORN-1947.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-03-28 10:50:12 -07:00
Angerszhuuuu
3cf1802e78 [CELEBORN-1928][CIP-12] Support HARD_SPLIT in PushMergedData should support handle older worker success response
### What changes were proposed in this pull request?
Support HARD_SPLIT in PushMergedData should support handle older worker success response

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #3172 from AngersZhuuuu/CELEBORN-1928-.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-03-27 15:06:01 +08:00
Xianming Lei
15ea5d3664 [CELEBORN-1930][CIP-12] Support HARD_SPLIT in PushMergedData should handle congestion control NPE issue
### What changes were proposed in this pull request?
When hard split happens, some FileWriters may be null which causes handle congestion control NPE.

### Why are the changes needed?
When hard split happens, some FileWriters may be null which causes handle congestion control NPE.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing uts.

Closes #3176 from leixm/CELEBORN-1930.

Authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-03-26 23:44:04 -07:00
SteNicholas
5f298f5ce2 [CELEBORN-1190][FOLLOWUP] Use -XepDisableWarningsInGeneratedCode to disable warnings for openapi-client module
### What changes were proposed in this pull request?

Use `-XepDisableWarningsInGeneratedCode` to disable warnings for `openapi-client` module.

### Why are the changes needed?

There are some warnings in compilation of `openapi-client` module as follows:

```
$ mvn clean install -pl openapi/openapi-client -DskipTests -Dcheckstyle.skip=true -Drat.skip=true -Dspotless.check.skip=true|grep "[WARNING].*java.*"
[WARNING] /Users/nicholasjiang/Github/celeborn/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/worker/invoker/ApiException.java:[100,18] [OverrideThrowableToString] To return a custom message with a Throwable class, one should override getMessage() instead of toString().
[WARNING] /Users/nicholasjiang/Github/celeborn/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/DynamicConfig.java:[55,19] [ImmutableEnumChecker] enums should be immutable: 'LevelEnum' has non-final field 'value'
[WARNING] /Users/nicholasjiang/Github/celeborn/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/invoker/ApiException.java:[100,18] [OverrideThrowableToString] To return a custom message with a Throwable class, one should override getMessage() instead of toString().
[WARNING] /Users/nicholasjiang/Github/celeborn/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/WorkerExitRequest.java:[51,19] [ImmutableEnumChecker] enums should be immutable: 'TypeEnum' has non-final field 'value'
[WARNING] /Users/nicholasjiang/Github/celeborn/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/PartitionLocationData.java:[58,19] [ImmutableEnumChecker] enums should be immutable: 'ModeEnum' has non-final field 'value'
[WARNING] /Users/nicholasjiang/Github/celeborn/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/PartitionLocationData.java:[107,19] [ImmutableEnumChecker] enums should be immutable: 'StorageEnum' has non-final field 'value'
[WARNING] /Users/nicholasjiang/Github/celeborn/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/SendWorkerEventRequest.java:[60,19] [ImmutableEnumChecker] enums should be immutable: 'EventTypeEnum' has non-final field 'value'
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Local test:

```
$ mvn clean install -pl openapi/openapi-client -DskipTests -Dcheckstyle.skip=true -Drat.skip=true -Dspotless.check.skip=true|grep "[WARNING].*java.*"
```

Closes #3169 from SteNicholas/CELEBORN-1190.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-03-26 12:04:07 +08:00
Björn Boschman
d5645a98d7 [CELEBORN-1900][FOLLOWUP] use github actions to login to docker hub
### What changes were proposed in this pull request?

use correct docker secrets

### Why are the changes needed?

use github actions to login to docker hub

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Can test this patch because the forked repo does not have docker secrets.

Closes #3168 from jesusch/CELEBORN-1900-docker-images-fix-2.

Authored-by: Björn Boschman <bjoern.boschman@innovid.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-25 19:58:53 +08:00
HolyLow
9bae3fbd5e [CELEBORN-1915][CIP-14] Add reader's ShuffleClient to cppClient
### What changes were proposed in this pull request?
This PR adds reader end's ShuffleClient to cppClient.

### Why are the changes needed?
ShuffleClient is the user interface for cppClient usage.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Compilation.

Closes #3156 from HolyLow/issue/celeborn-1915-add-reader-shuffle-client-to-cppclient.

Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-25 17:54:34 +08:00
Xianming Lei
0a97ca0aa9 [CELEBORN-1577][PHASE2] QuotaManager should support interrupt shuffle
### What changes were proposed in this pull request?
1. Worker reports resourceConsumption to master
2. QuotaManager calculates the resourceConsumption of each app and marks the apps that exceed the quota.
    2.1 When the tenant's resourceConsumption exceeds the tenant's quota, select the app with a larger consumption to mark interrupted.
    2.2 When the resourceConsumption of the cluster exceeds the cluster quota, select the app with larger consumption to mark interrupted.
3. Master returns to Driver through heartbeat, whether app is marked interrupted

### Why are the changes needed?
The current storage quota logic can only limit new shuffles, and cannot limit the writing of existing shuffles. In our production environment, there is such an scenario: the cluster is small, but the user's app single shuffle is large which occupied disk resources, we want to interrupt those shuffle.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
UTs.

Closes #2819 from leixm/CELEBORN-1577-2.

Authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-03-24 22:05:45 +08:00
wangshengjie3
4bacd1f211 [CELEBORN-1856] Support stage-rerun when read partition by chunkOffsets when enable optimize skew partition read
### What changes were proposed in this pull request?
Support stage-rerun when read partition by chunkOffsets when enable optimize skew partition read

### Why are the changes needed?
In [CELEBORN-1319](https://issues.apache.org/jira/browse/CELEBORN-1319), we have already implemented the skew partition read optimization based on chunk offsets, but we don't support skew partition shuffle retry, so we need support the stage rerun.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Cluster test

Closes #3118 from wangshengjie123/support-stage-rerun.

Lead-authored-by: wangshengjie3 <wangshengjie3@xiaomi.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-03-24 22:03:15 +08:00
Björn Boschman
192213dafb [CELEBORN-1900][FOLLOWUP] fixed wrong CI parameter
### What changes were proposed in this pull request?

The referred step ID used to extract the version was wrong

### Why are the changes needed?

To fix docker build workflow failures.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Manually tests.

Closes #3160 from jesusch/CELEBORN-1900-docker-images-fix.

Authored-by: Björn Boschman <bjoern.boschman@innovid.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-24 11:27:18 +08:00
Angerszhuuuu
151fd35676 [CELEBORN-1923] Correct Celeborn available slots calculation logic
### What changes were proposed in this pull request?
Fix incorrect logic when calculate disk available slots

### Why are the changes needed?
Now we use `usableSize / estimatedPartitionSize = maxSlots`
Then `availableSlots = maxSlots - allocatedSlots`
But `availableSlots` should be `usableSize / estimizatedPartitionSize`

### Does this PR introduce _any_ user-facing change?
Yea

### How was this patch tested?
MT

Closes #3162 from AngersZhuuuu/CELEBORN-1923.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-03-23 15:27:06 -07:00
SteNicholas
7b73f59173 [CELEBORN-1678][FOLLOWUP] Update master and worker commands in celeborn_cli.md
### What changes were proposed in this pull request?

Update master and worker commands in `celeborn_cli.md` including:

- Remove `--show-top-disk-used-apps` command of master and worker.
- Add `--show-workers-topology` command of master.

### Why are the changes needed?

- #2949 removes `--show-top-disk-used-apps` command of master and worker.
- #3119 adds `--show-workers-topology` command of master.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes #3165 from SteNicholas/CELEBORN-1678.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-03-21 15:50:55 +08:00
Björn Boschman
9e8f9f6b19 [CELEBORN-1900] docker images
### What changes were proposed in this pull request?

build & publish docker images on release

### Why are the changes needed?

No offical images available

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

I wish I could - but I cannot push against a non existing docker repo :)

Closes #3152 from jesusch/CELEBORN-1900-docker-images.

Authored-by: Björn Boschman <bjoern.boschman@innovid.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-19 15:37:20 +08:00
TheodoreLx
7174275533 [CELEBORN-1914] incWriteTime when ShuffleWriter invoke pushGiantRecord
### What changes were proposed in this pull request?

incWriteTime when ShuffleWriter invoke pushGiantRecord

### Why are the changes needed?
When ShuffleWriter calls pushGiantRecord, the task thread needs to wait synchronously for the push to complete, just like pushing the last part of data in the close method. The time spent on pushGiantRecord needs to be included in writeTime.

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ut

Closes #3155 from TheodoreLx/fix-write-time.

Authored-by: TheodoreLx <1548069580@qq.com >
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-18 15:38:36 +08:00
Saurabh Dubey
7571e10ad5 [CELEBORN-1894] Allow skipping already read chunks during unreplicated shuffle read retried
### What changes were proposed in this pull request?
Whenever a `WorkerPartitionReader` is recreated (due celeborn worker restarts / any other chunk fetch failure), the entire shuffle partition file is re-read from beginning, discarding already read chunks in `CelebornInputStream` based on the batchIdSet metadata maintained.

This can be improved (only for cases where shuffle data is unreplicated) by skipping already read chunk id since they'd be discarded anyway. This improves overall shuffle read performance (reducer's total time, network usage etc).

### Why are the changes needed?
Allow skipping already read shuffle chunks

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UTs added

Closes #3132 from saurabhd336/skipReadChunks.

Authored-by: Saurabh Dubey <saurabhd336@uber.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-03-18 11:37:33 +08:00
SteNicholas
38f3bdd375 [CELEBORN-1909] Support pre-run static code blocks of TransportMessages to improve performance of protobuf serialization
### What changes were proposed in this pull request?

Support pre-run static code blocks of `TransportMessages` to improve performance of protobuf serialization.

### Why are the changes needed?

The protobuf message protocol defines many map type fields, which makes it time-consuming to build these message instances. This is because `TransportMessages` contains static code blocks to initialize a large number of `Descriptor`s and `FieldAccessorTable`s, where the instantiation of `FieldAccessorTable` includes reflection. The test result proves that the static code blocks execute in about 70 milliseconds.

Therefore, it's better to pre-run static code blocks of `TransportMessages` to improve performance of protobuf serialization. Meanwhile, it's recommended to use repeated instead of map type field for rpc messages.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3149 from SteNicholas/CELEBORN-1909.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-03-18 11:34:39 +08:00
veli.yang
d96457909d [CELEBORN-1911] Move multipart-uploader to multipart-uploader/multipart-uploader-s3 for extensibility
### What changes were proposed in this pull request?
- close [CELEBORN-1911](https://issues.apache.org/jira/browse/CELEBORN-1911)

This PR refactors the project structure by moving the multipart-uploader module into multipart-uploader/multipart-uploader-s3.

### Why are the changes needed?
This change improves modularity and enables future extensions, such as multipart-uploader/multipart-uploader-oss, allowing better support for multiple object storage backends.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Deployment integration testing has been completed in the local environment.

Closes #3153 from shouwangyw/optimize/mpu-s3.

Authored-by: veli.yang <897900564@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-14 22:34:32 +08:00
HolyLow
a5214e2535 [CELEBORN-1906][CIP-14] Add CelebornInputStream to cppClient
### What changes were proposed in this pull request?
This PR adds CelebornInputStream to cppClient.

### Why are the changes needed?
The CelebornInputStream is the readerClient's feeding stream.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Compilation.

Closes #3151 from HolyLow/issue/celeborn-1906-add-celeborn-input-stream-to-cppclient.

Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-14 22:31:51 +08:00
zhengtao
b5fab42604 [CELEBORN-1822] Respond to RegisterShuffle with max epoch PartitionLocation to avoid revive
### What changes were proposed in this pull request?
LifecycleManager respond to RegisterShuffle with max epoch PartitionLocation.

### Why are the changes needed?
Newly spun up executors in a Spark job will still get the partitionLocations with the minEpoch of the celeborn lost worker.

These executors will fail to connect to the lost worker and then call into revive to get the latest PartitionLocation for a given partitionId in `ChangePartitionManager.getLatestPartition()`.

Return with max epoch can reduce such revive requests.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT.

Closes #3135 from zaynt4606/clb1822.

Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-03-14 16:08:05 +08:00
SteNicholas
c1fb94d6e3 [CELEBORN-1910] Remove redundant synchronized of isTerminated in ThreadUtils#sameThreadExecutorService
### What changes were proposed in this pull request?

Remove the redundant synchronized keyword from the `isTerminated` method in `sameThreadExecutorService()` implementation within the `ThreadUtils` class. The method was using both a synchronized block and an explicit `ReentrantLock`, which is unnecessary and potentially problematic.

Backport https://github.com/apache/spark/pull/50210.

### Why are the changes needed?

The changes are needed for several reasons:

1. **Eliminates redundant synchronization**: The current implementation uses both synchronized keyword and explicit ReentrantLock, which is redundant and creates unnecessary overhead.
2. **Reduces potential deadlock risks**: Using two different locking mechanisms (built-in synchronized and explicit `ReentrantLock`) in the same method could lead to complex lock ordering issues and increase deadlock risks. Although the risk of deadlock in the current implementation is low, if someone modifies the code in the future and adds a method that acquires these two locks in a different order, it would introduce a deadlock risk.
3. **Improves performance**: Removing the unnecessary synchronization layer reduces lock contention and context switching overhead.
4. **Code consistency**: Other methods in the same class only use `ReentrantLock` for synchronization, so removing synchronized makes the code style more consistent.
5. **More precise control**: `ReentrantLock` already provides all the synchronization needed with more features than the implicit synchronized keyword.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3150 from SteNicholas/CELEBORN-1910.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-03-13 16:10:07 +08:00
veli.yang
464a3842e3 [CELEBORN-1899] Fix configuration bug in shuffle s3
### What changes were proposed in this pull request?
close [issues-3145](https://github.com/apache/celeborn/issues/3145)

### Why are the changes needed?
1. Support s3 schema.
2. Fixed the problem that diskFileInfo judged the S3 type incorrectly.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Deployment integration testing has been completed in the local environment.

Closes #3146 from shouwangyw/bugfix/resolve_bugs_3145.

Authored-by: veli.yang <897900564@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-12 10:40:52 +08:00
Björn Boschman
8d6c49aed3 [CELEBORN-1901] updated base docker image tag
<!--
Thanks for sending a pull request!  Here are some tips for you:
  - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] Your PR title ...'.
  - Be sure to keep the PR description updated to reflect all changes.
  - Please write your PR title to summarize what this PR proposes.
  - If possible, provide a concise example to reproduce the issue for a faster review.
-->

CELEBORN-1901

### What changes were proposed in this pull request?

Update base docker image to ubuntu 24.04 (noble)

### Why are the changes needed?

Current used ubuntu 20.04 is close to end-of-life

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Tested on our celeborn installation

Closes #3143 from jesusch/CELEBORN-1901-update-docker-base-image.

Authored-by: Björn Boschman <bjoern.boschman@innovid.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-11 15:09:57 +08:00
Sanskar Modi
05b6ad4a7b [MINOR] Change config versions
### What changes were proposed in this pull request?

0.6.0 -> 0.5.4

- `celeborn.rpc.retryWait`
- `celeborn.client.rpc.retryWait`

`empty` -> 0.5.4

- `celeborn.<module>.io.conflictAvoidChooser.enable`

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #3142 from s0nskar/config_rpc_retry.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-03-11 07:39:32 +08:00
HolyLow
595ab41f5e [CELEBORN-1881][CIP-14] Add WorkerPartitionReader to cppClient
### What changes were proposed in this pull request?
This PR adds WorkerPartitionReader to cppClient.

### Why are the changes needed?
WorkerPartitionReader is the building block of CelebornInputStream.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Compilation and UTs.

Closes #3137 from HolyLow/issue/celeborn-1881-add-workerpartitionreader-to-cppclient.

Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-10 20:47:17 +08:00
Cheng Pan
df809159d1 [CELEBORN-1898] SparkOutOfMemoryError compatible with Spark 4.0 and 4.1
### What changes were proposed in this pull request?

SPARK-49946 (4.0.0) removes single String constructor of class `SparkOutOfMemoryError` and introduces `_LEGACY_ERROR_TEMP_3301` error condition, SPARK-51386 (4.1.0) renames the error condition to `POINTER_ARRAY_OUT_OF_MEMORY`.

### Why are the changes needed?

To be compatible with Spark 4.0 and 4.1

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

GHA checks Spark 2.4 to Spark 3.5, I manually tested with Spark 4.0.0 RC2

Closes #3141 from pan3793/CELEBORN-1898.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-10 15:19:50 +08:00
CodingCat
18b268d085 [CELEBORN-1897] Avoid calling toString for too long messages
### What changes were proposed in this pull request?

196ad607cd/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala (L256) we hit the issue that the ignored message is too long , and when calling toString, it applies for a too large array which is beyond the JVM's limit 

I don't think this log convey too much info, , so we could avoid calling toString to improve robustness of the applications

### Why are the changes needed?

more details in JIRA

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

prod

Closes #3139 from CodingCat/log.

Authored-by: CodingCat <zhunansjtu@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-03-10 11:33:13 +08:00
Wang, Fei
3d05c8998f [CELEBORN-1895] Bump log4j2 version to 2.24.3
### What changes were proposed in this pull request?

Bump log4j2 version to 2.24.3
https://github.com/apache/logging-log4j2/releases/tag/rel%2F2.24.3

### Why are the changes needed?
Bump to latest log4j2 bug fix release.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
GA.

Closes #3134 from turboFei/log4j2.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-03-10 11:30:52 +08:00
Wang, Fei
6f5ad2dde8 [MINOR] Refine the log for fetch failure and rpc metrics dump
### What changes were proposed in this pull request?
Minor change for the log, to make the fetch failure message and rpc metrics dump much clear.

### Why are the changes needed?
As title.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

GA.

Closes #3136 from turboFei/log_shuffle.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-03-10 10:56:53 +08:00
TheodoreLx
196ad607cd [CELEBORN-1792][FOLLOWUP] Keep resume for a while after resumeByPinnedMemory
### What changes were proposed in this pull request?

In the switchServingState after resumeByPinnedMemory, keep the resume channel to prevent the channel from frequently resuming and pausing before memoryUsage decreases to pausePushDataThreshold.

### Why are the changes needed?

Frequent channel resume and pause will result in slow data reception and failure to quickly reduce memoryUsage to below pausePushDataThreshold.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

ut

Closes #3099 from TheodoreLx/keep-resume.

Lead-authored-by: TheodoreLx <1548069580@qq.com >
Co-authored-by: 慧枫 <zhengqi.zzq@antgroup.com>
Co-authored-by: Zhengqi Zhang <1548069580@qq.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-03-05 09:37:59 +08:00
Sanskar Modi
fa4327e093 [CELEBORN-1885] Fix nullptr exceptions in FetchChunk after worker restart
### What changes were proposed in this pull request?

Handling nullptr exception in FetchHandler after worker restarts.

### Why are the changes needed?

Current code throw nullptr during handleChunkFetchRequest –

```
25/02/27 09:23:25 WARN [data-client-5-1] TransportResponseHandler: Ignoring response for RPC 425133 from phx61-u46.prod.uber.internal/10.154.141.80:19103 (java.lang.NullPointerException: Cannot read field "shuffleKey" because "state" is null
	at org.apache.celeborn.service.deploy.worker.storage.ChunkStreamManager.getShuffleKeyAndFileName(ChunkStreamManager.java:199)
	at org.apache.celeborn.service.deploy.worker.FetchHandler.handleChunkFetchRequest(FetchHandler.scala:493)
	at org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:181)
	at org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:101)
	at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
	at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
	at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:840)
) since it is not outstanding
```
and during handleEndStreamFromClient – 

```
2025-02-27T17:31:03+05:30 WARN [data-client-5-3] TransportResponseHandler: Ignoring response for RPC 461011 from phx61-hkr.prod.uber.internal/10.154.133.115:19103 (java.lang.NullPointerException: Cannot read field "shuffleKey" because "streamState" is null
	at org.apache.celeborn.service.deploy.worker.FetchHandler.handleEndStreamFromClient(FetchHandler.scala:458)
	at org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:174)
	at org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:101)
	at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
	at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
	at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:840)
) since it is not outstanding
```
Due to which this error does not get handled properly in the `TransportResponseHandler` as ChunkFetchFailure. Instead it considered this failure as `RpcFailure`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested in our staging cluster.

Closes #3128 from s0nskar/fix_nullptr_restart.

Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-04 22:26:38 +08:00
Cheng Pan
e85207e2c7 [CELEBORN-1413][FOLLOWUP] Rename celeborn-client-spark-3-4 back to celeborn-client-spark-3
### What changes were proposed in this pull request?

This PR partially reverts the change of https://github.com/apache/celeborn/pull/2813, namely, restores the renaming of `celeborn-client-spark-3`

### Why are the changes needed?

The renaming is not necessary, and might cause some confusion, for example, I wrongly interpreted the `spark-3-4` as Spark 3.4, it also increases the backport efforts for branch-0.5

### Does this PR introduce _any_ user-facing change?

No, it's dev only, before/after this change, the end users always use the shaded client

```
celeborn-client-spark-2-shaded_2.11-0.6.0-SNAPSHOT.jar
celeborn-client-spark-3-shaded_2.12-0.6.0-SNAPSHOT.jar
celeborn-client-spark-4-shaded_2.13-0.6.0-SNAPSHOT.jar
```

### How was this patch tested?

Pass GA.

Closes #3133 from pan3793/CELEBORN-1413-followup.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-04 22:25:10 +08:00
SteNicholas
15e34eca6e [CELEBORN-1890] Bump Spark from 3.5.4 to 3.5.5
### What changes were proposed in this pull request?

Bump Spark from 3.5.4 to 3.5.5.

### Why are the changes needed?

Spark 3.5.5 has been announced to release: [Spark 3.5.5 released](https://spark.apache.org/news/spark-3-5-5-released.html). The profile spark-3.5 could bump Spark from 3.5.4 to 3.5.5.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3129 from SteNicholas/CELEBORN-1890.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-03-04 14:15:04 +08:00
Chongchen Chen
3a83ac7693 [CELEBORN-1889] Fix scala 2.13 complie error
### What changes were proposed in this pull request?

add '()' to override method, because method in parent class has '()'

### Why are the changes needed?

./build/make-distribution.sh -Dscala.version=2.13.5 -Dscala.binary.version=2.13 -Pjdk-17
complains error

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #3127 from chenkovsky/fix/tierwriter.

Authored-by: Chongchen Chen <chenkovsky@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-03-03 15:41:02 +08:00
Wang, Fei
660cf24deb [CELEBORN-1319][FOLLOWUP] Fix IndexOutOfBoundsException when using old celeborn client
### What changes were proposed in this pull request?
Followup for https://github.com/apache/celeborn/pull/2373,
fix the `IndexOutOfBoundsException` when using old celeborn client.

### Why are the changes needed?

I meet below exception when using old celeborn client, seem incompatibility issue.

The error log in worker end:
```
25/02/27 14:18:24,182 ERROR [rpc_service-server-4-6] NettyRpcHandler: Error while invoking NettyRpcHandler#receive() on RPC id 4330
java.lang.IndexOutOfBoundsException: Index:0, Size:0
        at com.google.protobuf.LongArrayList.ensureIndexInRange(LongArrayList.java:265)
        at com.google.protobuf.LongArrayList.getLong(LongArrayList.java:113)
        at org.apache.celeborn.common.protocol.PbPackedPartitionLocations.getFileSizes(PbPackedPartitionLocations.java:465)
        at org.apache.celeborn.common.util.PbSerDeUtils$.fromPackedPartitionLocations(PbSerDeUtils.scala:653)
        at org.apache.celeborn.common.util.PbSerDeUtils$.fromPbPackedPartitionLocationsPair(PbSerDeUtils.scala:591)
        at org.apache.celeborn.common.protocol.message.ControlMessages$.fromTransportMessage(ControlMessages.scala:1312)
        at org.apache.celeborn.common.util.Utils$.fromTransportMessage(Utils.scala:1056)
        at org.apache.celeborn.common.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:110)
        at org.apache.celeborn.common.rpc.netty.NettyRpcEnv.$anonfun$deserialize$2(NettyRpcEnv.scala:313)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at org.apache.celeborn.common.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:363)
        at org.apache.celeborn.common.rpc.netty.NettyRpcEnv.$anonfun$deserialize$1(NettyRpcEnv.scala:312)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at org.apache.celeborn.common.rpc.netty.NettyRpcEnv.deserialize(NettyRpcEnv.scala:312)
        at org.apache.celeborn.common.rpc.netty.RequestMessage$.apply(NettyRpcEnv.scala:555)
        at org.apache.celeborn.common.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:619)
        at org.apache.celeborn.common.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:605)
        at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:101)
        at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:85)
        at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)
25/02/27 14:18:24,234 WARN [celeborn-dispatcher-81] Controller: Shuffle application_1739172886147_152994_1-0 not registered!
25/02/27 14:18:24,284 WARN [celeborn-dispatcher-82] Controller: Shuffle application_1739172886147_152994_1-0 not registered!
25/02/27 14:18:24,335 WARN [celeborn-dispatcher-83] Controller: Shuffle application_1739172886147_152994_1-0 not registered!
```

The client end log:
```
org.apache.celeborn.common.exception.CelebornIOException: Register shuffle failed for shuffle 0, reason: RESERVE_SLOTS_FAILED
	at org.apache.celeborn.client.ShuffleClientImpl.registerShuffleInternal(ShuffleClientImpl.java:710)
	at org.apache.celeborn.client.ShuffleClientImpl.registerShuffle(ShuffleClientImpl.java:519)
	at org.apache.celeborn.client.ShuffleClientImpl.lambda$getPartitionLocation$4(ShuffleClientImpl.java:579)
	at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
	at org.apache.celeborn.client.ShuffleClientImpl.getPartitionLocation(ShuffleClientImpl.java:575)
	at org.apache.celeborn.client.write.DataPushQueue.takePushTasks(DataPushQueue.java:92)
	at org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:124)
	at java.base/java.lang.Thread.run(Thread.java:833)

```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UT and IT.

Closes #3122 from turboFei/index_outof.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-02-28 11:34:14 +08:00
SteNicholas
d90cf0d427 [CELEBORN-1884] Bump rocksdbjni version from 9.5.2 to 9.10.0
### What changes were proposed in this pull request?

Bump rocksdbjni version from 9.5.2 to 9.10.0.

### Why are the changes needed?

There are some bug fixes and performance Improvements. The full release notes:

- https://github.com/facebook/rocksdb/releases/tag/v9.6.1
- https://github.com/facebook/rocksdb/releases/tag/v9.7.3
- https://github.com/facebook/rocksdb/releases/tag/v9.7.4
- https://github.com/facebook/rocksdb/releases/tag/v9.8.4
- https://github.com/facebook/rocksdb/releases/tag/v9.9.3
- https://github.com/facebook/rocksdb/releases/tag/v9.10.0

Backport:

- https://github.com/apache/spark/pull/48155
- https://github.com/apache/spark/pull/49538
- https://github.com/apache/spark/pull/50076

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3121 from SteNicholas/CELEBORN-1884.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-02-28 11:29:42 +08:00
Minchu Yang
44d772df75 [CELEBORN-1882] Support configuring the SSL handshake timeout for SSLHandler
### What changes were proposed in this pull request?
Support configuring the SSL handshake timeout for SSLHandler, for `rpc_app` and `rpc_service` transport modules.

### Why are the changes needed?
To make the SSLHandler handshake timeout configurable. We are working on ramping shuffle traffic to Celeborn internally, and have observed spark task failures which related to the connection timeout. This will make SSLHandler handshake timeout in line with our ecosystem’s production config, and should minimize those failures and improve robustness.

### Does this PR introduce _any_ user-facing change?
Introduces a new server side configuration.

### How was this patch tested?
Added a new UT, validated with existing UTs.

Closes #3120 from rmcyang/rmcyang/CELEBORN-1882.

Authored-by: Minchu Yang <minyang@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
2025-02-27 15:43:32 -06:00
SteNicholas
cc501928ce [CELEBORN-1875][FOLLOWUP] Support master --show-workers-topology command to show registered workers topology
### What changes were proposed in this pull request?

Follow up #3112.

Support `master --show-workers-topology` command to show registered workers topology.

### Why are the changes needed?

Rest API `/api/v1/workers/topology` supports the grouped workers topology info, which is lack of cli command to show registered workers topology.

### Does this PR introduce _any_ user-facing change?

Introduce `master --show-workers-topology` command.

### How was this patch tested?

`TestCelebornCliCommands#master --show-workers-topology`

Closes #3119 from SteNicholas/CELEBORN-1875.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-02-27 10:44:51 +08:00