From 6830cb61efb09a1bbeb1ee8a6f54e92528339e8d Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Fri, 21 Apr 2023 11:21:41 +0800 Subject: [PATCH] [CELEBORN-540][Refactor] Add config entity of celeborn.rpc.io.threads (#1443) * [CELEBORN-540][CONF] Add config entity of celeborn.rpc.io.threads --- .../org/apache/celeborn/common/CelebornConf.scala | 11 +++++++++++ .../celeborn/common/rpc/netty/NettyRpcEnv.scala | 2 +- docs/configuration/network.md | 1 + 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 138a1501c..17f3f5cc4 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -373,6 +373,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def portMaxRetries: Int = get(PORT_MAX_RETRY) def networkTimeout: RpcTimeout = new RpcTimeout(get(NETWORK_TIMEOUT).milli, NETWORK_TIMEOUT.key) + def rpcIoThreads: Option[Int] = get(RPC_IO_THREAD) def rpcConnectThreads: Int = get(RPC_CONNECT_THREADS) def rpcLookupTimeout: RpcTimeout = new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key) @@ -1071,6 +1072,16 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("10s") + val RPC_IO_THREAD: OptionalConfigEntry[Int] = + buildConf("celeborn.rpc.io.threads") + .withAlternative("rss.rpc.io.threads") + .categories("network") + .doc("Netty IO thread number of NettyRpcEnv to handle RPC request. " + + "The default threads number is the number of runtime available processors.") + .version("0.2.0") + .intConf + .createOptional + val RPC_CONNECT_THREADS: ConfigEntry[Int] = buildConf("celeborn.rpc.connect.threads") .categories("network") diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala index 57f2c1c33..1efeb2cbe 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala @@ -52,7 +52,7 @@ class NettyRpcEnv( private[celeborn] val transportConf = Utils.fromCelebornConf( conf.clone, TransportModuleConstants.RPC_MODULE, - conf.getInt("celeborn.rpc.io.threads", numUsableCores)) + conf.rpcIoThreads.getOrElse(numUsableCores)) private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores) diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 8fb4fef81..b990454b6 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -39,6 +39,7 @@ license: | | celeborn.rpc.askTimeout | <value of celeborn.network.timeout> | Timeout for RPC ask operations. | 0.2.0 | | celeborn.rpc.connect.threads | 64 | | 0.2.0 | | celeborn.rpc.haClient.askTimeout | <value of celeborn.network.timeout> | Timeout for HA client RPC ask operations. | 0.2.0 | +| celeborn.rpc.io.threads | <undefined> | Netty IO thread number of NettyRpcEnv to handle RPC request. The default threads number is the number of runtime available processors. | 0.2.0 | | celeborn.rpc.lookupTimeout | 30s | Timeout for RPC lookup operations. | 0.2.0 | | celeborn.shuffle.maxChunksBeingTransferred | 9223372036854775807 | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see `celeborn.shuffle.io.maxRetries` and `celeborn.shuffle.io.retryWait`), if those limits are reached the task will fail with fetch failure. | 0.2.0 |