diff --git a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java index 1da3fd23e..e323fe6e1 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java +++ b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java @@ -55,6 +55,7 @@ public class WorkerPartitionReader implements PartitionReader { private final AtomicReference exception = new AtomicReference<>(); private final int fetchMaxReqsInFlight; + private final long fetchTimeoutMs; private boolean closed = false; // for test @@ -74,6 +75,7 @@ public class WorkerPartitionReader implements PartitionReader { throws IOException, InterruptedException { fetchMaxReqsInFlight = conf.clientFetchMaxReqsInFlight(); results = new LinkedBlockingQueue<>(); + fetchTimeoutMs = conf.clientFetchTimeoutMs(); // only add the buffer to results queue if this reader is not closed. callback = new ChunkReceivedCallback() { @@ -105,8 +107,7 @@ public class WorkerPartitionReader implements PartitionReader { } OpenStream openBlocks = new OpenStream(shuffleKey, location.getFileName(), startMapIndex, endMapIndex); - long timeoutMs = conf.clientFetchTimeoutMs(); - ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), timeoutMs); + ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), fetchTimeoutMs); streamHandle = (StreamHandle) Message.decode(response); this.location = location; @@ -166,7 +167,7 @@ public class WorkerPartitionReader implements PartitionReader { try { TransportClient client = clientFactory.createClient(location.getHost(), location.getFetchPort()); - client.fetchChunk(streamHandle.streamId, chunkIndex, callback); + client.fetchChunk(streamHandle.streamId, chunkIndex, fetchTimeoutMs, callback); chunkIndex++; } catch (IOException e) { logger.error( diff --git a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java index 4bf6b93d7..699f1ac78 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java +++ b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java @@ -39,6 +39,7 @@ import org.apache.celeborn.common.exception.CelebornIOException; import org.apache.celeborn.common.network.buffer.NioManagedBuffer; import org.apache.celeborn.common.network.protocol.*; import org.apache.celeborn.common.network.util.NettyUtils; +import org.apache.celeborn.common.read.FetchRequestInfo; import org.apache.celeborn.common.write.PushRequestInfo; /** @@ -90,8 +91,9 @@ public class TransportClient implements Closeable { return channel.remoteAddress(); } - public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback) { - fetchChunk(streamId, chunkIndex, 0, Integer.MAX_VALUE, callback); + public void fetchChunk( + long streamId, int chunkIndex, long fetchDataTimeout, ChunkReceivedCallback callback) { + fetchChunk(streamId, chunkIndex, 0, Integer.MAX_VALUE, fetchDataTimeout, callback); } /** @@ -112,7 +114,12 @@ public class TransportClient implements Closeable { * @param callback Callback invoked upon successful receipt of chunk, or upon any failure. */ public void fetchChunk( - long streamId, int chunkIndex, int offset, int len, ChunkReceivedCallback callback) { + long streamId, + int chunkIndex, + int offset, + int len, + long fetchDataTimeout, + ChunkReceivedCallback callback) { if (logger.isDebugEnabled()) { logger.debug( "Sending fetch chunk request {} to {}.", @@ -129,9 +136,14 @@ public class TransportClient implements Closeable { callback.onFailure(chunkIndex, new IOException(errorMsg, cause)); } }; - handler.addFetchRequest(streamChunkSlice, callback); - channel.writeAndFlush(new ChunkFetchRequest(streamChunkSlice)).addListener(listener); + long dueTime = System.currentTimeMillis() + fetchDataTimeout; + FetchRequestInfo info = new FetchRequestInfo(dueTime, callback); + handler.addFetchRequest(streamChunkSlice, info); + + ChannelFuture channelFuture = + channel.writeAndFlush(new ChunkFetchRequest(streamChunkSlice)).addListener(listener); + info.setChannelFuture(channelFuture); } /** diff --git a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java index 46f487a2c..7a2f8e61e 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java +++ b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java @@ -37,6 +37,7 @@ import org.apache.celeborn.common.network.util.NettyUtils; import org.apache.celeborn.common.network.util.TransportConf; import org.apache.celeborn.common.protocol.TransportModuleConstants; import org.apache.celeborn.common.protocol.message.StatusCode; +import org.apache.celeborn.common.read.FetchRequestInfo; import org.apache.celeborn.common.util.JavaUtils; import org.apache.celeborn.common.util.ThreadUtils; import org.apache.celeborn.common.write.PushRequestInfo; @@ -53,7 +54,7 @@ public class TransportResponseHandler extends MessageHandler { private final TransportConf conf; private final Channel channel; - private final Map outstandingFetches; + private final Map outstandingFetches; private final Map outstandingRpcs; private final ConcurrentHashMap outstandingPushes; @@ -63,7 +64,11 @@ public class TransportResponseHandler extends MessageHandler { private final long pushTimeoutCheckerInterval; private static ScheduledExecutorService pushTimeoutChecker = null; - private ScheduledFuture scheduleFuture; + private ScheduledFuture pushCheckerScheduleFuture; + + private final long fetchTimeoutCheckerInterval; + private static ScheduledExecutorService fetchTimeoutChecker = null; + private ScheduledFuture fetchCheckerScheduleFuture; public TransportResponseHandler(TransportConf conf, Channel channel) { this.conf = conf; @@ -72,20 +77,33 @@ public class TransportResponseHandler extends MessageHandler { this.outstandingRpcs = JavaUtils.newConcurrentHashMap(); this.outstandingPushes = JavaUtils.newConcurrentHashMap(); this.timeOfLastRequestNs = new AtomicLong(0); - pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs(); + this.pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs(); + this.fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs(); synchronized (TransportResponseHandler.class) { if (pushTimeoutChecker == null) { pushTimeoutChecker = ThreadUtils.newDaemonThreadPoolScheduledExecutor( "push-timeout-checker", conf.pushDataTimeoutCheckerThreads()); } + if (fetchTimeoutChecker == null) { + fetchTimeoutChecker = + ThreadUtils.newDaemonThreadPoolScheduledExecutor( + "fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads()); + } } - scheduleFuture = + pushCheckerScheduleFuture = pushTimeoutChecker.scheduleAtFixedRate( () -> failExpiredPushRequest(), pushTimeoutCheckerInterval, pushTimeoutCheckerInterval, TimeUnit.MILLISECONDS); + + fetchCheckerScheduleFuture = + fetchTimeoutChecker.scheduleAtFixedRate( + () -> failExpiredFetchRequest(), + fetchTimeoutCheckerInterval, + fetchTimeoutCheckerInterval, + TimeUnit.MILLISECONDS); } public void failExpiredPushRequest() { @@ -113,12 +131,33 @@ public class TransportResponseHandler extends MessageHandler { } } - public void addFetchRequest(StreamChunkSlice streamChunkSlice, ChunkReceivedCallback callback) { + public void failExpiredFetchRequest() { + long currentTime = System.currentTimeMillis(); + Iterator> iter = + outstandingFetches.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + if (entry.getValue().dueTime <= currentTime) { + FetchRequestInfo info = outstandingFetches.remove(entry.getKey()); + if (info != null) { + if (info.channelFuture != null) { + info.channelFuture.cancel(true); + } + info.callback.onFailure( + entry.getKey().chunkIndex, new CelebornIOException(StatusCode.FETCH_DATA_TIMEOUT)); + info.channelFuture = null; + info.callback = null; + } + } + } + } + + public void addFetchRequest(StreamChunkSlice streamChunkSlice, FetchRequestInfo info) { updateTimeOfLastRequest(); if (outstandingFetches.containsKey(streamChunkSlice)) { logger.warn("[addFetchRequest] streamChunkSlice {} already exists!", streamChunkSlice); } - outstandingFetches.put(streamChunkSlice, callback); + outstandingFetches.put(streamChunkSlice, info); } public void removeFetchRequest(StreamChunkSlice streamChunkSlice) { @@ -154,9 +193,9 @@ public class TransportResponseHandler extends MessageHandler { * exception or pre-mature connection termination. */ private void failOutstandingRequests(Throwable cause) { - for (Map.Entry entry : outstandingFetches.entrySet()) { + for (Map.Entry entry : outstandingFetches.entrySet()) { try { - entry.getValue().onFailure(entry.getKey().chunkIndex, cause); + entry.getValue().callback.onFailure(entry.getKey().chunkIndex, cause); } catch (Exception e) { logger.warn("ChunkReceivedCallback.onFailure throws exception", e); } @@ -195,7 +234,8 @@ public class TransportResponseHandler extends MessageHandler { remoteAddress); failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed")); } - scheduleFuture.cancel(false); + pushCheckerScheduleFuture.cancel(false); + fetchCheckerScheduleFuture.cancel(false); } @Override @@ -208,28 +248,32 @@ public class TransportResponseHandler extends MessageHandler { remoteAddress); failOutstandingRequests(cause); } - scheduleFuture.cancel(false); + pushCheckerScheduleFuture.cancel(false); + fetchCheckerScheduleFuture.cancel(false); } @Override public void handle(ResponseMessage message) throws Exception { if (message instanceof ChunkFetchSuccess) { ChunkFetchSuccess resp = (ChunkFetchSuccess) message; - ChunkReceivedCallback listener = outstandingFetches.remove(resp.streamChunkSlice); - if (listener == null) { + FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice); + if (info == null) { logger.warn( "Ignoring response for block {} from {} since it is not outstanding", resp.streamChunkSlice, NettyUtils.getRemoteAddress(channel)); resp.body().release(); } else { - listener.onSuccess(resp.streamChunkSlice.chunkIndex, resp.body()); - resp.body().release(); + try { + info.callback.onSuccess(resp.streamChunkSlice.chunkIndex, resp.body()); + } finally { + resp.body().release(); + } } } else if (message instanceof ChunkFetchFailure) { ChunkFetchFailure resp = (ChunkFetchFailure) message; - ChunkReceivedCallback listener = outstandingFetches.remove(resp.streamChunkSlice); - if (listener == null) { + FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice); + if (info == null) { logger.warn( "Ignoring response for block {} from {} ({}) since it is not outstanding", resp.streamChunkSlice, @@ -237,7 +281,7 @@ public class TransportResponseHandler extends MessageHandler { resp.errorString); } else { logger.warn("Receive ChunkFetchFailure, errorMsg {}", resp.errorString); - listener.onFailure( + info.callback.onFailure( resp.streamChunkSlice.chunkIndex, new ChunkFetchFailureException( "Failure while fetching " + resp.streamChunkSlice + ": " + resp.errorString)); diff --git a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java index 29d7e5dc6..aff2dfa71 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java +++ b/common/src/main/java/org/apache/celeborn/common/network/util/TransportConf.java @@ -142,6 +142,14 @@ public class TransportConf { return celebornConf.pushDataTimeoutCheckInterval(module); } + public int fetchDataTimeoutCheckerThreads() { + return celebornConf.fetchDataTimeoutCheckerThreads(module); + } + + public long fetchDataTimeoutCheckIntervalMs() { + return celebornConf.fetchDataTimeoutCheckInterval(module); + } + public long clientHearbeatInterval() { return celebornConf.clientHeartbeatInterval(module); } diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java index 670d1efea..688f0def8 100644 --- a/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java +++ b/common/src/main/java/org/apache/celeborn/common/protocol/message/StatusCode.java @@ -75,7 +75,9 @@ public enum StatusCode { PUSH_DATA_TIMEOUT_MASTER(42), PUSH_DATA_TIMEOUT_SLAVE(43), PUSH_DATA_MASTER_BLACKLISTED(44), - PUSH_DATA_SLAVE_BLACKLISTED(45); + PUSH_DATA_SLAVE_BLACKLISTED(45), + + FETCH_DATA_TIMEOUT(46); private final byte value; diff --git a/common/src/main/java/org/apache/celeborn/common/read/FetchRequestInfo.java b/common/src/main/java/org/apache/celeborn/common/read/FetchRequestInfo.java new file mode 100644 index 000000000..4e066ec0b --- /dev/null +++ b/common/src/main/java/org/apache/celeborn/common/read/FetchRequestInfo.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.read; + +import io.netty.channel.ChannelFuture; + +import org.apache.celeborn.common.network.client.ChunkReceivedCallback; + +public class FetchRequestInfo { + public ChannelFuture channelFuture; + public long dueTime; + public ChunkReceivedCallback callback; + + public FetchRequestInfo(long dueTime, ChunkReceivedCallback callback) { + this.dueTime = dueTime; + this.callback = callback; + } + + public void setChannelFuture(ChannelFuture future) { + this.channelFuture = future; + } +} diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 68ef91df1..6714b29f2 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -482,6 +482,16 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se getTimeAsMs(key, PUSH_TIMEOUT_CHECK_INTERVAL.defaultValueString) } + def fetchDataTimeoutCheckerThreads(module: String): Int = { + val key = FETCH_TIMEOUT_CHECK_THREADS.key.replace("", module) + getInt(key, FETCH_TIMEOUT_CHECK_THREADS.defaultValue.get) + } + + def fetchDataTimeoutCheckInterval(module: String): Long = { + val key = FETCH_TIMEOUT_CHECK_INTERVAL.key.replace("", module) + getTimeAsMs(key, FETCH_TIMEOUT_CHECK_INTERVAL.defaultValueString) + } + // ////////////////////////////////////////////////////// // Master // // ////////////////////////////////////////////////////// @@ -1385,6 +1395,26 @@ object CelebornConf extends Logging { .intConf .createWithDefault(16) + val FETCH_TIMEOUT_CHECK_INTERVAL: ConfigEntry[Long] = + buildConf("celeborn..fetch.timeoutCheck.interval") + .categories("network") + .doc("Interval for checking fetch data timeout. " + + s"It only support setting to `${TransportModuleConstants.DATA_MODULE}` " + + s"since it works for shuffle client fetch data and should be configured on client side.") + .version("0.3.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("5s") + + val FETCH_TIMEOUT_CHECK_THREADS: ConfigEntry[Int] = + buildConf("celeborn..fetch.timeoutCheck.threads") + .categories("network") + .doc("Threads num for checking fetch data timeout. " + + s"It only support setting to `${TransportModuleConstants.DATA_MODULE}` " + + s"since it works for shuffle client fetch data and should be configured on client side.") + .version("0.3.0") + .intConf + .createWithDefault(16) + val CHANNEL_HEARTBEAT_INTERVAL: ConfigEntry[Long] = buildConf("celeborn..heartbeat.interval") .withAlternative("celeborn.client.heartbeat.interval") @@ -2608,7 +2638,7 @@ object CelebornConf extends Logging { .createWithDefaultString("2s") val CLIENT_PUSH_DATA_TIMEOUT: ConfigEntry[Long] = - buildConf("celeborn.client.push.data.timeout") + buildConf("celeborn.client.push.timeout") .withAlternative("celeborn.push.data.timeout") .categories("client") .version("0.3.0") @@ -2715,7 +2745,7 @@ object CelebornConf extends Logging { .withAlternative("celeborn.fetch.timeout") .categories("client") .version("0.3.0") - .doc("Timeout for a task to fetch chunk.") + .doc("Timeout for a task to open stream and fetch chunk.") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("30s") diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index 158d4287c..dd835c35b 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -903,6 +903,8 @@ object Utils extends Logging { StatusCode.PUSH_DATA_MASTER_BLACKLISTED case 45 => StatusCode.PUSH_DATA_SLAVE_BLACKLISTED + case 46 => + StatusCode.FETCH_DATA_TIMEOUT case _ => null } diff --git a/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java b/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java index c71411c36..3dd32334f 100644 --- a/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java +++ b/common/src/test/java/org/apache/celeborn/common/network/TransportResponseHandlerSuiteJ.java @@ -33,6 +33,7 @@ import org.apache.celeborn.common.network.client.RpcResponseCallback; import org.apache.celeborn.common.network.client.TransportResponseHandler; import org.apache.celeborn.common.network.protocol.*; import org.apache.celeborn.common.protocol.TransportModuleConstants; +import org.apache.celeborn.common.read.FetchRequestInfo; import org.apache.celeborn.common.util.Utils; import org.apache.celeborn.common.write.PushRequestInfo; @@ -46,7 +47,8 @@ public class TransportResponseHandlerSuiteJ { Utils.fromCelebornConf(new CelebornConf(), TransportModuleConstants.FETCH_MODULE, 8), new LocalChannel()); ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class); - handler.addFetchRequest(streamChunkSlice, callback); + FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() + 30000, callback); + handler.addFetchRequest(streamChunkSlice, info); assertEquals(1, handler.numOutstandingRequests()); handler.handle(new ChunkFetchSuccess(streamChunkSlice, new TestManagedBuffer(123))); @@ -62,7 +64,8 @@ public class TransportResponseHandlerSuiteJ { Utils.fromCelebornConf(new CelebornConf(), TransportModuleConstants.FETCH_MODULE, 8), new LocalChannel()); ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class); - handler.addFetchRequest(streamChunkSlice, callback); + FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() + 30000, callback); + handler.addFetchRequest(streamChunkSlice, info); assertEquals(1, handler.numOutstandingRequests()); handler.handle(new ChunkFetchFailure(streamChunkSlice, "some error msg")); @@ -77,9 +80,10 @@ public class TransportResponseHandlerSuiteJ { Utils.fromCelebornConf(new CelebornConf(), TransportModuleConstants.DATA_MODULE, 8), new LocalChannel()); ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class); - handler.addFetchRequest(new StreamChunkSlice(1, 0), callback); - handler.addFetchRequest(new StreamChunkSlice(1, 1), callback); - handler.addFetchRequest(new StreamChunkSlice(1, 2), callback); + FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() + 30000, callback); + handler.addFetchRequest(new StreamChunkSlice(1, 0), info); + handler.addFetchRequest(new StreamChunkSlice(1, 1), info); + handler.addFetchRequest(new StreamChunkSlice(1, 2), info); assertEquals(3, handler.numOutstandingRequests()); handler.handle(new ChunkFetchSuccess(new StreamChunkSlice(1, 0), new TestManagedBuffer(12))); diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 6ddae63ff..cbd1bb7a5 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -28,7 +28,7 @@ license: | | celeborn.client.fetch.excludedWorker.expireTimeout | <value of celeborn.client.excludedWorker.expireTimeout> | ShuffleClient is a static object, it will be used in the whole lifecycle of Executor,We give a expire time for blacklisted worker to avoid a transient worker issues. | 0.3.0 | | celeborn.client.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch request. | 0.3.0 | | celeborn.client.fetch.maxRetriesForEachReplica | 3 | Max retry times of fetch chunk on each replica | 0.3.0 | -| celeborn.client.fetch.timeout | 30s | Timeout for a task to fetch chunk. | 0.3.0 | +| celeborn.client.fetch.timeout | 30s | Timeout for a task to open stream and fetch chunk. | 0.3.0 | | celeborn.client.flink.compression.enabled | true | Whether to compress data in Flink plugin. | 0.3.0 | | celeborn.client.flink.inputGate.concurrentReadings | 2147483647 | Max concurrent reading channels for a input gate. | 0.3.0 | | celeborn.client.flink.inputGate.memory | 32m | Memory reserved for a input gate. | 0.3.0 | @@ -40,9 +40,8 @@ license: | | celeborn.client.push.blacklist.enabled | false | Whether to enable shuffle client-side push blacklist of workers. | 0.3.0 | | celeborn.client.push.buffer.initial.size | 8k | | 0.3.0 | | celeborn.client.push.buffer.max.size | 64k | Max size of reducer partition buffer memory for shuffle hash writer. The pushed data will be buffered in memory before sending to Celeborn worker. For performance consideration keep this buffer size higher than 32K. Example: If reducer amount is 2000, buffer size is 64K, then each task will consume up to `64KiB * 2000 = 125MiB` heap memory. | 0.3.0 | -| celeborn.client.push.data.timeout | 120s | Timeout for a task to push data rpc message. This value should better be more than twice of `celeborn..push.timeoutCheck.interval` | 0.3.0 | | celeborn.client.push.limit.inFlight.sleepInterval | 50ms | Sleep interval when check netty in-flight requests to be done. | 0.3.0 | -| celeborn.client.push.limit.inFlight.timeout | <undefined> | Timeout for netty in-flight requests to be done.Default value should be `celeborn.client.push.data.timeout * 2`. | 0.3.0 | +| celeborn.client.push.limit.inFlight.timeout | <undefined> | Timeout for netty in-flight requests to be done.Default value should be `celeborn.client.push.timeout * 2`. | 0.3.0 | | celeborn.client.push.limit.strategy | SIMPLE | The strategy used to control the push speed. Valid strategies are SIMPLE and SLOWSTART. the SLOWSTART strategy is usually cooperate with congest control mechanism in the worker side. | 0.3.0 | | celeborn.client.push.maxReqsInFlight | 4 | Amount of Netty in-flight requests per worker. The maximum memory is `celeborn.push.maxReqsInFlight` * `celeborn.push.buffer.max.size` * compression ratio(1 in worst case), default: 64Kib * 32 = 2Mib | 0.3.0 | | celeborn.client.push.queue.capacity | 512 | Push buffer queue size for a task. The maximum memory is `celeborn.push.buffer.max.size` * `celeborn.push.queue.capacity`, default: 64KiB * 512 = 32MiB | 0.3.0 | @@ -56,6 +55,7 @@ license: | | celeborn.client.push.stageEnd.timeout | <value of celeborn.<module>.io.connectionTimeout> | Timeout for waiting StageEnd. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing filesand 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn..io.connectionTimeout`. | 0.3.0 | | celeborn.client.push.takeTaskMaxWaitAttempts | 1 | Max wait times if no task available to push to worker. | 0.3.0 | | celeborn.client.push.takeTaskWaitInterval | 50ms | Wait interval if no task available to push to worker. | 0.3.0 | +| celeborn.client.push.timeout | 120s | Timeout for a task to push data rpc message. This value should better be more than twice of `celeborn..push.timeoutCheck.interval` | 0.3.0 | | celeborn.client.registerShuffle.maxRetries | 3 | Max retry times for client to register shuffle. | 0.3.0 | | celeborn.client.registerShuffle.retryWait | 3s | Wait time before next retry if register shuffle failed. | 0.3.0 | | celeborn.client.requestCommitFiles.maxRetries | 2 | Max retry times for requestCommitFiles RPC. | 0.3.0 | diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 1718dd512..28dc7fd01 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -19,6 +19,8 @@ license: | | Key | Default | Description | Since | | --- | ------- | ----------- | ----- | +| celeborn.<module>.fetch.timeoutCheck.interval | 5s | Interval for checking fetch data timeout. It only support setting to `data` since it works for shuffle client fetch data and should be configured on client side. | 0.3.0 | +| celeborn.<module>.fetch.timeoutCheck.threads | 16 | Threads num for checking fetch data timeout. It only support setting to `data` since it works for shuffle client fetch data and should be configured on client side. | 0.3.0 | | celeborn.<module>.heartbeat.interval | 60s | The heartbeat interval between worker and client. If setting to `data`, it works for shuffle client push and fetch data and should be configured on client side. If setting to `replicate`, it works for worker replicate data to peer worker and should be configured on worker side. | 0.3.0 | | celeborn.<module>.io.backLog | 0 | Requested maximum length of the queue of incoming connections. Default 0 for no backlog. | | | celeborn.<module>.io.clientThreads | 0 | Number of threads used in the client thread pool. Default to 0, which is 2x#cores. | | diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/network/RequestTimeoutIntegrationSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/network/RequestTimeoutIntegrationSuiteJ.java index 56f7e49e5..11223a0d3 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/network/RequestTimeoutIntegrationSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/network/RequestTimeoutIntegrationSuiteJ.java @@ -215,12 +215,12 @@ public class RequestTimeoutIntegrationSuiteJ { // Send one request, which will eventually fail. TestCallback callback0 = new TestCallback(); - client.fetchChunk(0, 0, callback0); + client.fetchChunk(0, 0, 10000, callback0); Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS); // Send a second request before the first has failed. TestCallback callback1 = new TestCallback(); - client.fetchChunk(0, 1, callback1); + client.fetchChunk(0, 1, 10000, callback1); Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS); // not complete yet, but should complete soon diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/ChunkFetchIntegrationSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/ChunkFetchIntegrationSuiteJ.java index a6dd0a162..47c0077c0 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/ChunkFetchIntegrationSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/ChunkFetchIntegrationSuiteJ.java @@ -170,7 +170,7 @@ public class ChunkFetchIntegrationSuiteJ { }; for (int chunkIndex : chunkIndices) { - client.fetchChunk(STREAM_ID, chunkIndex, callback); + client.fetchChunk(STREAM_ID, chunkIndex, 10000, callback); } if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) { fail("Timeout getting response from the server"); diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java index e1b8cacf7..6a4ec4774 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java @@ -232,7 +232,7 @@ public class FileWriterSuiteJ { }; for (int chunkIndex : chunkIndices) { - client.fetchChunk(streamId, chunkIndex, callback); + client.fetchChunk(streamId, chunkIndex, 10000, callback); } if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) { fail("Timeout getting response from the server");