From 6897b8be99ab8827ae87462bbca71fc2aba4dc5c Mon Sep 17 00:00:00 2001 From: Chandni Singh Date: Wed, 6 Mar 2024 17:02:04 +0800 Subject: [PATCH] [CELEBORN-1234] Master should persist the application meta in Ratis and push it to the Workers ### What changes were proposed in this pull request? This enables Celeborn Master to persist application meta in Ratis and also push it to Celeborn Workers when it receives the requests for slots from the LifecycleManager. ### Why are the changes needed? This change is required for adding authentication. ([CELEBORN-1011](https://issues.apache.org/jira/browse/CELEBORN-1011)). ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added some UTs. Closes #2346 from otterc/CELEBORN-1234. Authored-by: Chandni Singh Signed-off-by: SteNicholas --- .../network/protocol/TransportMessage.java | 3 + .../network/sasl/SecretRegistryImpl.java | 1 - common/src/main/proto/TransportMessages.proto | 8 +++ .../apache/celeborn/common/CelebornConf.scala | 12 +++- .../common/meta/ApplicationMeta.scala | 23 +++++++ .../celeborn/common/util/PbSerDeUtils.scala | 22 ++++++- .../common/util/PbSerDeUtilsTest.scala | 10 ++- docs/configuration/master.md | 1 + .../master/MasterSecretRegistryImpl.java | 50 +++++++++++++++ .../clustermeta/AbstractMetaManager.java | 18 +++++- .../master/clustermeta/IMetadataHandler.java | 3 + .../clustermeta/SingleMasterMetaManager.java | 6 ++ .../clustermeta/ha/HAMasterMetaManager.java | 20 ++++++ .../master/clustermeta/ha/MetaHandler.java | 7 +++ master/src/main/proto/Resource.proto | 7 +++ .../service/deploy/master/Master.scala | 61 ++++++++++++++++++- .../clustermeta/DefaultMetaSystemSuiteJ.java | 18 +++++- .../deploy/worker/InternalRpcEndpoint.scala | 16 +++-- .../service/deploy/worker/Worker.scala | 2 +- 19 files changed, 272 insertions(+), 16 deletions(-) create mode 100644 common/src/main/scala/org/apache/celeborn/common/meta/ApplicationMeta.scala create mode 100644 master/src/main/java/org/apache/celeborn/service/deploy/master/MasterSecretRegistryImpl.java diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessage.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessage.java index 4e2978b9d..67c3f329d 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessage.java +++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/TransportMessage.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.exception.CelebornIOException; import org.apache.celeborn.common.protocol.MessageType; +import org.apache.celeborn.common.protocol.PbApplicationMeta; import org.apache.celeborn.common.protocol.PbAuthenticationInitiationRequest; import org.apache.celeborn.common.protocol.PbAuthenticationInitiationResponse; import org.apache.celeborn.common.protocol.PbBacklogAnnouncement; @@ -117,6 +118,8 @@ public class TransportMessage implements Serializable { return (T) PbRegisterApplicationRequest.parseFrom(payload); case REGISTER_APPLICATION_RESPONSE_VALUE: return (T) PbRegisterApplicationResponse.parseFrom(payload); + case APPLICATION_META_VALUE: + return (T) PbApplicationMeta.parseFrom(payload); default: logger.error("Unexpected type {}", type); } diff --git a/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java b/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java index 18342b7b5..d647ade5c 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java +++ b/common/src/main/java/org/apache/celeborn/common/network/sasl/SecretRegistryImpl.java @@ -26,7 +26,6 @@ public class SecretRegistryImpl implements SecretRegistry { @Override public void register(String appId, String secret) { - // TODO: Persist the secret in ratis. See https://issues.apache.org/jira/browse/CELEBORN-1234 secrets.compute( appId, (id, oldVal) -> { diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 9547e08f0..c16077e76 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -98,6 +98,7 @@ enum MessageType { REGISTER_APPLICATION_RESPONSE = 75; WORKER_EVENT_REQUEST = 76; WORKER_EVENT_RESPONSE = 77; + APPLICATION_META = 78; } enum StreamType { @@ -613,6 +614,7 @@ message PbSnapshotMetaInfo { repeated PbWorkerInfo shutdownWorkers = 13; repeated PbWorkerInfo manuallyExcludedWorkers = 14; map workerEventInfos = 15; + map applicationMetas = 16; } message PbOpenStream { @@ -724,3 +726,9 @@ message PbRegisterApplicationRequest { message PbRegisterApplicationResponse { bool status = 1; } + +message PbApplicationMeta { + string appId = 1; + string secret = 2; +} + diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 475ac0cd2..d402173fd 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1139,9 +1139,11 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se throw new IllegalArgumentException( s"${AUTH_ENABLED.key} is true, but ${INTERNAL_PORT_ENABLED.key} is false") } - return authEnabled && internalPortEnabled + authEnabled && internalPortEnabled } + def masterSendApplicationMetaThreads: Int = get(MASTER_SEND_APPLICATION_META_THREADS) + // ////////////////////////////////////////////////////// // Internal Port // // ////////////////////////////////////////////////////// @@ -4618,4 +4620,12 @@ object CelebornConf extends Logging { .version("0.5.0") .intConf .createWithDefault(0) + + val MASTER_SEND_APPLICATION_META_THREADS: ConfigEntry[Int] = + buildConf("celeborn.master.send.applicationMeta.threads") + .categories("master") + .doc("Number of threads used by the Master to send ApplicationMeta to Workers.") + .version("0.5.0") + .intConf + .createWithDefault(8) } diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationMeta.scala b/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationMeta.scala new file mode 100644 index 000000000..6911d5bb5 --- /dev/null +++ b/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationMeta.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.meta + +/** + * Application meta + */ +case class ApplicationMeta(appId: String, secret: String) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala index 533238095..87d18f28a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala @@ -26,11 +26,12 @@ import scala.collection.JavaConverters._ import com.google.protobuf.InvalidProtocolBufferException import org.apache.celeborn.common.identity.UserIdentifier -import org.apache.celeborn.common.meta.{AppDiskUsage, AppDiskUsageSnapShot, DiskFileInfo, DiskInfo, FileInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus} +import org.apache.celeborn.common.meta.{AppDiskUsage, AppDiskUsageSnapShot, ApplicationMeta, DiskFileInfo, DiskInfo, FileInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus} import org.apache.celeborn.common.protocol._ import org.apache.celeborn.common.protocol.PartitionLocation.Mode import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource import org.apache.celeborn.common.quota.ResourceConsumption +import org.apache.celeborn.common.util.{CollectionUtils => localCollectionUtils} object PbSerDeUtils { @@ -417,7 +418,8 @@ object PbSerDeUtils { currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot, lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long], shutdownWorkers: java.util.Set[WorkerInfo], - workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo]): PbSnapshotMetaInfo = { + workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo], + applicationMetas: ConcurrentHashMap[String, ApplicationMeta]): PbSnapshotMetaInfo = { val builder = PbSnapshotMetaInfo.newBuilder() .setEstimatedPartitionSize(estimatedPartitionSize) .addAllRegisteredShuffle(registeredShuffle) @@ -446,9 +448,25 @@ object PbSerDeUtils { builder.setCurrentAppDiskUsageMetricsSnapshot( toPbAppDiskUsageSnapshot(currentAppDiskUsageMetricsSnapshot)) } + val pbApplicationMetas = applicationMetas.asScala.map { + case (appId, applicationMeta) => (appId, toPbApplicationMeta(applicationMeta)) + }.asJava + if (localCollectionUtils.isNotEmpty(pbApplicationMetas)) { + builder.putAllApplicationMetas(pbApplicationMetas) + } builder.build() } + def toPbApplicationMeta(meta: ApplicationMeta): PbApplicationMeta = { + PbApplicationMeta.newBuilder() + .setAppId(meta.appId) + .setSecret(meta.secret).build() + } + + def fromPbApplicationMeta(pbApplicationMeta: PbApplicationMeta): ApplicationMeta = { + new ApplicationMeta(pbApplicationMeta.getAppId, pbApplicationMeta.getSecret) + } + def toPbWorkerStatus(workerStatus: WorkerStatus): PbWorkerStatus = { PbWorkerStatus.newBuilder() .setState(workerStatus.getState) diff --git a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala index e5d18e071..0d8063d63 100644 --- a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala +++ b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.celeborn.CelebornFunSuite import org.apache.celeborn.common.identity.UserIdentifier -import org.apache.celeborn.common.meta.{DeviceInfo, DiskFileInfo, DiskInfo, FileInfo, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus} +import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, FileInfo, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus} import org.apache.celeborn.common.protocol.{PartitionLocation, StorageInfo} import org.apache.celeborn.common.protocol.PartitionLocation import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource @@ -282,4 +282,12 @@ class PbSerDeUtilsTest extends CelebornFunSuite { assert(restoredWorkerStatus.equals(workerStatus)) } + + test("fromAndToPbApplicationMeta") { + val applicationMeta = new ApplicationMeta("app1", "secret1") + val pbApplicationMeta = PbSerDeUtils.toPbApplicationMeta(applicationMeta) + val restoredApplicationMeta = PbSerDeUtils.fromPbApplicationMeta(pbApplicationMeta) + + assert(restoredApplicationMeta.equals(applicationMeta)) + } } diff --git a/docs/configuration/master.md b/docs/configuration/master.md index f0f293885..e1efa174d 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -45,6 +45,7 @@ license: | | celeborn.master.internal.port | 8097 | false | Internal port on the master where both workers and other master nodes connect. | 0.5.0 | | | celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 | | | celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for refreshing the node rack information periodically. | 0.5.0 | | +| celeborn.master.send.applicationMeta.threads | 8 | false | Number of threads used by the Master to send ApplicationMeta to Workers. | 0.5.0 | | | celeborn.master.slot.assign.extraSlots | 2 | false | Extra slots number when master assign slots. | 0.3.0 | celeborn.slots.assign.extraSlots | | celeborn.master.slot.assign.loadAware.diskGroupGradient | 0.1 | false | This value means how many more workload will be placed into a faster disk group than a slower group. | 0.3.0 | celeborn.slots.assign.loadAware.diskGroupGradient | | celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | false | Weight of average fetch time when calculating ordering in load-aware assignment strategy | 0.3.0 | celeborn.slots.assign.loadAware.fetchTimeWeight | diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/MasterSecretRegistryImpl.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/MasterSecretRegistryImpl.java new file mode 100644 index 000000000..d78cf6628 --- /dev/null +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/MasterSecretRegistryImpl.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.master; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.celeborn.common.meta.ApplicationMeta; +import org.apache.celeborn.common.network.sasl.SecretRegistry; +import org.apache.celeborn.common.network.sasl.SecretRegistryImpl; +import org.apache.celeborn.service.deploy.master.clustermeta.IMetadataHandler; + +/** + * A simple implementation of {@link SecretRegistry} that stores secrets in memory and Ratis. This + * persists an application secret in Ratis but the deletion of that secret happens when + * ApplicationLost is triggered. + */ +public class MasterSecretRegistryImpl extends SecretRegistryImpl { + + private static final Logger LOG = LoggerFactory.getLogger(MasterSecretRegistryImpl.class); + private IMetadataHandler metadataHandler; + + @Override + public void register(String appId, String secret) { + super.register(appId, secret); + if (metadataHandler != null) { + LOG.info("Persisting metadata for appId: {}", appId); + metadataHandler.handleApplicationMeta(new ApplicationMeta(appId, secret)); + } + } + + void setMetadataHandler(IMetadataHandler metadataHandler) { + this.metadataHandler = metadataHandler; + } +} diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 86b34b258..7957fd1aa 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -43,6 +43,7 @@ import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.meta.AppDiskUsageMetric; import org.apache.celeborn.common.meta.AppDiskUsageSnapShot; +import org.apache.celeborn.common.meta.ApplicationMeta; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.DiskStatus; import org.apache.celeborn.common.meta.WorkerEventInfo; @@ -84,6 +85,9 @@ public abstract class AbstractMetaManager implements IMetadataHandler { public final LongAdder partitionTotalFileCount = new LongAdder(); public AppDiskUsageMetric appDiskUsageMetric = null; + public final ConcurrentHashMap applicationMetas = + JavaUtils.newConcurrentHashMap(); + public void updateRequestSlotsMeta( String shuffleKey, String hostName, Map> workerWithAllocations) { registeredShuffle.add(shuffleKey); @@ -117,6 +121,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { public void updateAppLostMeta(String appId) { registeredShuffle.removeIf(shuffleKey -> shuffleKey.startsWith(appId)); appHeartbeatTime.remove(appId); + applicationMetas.remove(appId); } public void updateWorkerExcludeMeta( @@ -269,7 +274,8 @@ public abstract class AbstractMetaManager implements IMetadataHandler { appDiskUsageMetric.currentSnapShot().get(), lostWorkers, shutdownWorkers, - workerEventInfos) + workerEventInfos, + applicationMetas) .toByteArray(); Files.write(file.toPath(), snapshotBytes); } @@ -357,6 +363,11 @@ public abstract class AbstractMetaManager implements IMetadataHandler { new AtomicReference( PbSerDeUtils.fromPbAppDiskUsageSnapshot( snapshotMetaInfo.getCurrentAppDiskUsageMetricsSnapshot()))); + + snapshotMetaInfo + .getApplicationMetasMap() + .forEach( + (key, value) -> applicationMetas.put(key, PbSerDeUtils.fromPbApplicationMeta(value))); } catch (Exception e) { throw new IOException(e); } @@ -384,6 +395,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { partitionTotalWritten.reset(); partitionTotalFileCount.reset(); workerEventInfos.clear(); + applicationMetas.clear(); } public void updateMetaByReportWorkerUnavailable(List failedWorkers) { @@ -442,4 +454,8 @@ public abstract class AbstractMetaManager implements IMetadataHandler { && (!workerEventInfos.containsKey(workerInfo) && workerInfo.getWorkerStatus().getState() == PbWorkerStatus.State.Normal); } + + public void updateApplicationMeta(ApplicationMeta applicationMeta) { + applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta); + } } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java index fd87768b4..ed9eeba5a 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import org.apache.celeborn.common.identity.UserIdentifier; +import org.apache.celeborn.common.meta.ApplicationMeta; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.meta.WorkerStatus; @@ -82,4 +83,6 @@ public interface IMetadataHandler { int workerEventTypeValue, List workerInfoList, String requestId); void handleUpdatePartitionSize(); + + void handleApplicationMeta(ApplicationMeta applicationMeta); } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java index eff63a634..b4db36e2a 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.meta.AppDiskUsageMetric; +import org.apache.celeborn.common.meta.ApplicationMeta; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.meta.WorkerStatus; @@ -163,4 +164,9 @@ public class SingleMasterMetaManager extends AbstractMetaManager { public void handleUpdatePartitionSize() { updatePartitionSize(); } + + @Override + public void handleApplicationMeta(ApplicationMeta applicationMeta) { + updateApplicationMeta(applicationMeta); + } } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java index 77c31807a..c3b18b3d7 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java @@ -29,6 +29,7 @@ import org.apache.celeborn.common.client.MasterClient; import org.apache.celeborn.common.exception.CelebornRuntimeException; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.meta.AppDiskUsageMetric; +import org.apache.celeborn.common.meta.ApplicationMeta; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.meta.WorkerStatus; @@ -375,4 +376,23 @@ public class HAMasterMetaManager extends AbstractMetaManager { throw e; } } + + @Override + public void handleApplicationMeta(ApplicationMeta applicationMeta) { + try { + ratisServer.submitRequest( + ResourceRequest.newBuilder() + .setCmdType(Type.ApplicationMeta) + .setRequestId(MasterClient.genRequestId()) + .setApplicationMetaRequest( + ResourceProtos.ApplicationMetaRequest.newBuilder() + .setAppId(applicationMeta.appId()) + .setSecret(applicationMeta.secret()) + .build()) + .build()); + } catch (CelebornRuntimeException e) { + LOG.error("Handle app meta for {} failed!", applicationMeta.appId(), e); + throw e; + } + } } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java index 37d7797a0..2cfb6a5bb 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.identity.UserIdentifier; +import org.apache.celeborn.common.meta.ApplicationMeta; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.meta.WorkerStatus; @@ -267,6 +268,12 @@ public class MetaHandler { metaSystem.updateWorkerEventMeta( request.getWorkerEventRequest().getWorkerEventType().getNumber(), workerInfoList); + case ApplicationMeta: + appId = request.getApplicationMetaRequest().getAppId(); + String secret = request.getApplicationMetaRequest().getSecret(); + metaSystem.updateApplicationMeta(new ApplicationMeta(appId, secret)); + break; + default: throw new IOException("Can not parse this command!" + request); } diff --git a/master/src/main/proto/Resource.proto b/master/src/main/proto/Resource.proto index 91a69f536..6c630646c 100644 --- a/master/src/main/proto/Resource.proto +++ b/master/src/main/proto/Resource.proto @@ -38,6 +38,7 @@ enum Type { RemoveWorkersUnavailableInfo = 23; WorkerExclude = 24; WorkerEvent = 25; + ApplicationMeta = 26; } enum WorkerEventType { @@ -69,6 +70,7 @@ message ResourceRequest { optional RemoveWorkersUnavailableInfoRequest removeWorkersUnavailableInfoRequest = 20; optional WorkerExcludeRequest workerExcludeRequest = 21; optional WorkerEventRequest workerEventRequest = 22; + optional ApplicationMetaRequest applicationMetaRequest = 23; } message DiskInfo { @@ -222,3 +224,8 @@ message ResourceResponse { required Status status = 4; } + +message ApplicationMetaRequest { + required string appId = 1; + optional string secret = 2; +} diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 33754a673..7af28474a 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -20,7 +20,7 @@ package org.apache.celeborn.service.deploy.master import java.io.IOException import java.net.BindException import java.util -import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledFuture, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import java.util.function.ToLongFunction @@ -38,7 +38,7 @@ import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerStatus} import org.apache.celeborn.common.metrics.MetricsSystem import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, SystemMiscSource, ThreadPoolSource} -import org.apache.celeborn.common.network.sasl.SecretRegistryImpl +import org.apache.celeborn.common.network.protocol.TransportMessage import org.apache.celeborn.common.protocol._ import org.apache.celeborn.common.protocol.message.{ControlMessages, StatusCode} import org.apache.celeborn.common.protocol.message.ControlMessages._ @@ -76,7 +76,7 @@ private[celeborn] class Master( metricsSystem.registerSource(new SystemMiscSource(conf, MetricsSystem.ROLE_MASTER)) private val authEnabled = conf.authEnabled - private val secretRegistry = new SecretRegistryImpl() + private val secretRegistry = new MasterSecretRegistryImpl() override val rpcEnv: RpcEnv = if (!authEnabled) { @@ -144,6 +144,7 @@ private[celeborn] class Master( } else { new SingleMasterMetaManager(internalRpcEnvInUse, conf, rackResolver) } + secretRegistry.setMetadataHandler(statusSystem) // Threads private val forwardMessageThread = @@ -253,11 +254,24 @@ private[celeborn] class Master( internalRpcEndpoint) } + private val sendApplicationMetaThreads = conf.masterSendApplicationMetaThreads + // Send ApplicationMeta to workers + private var sendApplicationMetaExecutor: ExecutorService = _ + // Maintains the mapping for the workers assigned to each application + private val workersAssignedToApp + : util.concurrent.ConcurrentHashMap[String, util.Set[WorkerInfo]] = + JavaUtils.newConcurrentHashMap[String, util.Set[WorkerInfo]]() + // start threads to check timeout for workers and applications override def onStart(): Unit = { if (!threadsStarted.compareAndSet(false, true)) { return } + if (authEnabled) { + sendApplicationMetaExecutor = ThreadUtils.newDaemonFixedThreadPool( + sendApplicationMetaThreads, + "send-application-meta") + } checkForWorkerTimeOutTask = forwardMessageThread.scheduleWithFixedDelay( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { @@ -321,6 +335,9 @@ private[celeborn] class Master( } forwardMessageThread.shutdownNow() rackResolver.stop() + if (authEnabled) { + sendApplicationMetaExecutor.shutdownNow() + } logInfo("Celeborn Master is stopped.") } @@ -850,9 +867,46 @@ private[celeborn] class Master( logInfo(s"Offered extra $offerSlotsExtraSize slots for $shuffleKey") } + if (authEnabled) { + pushApplicationMetaToWorkers(requestSlots, slots) + } context.reply(RequestSlotsResponse(StatusCode.SUCCESS, slots.asInstanceOf[WorkerResource])) } + def pushApplicationMetaToWorkers( + requestSlots: RequestSlots, + slots: util.Map[WorkerInfo, (util.List[PartitionLocation], util.List[PartitionLocation])]) + : Unit = { + // Pass application registration information to the workers + val pbApplicationMeta = PbApplicationMeta.newBuilder() + .setAppId(requestSlots.applicationId) + .setSecret(secretRegistry.getSecretKey(requestSlots.applicationId)) + .build() + val transportMessage = + new TransportMessage(MessageType.APPLICATION_META, pbApplicationMeta.toByteArray) + val workerSet = workersAssignedToApp.computeIfAbsent( + requestSlots.applicationId, + new util.function.Function[String, util.Set[WorkerInfo]] { + override def apply(key: String): util.Set[WorkerInfo] = + util.Collections.newSetFromMap(new util.concurrent.ConcurrentHashMap[ + WorkerInfo, + java.lang.Boolean]()) + }) + slots.keySet().asScala.foreach { worker => + // The app meta info is send to a Worker only if it wasn't previously sent. + if (workerSet.add(worker)) { + sendApplicationMetaExecutor.submit(new Runnable { + override def run(): Unit = { + logDebug(s"Sending app registration info to ${worker.host}:${worker.internalPort}") + internalRpcEnvInUse.setupEndpointRef( + RpcAddress.apply(worker.host, worker.internalPort), + RpcNameConstants.WORKER_INTERNAL_EP).send(transportMessage) + } + }) + } + } + } + def handleUnregisterShuffle( context: RpcCallContext, applicationId: String, @@ -877,6 +931,7 @@ private[celeborn] class Master( def handleApplicationLost(context: RpcCallContext, appId: String, requestId: String): Unit = { nonEagerHandler.submit(new Runnable { override def run(): Unit = { + workersAssignedToApp.remove(appId) statusSystem.handleAppLost(appId, requestId) logInfo(s"Removed application $appId") if (hasHDFSStorage) { diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java index 25094ea13..45b059da5 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java @@ -33,6 +33,7 @@ import org.junit.Test; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.client.MasterClient; import org.apache.celeborn.common.identity.UserIdentifier; +import org.apache.celeborn.common.meta.ApplicationMeta; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.WorkerInfo; import org.apache.celeborn.common.meta.WorkerStatus; @@ -447,13 +448,15 @@ public class DefaultMetaSystemSuiteJ { workersToAllocate.put(workerInfo1.toUniqueId(), allocation); workersToAllocate.put(workerInfo2.toUniqueId(), allocation); + statusSystem.handleApplicationMeta(new ApplicationMeta(APPID1, "testSecret")); statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, getNewReqeustId()); assertEquals(1, statusSystem.registeredShuffle.size()); - + assertEquals(1, statusSystem.applicationMetas.size()); statusSystem.handleAppLost(APPID1, getNewReqeustId()); assertTrue(statusSystem.registeredShuffle.isEmpty()); + assertTrue(statusSystem.applicationMetas.isEmpty()); } @Test @@ -690,4 +693,17 @@ public class DefaultMetaSystemSuiteJ { public void testHandleUpdatePartitionSize() { statusSystem.handleUpdatePartitionSize(); } + + @Test + public void testHandleApplicationMeta() { + String appSecret = "testSecret"; + statusSystem.handleApplicationMeta(new ApplicationMeta(APPID1, appSecret)); + assertEquals(appSecret, statusSystem.applicationMetas.get(APPID1).secret()); + + String appId2 = "app02"; + String appSecret2 = "testSecret2"; + statusSystem.handleApplicationMeta(new ApplicationMeta(appId2, appSecret2)); + assertEquals(appSecret2, statusSystem.applicationMetas.get(appId2).secret()); + assertEquals(2, statusSystem.applicationMetas.size()); + } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/InternalRpcEndpoint.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/InternalRpcEndpoint.scala index 5f84aed4a..d78aecce5 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/InternalRpcEndpoint.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/InternalRpcEndpoint.scala @@ -18,8 +18,9 @@ package org.apache.celeborn.service.deploy.worker import org.apache.celeborn.common.CelebornConf -import org.apache.celeborn.common.exception.CelebornException import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.common.network.sasl.SecretRegistry +import org.apache.celeborn.common.protocol.PbApplicationMeta import org.apache.celeborn.common.rpc._ /** @@ -28,7 +29,8 @@ import org.apache.celeborn.common.rpc._ */ private[celeborn] class InternalRpcEndpoint( override val rpcEnv: RpcEnv, - val conf: CelebornConf) + val conf: CelebornConf, + val secretRegistry: SecretRegistry) extends RpcEndpoint with Logging { override def onDisconnected(address: RpcAddress): Unit = { @@ -36,8 +38,12 @@ private[celeborn] class InternalRpcEndpoint( } override def receive: PartialFunction[Any, Unit] = { - // TODO: [CELEBORN-1234] Handle the application secret from the Master - case _ => throw new CelebornException(self + " not implemented") + case pb: PbApplicationMeta => + val appId = pb.getAppId + val secret = pb.getSecret + if (!secretRegistry.isRegistered(appId)) { + logInfo(s"Received application meta for $appId from the Master.") + secretRegistry.register(appId, secret) + } } - } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 604a588e6..5743d2788 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -188,7 +188,7 @@ private[celeborn] class Worker( private[worker] var internalRpcEndpoint: RpcEndpoint = _ private var internalRpcEndpointRef: RpcEndpointRef = _ if (conf.internalPortEnabled) { - internalRpcEndpoint = new InternalRpcEndpoint(internalRpcEnvInUse, conf) + internalRpcEndpoint = new InternalRpcEndpoint(internalRpcEnvInUse, conf, secretRegistry) internalRpcEndpointRef = internalRpcEnvInUse.setupEndpoint( RpcNameConstants.WORKER_INTERNAL_EP, internalRpcEndpoint)