[CELEBORN-1909] Support pre-run static code blocks of TransportMessages to improve performance of protobuf serialization

### What changes were proposed in this pull request?

Support pre-run static code blocks of `TransportMessages` to improve performance of protobuf serialization.

### Why are the changes needed?

The protobuf message protocol defines many map type fields, which makes it time-consuming to build these message instances. This is because `TransportMessages` contains static code blocks to initialize a large number of `Descriptor`s and `FieldAccessorTable`s, where the instantiation of `FieldAccessorTable` includes reflection. The test result proves that the static code blocks execute in about 70 milliseconds.

Therefore, it's better to pre-run static code blocks of `TransportMessages` to improve performance of protobuf serialization. Meanwhile, it's recommended to use repeated instead of map type field for rpc messages.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

Closes #3149 from SteNicholas/CELEBORN-1909.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
SteNicholas 2025-03-18 11:34:39 +08:00 committed by Shuang
parent d96457909d
commit 38f3bdd375
6 changed files with 67 additions and 1 deletions

View File

@ -45,6 +45,9 @@ There are already some further improvements on the schedule and welcome to conta
When you add new RPC message, it's recommended to follow raw PB message case, for example When you add new RPC message, it's recommended to follow raw PB message case, for example
`RegisterWorker` and `RegisterWorkerResponse`. The RPC messages will be unified into raw PB messages eventually. `RegisterWorker` and `RegisterWorkerResponse`. The RPC messages will be unified into raw PB messages eventually.
### Using `repeated` instead of `map` type field of RPC Messages
When adding fields to an RPC Message, use `repeated` instead of `map` type. `TransportMessages` contains static code blocks to initialize many `Descriptor`s and `FieldAccessorTable`s, where the instantiation of `FieldAccessorTable` includes reflection.
### Using Error Prone ### Using Error Prone
Error Prone is a static analysis tool for Java that catches common programming mistakes at compile-time. Error Prone is a static analysis tool for Java that catches common programming mistakes at compile-time.

View File

@ -56,6 +56,7 @@ import org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.network.protocol.PushData; import org.apache.celeborn.common.network.protocol.PushData;
import org.apache.celeborn.common.network.protocol.PushMergedData; import org.apache.celeborn.common.network.protocol.PushMergedData;
import org.apache.celeborn.common.network.protocol.TransportMessage; import org.apache.celeborn.common.network.protocol.TransportMessage;
import org.apache.celeborn.common.network.protocol.TransportMessagesHelper;
import org.apache.celeborn.common.network.sasl.SaslClientBootstrap; import org.apache.celeborn.common.network.sasl.SaslClientBootstrap;
import org.apache.celeborn.common.network.sasl.SaslCredentials; import org.apache.celeborn.common.network.sasl.SaslCredentials;
import org.apache.celeborn.common.network.server.BaseMessageHandler; import org.apache.celeborn.common.network.server.BaseMessageHandler;
@ -185,6 +186,8 @@ public class ShuffleClientImpl extends ShuffleClient {
protected final Map<Integer, Tuple3<ReduceFileGroups, String, Exception>> reduceFileGroupsMap = protected final Map<Integer, Tuple3<ReduceFileGroups, String, Exception>> reduceFileGroupsMap =
JavaUtils.newConcurrentHashMap(); JavaUtils.newConcurrentHashMap();
private final TransportMessagesHelper messagesHelper = new TransportMessagesHelper();
public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier userIdentifier) { public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier userIdentifier) {
super(); super();
this.appUniqueId = appUniqueId; this.appUniqueId = appUniqueId;
@ -1984,6 +1987,7 @@ public class ShuffleClientImpl extends ShuffleClient {
shuffleIdCache.clear(); shuffleIdCache.clear();
pushExcludedWorkers.clear(); pushExcludedWorkers.clear();
fetchExcludedWorkers.clear(); fetchExcludedWorkers.clear();
messagesHelper.close();
logger.warn("Shuffle client has been shutdown!"); logger.warn("Shuffle client has been shutdown!");
} }

View File

@ -45,6 +45,7 @@ import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ApplicationMeta, ShufflePartitionLocationInfo, WorkerInfo} import org.apache.celeborn.common.meta.{ApplicationMeta, ShufflePartitionLocationInfo, WorkerInfo}
import org.apache.celeborn.common.metrics.source.Role import org.apache.celeborn.common.metrics.source.Role
import org.apache.celeborn.common.network.protocol.TransportMessagesHelper
import org.apache.celeborn.common.network.sasl.registration.RegistrationInfo import org.apache.celeborn.common.network.sasl.registration.RegistrationInfo
import org.apache.celeborn.common.protocol._ import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.RpcNameConstants.WORKER_EP import org.apache.celeborn.common.protocol.RpcNameConstants.WORKER_EP
@ -237,6 +238,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
private val changePartitionManager = new ChangePartitionManager(conf, this) private val changePartitionManager = new ChangePartitionManager(conf, this)
private val releasePartitionManager = new ReleasePartitionManager(conf, this) private val releasePartitionManager = new ReleasePartitionManager(conf, this)
private val messagesHelper: TransportMessagesHelper = new TransportMessagesHelper()
// Since method `onStart` is executed when `rpcEnv.setupEndpoint` is executed, and // Since method `onStart` is executed when `rpcEnv.setupEndpoint` is executed, and
// `masterClient` is initialized after `rpcEnv` is initialized, if method `onStart` contains // `masterClient` is initialized after `rpcEnv` is initialized, if method `onStart` contains
// a reference to `masterClient`, there may be cases where `masterClient` is null when // a reference to `masterClient`, there may be cases where `masterClient` is null when
@ -287,6 +290,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
workerRpcEnvInUse.awaitTermination() workerRpcEnvInUse.awaitTermination()
} }
} }
messagesHelper.close()
} }
/** /**

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.common.network.protocol;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import com.google.protobuf.ExtensionRegistry;
import org.apache.celeborn.common.protocol.TransportMessages;
import org.apache.celeborn.common.util.ThreadUtils;
public class TransportMessagesHelper {
private final ScheduledExecutorService transportMessagesRunner;
private final Future<?> runTransportMessagesStaticBlockerTask;
public TransportMessagesHelper() {
transportMessagesRunner =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("transport-messages-runner");
runTransportMessagesStaticBlockerTask =
transportMessagesRunner.submit(
() ->
// Pre-run TransportMessages static code blocks to improve performance of protobuf
// serialization.
TransportMessages.registerAllExtensions(ExtensionRegistry.newInstance()));
}
public void close() {
runTransportMessagesStaticBlockerTask.cancel(true);
ThreadUtils.shutdown(transportMessagesRunner);
}
}

View File

@ -42,7 +42,7 @@ import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.metrics.MetricsSystem import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, Role, SystemMiscSource, ThreadPoolSource} import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, Role, SystemMiscSource, ThreadPoolSource}
import org.apache.celeborn.common.network.CelebornRackResolver import org.apache.celeborn.common.network.CelebornRackResolver
import org.apache.celeborn.common.network.protocol.TransportMessage import org.apache.celeborn.common.network.protocol.{TransportMessage, TransportMessagesHelper}
import org.apache.celeborn.common.protocol._ import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.message.ControlMessages._ import org.apache.celeborn.common.protocol.message.ControlMessages._
import org.apache.celeborn.common.protocol.message.StatusCode import org.apache.celeborn.common.protocol.message.StatusCode
@ -309,6 +309,8 @@ private[celeborn] class Master(
: util.concurrent.ConcurrentHashMap[String, util.Set[WorkerInfo]] = : util.concurrent.ConcurrentHashMap[String, util.Set[WorkerInfo]] =
JavaUtils.newConcurrentHashMap[String, util.Set[WorkerInfo]]() JavaUtils.newConcurrentHashMap[String, util.Set[WorkerInfo]]()
private val messagesHelper: TransportMessagesHelper = new TransportMessagesHelper()
// start threads to check timeout for workers and applications // start threads to check timeout for workers and applications
override def onStart(): Unit = { override def onStart(): Unit = {
if (!threadsStarted.compareAndSet(false, true)) { if (!threadsStarted.compareAndSet(false, true)) {
@ -363,6 +365,7 @@ private[celeborn] class Master(
if (authEnabled) { if (authEnabled) {
sendApplicationMetaExecutor.shutdownNow() sendApplicationMetaExecutor.shutdownNow()
} }
messagesHelper.close()
logInfo("Celeborn Master is stopped.") logInfo("Celeborn Master is stopped.")
} }

View File

@ -38,6 +38,7 @@ import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerPartitionLoc
import org.apache.celeborn.common.metrics.MetricsSystem import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, Role, SystemMiscSource, ThreadPoolSource} import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, Role, SystemMiscSource, ThreadPoolSource}
import org.apache.celeborn.common.network.{CelebornRackResolver, TransportContext} import org.apache.celeborn.common.network.{CelebornRackResolver, TransportContext}
import org.apache.celeborn.common.network.protocol.TransportMessagesHelper
import org.apache.celeborn.common.network.sasl.SaslServerBootstrap import org.apache.celeborn.common.network.sasl.SaslServerBootstrap
import org.apache.celeborn.common.network.server.TransportServerBootstrap import org.apache.celeborn.common.network.server.TransportServerBootstrap
import org.apache.celeborn.common.network.util.TransportConf import org.apache.celeborn.common.network.util.TransportConf
@ -363,6 +364,8 @@ private[celeborn] class Worker(
jvmQuake.start() jvmQuake.start()
} }
private val messagesHelper: TransportMessagesHelper = new TransportMessagesHelper()
workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () => workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
workerInfo.getShuffleKeySet.size workerInfo.getShuffleKeySet.size
} }
@ -623,6 +626,7 @@ private[celeborn] class Worker(
if (conf.internalPortEnabled) { if (conf.internalPortEnabled) {
internalRpcEnvInUse.stop(internalRpcEndpointRef) internalRpcEnvInUse.stop(internalRpcEndpointRef)
} }
messagesHelper.close()
super.stop(exitKind) super.stop(exitKind)
logInfo("Worker is stopped.") logInfo("Worker is stopped.")