From 1a3b9f35b50d467beb7a39788588f1bb05e65441 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Wed, 27 Aug 2025 14:21:15 +0800 Subject: [PATCH] [CELEBORN-2129] CelebornBufferStream should invoke openStreamInternal in moveToNextPartitionIfPossible to avoid client creation timeout ### What changes were proposed in this pull request? `CelebornBufferStream` should invoke `openStreamInternal` in `moveToNextPartitionIfPossible` to avoid client creation timeout. ### Why are the changes needed? There are many `CelebornIOException` that is cause by timeout client creation in production environment as follows: ``` 2025-08-22 16:20:10,681 INFO [flink-akka.actor.default-dispatcher-40] org.apache.flink.runtime.executiongraph.ExecutionGraph [] - [vertex-2]Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm, PROCTIME() AS $6]) -> Sort(orderBy=[lz4sql ASC, rawsize ASC, obcluster ASC, ds ASC, hh ASC, mm ASC, $6 DESC]) -> OverAggregate(partitionBy=[lz4sql, rawsize, obcluster, ds, hh, mm], orderBy=[$6 DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[lz4sql, rawsize, obcluster, ds, hh, mm, $6, w0$o0]) -> Calc(select=[lz4sql, rawsize, obcluster, ds, hh, mm], where=[(w0$o0 = 1)]) (668/1900) (d8bf48183d8c69a1ab84bcd445f6d4ed_0e8289f2bf927649dd2511bbc2bb6759_667_0) switched from RUNNING to FAILED on antc4flink4172792604-taskmanager-403 (dataPort=1). java.io.IOException: org.apache.celeborn.common.exception.CelebornIOException: Connecting to /:9093 timed out (60000 ms) at org.apache.celeborn.common.network.client.TransportClientFactory.internalCreateClient(TransportClientFactory.java:313) at org.apache.celeborn.common.network.client.TransportClientFactory.createClient(TransportClientFactory.java:250) at org.apache.celeborn.common.network.client.TransportClientFactory.retryCreateClient(TransportClientFactory.java:157) at org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory.createClientWithRetry(FlinkTransportClientFactory.java:51) at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.openStreamInternal(CelebornBufferStream.java:200) at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:183) at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161) at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79) at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64) at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100) at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111) at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76) at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100) at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84) at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156) at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95) at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184) at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:991) at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.errorReceived(RemoteBufferStreamReader.java:146) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:77) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.plugin.flink.readclient.CelebornBufferStream.moveToNextPartitionIfPossible(CelebornBufferStream.java:193) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.onStreamEnd(RemoteBufferStreamReader.java:161) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.plugin.flink.RemoteBufferStreamReader.lambda$new$0(RemoteBufferStreamReader.java:79) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.plugin.flink.network.ReadClientHandler.processMessageInternal(ReadClientHandler.java:64) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:100) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:111) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.plugin.flink.network.ReadClientHandler.receive(ReadClientHandler.java:76) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.decodeBody(TransportFrameDecoderWithBufferSupplier.java:95) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.plugin.flink.network.TransportFrameDecoderWithBufferSupplier.channelRead(TransportFrameDecoderWithBufferSupplier.java:184) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at org.apache.celeborn.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[celeborn-client-flink-1.18-shaded_2.12-0.5.4-ANT.jar:?] at java.lang.Thread.run(Thread.java:991) ~[?:?] ``` `CelebornBufferStream` should invoke `openStreamInternal` in `moveToNextPartitionIfPossible` to avoid client creation timeout, which is caused by creating a client using the callback thread of netty. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual test. Closes #3450 from SteNicholas/CELEBORN-2129. Authored-by: SteNicholas Signed-off-by: SteNicholas --- .../flink/client/CelebornBufferStream.java | 66 ++++++++++++++----- .../flink/client/FlinkShuffleClientImpl.java | 13 +++- .../apache/celeborn/common/CelebornConf.scala | 10 +++ docs/configuration/client.md | 1 + 4 files changed, 73 insertions(+), 17 deletions(-) diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java index a0a498ff4..04f8b2387 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/CelebornBufferStream.java @@ -19,6 +19,8 @@ package org.apache.celeborn.plugin.flink.client; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -64,6 +66,7 @@ public class CelebornBufferStream { private Supplier bufferSupplier; private int initialCredit; private Consumer messageConsumer; + private ExecutorService openStreamThreadPool; public CelebornBufferStream() {} @@ -74,7 +77,8 @@ public class CelebornBufferStream { PartitionLocation[] locations, int subIndexStart, int subIndexEnd, - long pushDataTimeoutMs) { + long pushDataTimeoutMs, + ExecutorService openStreamThreadPool) { this.mapShuffleClient = mapShuffleClient; this.clientFactory = dataClientFactory; this.shuffleKey = shuffleKey; @@ -82,6 +86,7 @@ public class CelebornBufferStream { this.subIndexStart = subIndexStart; this.subIndexEnd = subIndexEnd; this.pushDataTimeoutMs = pushDataTimeoutMs; + this.openStreamThreadPool = openStreamThreadPool; } public void open( @@ -161,7 +166,8 @@ public class CelebornBufferStream { PartitionLocation[] locations, int subIndexStart, int subIndexEnd, - long pushDataTimeoutMs) { + long pushDataTimeoutMs, + ExecutorService openStreamThreadPool) { if (locations == null || locations.length == 0) { return empty(); } else { @@ -172,7 +178,8 @@ public class CelebornBufferStream { locations, subIndexStart, subIndexEnd, - pushDataTimeoutMs); + pushDataTimeoutMs, + openStreamThreadPool); } } @@ -214,7 +221,7 @@ public class CelebornBufferStream { @Nullable BiConsumer requiredSegmentIdConsumer, boolean sync) { logger.debug( - "MoveToNextPartitionIfPossible in this:{}, endedStreamId: {}, currentLocationIndex: {}, currentSteamId:{}, locationsLength:{}", + "MoveToNextPartitionIfPossible in this: {}, endedStreamId: {}, currentLocationIndex: {}, currentSteamId: {}, locationsLength: {}.", this, endedStreamId, currentLocationIndex.get(), @@ -226,18 +233,45 @@ public class CelebornBufferStream { } if (currentLocationIndex.get() < locations.length) { - try { - openStreamInternal(requiredSegmentIdConsumer, sync); - logger.debug( - "MoveToNextPartitionIfPossible after openStream this:{}, endedStreamId: {}, currentLocationIndex: {}, currentSteamId:{}, locationsLength:{}", - this, - endedStreamId, - currentLocationIndex.get(), - streamId, - locations.length); - } catch (Exception e) { - logger.warn("Failed to open stream and report to flink framework. ", e); - messageConsumer.accept(new TransportableError(0L, e)); + if (sync) { + try { + openStreamInternal(requiredSegmentIdConsumer, true); + logger.debug( + "MoveToNextPartitionIfPossible after openStream this: {}, endedStreamId: {}, currentLocationIndex: {}, currentSteamId: {}, locationsLength: {}.", + this, + endedStreamId, + currentLocationIndex.get(), + streamId, + locations.length); + } catch (Exception e) { + logger.warn("Failed to open stream and report to flink framework. ", e); + messageConsumer.accept(new TransportableError(0L, e)); + } + } else { + CompletableFuture.runAsync( + () -> { + try { + openStreamInternal(requiredSegmentIdConsumer, false); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }, + openStreamThreadPool) + .whenComplete( + (result, throwable) -> { + if (throwable == null) { + logger.debug( + "MoveToNextPartitionIfPossible after openStream this: {}, endedStreamId: {}, currentLocationIndex: {}, currentSteamId: {}, locationsLength: {}.", + this, + endedStreamId, + currentLocationIndex.get(), + streamId, + locations.length); + } else { + logger.warn("Failed to open stream and report to flink framework. ", throwable); + messageConsumer.accept(new TransportableError(0L, throwable)); + } + }); } } } diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java index 5b402b2ed..87a80006a 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/client/FlinkShuffleClientImpl.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -64,6 +65,7 @@ import org.apache.celeborn.common.rpc.RpcEndpointRef; import org.apache.celeborn.common.util.CollectionUtils; import org.apache.celeborn.common.util.JavaUtils; import org.apache.celeborn.common.util.PbSerDeUtils; +import org.apache.celeborn.common.util.ThreadUtils; import org.apache.celeborn.common.util.Utils; import org.apache.celeborn.common.write.PushState; import org.apache.celeborn.plugin.flink.network.FlinkTransportClientFactory; @@ -85,6 +87,8 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl { /** The buffer size bytes in flink, default value is 32KB. */ private final int bufferSizeBytes; + private final ExecutorService openStreamThreadPool; + public static FlinkShuffleClientImpl get( String appUniqueId, String driverHost, @@ -160,6 +164,9 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl { if (readClientHandler != null) { readClientHandler.close(); } + if (openStreamThreadPool != null) { + ThreadUtils.shutdown(openStreamThreadPool); + } } public FlinkShuffleClientImpl( @@ -180,6 +187,9 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl { dataTransportConf, readClientHandler, conf.clientCloseIdleConnections()); this.setupLifecycleManagerRef(driverHost, port); this.driverTimestamp = driverTimestamp; + this.openStreamThreadPool = + ThreadUtils.newDaemonCachedThreadPool( + "client-buffer-stream-opener", conf.clientFlinkOpenStreamThreads(), 60); } private void initializeTransportClientFactory() { @@ -238,7 +248,8 @@ public class FlinkShuffleClientImpl extends ShuffleClientImpl { partitionLocations, subPartitionIndexStart, subPartitionIndexEnd, - pushDataTimeout); + pushDataTimeout, + openStreamThreadPool); } } diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 5cf7bf12e..7379a19df 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1473,6 +1473,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientFlinkDataCompressionEnabled: Boolean = get(CLIENT_DATA_COMPRESSION_ENABLED) def clientFlinkMetricsScopeNamingShuffle: String = get(CLIENT_METRICS_SCOPE_NAMING_SHUFFLE) + def clientFlinkOpenStreamThreads: Int = + get(CLIENT_OPEN_STREAM_THREADS).getOrElse(Runtime.getRuntime.availableProcessors()) def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED) def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW) @@ -6031,6 +6033,14 @@ object CelebornConf extends Logging { .createWithDefault( ".taskmanager.....") + val CLIENT_OPEN_STREAM_THREADS: OptionalConfigEntry[Int] = + buildConf("celeborn.client.flink.open.stream.threads") + .categories("client") + .doc("Thread number of flink shuffle client to open buffer stream. Default value is Runtime.getRuntime.availableProcessors.") + .version("0.6.1") + .intConf + .createOptional + val CLIENT_MR_PUSH_DATA_MAX: ConfigEntry[Long] = buildConf("celeborn.client.mr.pushData.max") .categories("client") diff --git a/docs/configuration/client.md b/docs/configuration/client.md index f9f14fe65..8e640d0f3 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -42,6 +42,7 @@ license: | | celeborn.client.flink.inputGate.memory | 32m | false | Memory reserved for a input gate. | 0.3.0 | remote-shuffle.job.memory-per-gate | | celeborn.client.flink.inputGate.supportFloatingBuffer | true | false | Whether to support floating buffer in Flink input gates. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-input-gate | | celeborn.client.flink.metrics.scope.shuffle | <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>.<shuffle_id> | false | Defines the scope format string that is applied to all metrics scoped to a shuffle. Only effective when a identifier-based reporter is configured | 0.6.0 | | +| celeborn.client.flink.open.stream.threads | <undefined> | false | Thread number of flink shuffle client to open buffer stream. Default value is Runtime.getRuntime.availableProcessors. | 0.6.1 | | | celeborn.client.flink.partitionConnectionException.enabled | false | false | If enabled, `org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException` would be thrown when RemoteBufferStreamReader finds that the current exception is about connection failure, then Flink can be aware of the lost Celeborn server side nodes and be able to re-compute affected data. | 0.6.0 | | | celeborn.client.flink.resultPartition.memory | 64m | false | Memory reserved for a result partition. | 0.3.0 | remote-shuffle.job.memory-per-partition | | celeborn.client.flink.resultPartition.supportFloatingBuffer | true | false | Whether to support floating buffer for result partitions. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-output-gate |