diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 138a1501c..17f3f5cc4 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -373,6 +373,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def portMaxRetries: Int = get(PORT_MAX_RETRY) def networkTimeout: RpcTimeout = new RpcTimeout(get(NETWORK_TIMEOUT).milli, NETWORK_TIMEOUT.key) + def rpcIoThreads: Option[Int] = get(RPC_IO_THREAD) def rpcConnectThreads: Int = get(RPC_CONNECT_THREADS) def rpcLookupTimeout: RpcTimeout = new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key) @@ -1071,6 +1072,16 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("10s") + val RPC_IO_THREAD: OptionalConfigEntry[Int] = + buildConf("celeborn.rpc.io.threads") + .withAlternative("rss.rpc.io.threads") + .categories("network") + .doc("Netty IO thread number of NettyRpcEnv to handle RPC request. " + + "The default threads number is the number of runtime available processors.") + .version("0.2.0") + .intConf + .createOptional + val RPC_CONNECT_THREADS: ConfigEntry[Int] = buildConf("celeborn.rpc.connect.threads") .categories("network") diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala index 57f2c1c33..1efeb2cbe 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala @@ -52,7 +52,7 @@ class NettyRpcEnv( private[celeborn] val transportConf = Utils.fromCelebornConf( conf.clone, TransportModuleConstants.RPC_MODULE, - conf.getInt("celeborn.rpc.io.threads", numUsableCores)) + conf.rpcIoThreads.getOrElse(numUsableCores)) private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores) diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 8fb4fef81..b990454b6 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -39,6 +39,7 @@ license: | | celeborn.rpc.askTimeout | <value of celeborn.network.timeout> | Timeout for RPC ask operations. | 0.2.0 | | celeborn.rpc.connect.threads | 64 | | 0.2.0 | | celeborn.rpc.haClient.askTimeout | <value of celeborn.network.timeout> | Timeout for HA client RPC ask operations. | 0.2.0 | +| celeborn.rpc.io.threads | <undefined> | Netty IO thread number of NettyRpcEnv to handle RPC request. The default threads number is the number of runtime available processors. | 0.2.0 | | celeborn.rpc.lookupTimeout | 30s | Timeout for RPC lookup operations. | 0.2.0 | | celeborn.shuffle.maxChunksBeingTransferred | 9223372036854775807 | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see `celeborn.shuffle.io.maxRetries` and `celeborn.shuffle.io.retryWait`), if those limits are reached the task will fail with fetch failure. | 0.2.0 |