[BUG] Register shuffle with configurable retry times and retry wait time (#83)

This commit is contained in:
AngersZhuuuu 2022-04-01 16:59:37 +08:00 committed by GitHub
parent 4bd3a539a5
commit 86bbeea9b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 2 deletions

View File

@ -147,6 +147,7 @@ So we should set `rss.worker.flush.queue.capacity=6553` and each RSS worker has
| `rss.worker.numSlots` | -1 | int | |
| `rss.rpc.max.parallelism` | 1024 | int | |
| `rss.register.shuffle.max.retry` | 3 | int | |
| `rss.register.shuffle.retry.wait` | 3s | int | |
| `rss.flush.timeout` | 240 s | String | |
| `rss.expire.nonEmptyDir.duration` | 3 d | String | |
| `rss.expire.nonEmptyDir.cleanUp.threshold` | 10 | int | |

View File

@ -85,6 +85,8 @@ public class ShuffleClientImpl extends ShuffleClient {
private static final Random rand = new Random();
private final RssConf conf;
private final int registerShuffleMaxRetries;
private final long registerShuffleRetryWait;
private final int maxInFlight;
private final int pushBufferSize;
@ -131,6 +133,8 @@ public class ShuffleClientImpl extends ShuffleClient {
public ShuffleClientImpl(RssConf conf) {
super();
this.conf = conf;
registerShuffleMaxRetries = RssConf.registerShuffleMaxRetry(conf);
registerShuffleRetryWait = RssConf.registerShuffleRetryWait(conf);
maxInFlight = RssConf.pushDataMaxReqsInFlight(conf);
pushBufferSize = RssConf.pushDataBufferSize(conf);
@ -244,7 +248,7 @@ public class ShuffleClientImpl extends ShuffleClient {
private ConcurrentHashMap<Integer, PartitionLocation> registerShuffle(
String appId, int shuffleId, int numMappers, int numPartitions) {
int numRetries = 3;
int numRetries = registerShuffleMaxRetries;
while (numRetries > 0) {
try {
RegisterShuffleResponse response = driverRssMetaService.<RegisterShuffleResponse>askSync(
@ -267,7 +271,7 @@ public class ShuffleClientImpl extends ShuffleClient {
}
try {
TimeUnit.SECONDS.sleep(3);
TimeUnit.SECONDS.sleep(registerShuffleRetryWait);
} catch (InterruptedException e) {
break;
}

View File

@ -508,6 +508,10 @@ object RssConf extends Logging {
conf.getInt("rss.register.shuffle.max.retry", 3)
}
def registerShuffleRetryWait(conf: RssConf): Long = {
conf.getTimeAsSeconds("rss.register.shuffle.retry.wait", "3s")
}
def flushTimeout(conf: RssConf): Long = {
conf.getTimeAsSeconds("rss.flush.timeout", "120s")
}