[CELEBORN-2129] CelebornBufferStream should invoke openStreamInternal in moveToNextPartitionIfPossible to avoid client creation timeout

### What changes were proposed in this pull request?

`CelebornBufferStream` should invoke `openStreamInternal` in `moveToNextPartitionIfPossible` to avoid client creation timeout.

### Why are the changes needed?

There are many `CelebornIOException` that is cause by timeout client creation in production environment as follows:

```
2025-08-22 16:20:10,681 INFO  [flink-akka.actor.default-dispatcher-40] org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - [vertex-2]Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm, PROCTIME() AS $6]) -> Sort(orderBy=[lz4sql ASC, rawsize ASC, obcluster ASC, ds ASC, hh ASC, mm ASC, $6 DESC]) -> OverAggregate(partitionBy=[lz4sql, rawsize, obcluster, ds, hh, mm], orderBy=[$6 DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[lz4sql, rawsize, obcluster, ds, hh, mm, $6, w0$o0]) -> Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm], where=[(w0$o0 = 1)]) (668/1900) (d8bf48183d8c69a1ab84bcd445f6d4ed_0e8289f2bf927649dd2511bbc2bb6759_667_0) switched from RUNNING to FAILED on antc4flink4172792604-taskmanager-403  (dataPort=1).
java.io.IOException: org.apache.celeborn.common.exception.CelebornIOException: Connecting to /:9093 timed out (60000 ms)
	at org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:313)
	at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:250)
	at org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:157)
	at org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory.createClientWithRetry(FlinkTransportClientFactory.java:51)
	at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.openStreamInternal(CelebornBufferStream.java:200)
	at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:183)
	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161)
	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79)
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64)
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100)
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111)
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76)
	at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
	at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
	at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95)
	at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:991)

	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:193) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?]
	at java.lang.Thread.run(Thread.java:991) ~[?:?]
```

`CelebornBufferStream` should invoke `openStreamInternal` in `moveToNextPartitionIfPossible` to avoid client creation timeout, which is caused by creating a client using the callback thread of netty.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual test.

Closes #3450 from SteNicholas/CELEBORN-2129.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: SteNicholas <programgeek@163.com>
This commit is contained in:
SteNicholas 2025-08-27 14:21:15 +08:00
parent f590fb275d
commit 1a3b9f35b5
4 changed files with 73 additions and 17 deletions

View File

@ -19,6 +19,8 @@ package org.apache.celeborn.plugin.flink.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@ -64,6 +66,7 @@ public class CelebornBufferStream {
private Supplier<ByteBuf> bufferSupplier;
private int initialCredit;
private Consumer<RequestMessage> messageConsumer;
private ExecutorService openStreamThreadPool;
public CelebornBufferStream() {}
@ -74,7 +77,8 @@ public class CelebornBufferStream {
PartitionLocation[] locations,
int subIndexStart,
int subIndexEnd,
long pushDataTimeoutMs) {
long pushDataTimeoutMs,
ExecutorService openStreamThreadPool) {
this.mapShuffleClient = mapShuffleClient;
this.clientFactory = dataClientFactory;
this.shuffleKey = shuffleKey;
@ -82,6 +86,7 @@ public class CelebornBufferStream {
this.subIndexStart = subIndexStart;
this.subIndexEnd = subIndexEnd;
this.pushDataTimeoutMs = pushDataTimeoutMs;
this.openStreamThreadPool = openStreamThreadPool;
}
public void open(
@ -161,7 +166,8 @@ public class CelebornBufferStream {
PartitionLocation[] locations,
int subIndexStart,
int subIndexEnd,
long pushDataTimeoutMs) {
long pushDataTimeoutMs,
ExecutorService openStreamThreadPool) {
if (locations == null || locations.length == 0) {
return empty();
} else {
@ -172,7 +178,8 @@ public class CelebornBufferStream {
locations,
subIndexStart,
subIndexEnd,
pushDataTimeoutMs);
pushDataTimeoutMs,
openStreamThreadPool);
}
}
@ -214,7 +221,7 @@ public class CelebornBufferStream {
@Nullable BiConsumer<Long, Integer> requiredSegmentIdConsumer,
boolean sync) {
logger.debug(
"MoveToNextPartitionIfPossible in this:{}, endedStreamId: {}, currentLocationIndex: {}, currentSteamId:{}, locationsLength:{}",
"MoveToNextPartitionIfPossible in this: {}, endedStreamId: {}, currentLocationIndex: {}, currentSteamId: {}, locationsLength: {}.",
this,
endedStreamId,
currentLocationIndex.get(),
@ -226,18 +233,45 @@ public class CelebornBufferStream {
}
if (currentLocationIndex.get() < locations.length) {
try {
openStreamInternal(requiredSegmentIdConsumer, sync);
logger.debug(
"MoveToNextPartitionIfPossible after openStream this:{}, endedStreamId: {}, currentLocationIndex: {}, currentSteamId:{}, locationsLength:{}",
this,
endedStreamId,
currentLocationIndex.get(),
streamId,
locations.length);
} catch (Exception e) {
logger.warn("Failed to open stream and report to flink framework. ", e);
messageConsumer.accept(new TransportableError(0L, e));
if (sync) {
try {
openStreamInternal(requiredSegmentIdConsumer, true);
logger.debug(
"MoveToNextPartitionIfPossible after openStream this: {}, endedStreamId: {}, currentLocationIndex: {}, currentSteamId: {}, locationsLength: {}.",
this,
endedStreamId,
currentLocationIndex.get(),
streamId,
locations.length);
} catch (Exception e) {
logger.warn("Failed to open stream and report to flink framework. ", e);
messageConsumer.accept(new TransportableError(0L, e));
}
} else {
CompletableFuture.runAsync(
() -> {
try {
openStreamInternal(requiredSegmentIdConsumer, false);
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
},
openStreamThreadPool)
.whenComplete(
(result, throwable) -> {
if (throwable == null) {
logger.debug(
"MoveToNextPartitionIfPossible after openStream this: {}, endedStreamId: {}, currentLocationIndex: {}, currentSteamId: {}, locationsLength: {}.",
this,
endedStreamId,
currentLocationIndex.get(),
streamId,
locations.length);
} else {
logger.warn("Failed to open stream and report to flink framework. ", throwable);
messageConsumer.accept(new TransportableError(0L, throwable));
}
});
}
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -64,6 +65,7 @@ import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.util.CollectionUtils;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.PbSerDeUtils;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.common.write.PushState;
import org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory;
@ -85,6 +87,8 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl {
/** The buffer size bytes in flink, default value is 32KB. */
private final int bufferSizeBytes;
private final ExecutorService openStreamThreadPool;
public static FlinkShuffleClientImpl get(
String appUniqueId,
String driverHost,
@ -160,6 +164,9 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl {
if (readClientHandler != null) {
readClientHandler.close();
}
if (openStreamThreadPool != null) {
ThreadUtils.shutdown(openStreamThreadPool);
}
}
public FlinkShuffleClientImpl(
@ -180,6 +187,9 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl {
dataTransportConf, readClientHandler, conf.clientCloseIdleConnections());
this.setupLifecycleManagerRef(driverHost, port);
this.driverTimestamp = driverTimestamp;
this.openStreamThreadPool =
ThreadUtils.newDaemonCachedThreadPool(
"client-buffer-stream-opener", conf.clientFlinkOpenStreamThreads(), 60);
}
private void initializeTransportClientFactory() {
@ -238,7 +248,8 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl {
partitionLocations,
subPartitionIndexStart,
subPartitionIndexEnd,
pushDataTimeout);
pushDataTimeout,
openStreamThreadPool);
}
}

View File

@ -1473,6 +1473,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientFlinkDataCompressionEnabled: Boolean = get(CLIENT_DATA_COMPRESSION_ENABLED)
def clientFlinkMetricsScopeNamingShuffle: String =
get(CLIENT_METRICS_SCOPE_NAMING_SHUFFLE)
def clientFlinkOpenStreamThreads: Int =
get(CLIENT_OPEN_STREAM_THREADS).getOrElse(Runtime.getRuntime.availableProcessors())
def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED)
def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)
@ -6031,6 +6033,14 @@ object CelebornConf extends Logging {
.createWithDefault(
"<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>.<shuffle_id>")
val CLIENT_OPEN_STREAM_THREADS: OptionalConfigEntry[Int] =
buildConf("celeborn.client.flink.open.stream.threads")
.categories("client")
.doc("Thread number of flink shuffle client to open buffer stream. Default value is Runtime.getRuntime.availableProcessors.")
.version("0.6.1")
.intConf
.createOptional
val CLIENT_MR_PUSH_DATA_MAX: ConfigEntry[Long] =
buildConf("celeborn.client.mr.pushData.max")
.categories("client")

View File

@ -42,6 +42,7 @@ license: |
| celeborn.client.flink.inputGate.memory | 32m | false | Memory reserved for a input gate. | 0.3.0 | remote-shuffle.job.memory-per-gate |
| celeborn.client.flink.inputGate.supportFloatingBuffer | true | false | Whether to support floating buffer in Flink input gates. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-input-gate |
| celeborn.client.flink.metrics.scope.shuffle | &lt;host&gt;.taskmanager.&lt;tm_id&gt;.&lt;job_name&gt;.&lt;task_name&gt;.&lt;subtask_index&gt;.&lt;shuffle_id&gt; | false | Defines the scope format string that is applied to all metrics scoped to a shuffle. Only effective when a identifier-based reporter is configured | 0.6.0 | |
| celeborn.client.flink.open.stream.threads | &lt;undefined&gt; | false | Thread number of flink shuffle client to open buffer stream. Default value is Runtime.getRuntime.availableProcessors. | 0.6.1 | |
| celeborn.client.flink.partitionConnectionException.enabled | false | false | If enabled, `org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException` would be thrown when RemoteBufferStreamReader finds that the current exception is about connection failure, then Flink can be aware of the lost Celeborn server side nodes and be able to re-compute affected data. | 0.6.0 | |
| celeborn.client.flink.resultPartition.memory | 64m | false | Memory reserved for a result partition. | 0.3.0 | remote-shuffle.job.memory-per-partition |
| celeborn.client.flink.resultPartition.supportFloatingBuffer | true | false | Whether to support floating buffer for result partitions. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-output-gate |