### What changes were proposed in this pull request?
Support to register application info with user identifier and extra info.
### Why are the changes needed?
To provide more insight for the application information.
### Does this PR introduce _any_ user-facing change?
A new RESTful API.
### How was this patch tested?
UT.
Closes#3428 from turboFei/app_info_uid.
Lead-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
`CelebornBufferStream` should invoke `openStreamInternal` in `moveToNextPartitionIfPossible` to avoid client creation timeout.
### Why are the changes needed?
There are many `CelebornIOException` that is cause by timeout client creation in production environment as follows:
```
2025-08-22 16:20:10,681 INFO [flink-akka.actor.default-dispatcher-40] org.apache.flink.runtime.executiongraph.ExecutionGraph [] - [vertex-2]Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm, PROCTIME() AS $6]) -> Sort(orderBy=[lz4sql ASC, rawsize ASC, obcluster ASC, ds ASC, hh ASC, mm ASC, $6 DESC]) -> OverAggregate(partitionBy=[lz4sql, rawsize, obcluster, ds, hh, mm], orderBy=[$6 DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[lz4sql, rawsize, obcluster, ds, hh, mm, $6, w0$o0]) -> Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm], where=[(w0$o0 = 1)]) (668/1900) (d8bf48183d8c69a1ab84bcd445f6d4ed_0e8289f2bf927649dd2511bbc2bb6759_667_0) switched from RUNNING to FAILED on antc4flink4172792604-taskmanager-403 (dataPort=1).
java.io.IOException: org.apache.celeborn.common.exception.CelebornIOException: Connecting to /:9093 timed out (60000 ms)
at org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:313)
at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:250)
at org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:157)
at org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory.createClientWithRetry(FlinkTransportClientFactory.java:51)
at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.openStreamInternal(CelebornBufferStream.java:200)
at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:183)
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161)
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76)
at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95)
at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:991)
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:193) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
at java.lang.Thread.run(Thread.java:991) ~[?:?]
```
`CelebornBufferStream` should invoke `openStreamInternal` in `moveToNextPartitionIfPossible` to avoid client creation timeout, which is caused by creating a client using the callback thread of netty.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes#3450 from SteNicholas/CELEBORN-2129.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Provide user options to enable release workers only with high workload when the number of excluded worker set is too large.
### Why are the changes needed?
In some cases, a large percentage of workers were excluded, but most of them were due to high workload. It's better to release such workers from excluded set to ensure the system availability is a priority.
### Does this PR introduce _any_ user-facing change?
New Configuration Option.
### How was this patch tested?
Unit tests.
Closes#3365 from Kalvin2077/exclude-high-stress-workers.
Lead-authored-by: yuanzhen <yuanzhen.hwk@alibaba-inc.com>
Co-authored-by: Kalvin2077 <wk.huang2077@outlook.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Apply for a byte array in advance and use it as a transfer when copying is needed during flush
### Why are the changes needed?
For HdfsFlushTask, OssFlushTask, and S3FlushTask, you need to copy the CompositeByteBuf in the parameter to a byte array when flushing, and then use the respective clients to write the byte array to the storage.
When the flush throughput rate is very high, this copying will cause very serious GC problems and affect the performance of the worker
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
cluster test
Closes#3394 from TheodoreLx/copy-on-flush.
Authored-by: TheodoreLx <1548069580@qq.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Added a configuration for client to read non shuffle partition waiting time
### Why are the changes needed?
When the shuffle data of a task is relatively small and there are many empty shuffle partitions, it will take a lot of time for invalid waiting here
### Does this PR introduce _any_ user-facing change?
add configurable
### How was this patch tested?
production environment validation
Closes#3358 from dh20/celeborn_add-20250707.
Lead-authored-by: duanhao-jk <duanhao-jk@360shuke.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
add data size limitation to inflight data by introducing a new configuration: `celeborn.client.push.maxBytesInFlight.perWorker/total` and defaults to `celeborn.client.push.buffer.max.size * celeborn.client.push.maxReqsInFlight.perWorker/total`.
for backward compatibility, also add a control: `celeborn.client.push.maxReqsInFlight.enabled`.
### Why are the changes needed?
celeborn do supports limiting the number of push inflight requests via `celeborn.client.push.maxReqsInFlight.perWorker/total`. this is a good constraint to memory usage where most requests do not exceed `celeborn.client.push.buffer.max.size`. however, in a vectorized shuffle (like blaze and gluten), a request might be greatly larger then the max buffer size, leading to too much inflight data and results OOM.
### Does this PR introduce _any_ user-facing change?
Yes, add new config for client
### How was this patch tested?
test on local env
Closes#3248 from DDDominik/CELEBORN-1917.
Lead-authored-by: DDDominik <1015545832@qq.com>
Co-authored-by: SteNicholas <programgeek@163.com>
Co-authored-by: DDDominik <zhuangxian@kuaishou.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Design doc - https://docs.google.com/document/d/1YqK0kua-5rMufJw57kEIrHHGbLnAF9iXM5GdDweMzzg/edit?tab=t.0#heading=h.n5ldma432qnd
- End to End integrity checks provide additional confidence that Celeborn is producing complete as well as correct data
- The checks are hidden behind a client side config that is false by default. Provides users optionality to enable these if required on a per app basis
- Only compatible with Spark at the moment
- No support for Flink (can be considered in future)
- No support for Columnar Shuffle (can be considered in future)
Writer
- Whenever a mapper completes, it reports crc32 and bytes written on a per partition basis to the driver
Driver
- Driver aggregates the mapper reports - and computes aggregated CRC32 and bytes written on per partitionID basis
Reader
- Each CelebornInputStream will report (int shuffleId, int partitionId, int startMapIndex, int endMapIndex, int crc32, long bytes) to driver when it finished reading all data on the stream
- On every report
- Driver will aggregate the CRC32 and bytesRead for the partitionID
- Driver will aggregate mapRange to determine when all sub-paritions have been read for partitionID have been read
- It will then compare the aggregated CRC32 and bytes read with the expected CRC32 and bytes written for the partition
- There is special handling for skewhandlingwithoutMapRangeSplit scenario as well
- In this case, we report the number of sub-partitions and index of the sub-partition instead of startMapIndex and endMapIndex
There is separate handling for skew handling with and without map range split
As a follow up, I will do another PR that will harden up the checks and perform additional checks to add book keeping that every CelebornInputStream makes the required checks
### Why are the changes needed?
https://issues.apache.org/jira/browse/CELEBORN-894
Note: I am putting up this PR even though some tests are failing, since I want to get some early feedback on the code changes.
### Does this PR introduce _any_ user-facing change?
Not sure how to answer this. A new client side config is available to enable the checks if required
### How was this patch tested?
Unit tests + Integration tests
Closes#3261 from gauravkm/gaurav/e2e_checks_v3.
Lead-authored-by: Gaurav Mittal <gaurav@stripe.com>
Co-authored-by: Gaurav Mittal <gauravkm@gmail.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Add a new sink and allow the user to store metrics to files.
2. Celeborn will scrape its metrics periodically to make sure that the metric data won't be too large to cause OOM.
### Why are the changes needed?
A long-running worker ran out of memory and found out that the metrics are huge in the heap dump.
As you can see below, the biggest object is the time metric queue, and I got 1.6 million records.
<img width="1516" alt="Screenshot 2025-06-24 at 09 59 30" src="https://github.com/user-attachments/assets/691c7bc2-b974-4cc0-8d5a-bf626ab903c0" />
<img width="1239" alt="Screenshot 2025-06-24 at 14 45 10" src="https://github.com/user-attachments/assets/ebdf5a4d-c941-4f1e-911f-647aa156b37a" />
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
Cluster.
Closes#3346 from FMX/b2045.
Lead-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Co-authored-by: Ethan Feng <ethanfeng@apache.org>
Co-authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Add a retry mechanism when completing S3 multipart upload to ensure that completeMultipartUpload is retry when facing retryable exception like SlowDown one
### Why are the changes needed?
While running a “simple” spark jobs creating 10TiB of shuffle data (repartition from 100k partition to 20) the job was constantly failing when all files should be committed. relying on SOFT `celeborn.client.shuffle.partitionSplit.mode`
Despite an increase of `celeborn.storage.s3.mpu.maxRetries` up to `200`. Job was still failing due to SlowDown exception
Adding some debug logs on the retry policy from AWS S3 SDK I've seen that the policy is never called when doing completeMultipartUpload action while it is well called on other actions. See https://issues.apache.org/jira/browse/CELEBORN-2003
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Created a cluster on a kubernetes server relying on S3 storage.
Launch a 10TiB shuffle from 100000 partitions to 200 partitions with SOFT `celeborn.client.shuffle.partitionSplit.mode`
The job succeed and well display some warn logs indicating that the `completeMultipartUpload` is retried due to SlowDown:
```
bucket ******* key poc/spark-2c86663c948243d19c127e90f704a3d5/0/35-39-0 uploadId Pbaq.pp1qyLvtGbfZrMwA8RgLJ4QYanAMhmv0DvKUk0m6.GlCKdC3ICGngn7Q7iIa0Dw1h3wEn78EoogMlYgFD6.tDqiatOTbFprsNkk0qzLu9KY8YCC48pqaINcvgi8c1gQKKhsf1zZ.5Et5j40wQ-- upload failed to complete, will retry (1/10)
com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: null; Status Code: 0; Error Code: SlowDown; Request ID: RAV5MXX3B9Z3ZHTG; S3 Extended Request ID: 9Qqm3vfJVLFNY1Y3yKAobJHv7JkHQP2+v8hGSW2HYIOputAtiPdkqkY5MfD66lEzAl45m71aiPVB0f1TxTUD+upUo0NxXp6S; Proxy: null), S3 Extended Request ID: 9Qqm3vfJVLFNY1Y3yKAobJHv7JkHQP2+v8hGSW2HYIOputAtiPdkqkY5MfD66lEzAl45m71aiPVB0f1TxTUD+upUo0NxXp6S at com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$CompleteMultipartUploadHandler.doEndElement(XmlResponsesSaxParser.java:1906)
```
Closes#3293 from ashangit/nfraison/CELEBORN-2003.
Authored-by: nicolas.fraison@datadoghq.com <nicolas.fraison@datadoghq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Support min number of workers to assign slots on for a shuffle.
### Why are the changes needed?
PR https://github.com/apache/celeborn/pull/3039 updated the default value of `celeborn.master.slot.assign.extraSlots` to avoid skew in shuffle stage with less number of reducers. However, it will also affect the stage with large number of reducers, thus not ideal.
We are introducing a new config `celeborn.master.slot.assign.minWorkers` which will ensure that shuffle stages with less number of reducers will not cause load imbalance on few nodes.
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
NA
Closes#3297 from s0nskar/min_workers.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Introduce `ShuffleMetricGroup` for `numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics.
Follow up #3272.
### Why are the changes needed?
`numBytesIn`, `numBytesOut`, `numRecordsOut`, `numBytesInPerSecond`, `numBytesOutPerSecond`, `numRecordsOutPerSecond` metrics should put shuffle id into variables, which could introduce `ShuffleMetricGroup` to support. Meanwhile, #3272 would print many same logs as follows that shoud be improved:
```
2025-05-28 10:48:54,433 WARN [flink-akka.actor.default-dispatcher-18] org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numRecordsOut'. Metric will not be reported.[11.66.62.202, taskmanager, antc4flink3980005426-taskmanager-3-70, antc4flink3980005426, [vertex-2]HashJoin(joinType=[LeftOuterJoin], where=[(f0 = f00)], select=[f0, f1, f2, f3, f4, f5, f6, f00, f10, f20, f30, f40, f50, f60], build=[right]) -> Sink: Sink(table=[default_catalog.default_database.sink], fields=[f0, f1, f2, f3, f4, f5, f6, f00, f10, f20, f30, f40, f50, f60]), 2, Shuffle, Remote, 1]
```
### Does this PR introduce _any_ user-facing change?
Introduce `celeborn.client.flink.metrics.scope.shuffle` config option to define the scope format string that is applied to all metrics scoped to a shuffle:
- Variables:
- Shuffle: `<task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>, <shuffle_id>`.
- Metrics:
Scope | Metrics | Description | Type
-- | -- | -- | --
Shuffle | numBytesIn | The total number of bytes this shuffle has read. | Counter |
Shuffle | numBytesOut| The total number of bytes this shuffle has written. | Counter |
Shuffle | numRecordsOut | The total number of records this shuffle has written. | Counter |
Shuffle | numBytesInPerSecond | The number of bytes this shuffle reads per second. | Meter |
Shuffle | numBytesOutPerSecond | The number of bytes this shuffle writes per second. | Meter |
Shuffle | numRecordsOutPerSecond | The number of records this shuffle writes per second. | Meter |
### How was this patch tested?
Manual test.


Closes#3296 from SteNicholas/CELEBORN-2005.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Weijie Guo <reswqa@163.com>
### What changes were proposed in this pull request?
`org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException` is thrown when RemoteBufferStreamReader finds that the current exception is about connection failure.
### Why are the changes needed?
If `org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException` is correctly thrown to reflect connection failure, then Flink can be aware of the lost Celeborn server side nodes and be able to re-compute affected data. Otherwise, endless retries could cause Flink job failure.
This PR is to deal with exceptions like:
```
java.io.IOException: org.apache.celeborn.common.exception.CelebornIOException: Failed to connect to ltx1-app10154.prod.linkedin.com/10.88.105.20:23924
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested in a Flink batch job with Celeborn.
Closes#3147 from Austinfjq/throw-Partition-Connection-Exception.
Lead-authored-by: Jinqian Fan <jinqianfan@icloud.com>
Co-authored-by: Austin Fan <jinqianfan@icloud.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Fix the false config version in https://celeborn.apache.org/docs/0.5.4/configuration/
In https://github.com/apache/celeborn/pull/3082, it fixed:
- celeborn.master.endpoints.resolver
- celeborn.client.chunk.prefetch.enabled
- celeborn.client.inputStream.creation.window
In this PR, it fixes the remaining
- celeborn.ssl.<module>.sslHandshakeTimeoutMs
### Why are the changes needed?
Fix the false config version in https://celeborn.apache.org/docs/0.5.4/configuration/
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA
Closes#3269 from turboFei/config_version.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Improve Aliyun OSS support including `SlotsAllocator#offerSlotsLoadAware`, `Worker#heartbeatToMaster` and `PartitionDataWriter#getStorageInfo`.
### Why are the changes needed?
There are many methods where OSS support is lacking in `SlotsAllocator#offerSlotsLoadAware`, `Worker#heartbeatToMaster` and `PartitionDataWriter#getStorageInfo`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#3268 from SteNicholas/CELEBORN-1916.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
it's a joint work with YutingWang98
currently we have to wait for spark shuffle object gc to clean disk space occupied by celeborn shuffles
As a result, if a shuffle is failed to fetch and retried , the disk space occupied by the failed attempt cannot really be cleaned , we hit this issue internally when we have to deal with 100s of TB level shuffles in a single spark application, any hiccup in servers can double even triple the disk usage
this PR implements the mechanism to delete files from failed-to-fetch shuffles
the main idea is actually simple, it triggers clean up in LifecycleManager when it applies for a new celeborn shuffle id for a retried shuffle write stage
the tricky part is that is to avoid delete shuffle files when it is referred by multiple downstream stages: the PR introduces RunningStageManager to track the dependency between stages
### Why are the changes needed?
saving disk space
### Does this PR introduce _any_ user-facing change?
a new config
### How was this patch tested?
we manually delete some files

from the above screenshot we can see that originally we have shuffle 0, 1 and after 1 faced with chunk fetch failure, it triggers a retry of 0 (shuffle 2), but at this moment, 0 has been deleted from the workers

in the logs, we can see that in the middle the application , the unregister shuffle request was sent for shuffle 0
Closes#3109 from CodingCat/delete_fi.
Lead-authored-by: CodingCat <zhunansjtu@gmail.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Co-authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
`CelebornConf` introduces `celeborn.<module>.io.threads` to specify number of threads used in the client thread pool.
### Why are the changes needed?
`ShuffleClientImpl` and `FlinkShuffleClientImpl` use fixed configuration expression as `conf.getInt("celeborn." + module + ".io.threads", 8)`. Therefore, `CelebornConf` should introduce `celeborn.<module>.io.threads` to specify number of threads used in the client thread pool.
### Does this PR introduce _any_ user-facing change?
`CelebornConf` adds `celeborn.<module>.io.threads` config option.
### How was this patch tested?
No.
Closes#3245 from SteNicholas/CELEBORN-1993.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Push applicationId as metrics label only if `celeborn.metrics.worker.appLevel.enabled` is true.
### Why are the changes needed?
At Uber, We use m3 for monitoring, it tries to make a new series using all the present metrics label. Having applicationId as a metrics introduces too much cardinality in `activeconnectioncount` and we are unable to use it, while it is an useful metric with/without applicationId as label. Similarly for resourceConsumption, userIdentifier alone can be used.
### Does this PR introduce _any_ user-facing change?
Yes, changed the default config value.
### How was this patch tested?
NA
Closes#3221 from s0nskar/application_tag.
Lead-authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Support all [default hadoop provider](https://github.com/apache/hadoop/blob/rel/release-3.3.6/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java#L563) for S3 authentication
### Why are the changes needed?
As of now celeborn only support authentication based on ACESS/SECRET key while other authentication mechanism can be required (for ex. ENV var, relying on [AWS_CONTAINER_CREDENTIALS_RELATIVE_URI](https://docs.aws.amazon.com/sdkref/latest/guide/feature-container-credentials.html))
### Does this PR introduce _any_ user-facing change?
yes, the `celeborn.storage.s3.secret.key` and `celeborn.storage.s3.access.key` are removed. In order to still provide those we should rely on the hadoop config (`celeborn.hadoop.fs.s3a.access.key` / `celeborn.hadoop.fs.s3a.secret.key `)
### How was this patch tested?
Tested on celeborn cluster deployed on kubernetes and configured to use S3 relying on `IAMInstanceCredentialsProvider`
Closes#3243 from ashangit/nfraison/CELEBORN-1965.
Lead-authored-by: Nicolas Fraison <nfraison@yahoo.fr>
Co-authored-by: nicolas.fraison@datadoghq.com <nicolas.fraison@datadoghq.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Add S3 type in evict and create policies
Add S3 type in list of default evict and create policy
### Why are the changes needed?
To align with other types
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Closes#3218 from ashangit/nfraison/doc_s3.
Authored-by: Nicolas Fraison <nfraison@yahoo.fr>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Client should send heartbeat to worker for processing heartbeat to avoid reading idleness of worker which enables heartbeat.
Follow up #1457.
### Why are the changes needed?
In Flink batch jobs, the following exception is caused by closed connection:
```
2025-04-27 23:30:28
java.io.IOException: Client /:9093 is lost, notify related stream 805472050177
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146)
at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.lambda$channelInactive$0(ReadClientHandler.java:145)
at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
at org.apache.celeborn.plugin.flink.network.ReadClientHandler.channelInactive(ReadClientHandler.java:136)
at org.apache.celeborn.common.network.server.TransportRequestHandler.channelInactive(TransportRequestHandler.java:74)
at org.apache.celeborn.common.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:141)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at org.apache.celeborn.common.network.client.ReconnectHandler.scheduleReconnect(ReconnectHandler.java:93)
at org.apache.celeborn.common.network.client.ReconnectHandler.channelInactive(ReconnectHandler.java:63)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:280)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at org.apache.celeborn.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelInactive(TransportFrameDecoderWithBufferSupplier.java:207)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at org.apache.celeborn.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
at org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at org.apache.celeborn.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:991)
```
The closed connection is caused by reading idleness of worker which enables heartbeat with troubleshooting via debug mode of log.
```
2025-04-27 23:30:32,341 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE: MessageWithHeader [headerLength: 17, bodyLength: 26]
2025-04-27 23:30:32,341 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:30:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] READ 38B
2025-04-27 23:30:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] READ COMPLETE
2025-04-27 23:31:31,813 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:31:31,813 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:32:31,823 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:32:31,824 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:33:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:33:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:34:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] WRITE 10B
2025-04-27 23:34:31,826 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] FLUSH
2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 - R:/33.133.79.187:44862] CLOSE
2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 ! R:/33.133.79.187:44862] INACTIVE
2025-04-27 23:34:32,380 [fetch-server-11-5] DEBUG util.NettyLogger - [id: 0x2dc85987, L:/:9093 ! R:/33.133.79.187:44862] UNREGISTERED
```
The reading idleness of worker which enables heartbeat is resulted via one-way heartbeat from worker to client, which only keeps the channel of client active. Client should handle heartbeat to keep the channel of worker active via sending heartbeat to worker.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`HeartbeatTest`
Closes#3239 from SteNicholas/CELEBORN-1912.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Add bytes written threshold for top app consumption metrics.
### Why are the changes needed?
Used to limit and reduce the top app consumption metrics.
### Does this PR introduce _any_ user-facing change?
New config.
### How was this patch tested?
Existing GA.
Closes#3232 from turboFei/top_resource_consump_threashold.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Remove `celeborn.client.shuffle.mapPartition.split.enabled` to enable shuffle partition split at default for MapPartition.
### Why are the changes needed?
The default value of `celeborn.client.shuffle.mapPartition.split.enabled` is false, which causes that file writer fills the disk for PushData as follows:
```
2025-04-15 20:20:56,759 [push-server-6-4] WARN worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-614-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-6] WARN worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-312-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-4] WARN worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-614-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-3] WARN worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-524-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [LocalFlusher293474277-/home/admin/worker-6] ERROR storage.LocalFlusher - LocalFlusher293474277-/home/admin/worker write failed, report to DeviceMonitor, exception: java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-4] WARN worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-1704-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-3] WARN worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-1-524-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [LocalFlusher293474277-/home/admin/worker-0] ERROR storage.LocalFlusher - LocalFlusher293474277-/home/admin/worker write failed, report to DeviceMonitor, exception: java.io.IOException: Disk quota exceeded
2025-04-15 20:20:56,760 [push-server-6-6] WARN worker.PushDataHandler - [handlePUSH_DATA] fileWriter 1744719085150-f79d2a28c58f2115f4aa0a6aa6179b4a-0-312-0-0 partition-writer has Exception java.io.IOException: Disk quota exceeded
```
It's recommended to remove celeborn.client.shuffle.mapPartition.split.enabled to enable shuffle partition split at default.
### Does this PR introduce _any_ user-facing change?
`celeborn.client.shuffle.mapPartition.split.enabled` is removed to enable shuffle partition split at default for MapPartition.
### How was this patch tested?
No.
Closes#3217 from SteNicholas/CELEBORN-1969.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
- close [CELEBORN-1916](https://issues.apache.org/jira/browse/CELEBORN-1916)
- This PR extends the Multipart Uploader (MPU) interface to support Aliyun OSS.
### Why are the changes needed?
- Implemented multipart-uploader-oss module based on the existing MPU extension interface.
- Added necessary configurations and dependencies for Aliyun OSS integration.
- Ensured compatibility with the existing multipart-uploader framework.
- This enhancement allows seamless multipart upload functionality for Aliyun OSS, similar to the existing AWS S3 support.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Deployment integration testing has been completed in the local environment.
Closes#3157 from shouwangyw/optimize/mpu-oss.
Lead-authored-by: veli.yang <897900564@qq.com>
Co-authored-by: yangwei <897900564@qq.com>
Co-authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
For spark celeborn application, if the GetReducerFileGroupResponse is larger than the threshold, Spark driver would broadcast the GetReducerFileGroupResponse to the executors, it prevents the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor).
### Why are the changes needed?
To prevent the driver from being the bottleneck in sending out multiple copies of the GetReducerFileGroupResponse (one per executor).
### Does this PR introduce _any_ user-facing change?
No, the feature is not enabled by defaults.
### How was this patch tested?
UT.
Cluster testing with `spark.celeborn.client.spark.shuffle.getReducerFileGroup.broadcast.enabled=true`.
The broadcast response size should be always about 1kb.


Application succeed.

Closes#3158 from turboFei/broadcast_rgf.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
1. Worker reports resourceConsumption to master
2. QuotaManager calculates the resourceConsumption of each app and marks the apps that exceed the quota.
2.1 When the tenant's resourceConsumption exceeds the tenant's quota, select the app with a larger consumption to mark interrupted.
2.2 When the resourceConsumption of the cluster exceeds the cluster quota, select the app with larger consumption to mark interrupted.
3. Master returns to Driver through heartbeat, whether app is marked interrupted
### Why are the changes needed?
The current storage quota logic can only limit new shuffles, and cannot limit the writing of existing shuffles. In our production environment, there is such an scenario: the cluster is small, but the user's app single shuffle is large which occupied disk resources, we want to interrupt those shuffle.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UTs.
Closes#2819 from leixm/CELEBORN-1577-2.
Authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Whenever a `WorkerPartitionReader` is recreated (due celeborn worker restarts / any other chunk fetch failure), the entire shuffle partition file is re-read from beginning, discarding already read chunks in `CelebornInputStream` based on the batchIdSet metadata maintained.
This can be improved (only for cases where shuffle data is unreplicated) by skipping already read chunk id since they'd be discarded anyway. This improves overall shuffle read performance (reducer's total time, network usage etc).
### Why are the changes needed?
Allow skipping already read shuffle chunks
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UTs added
Closes#3132 from saurabhd336/skipReadChunks.
Authored-by: Saurabh Dubey <saurabhd336@uber.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
0.6.0 -> 0.5.4
- `celeborn.rpc.retryWait`
- `celeborn.client.rpc.retryWait`
`empty` -> 0.5.4
- `celeborn.<module>.io.conflictAvoidChooser.enable`
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#3142 from s0nskar/config_rpc_retry.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
In the switchServingState after resumeByPinnedMemory, keep the resume channel to prevent the channel from frequently resuming and pausing before memoryUsage decreases to pausePushDataThreshold.
### Why are the changes needed?
Frequent channel resume and pause will result in slow data reception and failure to quickly reduce memoryUsage to below pausePushDataThreshold.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
ut
Closes#3099 from TheodoreLx/keep-resume.
Lead-authored-by: TheodoreLx <1548069580@qq.com >
Co-authored-by: 慧枫 <zhengqi.zzq@antgroup.com>
Co-authored-by: Zhengqi Zhang <1548069580@qq.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Support configuring the SSL handshake timeout for SSLHandler, for `rpc_app` and `rpc_service` transport modules.
### Why are the changes needed?
To make the SSLHandler handshake timeout configurable. We are working on ramping shuffle traffic to Celeborn internally, and have observed spark task failures which related to the connection timeout. This will make SSLHandler handshake timeout in line with our ecosystem’s production config, and should minimize those failures and improve robustness.
### Does this PR introduce _any_ user-facing change?
Introduces a new server side configuration.
### How was this patch tested?
Added a new UT, validated with existing UTs.
Closes#3120 from rmcyang/rmcyang/CELEBORN-1882.
Authored-by: Minchu Yang <minyang@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
### What changes were proposed in this pull request?
Add logic to support avoid sorting shuffle files for Reduce mode when optimize skew partitions
### Why are the changes needed?
Current logic need sorting shuffle files when read Reduce mode skew partition shuffle files, we found some shuffle sorting timeout and performance issue
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Cluster test and uts
Closes#2373 from wangshengjie123/optimize-skew-partition.
Lead-authored-by: wangshengjie <wangshengjie3@xiaomi.com>
Co-authored-by: wangshengjie3 <wangshengjie3@xiaomi.com>
Co-authored-by: Fu Chen <cfmcgrady@gmail.com>
Co-authored-by: Shuang <lvshuang.tb@gmail.com>
Co-authored-by: wangshengjie3 <soldier.sj.wang@gmail.com>
Co-authored-by: Fei Wang <fwang12@ebay.com>
Co-authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Retry seding RPC to LifecycleManager when TimeoutException.
### Why are the changes needed?
RPC messages are processed by `Dispatcher.threadpool` which its numThreads depends on `numUsableCores`.
In some cases (k8s) the numThreads of LifecycleManager are not enough while the RPCs are a lot so there are TimeoutExceptions.
Add retry when there are TimeoutExceptions.
### Does this PR introduce _any_ user-facing change?
No.
Another way is to adjust the configuration `celeborn.lifecycleManager.rpc.dispatcher.threads` to add the numThreads.
This way is more affective.
### How was this patch tested?
Cluster testing.
Closes#3008 from zaynt4606/clb1757.
Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
### What changes were proposed in this pull request?
Remove unused `celeborn.<module>.io.enableVerboseMetrics` option.
### Why are the changes needed?
`celeborn.<module>.io.enableVerboseMetrics` option is unused, which could be replaced with `celeborn.network.memory.allocator.verbose.metric`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
`CelebornConfSuite`
Closes#3094 from SteNicholas/CELEBORN-1860.
Authored-by: Nicholas Jiang <programgeek@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Support custom implementation of EventExecutorChooser to avoid deadlock when calling await in EventLoop thread
### Why are the changes needed?
In Flink Celeborn Client, you can create a new connection in the EventLoop thread. To wait for the connection to complete, cf.await is called, which can cause a deadlock because the thread bound to the newly connected channel may be the same as the current EventLoop thread. The current thread is suspended by wait and needs to wait for the current thread to notify. This change is to avoid binding the same thread.
### Does this PR introduce _any_ user-facing change?
celeborn.<module>.io.conflictAvoidChooser.enable is introduced.
### How was this patch tested?
manual test
Closes#3071 from littlexyw/fix_dead_lock.
Authored-by: xinyuwang1 <xinyuwang1@xiaohongshu.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
0.6.0 -> 0.5.2
`celeborn.master.endpoints.resolver`
0.6.0 -> 0.5.1
`celeborn.client.chunk.prefetch.enabled`
`celeborn.client.inputStream.creation.window`
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes#3082 from cxzl25/config_version.
Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Shaoyun Chen <csy@apache.org>
### What changes were proposed in this pull request?
Congestion and MemoryManager should use pinnedDirectMemory instead of usedDirectMemory
### Why are the changes needed?
In our production environment, after worker pausing, the usedDirectMemory keep high and does not decrease. The worker node is permanently blacklisted and cannot be used.
This problem has been bothering us for a long time. When the thred cache is turned off, in fact, **after ctx.channel().config().setAutoRead(false), the netty framework will still hold some ByteBufs**. This part of ByteBuf result in a lot of PoolChunks cannot be released.
In netty, if a chunk is 16M and 8k of this chunk has been allocated, then the pinnedMemory is 8k and the activeMemory is 16M. The remaining (16M-8k) memory can be allocated, but not yet allocated, netty allocates and releases memory in chunk units, so the 8k that has been allocated will result in 16M that cannot be returned to the operating system.
Here are some scenes from our production/test environment:
We config 10gb off-heap memory for worker, other configs as below:
```
celeborn.network.memory.allocator.allowCache false
celeborn.worker.monitor.memory.check.interval 100ms
celeborn.worker.monitor.memory.report.interval 10s
celeborn.worker.directMemoryRatioToPauseReceive 0.75
celeborn.worker.directMemoryRatioToPauseReplicate 0.85
celeborn.worker.directMemoryRatioToResume 0.5
```
When receiving high traffic, the worker's usedDirectMemory increases. After triggering trim and pause, usedDirectMemory still does not reach the resume threshold, and worker was excluded.

So we checked the heap snapshot of the abnormal worker, we can see that there are a large number of DirectByteBuffers in the heap memory. These DirectByteBuffers are all 4mb in size, which is exactly the size of chunksize. According to the path to gc root, DirectByteBuffer is held by PoolChunk, and these 4m only have 160k pinnedBytes.


There are many ByteBufs that are not released

The stack shows that these ByteBufs are allocated by netty

We tried to reproduce this situation in the test environment. When the same problem occurred, we added a restful api of the worker to force the worker to resume. After the resume, the worker returned to normal, and PushDataHandler handled many delayed requests.


So I think that when pinnedMemory is not high enough, we should not trigger pause and congestion, because at this time a large part of the memory can still be allocated.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
Closes#3018 from leixm/CELEBORN-1792.
Authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Replace waitThreadPoll's thread pool with ScheduledExecutorService.
2. commitFile should reply when shuffleCommitTimeout.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Cluster test & UT.
Closes#3059 from zaynt4606/clb1829.
Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Change `celeborn.<module>.io.mode` optional to explain default behavior in description.
### Why are the changes needed?
The default value of `celeborn.<module>.io.mode` in document could be changed by whether epoll mode is available for different os. Therefore, `celeborn.<module>.io.mode` should be changed to optional and explained the default behavior in description of option.
Follow up https://github.com/apache/celeborn/pull/3039#discussion_r1899340272.
### Does this PR introduce _any_ user-facing change?
`celeborn.<module>.io.mode` is optional and explains default behavior in description.
### How was this patch tested?
CI.
Closes#3044 from SteNicholas/CELEBORN-1774.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
To avoid possible worker load skew for the stages with tiny reducer numbers.
### Why are the changes needed?
If a stage has tiny reducers and skewed partitions, The default value will lead to serious worker load imbalance cause some workers unable to handle shuffle data.
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
GA and cluster test.
Closes#3039 from FMX/1811.
Authored-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
Add documentation for `CELEBORN_NO_DAEMONIZE`
### Why are the changes needed?
Currently the celeborn processes starts in background and it was difficult to figure out how to change that behaviour. Setting this flag to true, will allow Celeborn processes to run in foreground.
### Does this PR introduce _any_ user-facing change?
NA
### How was this patch tested?
NA
Closes#3020 from s0nskar/no-daemonize-docs.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Update default value of `celeborn.<module>.io.mode` to whether epoll mode is available. Meanwhile, the io mode of transport is `NIO` for unavailable epoll mode.
### Why are the changes needed?
The JDK NIO bug produces the situation that empty polling of `Selector` could cause CPU 100%, which refers to
1. [JDK-2147719 : (se) Selector doesn't block on Selector.select(timeout) (lnx)](https://bugs.java.com/bugdatabase/view_bug.do?bug_id=2147719)
2. [JDK-6403933 : (se) Selector doesn't block on Selector.select(timeout) (lnx)](https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6403933)
When the epoll mode is available, the default IO mode should be `EPOLL`, which backports [NettyServer.java#L92](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java#L92). Meanwhile, the transport io mode should be `NIO` when the epoll mode is unavailable.
### Does this PR introduce _any_ user-facing change?
Change the default value of `celeborn.<module>.io.mode`.
### How was this patch tested?
CI.
Closes#2994 from SteNicholas/CELEBORN-1774.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Deprecate identity configs related with quota –
```
"celeborn.quota.identity.provider"
"celeborn.quota.identity.user-specific.tenant"
"celeborn.quota.identity.user-specific.userName"
```
In favour of identity configs independent of quota
```
"celeborn.identity.provider"
"celeborn.identity.user-specific.tenant"
"celeborn.identity.user-specific.userName"
```
### Why are the changes needed?
Current identity configs are tied with quota but identity should be free of quota because other pieces like tags are also using it. In future other new components can also make use of identity.
### Does this PR introduce _any_ user-facing change?
NA
### How was this patch tested?
Existing UTs
Closes#2952 from s0nskar/fix_identity.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
as titile
### Why are the changes needed?
The doc fail to mention S3 as one of storage layers
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Closes#2963 from zhaohehuhu/dev-1128.
Authored-by: zhaohehuhu <luoyedeyi@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
- Adding support to enable/disable worker tags feature by a master config flag.
- Fixed BUG: After this change #2936, admins can also define the tagsExpr for users. In a case user is passing an empty tagsExpr current code will ignore the admin defined tagsExpr and allow job to use all workers.
### Why are the changes needed?
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn
### Does this PR introduce _any_ user-facing change?
NA
### How was this patch tested?
Existing UTs
Closes#2953 from s0nskar/tags-enabled.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Remove the code for app top disk usage both in master and worker end.
Prefer to use below prometheus expr to figure out the top app usages.
```
topk(50, sum by (applicationId) (metrics_diskBytesWritten_Value{role="worker", applicationId!=""}))
```
### Why are the changes needed?
To address comments: https://github.com/apache/celeborn/pull/2947#issuecomment-2499564978
> Due to the application dimension resource consumption, this feature should be included in the deprecated features. Maybe you can remove the codes for application top disk usage.
### Does this PR introduce _any_ user-facing change?
Yes, remove the app top disk usage api.
### How was this patch tested?
GA.
Closes#2949 from turboFei/remove_app_top_usage.
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
Flink supports fallback to vanilla Flink built-in shuffle implementation.
### Why are the changes needed?
When quota is unenough or workers are unavailable, `RemoteShuffleMaster` does not support fallback to `NettyShuffleMaster`, and `RemoteShuffleEnvironment` does not support fallback to `NettyShuffleEnvironment` at present. Flink should support fallback to vanilla Flink built-in shuffle implementation for unenough quota and unavailable workers.

### Does this PR introduce _any_ user-facing change?
- Introduce `ShuffleFallbackPolicy` interface to determine whether fallback to vanilla Flink built-in shuffle implementation.
```
/**
* The shuffle fallback policy determines whether fallback to vanilla Flink built-in shuffle
* implementation.
*/
public interface ShuffleFallbackPolicy {
/**
* Returns whether fallback to vanilla flink built-in shuffle implementation.
*
* param shuffleContext The job shuffle context of Flink.
* param celebornConf The configuration of Celeborn.
* param lifecycleManager The {link LifecycleManager} of Celeborn.
* return Whether fallback to vanilla flink built-in shuffle implementation.
*/
boolean needFallback(
JobShuffleContext shuffleContext,
CelebornConf celebornConf,
LifecycleManager lifecycleManager);
}
```
- Introduce `celeborn.client.flink.shuffle.fallback.policy` config to support shuffle fallback policy configuration.
### How was this patch tested?
- `RemoteShuffleMasterSuiteJ#testRegisterJobWithForceFallbackPolicy`
- `WordCountTestBase#celeborn flink integration test with fallback - word count`
Closes#2932 from SteNicholas/CELEBORN-1700.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
### What changes were proposed in this pull request?
Support predefined tags expression for tenant and users via dynamic config. Using this admin can configure tags for users/tenants and give permission to special users to provide custom tags expression.
### Why are the changes needed?
https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn
### Does this PR introduce _any_ user-facing change?
NA
### How was this patch tested?
UTs
Closes#2936 from s0nskar/admin_tags.
Authored-by: Sanskar Modi <sanskarmodi97@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
as title
### Why are the changes needed?
AWS S3 doesn't support append, so Celeborn had to copy the historical data from s3 to worker and write to s3 again, which heavily scales out the write. This PR implements a better solution via MPU to avoid copy-and-write.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?

I conducted an experiment with a 1GB input dataset to compare the performance of Celeborn using only S3 storage versus using SSD storage. The results showed that Celeborn with SSD storage was approximately three times faster than with only S3 storage.
<img width="1728" alt="Screenshot 2024-11-16 at 13 02 10" src="https://github.com/user-attachments/assets/8f879c47-c01a-4004-9eae-1c266c1f3ef2">
The above screenshot is the second test with 5000 mapper and reducer that I did.
Closes#2830 from zhaohehuhu/dev-1021.
Lead-authored-by: zhaohehuhu <luoyedeyi@163.com>
Co-authored-by: He Zhao <luoyedeyi459@163.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
### What changes were proposed in this pull request?
1. Introduce `celeborn.client.spark.stageRerun.enabled` with alternative `celeborn.client.spark.fetch.throwsFetchFailure` to enable spark stage rerun.
2. Change the default value of `celeborn.client.spark.fetch.throwsFetchFailure` from `false` to `true`, which enables spark stage rerun at default.
### Why are the changes needed?
User could not directly understand the meaning of `celeborn.client.spark.fetch.throwsFetchFailure` as whether to enable stage rerun, which means that client throws `FetchFailedException` instead of `CelebornIOException`. It's recommended to introduce `celeborn.client.spark.stageRerun.enabled` with alternative `celeborn.client.spark.fetch.throwsFetchFailure` to enable spark stage rerun.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes#2920 from SteNicholas/CELEBORN-1719.
Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
### What changes were proposed in this pull request?
implement queue time/processing time metrics for rpc framework
### Why are the changes needed?
to identify rpc processing bottelneck
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
local
Closes#2784 from ErikFang/main-rpc-metrics.
Lead-authored-by: Erik.fang <fmerik@gmail.com>
Co-authored-by: 仲甫 <fangming@antgroup.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>