diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 3f5303260..0fbf570e3 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -45,6 +45,9 @@ There are already some further improvements on the schedule and welcome to conta When you add new RPC message, it's recommended to follow raw PB message case, for example `RegisterWorker` and `RegisterWorkerResponse`. The RPC messages will be unified into raw PB messages eventually. +### Using `repeated` instead of `map` type field of RPC Messages +When adding fields to an RPC Message, use `repeated` instead of `map` type. `TransportMessages` contains static code blocks to initialize many `Descriptor`s and `FieldAccessorTable`s, where the instantiation of `FieldAccessorTable` includes reflection. + ### Using Error Prone Error Prone is a static analysis tool for Java that catches common programming mistakes at compile-time. diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index cf48f95bc..be6b5aa3f 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -56,6 +56,7 @@ import org.apache.celeborn.common.network.client.TransportClientFactory; import org.apache.celeborn.common.network.protocol.PushData; import org.apache.celeborn.common.network.protocol.PushMergedData; import org.apache.celeborn.common.network.protocol.TransportMessage; +import org.apache.celeborn.common.network.protocol.TransportMessagesHelper; import org.apache.celeborn.common.network.sasl.SaslClientBootstrap; import org.apache.celeborn.common.network.sasl.SaslCredentials; import org.apache.celeborn.common.network.server.BaseMessageHandler; @@ -185,6 +186,8 @@ public class ShuffleClientImpl extends ShuffleClient { protected final Map> reduceFileGroupsMap = JavaUtils.newConcurrentHashMap(); + private final TransportMessagesHelper messagesHelper = new TransportMessagesHelper(); + public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier userIdentifier) { super(); this.appUniqueId = appUniqueId; @@ -1984,6 +1987,7 @@ public class ShuffleClientImpl extends ShuffleClient { shuffleIdCache.clear(); pushExcludedWorkers.clear(); fetchExcludedWorkers.clear(); + messagesHelper.close(); logger.warn("Shuffle client has been shutdown!"); } diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index a718a2b8b..285da2296 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -45,6 +45,7 @@ import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier} import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{ApplicationMeta, ShufflePartitionLocationInfo, WorkerInfo} import org.apache.celeborn.common.metrics.source.Role +import org.apache.celeborn.common.network.protocol.TransportMessagesHelper import org.apache.celeborn.common.network.sasl.registration.RegistrationInfo import org.apache.celeborn.common.protocol._ import org.apache.celeborn.common.protocol.RpcNameConstants.WORKER_EP @@ -237,6 +238,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends private val changePartitionManager = new ChangePartitionManager(conf, this) private val releasePartitionManager = new ReleasePartitionManager(conf, this) + private val messagesHelper: TransportMessagesHelper = new TransportMessagesHelper() + // Since method `onStart` is executed when `rpcEnv.setupEndpoint` is executed, and // `masterClient` is initialized after `rpcEnv` is initialized, if method `onStart` contains // a reference to `masterClient`, there may be cases where `masterClient` is null when @@ -287,6 +290,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends workerRpcEnvInUse.awaitTermination() } } + messagesHelper.close() } /** diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessagesHelper.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessagesHelper.java new file mode 100644 index 000000000..8cc8c79f0 --- /dev/null +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessagesHelper.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.network.protocol; + +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; + +import com.google.protobuf.ExtensionRegistry; + +import org.apache.celeborn.common.protocol.TransportMessages; +import org.apache.celeborn.common.util.ThreadUtils; + +public class TransportMessagesHelper { + + private final ScheduledExecutorService transportMessagesRunner; + private final Future runTransportMessagesStaticBlockerTask; + + public TransportMessagesHelper() { + transportMessagesRunner = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("transport-messages-runner"); + runTransportMessagesStaticBlockerTask = + transportMessagesRunner.submit( + () -> + // Pre-run TransportMessages static code blocks to improve performance of protobuf + // serialization. + TransportMessages.registerAllExtensions(ExtensionRegistry.newInstance())); + } + + public void close() { + runTransportMessagesStaticBlockerTask.cancel(true); + ThreadUtils.shutdown(transportMessagesRunner); + } +} diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 6b5152409..1e52da3e2 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -42,7 +42,7 @@ import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerStatus} import org.apache.celeborn.common.metrics.MetricsSystem import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, Role, SystemMiscSource, ThreadPoolSource} import org.apache.celeborn.common.network.CelebornRackResolver -import org.apache.celeborn.common.network.protocol.TransportMessage +import org.apache.celeborn.common.network.protocol.{TransportMessage, TransportMessagesHelper} import org.apache.celeborn.common.protocol._ import org.apache.celeborn.common.protocol.message.ControlMessages._ import org.apache.celeborn.common.protocol.message.StatusCode @@ -309,6 +309,8 @@ private[celeborn] class Master( : util.concurrent.ConcurrentHashMap[String, util.Set[WorkerInfo]] = JavaUtils.newConcurrentHashMap[String, util.Set[WorkerInfo]]() + private val messagesHelper: TransportMessagesHelper = new TransportMessagesHelper() + // start threads to check timeout for workers and applications override def onStart(): Unit = { if (!threadsStarted.compareAndSet(false, true)) { @@ -363,6 +365,7 @@ private[celeborn] class Master( if (authEnabled) { sendApplicationMetaExecutor.shutdownNow() } + messagesHelper.close() logInfo("Celeborn Master is stopped.") } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 5a229fa54..077adf727 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -38,6 +38,7 @@ import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerPartitionLoc import org.apache.celeborn.common.metrics.MetricsSystem import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, Role, SystemMiscSource, ThreadPoolSource} import org.apache.celeborn.common.network.{CelebornRackResolver, TransportContext} +import org.apache.celeborn.common.network.protocol.TransportMessagesHelper import org.apache.celeborn.common.network.sasl.SaslServerBootstrap import org.apache.celeborn.common.network.server.TransportServerBootstrap import org.apache.celeborn.common.network.util.TransportConf @@ -363,6 +364,8 @@ private[celeborn] class Worker( jvmQuake.start() } + private val messagesHelper: TransportMessagesHelper = new TransportMessagesHelper() + workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () => workerInfo.getShuffleKeySet.size } @@ -623,6 +626,7 @@ private[celeborn] class Worker( if (conf.internalPortEnabled) { internalRpcEnvInUse.stop(internalRpcEndpointRef) } + messagesHelper.close() super.stop(exitKind) logInfo("Worker is stopped.")