Commit Graph

2455 Commits

Author SHA1 Message Date
daowu.hzy
3efc60cc9f [CELEBORN-2047] Reuse FileChannel/FSDataInputStream in PartitionDataReader
Some checks failed
Celeborn SBT CI / spark3 (8, 2.12.15, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.2) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.15, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.3) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.17, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.4) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.18, 2.12, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.12.18, 2.12, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.5, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.2) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.8, 2.13, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.3) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.4) (push) Has been cancelled
Celeborn SBT CI / spark3 (8, 2.13.8, 2.13, org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO, 3.5) (push) Has been cancelled
Celeborn SBT CI / spark4 (17, 2.13.16, 2.13, org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO, 4.0) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.16, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.16, 8) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.17, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.17, 8) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.18, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.18, 8) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.19, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.19, 8) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.20, 11) (push) Has been cancelled
Celeborn SBT CI / flink1 (1.20, 8) (push) Has been cancelled
Celeborn SBT CI / flink2 (2.0, 11) (push) Has been cancelled
Celeborn SBT CI / flink2 (2.0, 17) (push) Has been cancelled
Celeborn SBT CI / flink2 (2.1, 11) (push) Has been cancelled
Celeborn SBT CI / flink2 (2.1, 17) (push) Has been cancelled
Celeborn SBT CI / mr (11) (push) Has been cancelled
Celeborn SBT CI / mr (8) (push) Has been cancelled
Celeborn SBT CI / openapi-codegen-check (11) (push) Has been cancelled
Style check / License (push) Has been cancelled
Lint Check for Web / lint (push) Has been cancelled
### What changes were proposed in this pull request?

Reuse FileChannel/FSDataInputStream in PartitionDataReader to avoid  open too many files.

### Why are the changes needed?

In previous implementations, different PartitionDataReader reused the same FileChannel/FSDataInputStream,but in #3349
changed this logic, so we need to maintain logical consistency

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manual test.

Closes #3445 from Alibaba-HZY/mirror-fix.

Authored-by: daowu.hzy <daowu.hzy@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-09-03 16:14:56 +08:00
xxx
449cb5d588 [CELEBORN-2127] When fileWriter is closed, it should return HARD_SPLIT StatusCode
…T StatusCode

### What changes were proposed in this pull request?

When fileWriter is closed, it should return HARD_SPLIT StatusCode

### Why are the changes needed?

When fileWriter is closed, it should return HARD_SPLIT StatusCode

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

CI

Closes #3448 from xy2953396112/CELEBORN-2127.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-09-03 16:04:32 +08:00
xxx
750aeefbc6 [CELEBORN-2139] Fix the condition for using OSS storage
### What changes were proposed in this pull request?

Fix the condition for using OSS storage

### Why are the changes needed?

When OSS is enabled, the local disk should be used first if it is available.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

CI

Closes #3463 from xy2953396112/CELEBORN-2139.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-09-03 15:40:42 +08:00
taowenjun
6c102441c3 [CELEBORN-2138] Avoiding multiple accesses to HDFS When writting index file
### What changes were proposed in this pull request?

Avoiding multiple accesses to HDFS When writting index file.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #3460 from taowenjun/CELEBORN-2138.

Authored-by: taowenjun <1483633867@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-09-03 10:43:13 +08:00
SteNicholas
7f08eb8f1d [CELEBORN-2137] Remove unused MAPGROUP PartitionType
### What changes were proposed in this pull request?

Remove unused `MAPGROUP` `PartitionType`.

### Why are the changes needed?

`PartitionType` `MAPGROUP` is unused at present, which could be removed.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3459 from SteNicholas/CELEBORN-2137.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-09-03 09:58:07 +08:00
Wang, Fei
d038dd2b32 [CELEBORN-1258] Support to register application info with user identifier and extra info
### What changes were proposed in this pull request?
Support to register application info with user identifier and extra info.

### Why are the changes needed?
To provide more insight for the application information.

### Does this PR introduce _any_ user-facing change?
A new RESTful API.

### How was this patch tested?
UT.

Closes #3428 from turboFei/app_info_uid.

Lead-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-09-01 11:15:40 +08:00
dz
2817f7fb9e [CELEBORN-2104] Clean up sources of NettyRpcEnv, Master and Worker to avoid thread leaks
### What changes were proposed in this pull request?

Fix NettyRpcEnv,Master,Worker `Source` to avoid thread leak

### Why are the changes needed?

1. NettyRpcEnv should clean rpcSource to prevent resource leak.
2. Master clean resourceConsumptionSource, masterSource, threadPoolSource, jVMSource, jVMCPUSource, systemMiscSource
3. Worker clean clean workerSource.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

local

Closes #3418 from xy2953396112/CELEBORN-2104.

Authored-by: dz <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-29 19:04:19 +08:00
Jray
ffdaef98c3 [CELEBORN-2097] Support Zstd Compression in CppClient
### What changes were proposed in this pull request?
This PR adds support for zstd compression in CppClient.

### Why are the changes needed?
To support writing to Celeborn with CppClient.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By compilation and UTs.

Closes #3454 from Jraaay/feat/cpp_client_zstd_compression.

Authored-by: Jray <1075860716@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-29 18:58:22 +08:00
sychen
185890381b [CELEBORN-2135] Rename Blaze to Auron
### What changes were proposed in this pull request?

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?
<img width="1370" height="1100" alt="image" src="https://github.com/user-attachments/assets/dce7f5b4-a166-4547-bc08-4a8162f129d7" />

Closes #3457 from cxzl25/CELEBORN-2135.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-29 18:55:45 +08:00
xxx
db70473de2 [CELEBORN-2134] When creating a DiskFile, retrieve the storage type b…
…ased on the mount point

### What changes were proposed in this pull request?

 When creating a DiskFile, retrieve the storage type based on the mount point

### Why are the changes needed?

If the worker disk is an SSD, it should be set to the SSD type, and the storage type can be retrieved based on the mount point.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

CI

Closes #3456 from xy2953396112/CELEBORN-2134.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-08-28 17:21:51 +08:00
gaoyajun02
1be3094fb2 [CELEBORN-2132] Enhance ratis peer add operation to support clientAddress & adminAddress
### What changes were proposed in this pull request?

This PR enhances the Ratis peer add operation to support clientAddress and adminAddress parameters with RESTful api, allowing these critical RPC endpoints to be properly configured when adding new peers to the Celeborn master cluster.

### Why are the changes needed?

Currently, when expanding the Celeborn master cluster using the ratis peer add operation, newly added peers lack clientAddress and adminAddress settings. If a newly added peer becomes the Leader, all Followers will return empty addresses to clients, causing them to attempt connections to an incorrect Leader address (127.0.0.1:0). This change ensures proper client request routing in expanded clusters.

### Does this PR introduce _any_ user-facing change?

Yes, this PR extends the API for adding Ratis peers by adding support for clientAddress and adminAddress parameters. Users will now be able to specify these addresses when adding new peers to the cluster.

### How was this patch tested?

Manual testing of cluster expansion scenarios to ensure clients can correctly connect to the Leader regardless of which peer holds leadership

```
➜ curl -sX  POST zw06-data-k8s-sparktest-node007.mt:9098/api/v1/ratis/peer/add \
  -H "Content-Type: application/json" \
  -d '{ "peers": [{"id": "2", "address": "zw06-data-k8s-sparktest-node009.mt:9872", "clientAddress": "zw06-data-k8s-sparktest-node009.mt:9097", "adminAddress":  "zw06-data-k8s-sparktest-node009.mt:9097" }] }' | jq

{
  "success": true,
  "message": "Successfully added peers ArrayBuffer(2|zw06-data-k8s-sparktest-node009.mt:9872) to group GroupInfoReply:client-3E7C9CE679B2->0group-47BEDE733167, cid=1031, SUCCESS, logIndex=0, commits[0:c224, 1:c224]."
}

➜ curl -s zw06-data-k8s-sparktest-node009.mt:9098/masterGroupInfo
====================== Master Group INFO ==============================
group id: c5196f6d-2c34-3ed3-8b8a-47bede733167
leader info: 0(zw06-data-k8s-sparktest-node007.mt:9872)

[server {
  id: "2"
  address: "zw06-data-k8s-sparktest-node009.mt:9872"
  clientAddress: "zw06-data-k8s-sparktest-node009.mt:9097"
  adminAddress: "zw06-data-k8s-sparktest-node009.mt:9097"
  startupRole: FOLLOWER
}
commitIndex: 228
, server {
  id: "0"
  address: "zw06-data-k8s-sparktest-node007.mt:9872"
  clientAddress: "zw06-data-k8s-sparktest-node007.mt:9097"
  adminAddress: "zw06-data-k8s-sparktest-node007.mt:9097"
  startupRole: FOLLOWER
}
commitIndex: 228
, server {
  id: "1"
  address: "zw06-data-k8s-sparktest-node008.mt:9872"
  clientAddress: "zw06-data-k8s-sparktest-node008.mt:9097"
  adminAddress: "zw06-data-k8s-sparktest-node008.mt:9097"
  startupRole: FOLLOWER
}
commitIndex: 228
]

```

Closes #3452 from gaoyajun02/ratis.

Authored-by: gaoyajun02 <gaoyajun02@meituan.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-08-28 11:29:51 +08:00
SteNicholas
1a3b9f35b5 [CELEBORN-2129] CelebornBufferStream should invoke openStreamInternal in moveToNextPartitionIfPossible to avoid client creation timeout
### What changes were proposed in this pull request?

`CelebornBufferStream` should invoke `openStreamInternal` in `moveToNextPartitionIfPossible` to avoid client creation timeout.

### Why are the changes needed?

There are many `CelebornIOException` that is cause by timeout client creation in production environment as follows:

```
2025-08-22 16:20:10,681 INFO  [flink-akka.actor.default-dispatcher-40] org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - [vertex-2]Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm, PROCTIME() AS $6]) -> Sort(orderBy=[lz4sql ASC, rawsize ASC, obcluster ASC, ds ASC, hh ASC, mm ASC, $6 DESC]) -> OverAggregate(partitionBy=[lz4sql, rawsize, obcluster, ds, hh, mm], orderBy=[$6 DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[lz4sql, rawsize, obcluster, ds, hh, mm, $6, w0$o0]) -> Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm], where=[(w0$o0 = 1)]) (668/1900) (d8bf48183d8c69a1ab84bcd445f6d4ed_0e8289f2bf927649dd2511bbc2bb6759_667_0) switched from RUNNING to FAILED on antc4flink4172792604-taskmanager-403  (dataPort=1).
java.io.IOException: org.apache.celeborn.common.exception.CelebornIOException: Connecting to /:9093 timed out (60000 ms)
	at org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:313)
	at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:250)
	at org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:157)
	at org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory.createClientWithRetry(FlinkTransportClientFactory.java:51)
	at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.openStreamInternal(CelebornBufferStream.java:200)
	at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:183)
	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161)
	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79)
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100)
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111)
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76)
	at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
	at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
	at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95)
	at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:991)

	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:193) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at java.lang.Thread.run(Thread.java:991) ~[?:?]
```

`CelebornBufferStream` should invoke `openStreamInternal` in `moveToNextPartitionIfPossible` to avoid client creation timeout, which is caused by creating a client using the callback thread of netty.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual test.

Closes #3450 from SteNicholas/CELEBORN-2129.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-27 14:21:15 +08:00
xxx
f590fb275d [CELEBORN-2122] Avoiding multiple accesses to HDFS when retrieving in…
…dexes in DfsPartitionReader

### What changes were proposed in this pull request?

Avoiding multiple accesses to HDFS when retrieving indexes in DfsPartitionReader

### Why are the changes needed?

This optimization method improves read performance by reducing the number of interactions with HDFS, merging multiple small I/O operations into a single large I/O operation. Especially when index files are small, the strategy of reading the entire file at once can significantly reduce the number of I/O operations.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

CI

Closes #3443 from xy2953396112/CELEBORN-2122.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-08-27 14:16:59 +08:00
xxx
d4e13b6ba2 [CELEBORN-2128] Close hadoopFs FileSystem when worker is closed
### What changes were proposed in this pull request?

Close hadoopFs FileSystem when worker is closed.

### Why are the changes needed?

When the worker is closed, close the hadoopFs FileSystem to avoid resource leakage.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3449 from xy2953396112/CELEBORN-2128.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-08-27 14:12:55 +08:00
SteNicholas
20e36ca72d [CELEBORN-2133] LifecycleManager should log stack trace of Throwable for invoking appShuffleTrackerCallback
### What changes were proposed in this pull request?

`LifecycleManager` should log stack trace of `Throwable` for invoking `appShuffleTrackerCallback`.

### Why are the changes needed?

`LifecycleManager` does not log stack trace of `Throwable` for invoking `appShuffleTrackerCallback` at present, which log is as follows:

```

ERROR LifecycleManager: java.lang.RuntimeException

```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3453 from SteNicholas/CELEBORN-2133.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-27 10:31:56 +08:00
sychen
679df6c0f5 [CELEBORN-2125] Imporve PartitionFilesSorter sort timeout log
### What changes were proposed in this pull request?

### Why are the changes needed?
log only outputs fileid, exception only has file path, and no file length.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

```
25/08/25 18:06:37,083 ERROR [pool-1-thread-1] PartitionFilesSorter: Sorting file application-1-/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/Celeborn1344631182030527062sort-suite path /var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/Celeborn1344631182030527062sort-suite length 2453444321 timeout after 1ms
```

Closes #3446 from cxzl25/CELEBORN-2125.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-26 19:23:22 +08:00
xxx
a9490d6e24 [CELEBORN-2118] Introduce IsHighWorkload metric to monitor worker overload status
### What changes were proposed in this pull request?

Introduce `IsHighWorkload` metric to monitor worker overload status.

### Why are the changes needed?

There is no any metric to monitor worker overload status at present.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

[Grafana test](https://xy2953396112.grafana.net/public-dashboards/22ab1750ef874a1bb39b5879b81a24cf).

Closes #3435 from xy2953396112/CELEBORN-2118.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-25 20:46:17 +08:00
xxx
1d6299717f [CELEBORN-2123] Add log for commit file size
### What changes were proposed in this pull request?

Add log for commit file size.

### Why are the changes needed?

Statistics on the file size information of successfully committed files enables offline analysis of file write sizes, assessment of the proportion of small files, and implementation of optimizations based on file size data.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3444 from xy2953396112/CELEBORN-2123.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-25 17:18:32 +08:00
HolyLow
d6df794ae7 [CELEBORN-2115][CIP-14] Support PushData in cppClient
### What changes were proposed in this pull request?
Support PushData network message in cppClient.

### Why are the changes needed?
PushData is the network message of writing to cppClient.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Compilation and UTs.

Closes #3434 from HolyLow/issue/celeborn-2115-support-push-date-in-cpp-client.

Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-08-25 15:03:33 +08:00
sychen
5f3884f5fb [MINOR] Fix migration doc style
### What changes were proposed in this pull request?

### Why are the changes needed?

<img width="1057" height="442" alt="image" src="https://github.com/user-attachments/assets/a9731a78-4132-4663-8835-fb85aa049a40" />

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

<img width="961" height="412" alt="image" src="https://github.com/user-attachments/assets/4c5eb70a-abed-45a4-9709-cab1fcf0346e" />

Closes #3441 from cxzl25/minor_doc_style.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-22 20:52:31 +08:00
sychen
3c9fe9897c [MINOR] Fix doc about PushMergedData split
### What changes were proposed in this pull request?

### Why are the changes needed?

[CELEBORN-1721][CIP-12] Support HARD_SPLIT in PushMergedData

https://issues.apache.org/jira/browse/CELEBORN-1721

<img width="775" height="149" alt="image" src="https://github.com/user-attachments/assets/deb7a741-5d72-403c-8405-77f837c25f59" />

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

<img width="675" height="108" alt="image" src="https://github.com/user-attachments/assets/b33bead1-6f26-42d7-8ef3-7fd6df3b334e" />

Closes #3442 from cxzl25/doc_PushMergedData_split.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-22 20:50:05 +08:00
SteNicholas
686242cc1b [MINOR] Fix appVersion in Chart.yaml
### What changes were proposed in this pull request?

Fix `appVersion` with 0.7.0 in `Chart.yaml`. Fix #3437.

### Why are the changes needed?

The `appVersion` of `Chart.yaml` is 0.5.4 at present, which does not update to latest version.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3438 from SteNicholas/chart-app-version.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-22 10:55:33 +08:00
yuanzhen
8effb735f7 [CELEBORN-2066] Release workers only with high workload when the number of excluded worker set is too large
### What changes were proposed in this pull request?

Provide user options to enable release workers only with high workload when the number of excluded worker set is too large.

### Why are the changes needed?

In some cases, a large percentage of workers were excluded, but most of them were due to high workload. It's better to release such workers from excluded set to ensure the system availability is a priority.

### Does this PR introduce _any_ user-facing change?

New Configuration Option.

### How was this patch tested?
Unit tests.

Closes #3365 from Kalvin2077/exclude-high-stress-workers.

Lead-authored-by: yuanzhen <yuanzhen.hwk@alibaba-inc.com>
Co-authored-by: Kalvin2077 <wk.huang2077@outlook.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-08-22 10:14:38 +08:00
xxx
661a096b77 [CELEBORN-2112] Introduce PausePushDataStatus and PausePushDataAndReplicateStatus metric to record status of pause push data
### What changes were proposed in this pull request?

Add `PausePushDataStatus` and `PausePushDataAndReplicateStatus` metric.

### Why are the changes needed?

Introduce `PausePushDataStatus` and `PausePushDataAndReplicateStatus` metric to record status of pause push data.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual test. [Grafana](https://xy2953396112.grafana.net/public-dashboards/21af8e2844234c438e74c741211f0032)

Closes #3426 from xy2953396112/CELEBORN-2112.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-21 11:17:44 +08:00
dz
8a37a7ca17 [CELEBORN-2106] CommitFile/Reserved location shows detail primary location UniqueId
…ation UniqueId info

### What changes were proposed in this pull request?

CommitFile/Reserved location shows detail primary location UniqueId info

### Why are the changes needed?

CommitFile/Reserved should display detailed partitionLocation uniqueId logs to facilitate troubleshooting.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

CI

Closes #3420 from xy2953396112/controller_log.

Authored-by: dz <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-21 10:45:30 +08:00
dz
11b41f97ad [CELEBORN-2102] Introduce SorterCacheHitRate metric to monitor the hit reate of index cache for sorter
### What changes were proposed in this pull request?

Introduce `SorterCacheHitRate` metric to monitor the hit reate of index cache for sorter.

### Why are the changes needed?

Monitor the hit rate of `PartitionFilesSorter#indexCache`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The verified grafana dashboard: https://xy2953396112.grafana.net/public-dashboards/5d1177ee0f784b53ad817fde919141b7

Closes #3416 from xy2953396112/CELEBORN_2102.

Authored-by: dz <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-20 10:47:38 +08:00
xxx
b537798e37 [CELEBORN-2108] Remove redundant PartitionType
### What changes were proposed in this pull request?

Remove redundant `PartitionType`.

### Why are the changes needed?

`PartitionType` is included in `PartitionDataWriterContext`, therefore it is not necessary to use `PartitionType` as method parameter.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3422 from xy2953396112/remove_useless_partition_type.

Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-19 16:38:19 +08:00
SteNicholas
adfc563828 [CELEBORN-2119] DfsTierWriter should close s3MultipartUploadHandler and ossMultipartUploadHandler for close resource
### What changes were proposed in this pull request?

`DfsTierWriter` should close `s3MultipartUploadHandler` and `ossMultipartUploadHandler` for close resource to avoid resource leak for destroy file writer.

### Why are the changes needed?

`DfsTierWriter` does not close `s3MultipartUploadHandler` and `ossMultipartUploadHandler` in `closeResource`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3433 from SteNicholas/CELEBORN-2119.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-19 14:57:16 +08:00
sychen
a8f6de5cc6 [CELEBORN-2117] Use git submodules for Chart Actions
### What changes were proposed in this pull request?

### Why are the changes needed?

> The action helm/chart-testing-actionv2.6.1 is not allowed in apache/celeborn because all actions must be from a repository owned by your enterprise, created by GitHub, verified in the GitHub Marketplace

https://github.com/apache/celeborn/actions/runs/17004559972

---

Refer to this PR implementation

https://github.com/apache/ozone-helm-charts/pull/6

```bash
git submodule add --force https://github.com/helm/chart-testing-action .github/actions/chart-testing-action
git -C .github/actions/chart-testing-action checkout e6669bcd63d7cb57cb4380c33043eebe5d111992
```

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?
https://github.com/apache/celeborn/actions/runs/17037324387/job/48292733845?pr=3431

Closes #3431 from cxzl25/chart-testing-action.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-08-19 14:25:48 +08:00
Jray
0882db926d [CELEBORN-2096] Support Lz4 Compression in CppClient
### What changes were proposed in this pull request?
This PR adds support for lz4 compression in CppClient.

### Why are the changes needed?
To support writing to Celeborn with CppClient.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By compilation and UTs.

Closes #3412 from Jraaay/feat/cpp_client_lz4_compression.

Authored-by: Jray <1075860716@qq.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-08-19 10:05:13 +08:00
HolyLow
7e13c9934f [CELEBORN-2098][CIP-14] Support Revive/Response in cppClient
### What changes were proposed in this pull request?
This PR supports Revive/ChangeLocationResponse messages in cppClient.

### Why are the changes needed?
These messages are used when writing triggers revive operation.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Compilation and UTs.

Closes #3413 from HolyLow/issue/celeborn-2098-support-revive-changelocationresponse.

Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-16 12:59:32 +08:00
Jray
180a74146c [CELEBORN-2100] Fix performance issue on readToReadOnlyBuffer
### What changes were proposed in this pull request?
Fix performance issue on ReadOnlyByteBuffer::readToReadOnlyBuffer.

### Why are the changes needed?
ReadOnlyByteBuffer::readToReadOnlyBuffer now is slow on a long iobuf chain because it used wrong api to clone an iobuf block.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By compilation and UTs.

Closes #3415 from Jraaay/fix/readToReadOnlyBuffer.

Authored-by: Jray <1075860716@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-16 12:56:38 +08:00
HolyLow
1ed2abc6bf [CELEBORN-2095][CIP-14] Support RegisterShuffle/Response in cppClient
### What changes were proposed in this pull request?
Support RegisterShuffle/Response messages in CppClient.

### Why are the changes needed?
To support the procedure of registering shuffle and accepting response in CppClient.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Compilation and UTs.

Closes #3410 from HolyLow/issue/celeborn-2095-support-registershuffle-response.

Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-08-13 10:29:34 +08:00
Jray
eb5e8a46f8 [CELEBORN-2091] Support Zstd Decompression in CppClient
### What changes were proposed in this pull request?
This PR adds support for zstd decompression in CppClient.

### Why are the changes needed?
To support reading from Celeborn with CppClient.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By compilation and UTs.

Closes #3411 from Jraaay/feat/cpp_client_zstd_decompression.

Authored-by: Jray <1075860716@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-08-11 15:55:32 +08:00
Jray
cfb490c938 [CELEBORN-2090] Support Lz4 Decompression in CppClient
### What changes were proposed in this pull request?
This PR adds support for lz4 decompression in CppClient.

### Why are the changes needed?
To support reading from Celeborn with CppClient.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By compilation and UTs.

Closes #3402 from Jraaay/feat/cpp_client_lz4_decompression.

Authored-by: Jray <1075860716@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-08 18:19:48 +08:00
TheodoreLx
1ead784fa1 [CELEBORN-2085] Use a fixed buffer for flush copying to reduce GC
### What changes were proposed in this pull request?

Apply for a byte array in advance and use it as a transfer when copying is needed during flush

### Why are the changes needed?

For HdfsFlushTask, OssFlushTask, and S3FlushTask, you need to copy the CompositeByteBuf in the parameter to a byte array when flushing, and then use the respective clients to write the byte array to the storage.
When the flush throughput rate is very high, this copying will cause very serious GC problems and affect the performance of the worker

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

cluster test

Closes #3394 from TheodoreLx/copy-on-flush.

Authored-by: TheodoreLx <1548069580@qq.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-08-08 13:57:21 +08:00
SteNicholas
5a459250b0
[CELEBORN-1844][FOLLOWUP] Fix the condition of StoragePolicy that worker uses memory storage
### What changes were proposed in this pull request?

Fix the condition of `StoragePolicy` that worker uses memory storage

### Why are the changes needed?

The condition of `StoragePolicy` that worker uses memory storage is `order.contains(StorageInfo.Type.MEMORY.name())`, which condition is wrong because `Option#contains` is as follows:

```
final def contains[A1 >: A](elem: A1): Boolean = !isEmpty && this.get == elem
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3408 from SteNicholas/CELEBORN-1844.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-08-06 10:36:07 +08:00
liuyang62
fdcc108689 [CELEBORN-1792][FOLLOWUP] Add missing break in resumeByPinnedMemory
### What changes were proposed in this pull request?
Add missing break in resumeByPinnedMemory

### Why are the changes needed?
Avoid execute `resumePush` twice when resume by pinned memory from `PUSH_AND_REPLICATE_PAUSED` state.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA and cluster tests.

Closes #3407 from Flyangz/bugfix/fix-resumeByPinnedMemory-switch.

Authored-by: liuyang62 <liuyang62@staff.sina.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-05 20:32:55 +08:00
SteNicholas
75446a05d3 [CELEBORN-2093] Support Flink 2.1
### What changes were proposed in this pull request?

Support Flink 2.1.

### Why are the changes needed?

Flink 2.1 has already released, which release notes refer to [Release notes - Flink 2.1](https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-2.1).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3404 from SteNicholas/CELEBORN-2093.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-08-04 14:12:55 +08:00
xinyuwang1
a61d6a517f [CELEBORN-2064] Fix the issue where reading replica partition that returns zero chunk causes tasks to hang
### What changes were proposed in this pull request?
Re-validate hasNextChunk within getNextChunk.

### Why are the changes needed?
Fix the issue where reading replica partition that returns zero chunk causes tasks to hang

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
manual test

Closes #3364 from littlexyw/fix_get_next_chunk.

Authored-by: xinyuwang1 <xinyuwang1@xiaohongshu.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-08-01 13:47:07 -07:00
zhaohehuhu
a498b1137f [CELEBORN-1984] Merge ResourceRequest to transportMessageProtobuf
### What changes were proposed in this pull request?
as title

### Why are the changes needed?

Merge Resource.proto into TransportMessages.proto as per the below design

https://cwiki.apache.org/confluence/display/CELEBORN/CIP-16+Merge+transport+proto+and+resource+proto+files

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #3231 from zhaohehuhu/dev-0425.

Lead-authored-by: zhaohehuhu <luoyedeyi@163.com>
Co-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
2025-08-01 23:28:32 +08:00
Wang, Fei
20a629a432 [CELEBORN-2088] Fix NPE if celeborn.client.spark.fetch.cleanFailedShuffle enabled
### What changes were proposed in this pull request?

Fix NPE if `celeborn.client.spark.fetch.cleanFailedShuffle` is true.

This PR also refine the code for `FailedShuffleCleaner`.
### Why are the changes needed?

`failedShuffleCleaner` is null in executor end.
```
25/07/29 17:58:40 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main]
java.lang.NullPointerException: Cannot invoke "org.apache.celeborn.spark.FailedShuffleCleaner.reset()" because "this.failedShuffleCleaner" is null
	at org.apache.spark.shuffle.celeborn.SparkShuffleManager.stop(SparkShuffleManager.java:272) ~[celeborn-client-spark-3-shaded_2.12-0.6.0-rc3.jar:?]
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
UT.

Closes #3401 from turboFei/fix_npe_cleaner.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-07-31 21:15:51 -07:00
Wang, Fei
604485779c [CELEBORN-2092] Inc COMMIT_FILES_FAIL_COUNT when TimerWriter::close timeout
### What changes were proposed in this pull request?
Inc COMMIT_FILES_FAIL_COUNT when TimerWriter::close timeout

### Why are the changes needed?

1. the COMMIT_FILES_FAIL_COUNT is 0 even we meet SHUFFLE_DATA_LOST caused by commit files failure

Spark executor log:
```

25/07/30 10:10:39 WARN CelebornShuffleReader: Handle fetch exceptions for 0-0org.apache.celeborn.common.exception.CelebornIOException: Failed to load file group of shuffle 0 partition 441! Request GetReducerFileGroup(0,false,V1) return SHUFFLE_DATA_LOST for 0.
```

Spark driver log:
```
25/07/30 10:10:38 ERROR ReducePartitionCommitHandler: Failed to handle stageEnd for 0, lost file!

25/07/30 10:10:38 ERROR ReducePartitionCommitHandler:
For shuffle application_1750652300305_10219240_1-0 partition data lost:
Lost partition 307-0 in worker [Host:hdc42-mcc10-01-0910-2704-064-tess0028.stratus.rno.ebay.com:RpcPort:9200:PushPort:9202:FetchPort:9201:ReplicatePort:9203]
Lost partition 1289-0 in worker [Host:hdc42-mcc10-01-0910-2704-064-tess0028.stratus.rno.ebay.com:RpcPort:9200:PushPort:9202:FetchPort:9201:ReplicatePort:9203]
```

Worker log:
```
java.io.IOException: Wait pending actions timeout.
	at org.apache.celeborn.service.deploy.worker.storage.TierWriterBase.waitOnNoPending(TierWriter.scala:158)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Closes #3403 from turboFei/commit_failed.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-07-31 21:12:21 -07:00
HolyLow
f3c6f306c1 [CELEBORN-2070][CIP-14] Support MapperEnd/Response in CppClient
### What changes were proposed in this pull request?
This PR adds support for MapperEnd/Response in CppClient.

### Why are the changes needed?
To support writing to Celeborn with CppClient.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By compilation and UTs.

Closes #3372 from HolyLow/issue/celeborn-2070-support-registershuffle-mapperend.

Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-07-30 14:40:55 +08:00
Wang, Fei
0a465adea7 [CELEBORN-2087] Refine the docs configuration table view
### What changes were proposed in this pull request?

Enhance the docs configuration view.

### Why are the changes needed?

Currently, it is hard for me to review the config items, I have to scroll for every config item.
https://celeborn.apache.org/docs/latest/configuration/#master
<img width="1188" height="666" alt="image" src="https://github.com/user-attachments/assets/acc02440-f302-452d-9c1c-734332756bde" />

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

```
mkdocs server
```
<img width="1629" height="925" alt="image" src="https://github.com/user-attachments/assets/772c9f19-283d-4080-82d6-5b49494adf4f" />

Closes #3399 from turboFei/celeborn_config.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-07-30 10:33:07 +08:00
SteNicholas
392f6186df [CELEBORN-2086] S3FlushTask and OssFlushTask should close ByteArrayInputStream to avoid resource leak
### What changes were proposed in this pull request?

`S3FlushTask` and `OssFlushTask` should close `ByteArrayInputStream` to avoid resource leak.

### Why are the changes needed?

`S3FlushTask` and `OssFlushTask` don't close `ByteArrayInputStream` at present, which may cause resource leak.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3395 from SteNicholas/CELEBORN-2086.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-07-29 17:19:18 +08:00
SteNicholas
4540b5772b [MINOR] Document introduced metrics into monitoring.md
### What changes were proposed in this pull request?

Document introduced metrics into `monitoring.md` including `FetchChunkTransferTime`, `FetchChunkTransferSize`, `FlushWorkingQueueSize`, `LocalFlushCount`, `LocalFlushSize`, `HdfsFlushCount`, `HdfsFlushSize`, `OssFlushCount`, `OssFlushSize`, `S3FlushCount`, `S3FlushSize`.

### Why are the changes needed?

Introduced metrics `FetchChunkTransferTime`, `FetchChunkTransferSize`, `FlushWorkingQueueSize`, `LocalFlushCount`, `LocalFlushSize`, `HdfsFlushCount`, `HdfsFlushSize`, `OssFlushCount`, `OssFlushSize`, `S3FlushCount`, `S3FlushSize` don't document in `monitoring.md`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes #3398 from SteNicholas/document-monitoring.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-07-29 14:33:46 +08:00
SteNicholas
3ff44fae3f [CELEBORN-894][CELEBORN-474][FOLLOWUP] PushState uses JavaUtils#newConcurrentHashMap to speed up ConcurrentHashMap#computeIfAbsent
### What changes were proposed in this pull request?

`PushState`  uses `JavaUtils#newConcurrentHashMap` to speed up `ConcurrentHashMap#computeIfAbsent`.

### Why are the changes needed?

Celeborn supports JDK8, which could meet the bug mentioned in [JDK-8161372](https://bugs.openjdk.org/browse/JDK-8161372). Therefore, it's better to use `JavaUtils#newConcurrentHashMap` to speed up `ConcurrentHashMap#computeIfAbsent`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3396 from SteNicholas/CELEBORN-894.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
2025-07-29 10:30:31 +08:00
Kalvin2077
c6e68fddfa [CELEBORN-2053] Refactor remote storage configration usage
### What changes were proposed in this pull request?

Refactoring similar code about configuration usage.

### Why are the changes needed?

Improve scalability for possible new remote storage in the future.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests.

Closes #3353 from Kalvin2077/draft.

Authored-by: Kalvin2077 <wk.huang2077@outlook.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
2025-07-28 16:56:32 +08:00
Wang, Fei
7ab6268e38 [CELEBORN-2083] For WorkerStatusTracker, log error for recordWorkerFailure
### What changes were proposed in this pull request?

For WorkerStatusTracker, log error for recordWorkerFailure to separate with status change from application heartbeat response.

### Why are the changes needed?

Currently, in `WorkerStatusTracker`, it logs warning for two cases:
1. status change from application heartbeat response
ae40222351/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala (L213-L214)

2. `recordWorkerFailure ` on some failures, likes `connectFailedWorkers`.

In our use case, the celeborn cluster is very large and the worker status change frequently, so the log for case 1 is very noisy.

I think that:
1. for case2, it is more critical, should use error level
2. for case1, it might be normal for large celeborn cluster, warning level is fine.

With separated log levels, we can mute the noisy status change from application heartbeat response by setting the log level for `WorkerStatusTracker` to error.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
Code review.

Closes #3392 from turboFei/log_level_worker_status.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
2025-07-27 22:46:20 -07:00