diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessage.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessage.java index 4e2978b9d..67c3f329d 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessage.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessage.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.exception.CelebornIOException; import org.apache.celeborn.common.protocol.MessageType; +import org.apache.celeborn.common.protocol.PbApplicationMeta; import org.apache.celeborn.common.protocol.PbAuthenticationInitiationRequest; import org.apache.celeborn.common.protocol.PbAuthenticationInitiationResponse; import org.apache.celeborn.common.protocol.PbBacklogAnnouncement; @@ -117,6 +118,8 @@ public class TransportMessage implements Serializable { return (T) PbRegisterApplicationRequest.parseFrom(payload); case REGISTER_APPLICATION_RESPONSE_VALUE: return (T) PbRegisterApplicationResponse.parseFrom(payload); + case APPLICATION_META_VALUE: + return (T) PbApplicationMeta.parseFrom(payload); default: logger.error("Unexpected type {}", type); } diff --git a/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java b/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java index 18342b7b5..d647ade5c 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java +++ b/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java @@ -26,7 +26,6 @@ public class SecretRegistryImpl implements SecretRegistry { @Override public void register(String appId, String secret) { - // TODO: Persist the secret in ratis. See https://issues.apache.org/jira/browse/CELEBORN-1234 secrets.compute( appId, (id, oldVal) -> { diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 9547e08f0..c16077e76 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -98,6 +98,7 @@ enum MessageType { REGISTER_APPLICATION_RESPONSE = 75; WORKER_EVENT_REQUEST = 76; WORKER_EVENT_RESPONSE = 77; + APPLICATION_META = 78; } enum StreamType { @@ -613,6 +614,7 @@ message PbSnapshotMetaInfo { repeated PbWorkerInfo shutdownWorkers = 13; repeated PbWorkerInfo manuallyExcludedWorkers = 14; map workerEventInfos = 15; + map applicationMetas = 16; } message PbOpenStream { @@ -724,3 +726,9 @@ message PbRegisterApplicationRequest { message PbRegisterApplicationResponse { bool status = 1; } + +message PbApplicationMeta { + string appId = 1; + string secret = 2; +} + diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 475ac0cd2..d402173fd 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1139,9 +1139,11 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se throw new IllegalArgumentException( s"${AUTH_ENABLED.key} is true, but ${INTERNAL_PORT_ENABLED.key} is false") } - return authEnabled && internalPortEnabled + authEnabled && internalPortEnabled } + def masterSendApplicationMetaThreads: Int = get(MASTER_SEND_APPLICATION_META_THREADS) + // ////////////////////////////////////////////////////// // Internal Port // // ////////////////////////////////////////////////////// @@ -4618,4 +4620,12 @@ object CelebornConf extends Logging { .version("0.5.0") .intConf .createWithDefault(0) + + val MASTER_SEND_APPLICATION_META_THREADS: ConfigEntry[Int] = + buildConf("celeborn.master.send.applicationMeta.threads") + .categories("master") + .doc("Number of threads used by the Master to send ApplicationMeta to Workers.") + .version("0.5.0") + .intConf + .createWithDefault(8) } diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationMeta.scala b/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationMeta.scala new file mode 100644 index 000000000..6911d5bb5 --- /dev/null +++ b/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationMeta.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.meta + +/** + * Application meta + */ +case class ApplicationMeta(appId: String, secret: String) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala index 533238095..87d18f28a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala @@ -26,11 +26,12 @@ import scala.collection.JavaConverters._ import com.google.protobuf.InvalidProtocolBufferException import org.apache.celeborn.common.identity.UserIdentifier -import org.apache.celeborn.common.meta.{AppDiskUsage, AppDiskUsageSnapShot, DiskFileInfo, DiskInfo, FileInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus} +import org.apache.celeborn.common.meta.{AppDiskUsage, AppDiskUsageSnapShot, ApplicationMeta, DiskFileInfo, DiskInfo, FileInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus} import org.apache.celeborn.common.protocol._ import org.apache.celeborn.common.protocol.PartitionLocation.Mode import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource import org.apache.celeborn.common.quota.ResourceConsumption +import org.apache.celeborn.common.util.{CollectionUtils => localCollectionUtils} object PbSerDeUtils { @@ -417,7 +418,8 @@ object PbSerDeUtils { currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot, lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long], shutdownWorkers: java.util.Set[WorkerInfo], - workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo]): PbSnapshotMetaInfo = { + workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo], + applicationMetas: ConcurrentHashMap[String, ApplicationMeta]): PbSnapshotMetaInfo = { val builder = PbSnapshotMetaInfo.newBuilder() .setEstimatedPartitionSize(estimatedPartitionSize) .addAllRegisteredShuffle(registeredShuffle) @@ -446,9 +448,25 @@ object PbSerDeUtils { builder.setCurrentAppDiskUsageMetricsSnapshot( toPbAppDiskUsageSnapshot(currentAppDiskUsageMetricsSnapshot)) } + val pbApplicationMetas = applicationMetas.asScala.map { + case (appId, applicationMeta) => (appId, toPbApplicationMeta(applicationMeta)) + }.asJava + if (localCollectionUtils.isNotEmpty(pbApplicationMetas)) { + builder.putAllApplicationMetas(pbApplicationMetas) + } builder.build() } + def toPbApplicationMeta(meta: ApplicationMeta): PbApplicationMeta = { + PbApplicationMeta.newBuilder() + .setAppId(meta.appId) + .setSecret(meta.secret).build() + } + + def fromPbApplicationMeta(pbApplicationMeta: PbApplicationMeta): ApplicationMeta = { + new ApplicationMeta(pbApplicationMeta.getAppId, pbApplicationMeta.getSecret) + } + def toPbWorkerStatus(workerStatus: WorkerStatus): PbWorkerStatus = { PbWorkerStatus.newBuilder() .setState(workerStatus.getState) diff --git a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala index e5d18e071..0d8063d63 100644 --- a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala +++ b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.celeborn.CelebornFunSuite import org.apache.celeborn.common.identity.UserIdentifier -import org.apache.celeborn.common.meta.{DeviceInfo, DiskFileInfo, DiskInfo, FileInfo, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus} +import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, FileInfo, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus} import org.apache.celeborn.common.protocol.{PartitionLocation, StorageInfo} import org.apache.celeborn.common.protocol.PartitionLocation import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource @@ -282,4 +282,12 @@ class PbSerDeUtilsTest extends CelebornFunSuite { assert(restoredWorkerStatus.equals(workerStatus)) } + + test("fromAndToPbApplicationMeta") { + val applicationMeta = new ApplicationMeta("app1", "secret1") + val pbApplicationMeta = PbSerDeUtils.toPbApplicationMeta(applicationMeta) + val restoredApplicationMeta = PbSerDeUtils.fromPbApplicationMeta(pbApplicationMeta) + + assert(restoredApplicationMeta.equals(applicationMeta)) + } } diff --git a/docs/configuration/master.md b/docs/configuration/master.md index f0f293885..e1efa174d 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -45,6 +45,7 @@ license: | | celeborn.master.internal.port | 8097 | false | Internal port on the master where both workers and other master nodes connect. | 0.5.0 | | | celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 | | | celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for refreshing the node rack information periodically. | 0.5.0 | | +| celeborn.master.send.applicationMeta.threads | 8 | false | Number of threads used by the Master to send ApplicationMeta to Workers. | 0.5.0 | | | celeborn.master.slot.assign.extraSlots | 2 | false | Extra slots number when master assign slots. | 0.3.0 | celeborn.slots.assign.extraSlots | | celeborn.master.slot.assign.loadAware.diskGroupGradient | 0.1 | false | This value means how many more workload will be placed into a faster disk group than a slower group. | 0.3.0 | celeborn.slots.assign.loadAware.diskGroupGradient | | celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | false | Weight of average fetch time when calculating ordering in load-aware assignment strategy | 0.3.0 | celeborn.slots.assign.loadAware.fetchTimeWeight | diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/MasterSecretRegistryImpl.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/MasterSecretRegistryImpl.java new file mode 100644 index 000000000..d78cf6628 --- /dev/null +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/MasterSecretRegistryImpl.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.master; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.celeborn.common.meta.ApplicationMeta; +import org.apache.celeborn.common.network.sasl.SecretRegistry; +import org.apache.celeborn.common.network.sasl.SecretRegistryImpl; +import org.apache.celeborn.service.deploy.master.clustermeta.IMetadataHandler; + +/** + * A simple implementation of {@link SecretRegistry} that stores secrets in memory and Ratis. This + * persists an application secret in Ratis but the deletion of that secret happens when + * ApplicationLost is triggered. + */ +public class MasterSecretRegistryImpl extends SecretRegistryImpl { + + private static final Logger LOG = LoggerFactory.getLogger(MasterSecretRegistryImpl.class); + private IMetadataHandler metadataHandler; + + @Override + public void register(String appId, String secret) { + super.register(appId, secret); + if (metadataHandler != null) { + LOG.info("Persisting metadata for appId: {}", appId); + metadataHandler.handleApplicationMeta(new ApplicationMeta(appId, secret)); + } + } + + void setMetadataHandler(IMetadataHandler metadataHandler) { + this.metadataHandler = metadataHandler; + } +} diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 86b34b258..7957fd1aa 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -43,6 +43,7 @@ import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.meta.AppDiskUsageMetric; import org.apache.celeborn.common.meta.AppDiskUsageSnapShot; +import org.apache.celeborn.common.meta.ApplicationMeta; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.DiskStatus; import org.apache.celeborn.common.meta.WorkerEventInfo; @@ -84,6 +85,9 @@ public abstract class AbstractMetaManager implements IMetadataHandler { public final LongAdder partitionTotalFileCount = new LongAdder(); public AppDiskUsageMetric appDiskUsageMetric = null; + public final ConcurrentHashMap applicationMetas = + JavaUtils.newConcurrentHashMap(); + public void updateRequestSlotsMeta( String shuffleKey, String hostName, Map> workerWithAllocations) { registeredShuffle.add(shuffleKey); @@ -117,6 +121,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { public void updateAppLostMeta(String appId) { registeredShuffle.removeIf(shuffleKey -> shuffleKey.startsWith(appId)); appHeartbeatTime.remove(appId); + applicationMetas.remove(appId); } public void updateWorkerExcludeMeta( @@ -269,7 +274,8 @@ public abstract class AbstractMetaManager implements IMetadataHandler { appDiskUsageMetric.currentSnapShot().get(), lostWorkers, shutdownWorkers, - workerEventInfos) + workerEventInfos, + applicationMetas) .toByteArray(); Files.write(file.toPath(), snapshotBytes); } @@ -357,6 +363,11 @@ public abstract class AbstractMetaManager implements IMetadataHandler { new AtomicReference( PbSerDeUtils.fromPbAppDiskUsageSnapshot( snapshotMetaInfo.getCurrentAppDiskUsageMetricsSnapshot()))); + + snapshotMetaInfo + .getApplicationMetasMap() + .forEach( + (key, value) -> applicationMetas.put(key, PbSerDeUtils.fromPbApplicationMeta(value))); } catch (Exception e) { throw new IOException(e); } @@ -384,6 +395,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { partitionTotalWritten.reset(); partitionTotalFileCount.reset(); workerEventInfos.clear(); + applicationMetas.clear(); } public void updateMetaByReportWorkerUnavailable(List failedWorkers) { @@ -442,4 +454,8 @@ public abstract class AbstractMetaManager implements IMetadataHandler { && (!workerEventInfos.containsKey(workerInfo) && workerInfo.getWorkerStatus().getState() == PbWorkerStatus.State.Normal); } + + public void updateApplicationMeta(ApplicationMeta applicationMeta) { + applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta); + } } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java index fd87768b4..ed9eeba5a 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import org.apache.celeborn.common.identity.UserIdentifier; +import org.apache.celeborn.common.meta.ApplicationMeta; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.meta.WorkerStatus; @@ -82,4 +83,6 @@ public interface IMetadataHandler { int workerEventTypeValue, List workerInfoList, String requestId); void handleUpdatePartitionSize(); + + void handleApplicationMeta(ApplicationMeta applicationMeta); } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java index eff63a634..b4db36e2a 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.meta.AppDiskUsageMetric; +import org.apache.celeborn.common.meta.ApplicationMeta; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.meta.WorkerStatus; @@ -163,4 +164,9 @@ public class SingleMasterMetaManager extends AbstractMetaManager { public void handleUpdatePartitionSize() { updatePartitionSize(); } + + @Override + public void handleApplicationMeta(ApplicationMeta applicationMeta) { + updateApplicationMeta(applicationMeta); + } } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java index 77c31807a..c3b18b3d7 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java @@ -29,6 +29,7 @@ import org.apache.celeborn.common.client.MasterClient; import org.apache.celeborn.common.exception.CelebornRuntimeException; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.meta.AppDiskUsageMetric; +import org.apache.celeborn.common.meta.ApplicationMeta; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.meta.WorkerStatus; @@ -375,4 +376,23 @@ public class HAMasterMetaManager extends AbstractMetaManager { throw e; } } + + @Override + public void handleApplicationMeta(ApplicationMeta applicationMeta) { + try { + ratisServer.submitRequest( + ResourceRequest.newBuilder() + .setCmdType(Type.ApplicationMeta) + .setRequestId(MasterClient.genRequestId()) + .setApplicationMetaRequest( + ResourceProtos.ApplicationMetaRequest.newBuilder() + .setAppId(applicationMeta.appId()) + .setSecret(applicationMeta.secret()) + .build()) + .build()); + } catch (CelebornRuntimeException e) { + LOG.error("Handle app meta for {} failed!", applicationMeta.appId(), e); + throw e; + } + } } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java index 37d7797a0..2cfb6a5bb 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.identity.UserIdentifier; +import org.apache.celeborn.common.meta.ApplicationMeta; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.meta.WorkerStatus; @@ -267,6 +268,12 @@ public class MetaHandler { metaSystem.updateWorkerEventMeta( request.getWorkerEventRequest().getWorkerEventType().getNumber(), workerInfoList); + case ApplicationMeta: + appId = request.getApplicationMetaRequest().getAppId(); + String secret = request.getApplicationMetaRequest().getSecret(); + metaSystem.updateApplicationMeta(new ApplicationMeta(appId, secret)); + break; + default: throw new IOException("Can not parse this command!" + request); } diff --git a/master/src/main/proto/Resource.proto b/master/src/main/proto/Resource.proto index 91a69f536..6c630646c 100644 --- a/master/src/main/proto/Resource.proto +++ b/master/src/main/proto/Resource.proto @@ -38,6 +38,7 @@ enum Type { RemoveWorkersUnavailableInfo = 23; WorkerExclude = 24; WorkerEvent = 25; + ApplicationMeta = 26; } enum WorkerEventType { @@ -69,6 +70,7 @@ message ResourceRequest { optional RemoveWorkersUnavailableInfoRequest removeWorkersUnavailableInfoRequest = 20; optional WorkerExcludeRequest workerExcludeRequest = 21; optional WorkerEventRequest workerEventRequest = 22; + optional ApplicationMetaRequest applicationMetaRequest = 23; } message DiskInfo { @@ -222,3 +224,8 @@ message ResourceResponse { required Status status = 4; } + +message ApplicationMetaRequest { + required string appId = 1; + optional string secret = 2; +} diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 33754a673..7af28474a 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -20,7 +20,7 @@ package org.apache.celeborn.service.deploy.master import java.io.IOException import java.net.BindException import java.util -import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledFuture, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import java.util.function.ToLongFunction @@ -38,7 +38,7 @@ import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerStatus} import org.apache.celeborn.common.metrics.MetricsSystem import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, SystemMiscSource, ThreadPoolSource} -import org.apache.celeborn.common.network.sasl.SecretRegistryImpl +import org.apache.celeborn.common.network.protocol.TransportMessage import org.apache.celeborn.common.protocol._ import org.apache.celeborn.common.protocol.message.{ControlMessages, StatusCode} import org.apache.celeborn.common.protocol.message.ControlMessages._ @@ -76,7 +76,7 @@ private[celeborn] class Master( metricsSystem.registerSource(new SystemMiscSource(conf, MetricsSystem.ROLE_MASTER)) private val authEnabled = conf.authEnabled - private val secretRegistry = new SecretRegistryImpl() + private val secretRegistry = new MasterSecretRegistryImpl() override val rpcEnv: RpcEnv = if (!authEnabled) { @@ -144,6 +144,7 @@ private[celeborn] class Master( } else { new SingleMasterMetaManager(internalRpcEnvInUse, conf, rackResolver) } + secretRegistry.setMetadataHandler(statusSystem) // Threads private val forwardMessageThread = @@ -253,11 +254,24 @@ private[celeborn] class Master( internalRpcEndpoint) } + private val sendApplicationMetaThreads = conf.masterSendApplicationMetaThreads + // Send ApplicationMeta to workers + private var sendApplicationMetaExecutor: ExecutorService = _ + // Maintains the mapping for the workers assigned to each application + private val workersAssignedToApp + : util.concurrent.ConcurrentHashMap[String, util.Set[WorkerInfo]] = + JavaUtils.newConcurrentHashMap[String, util.Set[WorkerInfo]]() + // start threads to check timeout for workers and applications override def onStart(): Unit = { if (!threadsStarted.compareAndSet(false, true)) { return } + if (authEnabled) { + sendApplicationMetaExecutor = ThreadUtils.newDaemonFixedThreadPool( + sendApplicationMetaThreads, + "send-application-meta") + } checkForWorkerTimeOutTask = forwardMessageThread.scheduleWithFixedDelay( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { @@ -321,6 +335,9 @@ private[celeborn] class Master( } forwardMessageThread.shutdownNow() rackResolver.stop() + if (authEnabled) { + sendApplicationMetaExecutor.shutdownNow() + } logInfo("Celeborn Master is stopped.") } @@ -850,9 +867,46 @@ private[celeborn] class Master( logInfo(s"Offered extra $offerSlotsExtraSize slots for $shuffleKey") } + if (authEnabled) { + pushApplicationMetaToWorkers(requestSlots, slots) + } context.reply(RequestSlotsResponse(StatusCode.SUCCESS, slots.asInstanceOf[WorkerResource])) } + def pushApplicationMetaToWorkers( + requestSlots: RequestSlots, + slots: util.Map[WorkerInfo, (util.List[PartitionLocation], util.List[PartitionLocation])]) + : Unit = { + // Pass application registration information to the workers + val pbApplicationMeta = PbApplicationMeta.newBuilder() + .setAppId(requestSlots.applicationId) + .setSecret(secretRegistry.getSecretKey(requestSlots.applicationId)) + .build() + val transportMessage = + new TransportMessage(MessageType.APPLICATION_META, pbApplicationMeta.toByteArray) + val workerSet = workersAssignedToApp.computeIfAbsent( + requestSlots.applicationId, + new util.function.Function[String, util.Set[WorkerInfo]] { + override def apply(key: String): util.Set[WorkerInfo] = + util.Collections.newSetFromMap(new util.concurrent.ConcurrentHashMap[ + WorkerInfo, + java.lang.Boolean]()) + }) + slots.keySet().asScala.foreach { worker => + // The app meta info is send to a Worker only if it wasn't previously sent. + if (workerSet.add(worker)) { + sendApplicationMetaExecutor.submit(new Runnable { + override def run(): Unit = { + logDebug(s"Sending app registration info to ${worker.host}:${worker.internalPort}") + internalRpcEnvInUse.setupEndpointRef( + RpcAddress.apply(worker.host, worker.internalPort), + RpcNameConstants.WORKER_INTERNAL_EP).send(transportMessage) + } + }) + } + } + } + def handleUnregisterShuffle( context: RpcCallContext, applicationId: String, @@ -877,6 +931,7 @@ private[celeborn] class Master( def handleApplicationLost(context: RpcCallContext, appId: String, requestId: String): Unit = { nonEagerHandler.submit(new Runnable { override def run(): Unit = { + workersAssignedToApp.remove(appId) statusSystem.handleAppLost(appId, requestId) logInfo(s"Removed application $appId") if (hasHDFSStorage) { diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java index 25094ea13..45b059da5 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java @@ -33,6 +33,7 @@ import org.junit.Test; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.client.MasterClient; import org.apache.celeborn.common.identity.UserIdentifier; +import org.apache.celeborn.common.meta.ApplicationMeta; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.meta.WorkerStatus; @@ -447,13 +448,15 @@ public class DefaultMetaSystemSuiteJ { workersToAllocate.put(workerInfo1.toUniqueId(), allocation); workersToAllocate.put(workerInfo2.toUniqueId(), allocation); + statusSystem.handleApplicationMeta(new ApplicationMeta(APPID1, "testSecret")); statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, getNewReqeustId()); assertEquals(1, statusSystem.registeredShuffle.size()); - + assertEquals(1, statusSystem.applicationMetas.size()); statusSystem.handleAppLost(APPID1, getNewReqeustId()); assertTrue(statusSystem.registeredShuffle.isEmpty()); + assertTrue(statusSystem.applicationMetas.isEmpty()); } @Test @@ -690,4 +693,17 @@ public class DefaultMetaSystemSuiteJ { public void testHandleUpdatePartitionSize() { statusSystem.handleUpdatePartitionSize(); } + + @Test + public void testHandleApplicationMeta() { + String appSecret = "testSecret"; + statusSystem.handleApplicationMeta(new ApplicationMeta(APPID1, appSecret)); + assertEquals(appSecret, statusSystem.applicationMetas.get(APPID1).secret()); + + String appId2 = "app02"; + String appSecret2 = "testSecret2"; + statusSystem.handleApplicationMeta(new ApplicationMeta(appId2, appSecret2)); + assertEquals(appSecret2, statusSystem.applicationMetas.get(appId2).secret()); + assertEquals(2, statusSystem.applicationMetas.size()); + } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/InternalRpcEndpoint.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/InternalRpcEndpoint.scala index 5f84aed4a..d78aecce5 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/InternalRpcEndpoint.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/InternalRpcEndpoint.scala @@ -18,8 +18,9 @@ package org.apache.celeborn.service.deploy.worker import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.exception.CelebornException import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.common.network.sasl.SecretRegistry +import org.apache.celeborn.common.protocol.PbApplicationMeta import org.apache.celeborn.common.rpc._ /** @@ -28,7 +29,8 @@ import org.apache.celeborn.common.rpc._ */ private[celeborn] class InternalRpcEndpoint( override val rpcEnv: RpcEnv, - val conf: CelebornConf) + val conf: CelebornConf, + val secretRegistry: SecretRegistry) extends RpcEndpoint with Logging { override def onDisconnected(address: RpcAddress): Unit = { @@ -36,8 +38,12 @@ private[celeborn] class InternalRpcEndpoint( } override def receive: PartialFunction[Any, Unit] = { - // TODO: [CELEBORN-1234] Handle the application secret from the Master - case _ => throw new CelebornException(self + " not implemented") + case pb: PbApplicationMeta => + val appId = pb.getAppId + val secret = pb.getSecret + if (!secretRegistry.isRegistered(appId)) { + logInfo(s"Received application meta for $appId from the Master.") + secretRegistry.register(appId, secret) + } } - } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 604a588e6..5743d2788 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -188,7 +188,7 @@ private[celeborn] class Worker( private[worker] var internalRpcEndpoint: RpcEndpoint = _ private var internalRpcEndpointRef: RpcEndpointRef = _ if (conf.internalPortEnabled) { - internalRpcEndpoint = new InternalRpcEndpoint(internalRpcEnvInUse, conf) + internalRpcEndpoint = new InternalRpcEndpoint(internalRpcEnvInUse, conf, secretRegistry) internalRpcEndpointRef = internalRpcEnvInUse.setupEndpoint( RpcNameConstants.WORKER_INTERNAL_EP, internalRpcEndpoint)