[CELEBORN-540][Refactor] Add config entity of celeborn.rpc.io.threads (#1443)

* [CELEBORN-540][CONF] Add config entity of celeborn.rpc.io.threads
This commit is contained in:
Angerszhuuuu 2023-04-21 11:21:41 +08:00 committed by GitHub
parent 62d60de8c5
commit 6830cb61ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 13 additions and 1 deletions

View File

@ -373,6 +373,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def portMaxRetries: Int = get(PORT_MAX_RETRY)
def networkTimeout: RpcTimeout =
new RpcTimeout(get(NETWORK_TIMEOUT).milli, NETWORK_TIMEOUT.key)
def rpcIoThreads: Option[Int] = get(RPC_IO_THREAD)
def rpcConnectThreads: Int = get(RPC_CONNECT_THREADS)
def rpcLookupTimeout: RpcTimeout =
new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key)
@ -1071,6 +1072,16 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")
val RPC_IO_THREAD: OptionalConfigEntry[Int] =
buildConf("celeborn.rpc.io.threads")
.withAlternative("rss.rpc.io.threads")
.categories("network")
.doc("Netty IO thread number of NettyRpcEnv to handle RPC request. " +
"The default threads number is the number of runtime available processors.")
.version("0.2.0")
.intConf
.createOptional
val RPC_CONNECT_THREADS: ConfigEntry[Int] =
buildConf("celeborn.rpc.connect.threads")
.categories("network")

View File

@ -52,7 +52,7 @@ class NettyRpcEnv(
private[celeborn] val transportConf = Utils.fromCelebornConf(
conf.clone,
TransportModuleConstants.RPC_MODULE,
conf.getInt("celeborn.rpc.io.threads", numUsableCores))
conf.rpcIoThreads.getOrElse(numUsableCores))
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

View File

@ -39,6 +39,7 @@ license: |
| celeborn.rpc.askTimeout | <value of celeborn.network.timeout> | Timeout for RPC ask operations. | 0.2.0 |
| celeborn.rpc.connect.threads | 64 | | 0.2.0 |
| celeborn.rpc.haClient.askTimeout | <value of celeborn.network.timeout> | Timeout for HA client RPC ask operations. | 0.2.0 |
| celeborn.rpc.io.threads | <undefined> | Netty IO thread number of NettyRpcEnv to handle RPC request. The default threads number is the number of runtime available processors. | 0.2.0 |
| celeborn.rpc.lookupTimeout | 30s | Timeout for RPC lookup operations. | 0.2.0 |
| celeborn.shuffle.maxChunksBeingTransferred | 9223372036854775807 | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see `celeborn.shuffle.io.maxRetries` and `celeborn.shuffle.io.retryWait`), if those limits are reached the task will fail with fetch failure. | 0.2.0 |
<!--end-include-->