### What changes were proposed in this pull request?
Reuse FileChannel/FSDataInputStream in PartitionDataReader to avoid open too many files.
### Why are the changes needed?
In previous implementations, different PartitionDataReader reused the same FileChannel/FSDataInputStream,but in #3349
changed this logic, so we need to maintain logical consistency
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manual test.
Closes#3445 from Alibaba-HZY/mirror-fix.
Authored-by: daowu.hzy <daowu.hzy@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
…T StatusCode
### What changes were proposed in this pull request?
When fileWriter is closed, it should return HARD_SPLIT StatusCode
### Why are the changes needed?
When fileWriter is closed, it should return HARD_SPLIT StatusCode
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes#3448 from xy2953396112/CELEBORN-2127.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix the condition for using OSS storage
### Why are the changes needed?
When OSS is enabled, the local disk should be used first if it is available.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes#3463 from xy2953396112/CELEBORN-2139.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Avoiding multiple accesses to HDFS When writting index file.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#3460 from taowenjun/CELEBORN-2138.
Authored-by: taowenjun <1483633867@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Remove unused `MAPGROUP` `PartitionType`.
### Why are the changes needed?
`PartitionType` `MAPGROUP` is unused at present, which could be removed.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3459 from SteNicholas/CELEBORN-2137.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Support to register application info with user identifier and extra info.
### Why are the changes needed?
To provide more insight for the application information.
### Does this PR introduce _any_ user-facing change?
A new RESTful API.
### How was this patch tested?
UT.
Closes#3428 from turboFei/app_info_uid.
Lead-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix NettyRpcEnv,Master,Worker `Source` to avoid thread leak
### Why are the changes needed?
1. NettyRpcEnv should clean rpcSource to prevent resource leak.
2. Master clean resourceConsumptionSource, masterSource, threadPoolSource, jVMSource, jVMCPUSource, systemMiscSource
3. Worker clean clean workerSource.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
local
Closes#3418 from xy2953396112/CELEBORN-2104.
Authored-by: dz <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
This PR adds support for zstd compression in CppClient.
### Why are the changes needed?
To support writing to Celeborn with CppClient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By compilation and UTs.
Closes#3454 from Jraaay/feat/cpp_client_zstd_compression.
Authored-by: Jray <1075860716@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
<img width="1370" height="1100" alt="image" src="https://github.com/user-attachments/assets/dce7f5b4-a166-4547-bc08-4a8162f129d7" />
Closes#3457 from cxzl25/CELEBORN-2135.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: SteNicholas <programgeek@163.com>
…ased on the mount point
### What changes were proposed in this pull request?
When creating a DiskFile, retrieve the storage type based on the mount point
### Why are the changes needed?
If the worker disk is an SSD, it should be set to the SSD type, and the storage type can be retrieved based on the mount point.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes#3456 from xy2953396112/CELEBORN-2134.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR enhances the Ratis peer add operation to support clientAddress and adminAddress parameters with RESTful api, allowing these critical RPC endpoints to be properly configured when adding new peers to the Celeborn master cluster.
### Why are the changes needed?
Currently, when expanding the Celeborn master cluster using the ratis peer add operation, newly added peers lack clientAddress and adminAddress settings. If a newly added peer becomes the Leader, all Followers will return empty addresses to clients, causing them to attempt connections to an incorrect Leader address (127.0.0.1:0). This change ensures proper client request routing in expanded clusters.
### Does this PR introduce _any_ user-facing change?
Yes, this PR extends the API for adding Ratis peers by adding support for clientAddress and adminAddress parameters. Users will now be able to specify these addresses when adding new peers to the cluster.
### How was this patch tested?
Manual testing of cluster expansion scenarios to ensure clients can correctly connect to the Leader regardless of which peer holds leadership
```
➜ curl -sX POST zw06-data-k8s-sparktest-node007.mt:9098/api/v1/ratis/peer/add \
-H "Content-Type: application/json" \
-d '{ "peers": [{"id": "2", "address": "zw06-data-k8s-sparktest-node009.mt:9872", "clientAddress": "zw06-data-k8s-sparktest-node009.mt:9097", "adminAddress": "zw06-data-k8s-sparktest-node009.mt:9097" }] }' | jq
{
"success": true,
"message": "Successfully added peers ArrayBuffer(2|zw06-data-k8s-sparktest-node009.mt:9872) to group GroupInfoReply:client-3E7C9CE679B2->0group-47BEDE733167, cid=1031, SUCCESS, logIndex=0, commits[0:c224, 1:c224]."
}
➜ curl -s zw06-data-k8s-sparktest-node009.mt:9098/masterGroupInfo
====================== Master Group INFO ==============================
group id: c5196f6d-2c34-3ed3-8b8a-47bede733167
leader info: 0(zw06-data-k8s-sparktest-node007.mt:9872)
[server {
id: "2"
address: "zw06-data-k8s-sparktest-node009.mt:9872"
clientAddress: "zw06-data-k8s-sparktest-node009.mt:9097"
adminAddress: "zw06-data-k8s-sparktest-node009.mt:9097"
startupRole: FOLLOWER
}
commitIndex: 228
, server {
id: "0"
address: "zw06-data-k8s-sparktest-node007.mt:9872"
clientAddress: "zw06-data-k8s-sparktest-node007.mt:9097"
adminAddress: "zw06-data-k8s-sparktest-node007.mt:9097"
startupRole: FOLLOWER
}
commitIndex: 228
, server {
id: "1"
address: "zw06-data-k8s-sparktest-node008.mt:9872"
clientAddress: "zw06-data-k8s-sparktest-node008.mt:9097"
adminAddress: "zw06-data-k8s-sparktest-node008.mt:9097"
startupRole: FOLLOWER
}
commitIndex: 228
]
```
Closes#3452 from gaoyajun02/ratis.
Authored-by: gaoyajun02 <gaoyajun02@meituan.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
`CelebornBufferStream` should invoke `openStreamInternal` in `moveToNextPartitionIfPossible` to avoid client creation timeout.
### Why are the changes needed?
There are many `CelebornIOException` that is cause by timeout client creation in production environment as follows:
```
2025-08-22 16:20:10,681 INFO [flink-akka.actor.default-dispatcher-40] org.apache.flink.runtime.executiongraph.ExecutionGraph [] - [vertex-2]Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm, PROCTIME() AS $6]) -> Sort(orderBy=[lz4sql ASC, rawsize ASC, obcluster ASC, ds ASC, hh ASC, mm ASC, $6 DESC]) -> OverAggregate(partitionBy=[lz4sql, rawsize, obcluster, ds, hh, mm], orderBy=[$6 DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[lz4sql, rawsize, obcluster, ds, hh, mm, $6, w0$o0]) -> Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm], where=[(w0$o0 = 1)]) (668/1900) (d8bf48183d8c69a1ab84bcd445f6d4ed_0e8289f2bf927649dd2511bbc2bb6759_667_0) switched from RUNNING to FAILED on antc4flink4172792604-taskmanager-403 (dataPort=1).
java.io.IOException: org.apache.celeborn.common.exception.CelebornIOException: Connecting to /:9093 timed out (60000 ms)
at org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:313)
at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:250)
at org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:157)
at org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory.createClientWithRetry(FlinkTransportClientFactory.java:51)
at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.openStreamInternal(CelebornBufferStream.java:200)
at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:183)
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161)
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76)
at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95)
at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:991)
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:193) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at java.lang.Thread.run(Thread.java:991) ~[?:?]
```
`CelebornBufferStream` should invoke `openStreamInternal` in `moveToNextPartitionIfPossible` to avoid client creation timeout, which is caused by creating a client using the callback thread of netty.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes#3450 from SteNicholas/CELEBORN-2129.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
…dexes in DfsPartitionReader
### What changes were proposed in this pull request?
Avoiding multiple accesses to HDFS when retrieving indexes in DfsPartitionReader
### Why are the changes needed?
This optimization method improves read performance by reducing the number of interactions with HDFS, merging multiple small I/O operations into a single large I/O operation. Especially when index files are small, the strategy of reading the entire file at once can significantly reduce the number of I/O operations.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes#3443 from xy2953396112/CELEBORN-2122.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Close hadoopFs FileSystem when worker is closed.
### Why are the changes needed?
When the worker is closed, close the hadoopFs FileSystem to avoid resource leakage.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3449 from xy2953396112/CELEBORN-2128.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
`LifecycleManager` should log stack trace of `Throwable` for invoking `appShuffleTrackerCallback`.
### Why are the changes needed?
`LifecycleManager` does not log stack trace of `Throwable` for invoking `appShuffleTrackerCallback` at present, which log is as follows:
```
ERROR LifecycleManager: java.lang.RuntimeException
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3453 from SteNicholas/CELEBORN-2133.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
### Why are the changes needed?
log only outputs fileid, exception only has file path, and no file length.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
```
25/08/25 18:06:37,083 ERROR [pool-1-thread-1] PartitionFilesSorter: Sorting file application-1-/var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/Celeborn1344631182030527062sort-suite path /var/folders/tc/r2n_8g6j4731h7clfqwntg880000gn/T/Celeborn1344631182030527062sort-suite length 2453444321 timeout after 1ms
```
Closes#3446 from cxzl25/CELEBORN-2125.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Introduce `IsHighWorkload` metric to monitor worker overload status.
### Why are the changes needed?
There is no any metric to monitor worker overload status at present.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
[Grafana test](https://xy2953396112.grafana.net/public-dashboards/22ab1750ef874a1bb39b5879b81a24cf).
Closes#3435 from xy2953396112/CELEBORN-2118.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Add log for commit file size.
### Why are the changes needed?
Statistics on the file size information of successfully committed files enables offline analysis of file write sizes, assessment of the proportion of small files, and implementation of optimizations based on file size data.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3444 from xy2953396112/CELEBORN-2123.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Support PushData network message in cppClient.
### Why are the changes needed?
PushData is the network message of writing to cppClient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation and UTs.
Closes#3434 from HolyLow/issue/celeborn-2115-support-push-date-in-cpp-client.
Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix `appVersion` with 0.7.0 in `Chart.yaml`. Fix#3437.
### Why are the changes needed?
The `appVersion` of `Chart.yaml` is 0.5.4 at present, which does not update to latest version.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3438 from SteNicholas/chart-app-version.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Provide user options to enable release workers only with high workload when the number of excluded worker set is too large.
### Why are the changes needed?
In some cases, a large percentage of workers were excluded, but most of them were due to high workload. It's better to release such workers from excluded set to ensure the system availability is a priority.
### Does this PR introduce _any_ user-facing change?
New Configuration Option.
### How was this patch tested?
Unit tests.
Closes#3365 from Kalvin2077/exclude-high-stress-workers.
Lead-authored-by: yuanzhen <yuanzhen.hwk@alibaba-inc.com>
Co-authored-by: Kalvin2077 <wk.huang2077@outlook.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Add `PausePushDataStatus` and `PausePushDataAndReplicateStatus` metric.
### Why are the changes needed?
Introduce `PausePushDataStatus` and `PausePushDataAndReplicateStatus` metric to record status of pause push data.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test. [Grafana](https://xy2953396112.grafana.net/public-dashboards/21af8e2844234c438e74c741211f0032)
Closes#3426 from xy2953396112/CELEBORN-2112.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
…ation UniqueId info
### What changes were proposed in this pull request?
CommitFile/Reserved location shows detail primary location UniqueId info
### Why are the changes needed?
CommitFile/Reserved should display detailed partitionLocation uniqueId logs to facilitate troubleshooting.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes#3420 from xy2953396112/controller_log.
Authored-by: dz <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Introduce `SorterCacheHitRate` metric to monitor the hit reate of index cache for sorter.
### Why are the changes needed?
Monitor the hit rate of `PartitionFilesSorter#indexCache`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
The verified grafana dashboard: https://xy2953396112.grafana.net/public-dashboards/5d1177ee0f784b53ad817fde919141b7Closes#3416 from xy2953396112/CELEBORN_2102.
Authored-by: dz <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Remove redundant `PartitionType`.
### Why are the changes needed?
`PartitionType` is included in `PartitionDataWriterContext`, therefore it is not necessary to use `PartitionType` as method parameter.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3422 from xy2953396112/remove_useless_partition_type.
Authored-by: xxx <953396112@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
`DfsTierWriter` should close `s3MultipartUploadHandler` and `ossMultipartUploadHandler` for close resource to avoid resource leak for destroy file writer.
### Why are the changes needed?
`DfsTierWriter` does not close `s3MultipartUploadHandler` and `ossMultipartUploadHandler` in `closeResource`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3433 from SteNicholas/CELEBORN-2119.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
This PR adds support for lz4 compression in CppClient.
### Why are the changes needed?
To support writing to Celeborn with CppClient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By compilation and UTs.
Closes#3412 from Jraaay/feat/cpp_client_lz4_compression.
Authored-by: Jray <1075860716@qq.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR supports Revive/ChangeLocationResponse messages in cppClient.
### Why are the changes needed?
These messages are used when writing triggers revive operation.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation and UTs.
Closes#3413 from HolyLow/issue/celeborn-2098-support-revive-changelocationresponse.
Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Fix performance issue on ReadOnlyByteBuffer::readToReadOnlyBuffer.
### Why are the changes needed?
ReadOnlyByteBuffer::readToReadOnlyBuffer now is slow on a long iobuf chain because it used wrong api to clone an iobuf block.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By compilation and UTs.
Closes#3415 from Jraaay/fix/readToReadOnlyBuffer.
Authored-by: Jray <1075860716@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Support RegisterShuffle/Response messages in CppClient.
### Why are the changes needed?
To support the procedure of registering shuffle and accepting response in CppClient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Compilation and UTs.
Closes#3410 from HolyLow/issue/celeborn-2095-support-registershuffle-response.
Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR adds support for zstd decompression in CppClient.
### Why are the changes needed?
To support reading from Celeborn with CppClient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By compilation and UTs.
Closes#3411 from Jraaay/feat/cpp_client_zstd_decompression.
Authored-by: Jray <1075860716@qq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
This PR adds support for lz4 decompression in CppClient.
### Why are the changes needed?
To support reading from Celeborn with CppClient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By compilation and UTs.
Closes#3402 from Jraaay/feat/cpp_client_lz4_decompression.
Authored-by: Jray <1075860716@qq.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Apply for a byte array in advance and use it as a transfer when copying is needed during flush
### Why are the changes needed?
For HdfsFlushTask, OssFlushTask, and S3FlushTask, you need to copy the CompositeByteBuf in the parameter to a byte array when flushing, and then use the respective clients to write the byte array to the storage.
When the flush throughput rate is very high, this copying will cause very serious GC problems and affect the performance of the worker
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
cluster test
Closes#3394 from TheodoreLx/copy-on-flush.
Authored-by: TheodoreLx <1548069580@qq.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix the condition of `StoragePolicy` that worker uses memory storage
### Why are the changes needed?
The condition of `StoragePolicy` that worker uses memory storage is `order.contains(StorageInfo.Type.MEMORY.name())`, which condition is wrong because `Option#contains` is as follows:
```
final def contains[A1 >: A](elem: A1): Boolean = !isEmpty && this.get == elem
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3408 from SteNicholas/CELEBORN-1844.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Add missing break in resumeByPinnedMemory
### Why are the changes needed?
Avoid execute `resumePush` twice when resume by pinned memory from `PUSH_AND_REPLICATE_PAUSED` state.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA and cluster tests.
Closes#3407 from Flyangz/bugfix/fix-resumeByPinnedMemory-switch.
Authored-by: liuyang62 <liuyang62@staff.sina.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Support Flink 2.1.
### Why are the changes needed?
Flink 2.1 has already released, which release notes refer to [Release notes - Flink 2.1](https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-2.1).
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3404 from SteNicholas/CELEBORN-2093.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Re-validate hasNextChunk within getNextChunk.
### Why are the changes needed?
Fix the issue where reading replica partition that returns zero chunk causes tasks to hang
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
manual test
Closes#3364 from littlexyw/fix_get_next_chunk.
Authored-by: xinyuwang1 <xinyuwang1@xiaohongshu.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
as title
### Why are the changes needed?
Merge Resource.proto into TransportMessages.proto as per the below design
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-16+Merge+transport+proto+and+resource+proto+files
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#3231 from zhaohehuhu/dev-0425.
Lead-authored-by: zhaohehuhu <luoyedeyi@163.com>
Co-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Fix NPE if `celeborn.client.spark.fetch.cleanFailedShuffle` is true.
This PR also refine the code for `FailedShuffleCleaner`.
### Why are the changes needed?
`failedShuffleCleaner` is null in executor end.
```
25/07/29 17:58:40 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main]
java.lang.NullPointerException: Cannot invoke "org.apache.celeborn.spark.FailedShuffleCleaner.reset()" because "this.failedShuffleCleaner" is null
at org.apache.spark.shuffle.celeborn.SparkShuffleManager.stop(SparkShuffleManager.java:272) ~[celeborn-client-spark-3-shaded_2.12-0.6.0-rc3.jar:?]
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
Closes#3401 from turboFei/fix_npe_cleaner.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Inc COMMIT_FILES_FAIL_COUNT when TimerWriter::close timeout
### Why are the changes needed?
1. the COMMIT_FILES_FAIL_COUNT is 0 even we meet SHUFFLE_DATA_LOST caused by commit files failure
Spark executor log:
```
25/07/30 10:10:39 WARN CelebornShuffleReader: Handle fetch exceptions for 0-0org.apache.celeborn.common.exception.CelebornIOException: Failed to load file group of shuffle 0 partition 441! Request GetReducerFileGroup(0,false,V1) return SHUFFLE_DATA_LOST for 0.
```
Spark driver log:
```
25/07/30 10:10:38 ERROR ReducePartitionCommitHandler: Failed to handle stageEnd for 0, lost file!
25/07/30 10:10:38 ERROR ReducePartitionCommitHandler:
For shuffle application_1750652300305_10219240_1-0 partition data lost:
Lost partition 307-0 in worker [Host:hdc42-mcc10-01-0910-2704-064-tess0028.stratus.rno.ebay.com:RpcPort:9200:PushPort:9202:FetchPort:9201:ReplicatePort:9203]
Lost partition 1289-0 in worker [Host:hdc42-mcc10-01-0910-2704-064-tess0028.stratus.rno.ebay.com:RpcPort:9200:PushPort:9202:FetchPort:9201:ReplicatePort:9203]
```
Worker log:
```
java.io.IOException: Wait pending actions timeout.
at org.apache.celeborn.service.deploy.worker.storage.TierWriterBase.waitOnNoPending(TierWriter.scala:158)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Closes#3403 from turboFei/commit_failed.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
This PR adds support for MapperEnd/Response in CppClient.
### Why are the changes needed?
To support writing to Celeborn with CppClient.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By compilation and UTs.
Closes#3372 from HolyLow/issue/celeborn-2070-support-registershuffle-mapperend.
Authored-by: HolyLow <jiaming.xie7@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
`S3FlushTask` and `OssFlushTask` should close `ByteArrayInputStream` to avoid resource leak.
### Why are the changes needed?
`S3FlushTask` and `OssFlushTask` don't close `ByteArrayInputStream` at present, which may cause resource leak.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3395 from SteNicholas/CELEBORN-2086.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Document introduced metrics into `monitoring.md` including `FetchChunkTransferTime`, `FetchChunkTransferSize`, `FlushWorkingQueueSize`, `LocalFlushCount`, `LocalFlushSize`, `HdfsFlushCount`, `HdfsFlushSize`, `OssFlushCount`, `OssFlushSize`, `S3FlushCount`, `S3FlushSize`.
### Why are the changes needed?
Introduced metrics `FetchChunkTransferTime`, `FetchChunkTransferSize`, `FlushWorkingQueueSize`, `LocalFlushCount`, `LocalFlushSize`, `HdfsFlushCount`, `HdfsFlushSize`, `OssFlushCount`, `OssFlushSize`, `S3FlushCount`, `S3FlushSize` don't document in `monitoring.md`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes#3398 from SteNicholas/document-monitoring.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
`PushState` uses `JavaUtils#newConcurrentHashMap` to speed up `ConcurrentHashMap#computeIfAbsent`.
### Why are the changes needed?
Celeborn supports JDK8, which could meet the bug mentioned in [JDK-8161372](https://bugs.openjdk.org/browse/JDK-8161372). Therefore, it's better to use `JavaUtils#newConcurrentHashMap` to speed up `ConcurrentHashMap#computeIfAbsent`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3396 from SteNicholas/CELEBORN-894.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Refactoring similar code about configuration usage.
### Why are the changes needed?
Improve scalability for possible new remote storage in the future.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
Closes#3353 from Kalvin2077/draft.
Authored-by: Kalvin2077 <wk.huang2077@outlook.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
For WorkerStatusTracker, log error for recordWorkerFailure to separate with status change from application heartbeat response.
### Why are the changes needed?
Currently, in `WorkerStatusTracker`, it logs warning for two cases:
1. status change from application heartbeat response
ae40222351/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala (L213-L214)
2. `recordWorkerFailure ` on some failures, likes `connectFailedWorkers`.
In our use case, the celeborn cluster is very large and the worker status change frequently, so the log for case 1 is very noisy.
I think that:
1. for case2, it is more critical, should use error level
2. for case1, it might be normal for large celeborn cluster, warning level is fine.
With separated log levels, we can mute the noisy status change from application heartbeat response by setting the log level for `WorkerStatusTracker` to error.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Code review.
Closes#3392 from turboFei/log_level_worker_status.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>