diff --git a/CONFIGURATION_GUIDE.md b/CONFIGURATION_GUIDE.md index 2bfbbef8a..5be4619b7 100644 --- a/CONFIGURATION_GUIDE.md +++ b/CONFIGURATION_GUIDE.md @@ -147,6 +147,7 @@ So we should set `rss.worker.flush.queue.capacity=6553` and each RSS worker has | `rss.worker.numSlots` | -1 | int | | | `rss.rpc.max.parallelism` | 1024 | int | | | `rss.register.shuffle.max.retry` | 3 | int | | +| `rss.register.shuffle.retry.wait` | 3s | int | | | `rss.flush.timeout` | 240 s | String | | | `rss.expire.nonEmptyDir.duration` | 3 d | String | | | `rss.expire.nonEmptyDir.cleanUp.threshold` | 10 | int | | diff --git a/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClientImpl.java b/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClientImpl.java index 5ffb44d26..ab86b57a0 100644 --- a/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClientImpl.java +++ b/client/src/main/java/com/aliyun/emr/rss/client/ShuffleClientImpl.java @@ -85,6 +85,8 @@ public class ShuffleClientImpl extends ShuffleClient { private static final Random rand = new Random(); private final RssConf conf; + private final int registerShuffleMaxRetries; + private final long registerShuffleRetryWait; private final int maxInFlight; private final int pushBufferSize; @@ -131,6 +133,8 @@ public class ShuffleClientImpl extends ShuffleClient { public ShuffleClientImpl(RssConf conf) { super(); this.conf = conf; + registerShuffleMaxRetries = RssConf.registerShuffleMaxRetry(conf); + registerShuffleRetryWait = RssConf.registerShuffleRetryWait(conf); maxInFlight = RssConf.pushDataMaxReqsInFlight(conf); pushBufferSize = RssConf.pushDataBufferSize(conf); @@ -244,7 +248,7 @@ public class ShuffleClientImpl extends ShuffleClient { private ConcurrentHashMap registerShuffle( String appId, int shuffleId, int numMappers, int numPartitions) { - int numRetries = 3; + int numRetries = registerShuffleMaxRetries; while (numRetries > 0) { try { RegisterShuffleResponse response = driverRssMetaService.askSync( @@ -267,7 +271,7 @@ public class ShuffleClientImpl extends ShuffleClient { } try { - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(registerShuffleRetryWait); } catch (InterruptedException e) { break; } diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala index 6f8622eab..d05773000 100644 --- a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala +++ b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala @@ -508,6 +508,10 @@ object RssConf extends Logging { conf.getInt("rss.register.shuffle.max.retry", 3) } + def registerShuffleRetryWait(conf: RssConf): Long = { + conf.getTimeAsSeconds("rss.register.shuffle.retry.wait", "3s") + } + def flushTimeout(conf: RssConf): Long = { conf.getTimeAsSeconds("rss.flush.timeout", "120s") }