[CELEBORN-676] Celeborn fetch chunk also should support check timeout

### What changes were proposed in this pull request?
Celeborn fetch chunk also should support check timeout

#### Test case
```
executor instance 20

SQL:
SELECT count(1) from (select /*+ REPARTITION(100) */ * from spark_auxiliary.t50g) tmp;

--conf spark.celeborn.client.spark.shuffle.writer=sort \
--conf spark.celeborn.client.fetch.excludeWorkerOnFailure.enabled=true \
--conf spark.celeborn.client.push.timeout=10s \
--conf spark.celeborn.client.push.replicate.enabled=true \
--conf spark.celeborn.client.push.revive.maxRetries=10 \
--conf spark.celeborn.client.reserveSlots.maxRetries=10 \
--conf spark.celeborn.client.registerShuffle.maxRetries=3 \
--conf spark.celeborn.client.push.blacklist.enabled=true \
--conf spark.celeborn.client.blacklistSlave.enabled=true \
--conf spark.celeborn.client.fetch.timeout=30s \
--conf spark.celeborn.client.push.data.timeout=30s \
--conf spark.celeborn.client.push.limit.inFlight.timeout=600s \
--conf spark.celeborn.client.push.maxReqsInFlight=32 \
--conf spark.celeborn.client.shuffle.compression.codec=ZSTD \
--conf spark.celeborn.rpc.askTimeout=30s \
--conf spark.celeborn.client.rpc.reserveSlots.askTimeout=30s \
--conf spark.celeborn.client.shuffle.batchHandleChangePartition.enabled=true \
--conf spark.celeborn.client.shuffle.batchHandleCommitPartition.enabled=true \
--conf spark.celeborn.client.shuffle.batchHandleReleasePartition.enabled=true
```

Test with 3 worker and add a `Thread.sleep(100s)` before worker handle `ChunkFetchRequest`

Before patch
<img width="1783" alt="截屏2023-06-14 上午11 20 55" src="https://github.com/apache/incubator-celeborn/assets/46485123/182dff7d-a057-4077-8368-d1552104d206">

After patch
<img width="1792" alt="image" src="https://github.com/apache/incubator-celeborn/assets/46485123/3c8b7933-8ace-426d-8e9f-04e0aabfac8e">

The log shows the fetch timeout checker workers
```
23/06/14 11:14:54 ERROR WorkerPartitionReader: Fetch chunk 0 failed.
org.apache.celeborn.common.exception.CelebornIOException: FETCH_DATA_TIMEOUT
	at org.apache.celeborn.common.network.client.TransportResponseHandler.failExpiredFetchRequest(TransportResponseHandler.java:147)
	at org.apache.celeborn.common.network.client.TransportResponseHandler.lambda$new$1(TransportResponseHandler.java:103)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
23/06/14 11:14:54 WARN RssInputStream: Fetch chunk failed 1/6 times for location PartitionLocation[
  id-epoch:35-0
  host-rpcPort-pushPort-fetchPort-replicatePort:10.169.48.203-9092-9094-9093-9095
  mode:MASTER
  peer:(host-rpcPort-pushPort-fetchPort-replicatePort:10.169.48.202-9092-9094-9093-9095)
  storage hint:StorageInfo{type=HDD, mountPoint='/mnt/ssd/0', finalResult=true, filePath=}
  mapIdBitMap:null], change to peer
org.apache.celeborn.common.exception.CelebornIOException: Fetch chunk 0 failed.
	at org.apache.celeborn.client.read.WorkerPartitionReader$1.onFailure(WorkerPartitionReader.java:98)
	at org.apache.celeborn.common.network.client.TransportResponseHandler.failExpiredFetchRequest(TransportResponseHandler.java:146)
	at org.apache.celeborn.common.network.client.TransportResponseHandler.lambda$new$1(TransportResponseHandler.java:103)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.celeborn.common.exception.CelebornIOException: FETCH_DATA_TIMEOUT
	at org.apache.celeborn.common.network.client.TransportResponseHandler.failExpiredFetchRequest(TransportResponseHandler.java:147)
	... 8 more
23/06/14 11:14:54 INFO SortBasedShuffleWriter: Memory used 72.0 MB
```

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #1587 from AngersZhuuuu/CELEBORN-676.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Angerszhuuuu <angers.zhu@gmail.com>
This commit is contained in:
Angerszhuuuu 2023-06-15 13:54:09 +08:00
parent 47f66a87a1
commit 0aa13832b5
14 changed files with 182 additions and 40 deletions

View File

@ -55,6 +55,7 @@ public class WorkerPartitionReader implements PartitionReader {
private final AtomicReference<IOException> exception = new AtomicReference<>();
private final int fetchMaxReqsInFlight;
private final long fetchTimeoutMs;
private boolean closed = false;
// for test
@ -74,6 +75,7 @@ public class WorkerPartitionReader implements PartitionReader {
throws IOException, InterruptedException {
fetchMaxReqsInFlight = conf.clientFetchMaxReqsInFlight();
results = new LinkedBlockingQueue<>();
fetchTimeoutMs = conf.clientFetchTimeoutMs();
// only add the buffer to results queue if this reader is not closed.
callback =
new ChunkReceivedCallback() {
@ -105,8 +107,7 @@ public class WorkerPartitionReader implements PartitionReader {
}
OpenStream openBlocks =
new OpenStream(shuffleKey, location.getFileName(), startMapIndex, endMapIndex);
long timeoutMs = conf.clientFetchTimeoutMs();
ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), timeoutMs);
ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), fetchTimeoutMs);
streamHandle = (StreamHandle) Message.decode(response);
this.location = location;
@ -166,7 +167,7 @@ public class WorkerPartitionReader implements PartitionReader {
try {
TransportClient client =
clientFactory.createClient(location.getHost(), location.getFetchPort());
client.fetchChunk(streamHandle.streamId, chunkIndex, callback);
client.fetchChunk(streamHandle.streamId, chunkIndex, fetchTimeoutMs, callback);
chunkIndex++;
} catch (IOException e) {
logger.error(

View File

@ -39,6 +39,7 @@ import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.network.buffer.NioManagedBuffer;
import org.apache.celeborn.common.network.protocol.*;
import org.apache.celeborn.common.network.util.NettyUtils;
import org.apache.celeborn.common.read.FetchRequestInfo;
import org.apache.celeborn.common.write.PushRequestInfo;
/**
@ -90,8 +91,9 @@ public class TransportClient implements Closeable {
return channel.remoteAddress();
}
public void fetchChunk(long streamId, int chunkIndex, ChunkReceivedCallback callback) {
fetchChunk(streamId, chunkIndex, 0, Integer.MAX_VALUE, callback);
public void fetchChunk(
long streamId, int chunkIndex, long fetchDataTimeout, ChunkReceivedCallback callback) {
fetchChunk(streamId, chunkIndex, 0, Integer.MAX_VALUE, fetchDataTimeout, callback);
}
/**
@ -112,7 +114,12 @@ public class TransportClient implements Closeable {
* @param callback Callback invoked upon successful receipt of chunk, or upon any failure.
*/
public void fetchChunk(
long streamId, int chunkIndex, int offset, int len, ChunkReceivedCallback callback) {
long streamId,
int chunkIndex,
int offset,
int len,
long fetchDataTimeout,
ChunkReceivedCallback callback) {
if (logger.isDebugEnabled()) {
logger.debug(
"Sending fetch chunk request {} to {}.",
@ -129,9 +136,14 @@ public class TransportClient implements Closeable {
callback.onFailure(chunkIndex, new IOException(errorMsg, cause));
}
};
handler.addFetchRequest(streamChunkSlice, callback);
channel.writeAndFlush(new ChunkFetchRequest(streamChunkSlice)).addListener(listener);
long dueTime = System.currentTimeMillis() + fetchDataTimeout;
FetchRequestInfo info = new FetchRequestInfo(dueTime, callback);
handler.addFetchRequest(streamChunkSlice, info);
ChannelFuture channelFuture =
channel.writeAndFlush(new ChunkFetchRequest(streamChunkSlice)).addListener(listener);
info.setChannelFuture(channelFuture);
}
/**

View File

@ -37,6 +37,7 @@ import org.apache.celeborn.common.network.util.NettyUtils;
import org.apache.celeborn.common.network.util.TransportConf;
import org.apache.celeborn.common.protocol.TransportModuleConstants;
import org.apache.celeborn.common.protocol.message.StatusCode;
import org.apache.celeborn.common.read.FetchRequestInfo;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.common.write.PushRequestInfo;
@ -53,7 +54,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
private final TransportConf conf;
private final Channel channel;
private final Map<StreamChunkSlice, ChunkReceivedCallback> outstandingFetches;
private final Map<StreamChunkSlice, FetchRequestInfo> outstandingFetches;
private final Map<Long, RpcResponseCallback> outstandingRpcs;
private final ConcurrentHashMap<Long, PushRequestInfo> outstandingPushes;
@ -63,7 +64,11 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
private final long pushTimeoutCheckerInterval;
private static ScheduledExecutorService pushTimeoutChecker = null;
private ScheduledFuture scheduleFuture;
private ScheduledFuture pushCheckerScheduleFuture;
private final long fetchTimeoutCheckerInterval;
private static ScheduledExecutorService fetchTimeoutChecker = null;
private ScheduledFuture fetchCheckerScheduleFuture;
public TransportResponseHandler(TransportConf conf, Channel channel) {
this.conf = conf;
@ -72,20 +77,33 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
this.outstandingRpcs = JavaUtils.newConcurrentHashMap();
this.outstandingPushes = JavaUtils.newConcurrentHashMap();
this.timeOfLastRequestNs = new AtomicLong(0);
pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
this.pushTimeoutCheckerInterval = conf.pushDataTimeoutCheckIntervalMs();
this.fetchTimeoutCheckerInterval = conf.fetchDataTimeoutCheckIntervalMs();
synchronized (TransportResponseHandler.class) {
if (pushTimeoutChecker == null) {
pushTimeoutChecker =
ThreadUtils.newDaemonThreadPoolScheduledExecutor(
"push-timeout-checker", conf.pushDataTimeoutCheckerThreads());
}
if (fetchTimeoutChecker == null) {
fetchTimeoutChecker =
ThreadUtils.newDaemonThreadPoolScheduledExecutor(
"fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads());
}
}
scheduleFuture =
pushCheckerScheduleFuture =
pushTimeoutChecker.scheduleAtFixedRate(
() -> failExpiredPushRequest(),
pushTimeoutCheckerInterval,
pushTimeoutCheckerInterval,
TimeUnit.MILLISECONDS);
fetchCheckerScheduleFuture =
fetchTimeoutChecker.scheduleAtFixedRate(
() -> failExpiredFetchRequest(),
fetchTimeoutCheckerInterval,
fetchTimeoutCheckerInterval,
TimeUnit.MILLISECONDS);
}
public void failExpiredPushRequest() {
@ -113,12 +131,33 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
}
}
public void addFetchRequest(StreamChunkSlice streamChunkSlice, ChunkReceivedCallback callback) {
public void failExpiredFetchRequest() {
long currentTime = System.currentTimeMillis();
Iterator<Map.Entry<StreamChunkSlice, FetchRequestInfo>> iter =
outstandingFetches.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<StreamChunkSlice, FetchRequestInfo> entry = iter.next();
if (entry.getValue().dueTime <= currentTime) {
FetchRequestInfo info = outstandingFetches.remove(entry.getKey());
if (info != null) {
if (info.channelFuture != null) {
info.channelFuture.cancel(true);
}
info.callback.onFailure(
entry.getKey().chunkIndex, new CelebornIOException(StatusCode.FETCH_DATA_TIMEOUT));
info.channelFuture = null;
info.callback = null;
}
}
}
}
public void addFetchRequest(StreamChunkSlice streamChunkSlice, FetchRequestInfo info) {
updateTimeOfLastRequest();
if (outstandingFetches.containsKey(streamChunkSlice)) {
logger.warn("[addFetchRequest] streamChunkSlice {} already exists!", streamChunkSlice);
}
outstandingFetches.put(streamChunkSlice, callback);
outstandingFetches.put(streamChunkSlice, info);
}
public void removeFetchRequest(StreamChunkSlice streamChunkSlice) {
@ -154,9 +193,9 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
* exception or pre-mature connection termination.
*/
private void failOutstandingRequests(Throwable cause) {
for (Map.Entry<StreamChunkSlice, ChunkReceivedCallback> entry : outstandingFetches.entrySet()) {
for (Map.Entry<StreamChunkSlice, FetchRequestInfo> entry : outstandingFetches.entrySet()) {
try {
entry.getValue().onFailure(entry.getKey().chunkIndex, cause);
entry.getValue().callback.onFailure(entry.getKey().chunkIndex, cause);
} catch (Exception e) {
logger.warn("ChunkReceivedCallback.onFailure throws exception", e);
}
@ -195,7 +234,8 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
remoteAddress);
failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed"));
}
scheduleFuture.cancel(false);
pushCheckerScheduleFuture.cancel(false);
fetchCheckerScheduleFuture.cancel(false);
}
@Override
@ -208,28 +248,32 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
remoteAddress);
failOutstandingRequests(cause);
}
scheduleFuture.cancel(false);
pushCheckerScheduleFuture.cancel(false);
fetchCheckerScheduleFuture.cancel(false);
}
@Override
public void handle(ResponseMessage message) throws Exception {
if (message instanceof ChunkFetchSuccess) {
ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
ChunkReceivedCallback listener = outstandingFetches.remove(resp.streamChunkSlice);
if (listener == null) {
FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
if (info == null) {
logger.warn(
"Ignoring response for block {} from {} since it is not outstanding",
resp.streamChunkSlice,
NettyUtils.getRemoteAddress(channel));
resp.body().release();
} else {
listener.onSuccess(resp.streamChunkSlice.chunkIndex, resp.body());
resp.body().release();
try {
info.callback.onSuccess(resp.streamChunkSlice.chunkIndex, resp.body());
} finally {
resp.body().release();
}
}
} else if (message instanceof ChunkFetchFailure) {
ChunkFetchFailure resp = (ChunkFetchFailure) message;
ChunkReceivedCallback listener = outstandingFetches.remove(resp.streamChunkSlice);
if (listener == null) {
FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
if (info == null) {
logger.warn(
"Ignoring response for block {} from {} ({}) since it is not outstanding",
resp.streamChunkSlice,
@ -237,7 +281,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
resp.errorString);
} else {
logger.warn("Receive ChunkFetchFailure, errorMsg {}", resp.errorString);
listener.onFailure(
info.callback.onFailure(
resp.streamChunkSlice.chunkIndex,
new ChunkFetchFailureException(
"Failure while fetching " + resp.streamChunkSlice + ": " + resp.errorString));

View File

@ -142,6 +142,14 @@ public class TransportConf {
return celebornConf.pushDataTimeoutCheckInterval(module);
}
public int fetchDataTimeoutCheckerThreads() {
return celebornConf.fetchDataTimeoutCheckerThreads(module);
}
public long fetchDataTimeoutCheckIntervalMs() {
return celebornConf.fetchDataTimeoutCheckInterval(module);
}
public long clientHearbeatInterval() {
return celebornConf.clientHeartbeatInterval(module);
}

View File

@ -75,7 +75,9 @@ public enum StatusCode {
PUSH_DATA_TIMEOUT_MASTER(42),
PUSH_DATA_TIMEOUT_SLAVE(43),
PUSH_DATA_MASTER_BLACKLISTED(44),
PUSH_DATA_SLAVE_BLACKLISTED(45);
PUSH_DATA_SLAVE_BLACKLISTED(45),
FETCH_DATA_TIMEOUT(46);
private final byte value;

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.common.read;
import io.netty.channel.ChannelFuture;
import org.apache.celeborn.common.network.client.ChunkReceivedCallback;
public class FetchRequestInfo {
public ChannelFuture channelFuture;
public long dueTime;
public ChunkReceivedCallback callback;
public FetchRequestInfo(long dueTime, ChunkReceivedCallback callback) {
this.dueTime = dueTime;
this.callback = callback;
}
public void setChannelFuture(ChannelFuture future) {
this.channelFuture = future;
}
}

View File

@ -482,6 +482,16 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
getTimeAsMs(key, PUSH_TIMEOUT_CHECK_INTERVAL.defaultValueString)
}
def fetchDataTimeoutCheckerThreads(module: String): Int = {
val key = FETCH_TIMEOUT_CHECK_THREADS.key.replace("<module>", module)
getInt(key, FETCH_TIMEOUT_CHECK_THREADS.defaultValue.get)
}
def fetchDataTimeoutCheckInterval(module: String): Long = {
val key = FETCH_TIMEOUT_CHECK_INTERVAL.key.replace("<module>", module)
getTimeAsMs(key, FETCH_TIMEOUT_CHECK_INTERVAL.defaultValueString)
}
// //////////////////////////////////////////////////////
// Master //
// //////////////////////////////////////////////////////
@ -1385,6 +1395,26 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(16)
val FETCH_TIMEOUT_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.<module>.fetch.timeoutCheck.interval")
.categories("network")
.doc("Interval for checking fetch data timeout. " +
s"It only support setting <module> to `${TransportModuleConstants.DATA_MODULE}` " +
s"since it works for shuffle client fetch data and should be configured on client side.")
.version("0.3.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
val FETCH_TIMEOUT_CHECK_THREADS: ConfigEntry[Int] =
buildConf("celeborn.<module>.fetch.timeoutCheck.threads")
.categories("network")
.doc("Threads num for checking fetch data timeout. " +
s"It only support setting <module> to `${TransportModuleConstants.DATA_MODULE}` " +
s"since it works for shuffle client fetch data and should be configured on client side.")
.version("0.3.0")
.intConf
.createWithDefault(16)
val CHANNEL_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.<module>.heartbeat.interval")
.withAlternative("celeborn.client.heartbeat.interval")
@ -2608,7 +2638,7 @@ object CelebornConf extends Logging {
.createWithDefaultString("2s")
val CLIENT_PUSH_DATA_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.client.push.data.timeout")
buildConf("celeborn.client.push.timeout")
.withAlternative("celeborn.push.data.timeout")
.categories("client")
.version("0.3.0")
@ -2715,7 +2745,7 @@ object CelebornConf extends Logging {
.withAlternative("celeborn.fetch.timeout")
.categories("client")
.version("0.3.0")
.doc("Timeout for a task to fetch chunk.")
.doc("Timeout for a task to open stream and fetch chunk.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")

View File

@ -903,6 +903,8 @@ object Utils extends Logging {
StatusCode.PUSH_DATA_MASTER_BLACKLISTED
case 45 =>
StatusCode.PUSH_DATA_SLAVE_BLACKLISTED
case 46 =>
StatusCode.FETCH_DATA_TIMEOUT
case _ =>
null
}

View File

@ -33,6 +33,7 @@ import org.apache.celeborn.common.network.client.RpcResponseCallback;
import org.apache.celeborn.common.network.client.TransportResponseHandler;
import org.apache.celeborn.common.network.protocol.*;
import org.apache.celeborn.common.protocol.TransportModuleConstants;
import org.apache.celeborn.common.read.FetchRequestInfo;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.common.write.PushRequestInfo;
@ -46,7 +47,8 @@ public class TransportResponseHandlerSuiteJ {
Utils.fromCelebornConf(new CelebornConf(), TransportModuleConstants.FETCH_MODULE, 8),
new LocalChannel());
ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
handler.addFetchRequest(streamChunkSlice, callback);
FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() + 30000, callback);
handler.addFetchRequest(streamChunkSlice, info);
assertEquals(1, handler.numOutstandingRequests());
handler.handle(new ChunkFetchSuccess(streamChunkSlice, new TestManagedBuffer(123)));
@ -62,7 +64,8 @@ public class TransportResponseHandlerSuiteJ {
Utils.fromCelebornConf(new CelebornConf(), TransportModuleConstants.FETCH_MODULE, 8),
new LocalChannel());
ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
handler.addFetchRequest(streamChunkSlice, callback);
FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() + 30000, callback);
handler.addFetchRequest(streamChunkSlice, info);
assertEquals(1, handler.numOutstandingRequests());
handler.handle(new ChunkFetchFailure(streamChunkSlice, "some error msg"));
@ -77,9 +80,10 @@ public class TransportResponseHandlerSuiteJ {
Utils.fromCelebornConf(new CelebornConf(), TransportModuleConstants.DATA_MODULE, 8),
new LocalChannel());
ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
handler.addFetchRequest(new StreamChunkSlice(1, 0), callback);
handler.addFetchRequest(new StreamChunkSlice(1, 1), callback);
handler.addFetchRequest(new StreamChunkSlice(1, 2), callback);
FetchRequestInfo info = new FetchRequestInfo(System.currentTimeMillis() + 30000, callback);
handler.addFetchRequest(new StreamChunkSlice(1, 0), info);
handler.addFetchRequest(new StreamChunkSlice(1, 1), info);
handler.addFetchRequest(new StreamChunkSlice(1, 2), info);
assertEquals(3, handler.numOutstandingRequests());
handler.handle(new ChunkFetchSuccess(new StreamChunkSlice(1, 0), new TestManagedBuffer(12)));

View File

@ -28,7 +28,7 @@ license: |
| celeborn.client.fetch.excludedWorker.expireTimeout | &lt;value of celeborn.client.excludedWorker.expireTimeout&gt; | ShuffleClient is a static object, it will be used in the whole lifecycle of Executor,We give a expire time for blacklisted worker to avoid a transient worker issues. | 0.3.0 |
| celeborn.client.fetch.maxReqsInFlight | 3 | Amount of in-flight chunk fetch request. | 0.3.0 |
| celeborn.client.fetch.maxRetriesForEachReplica | 3 | Max retry times of fetch chunk on each replica | 0.3.0 |
| celeborn.client.fetch.timeout | 30s | Timeout for a task to fetch chunk. | 0.3.0 |
| celeborn.client.fetch.timeout | 30s | Timeout for a task to open stream and fetch chunk. | 0.3.0 |
| celeborn.client.flink.compression.enabled | true | Whether to compress data in Flink plugin. | 0.3.0 |
| celeborn.client.flink.inputGate.concurrentReadings | 2147483647 | Max concurrent reading channels for a input gate. | 0.3.0 |
| celeborn.client.flink.inputGate.memory | 32m | Memory reserved for a input gate. | 0.3.0 |
@ -40,9 +40,8 @@ license: |
| celeborn.client.push.blacklist.enabled | false | Whether to enable shuffle client-side push blacklist of workers. | 0.3.0 |
| celeborn.client.push.buffer.initial.size | 8k | | 0.3.0 |
| celeborn.client.push.buffer.max.size | 64k | Max size of reducer partition buffer memory for shuffle hash writer. The pushed data will be buffered in memory before sending to Celeborn worker. For performance consideration keep this buffer size higher than 32K. Example: If reducer amount is 2000, buffer size is 64K, then each task will consume up to `64KiB * 2000 = 125MiB` heap memory. | 0.3.0 |
| celeborn.client.push.data.timeout | 120s | Timeout for a task to push data rpc message. This value should better be more than twice of `celeborn.<module>.push.timeoutCheck.interval` | 0.3.0 |
| celeborn.client.push.limit.inFlight.sleepInterval | 50ms | Sleep interval when check netty in-flight requests to be done. | 0.3.0 |
| celeborn.client.push.limit.inFlight.timeout | &lt;undefined&gt; | Timeout for netty in-flight requests to be done.Default value should be `celeborn.client.push.data.timeout * 2`. | 0.3.0 |
| celeborn.client.push.limit.inFlight.timeout | &lt;undefined&gt; | Timeout for netty in-flight requests to be done.Default value should be `celeborn.client.push.timeout * 2`. | 0.3.0 |
| celeborn.client.push.limit.strategy | SIMPLE | The strategy used to control the push speed. Valid strategies are SIMPLE and SLOWSTART. the SLOWSTART strategy is usually cooperate with congest control mechanism in the worker side. | 0.3.0 |
| celeborn.client.push.maxReqsInFlight | 4 | Amount of Netty in-flight requests per worker. The maximum memory is `celeborn.push.maxReqsInFlight` * `celeborn.push.buffer.max.size` * compression ratio(1 in worst case), default: 64Kib * 32 = 2Mib | 0.3.0 |
| celeborn.client.push.queue.capacity | 512 | Push buffer queue size for a task. The maximum memory is `celeborn.push.buffer.max.size` * `celeborn.push.queue.capacity`, default: 64KiB * 512 = 32MiB | 0.3.0 |
@ -56,6 +55,7 @@ license: |
| celeborn.client.push.stageEnd.timeout | &lt;value of celeborn.&lt;module&gt;.io.connectionTimeout&gt; | Timeout for waiting StageEnd. During this process, there are `celeborn.client.requestCommitFiles.maxRetries` times for retry opportunities for committing filesand 1 times for releasing slots request. User can customize this value according to your setting. By default, the value is the max timeout value `celeborn.<module>.io.connectionTimeout`. | 0.3.0 |
| celeborn.client.push.takeTaskMaxWaitAttempts | 1 | Max wait times if no task available to push to worker. | 0.3.0 |
| celeborn.client.push.takeTaskWaitInterval | 50ms | Wait interval if no task available to push to worker. | 0.3.0 |
| celeborn.client.push.timeout | 120s | Timeout for a task to push data rpc message. This value should better be more than twice of `celeborn.<module>.push.timeoutCheck.interval` | 0.3.0 |
| celeborn.client.registerShuffle.maxRetries | 3 | Max retry times for client to register shuffle. | 0.3.0 |
| celeborn.client.registerShuffle.retryWait | 3s | Wait time before next retry if register shuffle failed. | 0.3.0 |
| celeborn.client.requestCommitFiles.maxRetries | 2 | Max retry times for requestCommitFiles RPC. | 0.3.0 |

View File

@ -19,6 +19,8 @@ license: |
<!--begin-include-->
| Key | Default | Description | Since |
| --- | ------- | ----------- | ----- |
| celeborn.&lt;module&gt;.fetch.timeoutCheck.interval | 5s | Interval for checking fetch data timeout. It only support setting <module> to `data` since it works for shuffle client fetch data and should be configured on client side. | 0.3.0 |
| celeborn.&lt;module&gt;.fetch.timeoutCheck.threads | 16 | Threads num for checking fetch data timeout. It only support setting <module> to `data` since it works for shuffle client fetch data and should be configured on client side. | 0.3.0 |
| celeborn.&lt;module&gt;.heartbeat.interval | 60s | The heartbeat interval between worker and client. If setting <module> to `data`, it works for shuffle client push and fetch data and should be configured on client side. If setting <module> to `replicate`, it works for worker replicate data to peer worker and should be configured on worker side. | 0.3.0 |
| celeborn.&lt;module&gt;.io.backLog | 0 | Requested maximum length of the queue of incoming connections. Default 0 for no backlog. | |
| celeborn.&lt;module&gt;.io.clientThreads | 0 | Number of threads used in the client thread pool. Default to 0, which is 2x#cores. | |

View File

@ -215,12 +215,12 @@ public class RequestTimeoutIntegrationSuiteJ {
// Send one request, which will eventually fail.
TestCallback callback0 = new TestCallback();
client.fetchChunk(0, 0, callback0);
client.fetchChunk(0, 0, 10000, callback0);
Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS);
// Send a second request before the first has failed.
TestCallback callback1 = new TestCallback();
client.fetchChunk(0, 1, callback1);
client.fetchChunk(0, 1, 10000, callback1);
Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS);
// not complete yet, but should complete soon

View File

@ -170,7 +170,7 @@ public class ChunkFetchIntegrationSuiteJ {
};
for (int chunkIndex : chunkIndices) {
client.fetchChunk(STREAM_ID, chunkIndex, callback);
client.fetchChunk(STREAM_ID, chunkIndex, 10000, callback);
}
if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
fail("Timeout getting response from the server");

View File

@ -232,7 +232,7 @@ public class FileWriterSuiteJ {
};
for (int chunkIndex : chunkIndices) {
client.fetchChunk(streamId, chunkIndex, callback);
client.fetchChunk(streamId, chunkIndex, 10000, callback);
}
if (!sem.tryAcquire(chunkIndices.size(), 5, TimeUnit.SECONDS)) {
fail("Timeout getting response from the server");