[CELEBORN-1234] Master should persist the application meta in Ratis and push it to the Workers

### What changes were proposed in this pull request?
This enables Celeborn Master to persist application meta in Ratis and also push it to Celeborn Workers when it receives the requests for slots from the LifecycleManager.

### Why are the changes needed?
This change is required for adding authentication. ([CELEBORN-1011](https://issues.apache.org/jira/browse/CELEBORN-1011)).

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added some UTs.

Closes #2346 from otterc/CELEBORN-1234.

Authored-by: Chandni Singh <singh.chandni@gmail.com>
Signed-off-by: SteNicholas <programgeek@163.com>
This commit is contained in:
Chandni Singh 2024-03-06 17:02:04 +08:00 committed by SteNicholas
parent dbca7536b6
commit 6897b8be99
19 changed files with 272 additions and 16 deletions

View File

@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.protocol.MessageType;
import org.apache.celeborn.common.protocol.PbApplicationMeta;
import org.apache.celeborn.common.protocol.PbAuthenticationInitiationRequest;
import org.apache.celeborn.common.protocol.PbAuthenticationInitiationResponse;
import org.apache.celeborn.common.protocol.PbBacklogAnnouncement;
@ -117,6 +118,8 @@ public class TransportMessage implements Serializable {
return (T) PbRegisterApplicationRequest.parseFrom(payload);
case REGISTER_APPLICATION_RESPONSE_VALUE:
return (T) PbRegisterApplicationResponse.parseFrom(payload);
case APPLICATION_META_VALUE:
return (T) PbApplicationMeta.parseFrom(payload);
default:
logger.error("Unexpected type {}", type);
}

View File

@ -26,7 +26,6 @@ public class SecretRegistryImpl implements SecretRegistry {
@Override
public void register(String appId, String secret) {
// TODO: Persist the secret in ratis. See https://issues.apache.org/jira/browse/CELEBORN-1234
secrets.compute(
appId,
(id, oldVal) -> {

View File

@ -98,6 +98,7 @@ enum MessageType {
REGISTER_APPLICATION_RESPONSE = 75;
WORKER_EVENT_REQUEST = 76;
WORKER_EVENT_RESPONSE = 77;
APPLICATION_META = 78;
}
enum StreamType {
@ -613,6 +614,7 @@ message PbSnapshotMetaInfo {
repeated PbWorkerInfo shutdownWorkers = 13;
repeated PbWorkerInfo manuallyExcludedWorkers = 14;
map<string, PbWorkerEventInfo> workerEventInfos = 15;
map<string, PbApplicationMeta> applicationMetas = 16;
}
message PbOpenStream {
@ -724,3 +726,9 @@ message PbRegisterApplicationRequest {
message PbRegisterApplicationResponse {
bool status = 1;
}
message PbApplicationMeta {
string appId = 1;
string secret = 2;
}

View File

@ -1139,9 +1139,11 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
throw new IllegalArgumentException(
s"${AUTH_ENABLED.key} is true, but ${INTERNAL_PORT_ENABLED.key} is false")
}
return authEnabled && internalPortEnabled
authEnabled && internalPortEnabled
}
def masterSendApplicationMetaThreads: Int = get(MASTER_SEND_APPLICATION_META_THREADS)
// //////////////////////////////////////////////////////
// Internal Port //
// //////////////////////////////////////////////////////
@ -4618,4 +4620,12 @@ object CelebornConf extends Logging {
.version("0.5.0")
.intConf
.createWithDefault(0)
val MASTER_SEND_APPLICATION_META_THREADS: ConfigEntry[Int] =
buildConf("celeborn.master.send.applicationMeta.threads")
.categories("master")
.doc("Number of threads used by the Master to send ApplicationMeta to Workers.")
.version("0.5.0")
.intConf
.createWithDefault(8)
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.common.meta
/**
* Application meta
*/
case class ApplicationMeta(appId: String, secret: String)

View File

@ -26,11 +26,12 @@ import scala.collection.JavaConverters._
import com.google.protobuf.InvalidProtocolBufferException
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta.{AppDiskUsage, AppDiskUsageSnapShot, DiskFileInfo, DiskInfo, FileInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.meta.{AppDiskUsage, AppDiskUsageSnapShot, ApplicationMeta, DiskFileInfo, DiskInfo, FileInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.PartitionLocation.Mode
import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.util.{CollectionUtils => localCollectionUtils}
object PbSerDeUtils {
@ -417,7 +418,8 @@ object PbSerDeUtils {
currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot,
lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long],
shutdownWorkers: java.util.Set[WorkerInfo],
workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo]): PbSnapshotMetaInfo = {
workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo],
applicationMetas: ConcurrentHashMap[String, ApplicationMeta]): PbSnapshotMetaInfo = {
val builder = PbSnapshotMetaInfo.newBuilder()
.setEstimatedPartitionSize(estimatedPartitionSize)
.addAllRegisteredShuffle(registeredShuffle)
@ -446,9 +448,25 @@ object PbSerDeUtils {
builder.setCurrentAppDiskUsageMetricsSnapshot(
toPbAppDiskUsageSnapshot(currentAppDiskUsageMetricsSnapshot))
}
val pbApplicationMetas = applicationMetas.asScala.map {
case (appId, applicationMeta) => (appId, toPbApplicationMeta(applicationMeta))
}.asJava
if (localCollectionUtils.isNotEmpty(pbApplicationMetas)) {
builder.putAllApplicationMetas(pbApplicationMetas)
}
builder.build()
}
def toPbApplicationMeta(meta: ApplicationMeta): PbApplicationMeta = {
PbApplicationMeta.newBuilder()
.setAppId(meta.appId)
.setSecret(meta.secret).build()
}
def fromPbApplicationMeta(pbApplicationMeta: PbApplicationMeta): ApplicationMeta = {
new ApplicationMeta(pbApplicationMeta.getAppId, pbApplicationMeta.getSecret)
}
def toPbWorkerStatus(workerStatus: WorkerStatus): PbWorkerStatus = {
PbWorkerStatus.newBuilder()
.setState(workerStatus.getState)

View File

@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta.{DeviceInfo, DiskFileInfo, DiskInfo, FileInfo, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, FileInfo, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.protocol.{PartitionLocation, StorageInfo}
import org.apache.celeborn.common.protocol.PartitionLocation
import org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
@ -282,4 +282,12 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
assert(restoredWorkerStatus.equals(workerStatus))
}
test("fromAndToPbApplicationMeta") {
val applicationMeta = new ApplicationMeta("app1", "secret1")
val pbApplicationMeta = PbSerDeUtils.toPbApplicationMeta(applicationMeta)
val restoredApplicationMeta = PbSerDeUtils.fromPbApplicationMeta(pbApplicationMeta)
assert(restoredApplicationMeta.equals(applicationMeta))
}
}

View File

@ -45,6 +45,7 @@ license: |
| celeborn.master.internal.port | 8097 | false | Internal port on the master where both workers and other master nodes connect. | 0.5.0 | |
| celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 | |
| celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for refreshing the node rack information periodically. | 0.5.0 | |
| celeborn.master.send.applicationMeta.threads | 8 | false | Number of threads used by the Master to send ApplicationMeta to Workers. | 0.5.0 | |
| celeborn.master.slot.assign.extraSlots | 2 | false | Extra slots number when master assign slots. | 0.3.0 | celeborn.slots.assign.extraSlots |
| celeborn.master.slot.assign.loadAware.diskGroupGradient | 0.1 | false | This value means how many more workload will be placed into a faster disk group than a slower group. | 0.3.0 | celeborn.slots.assign.loadAware.diskGroupGradient |
| celeborn.master.slot.assign.loadAware.fetchTimeWeight | 1.0 | false | Weight of average fetch time when calculating ordering in load-aware assignment strategy | 0.3.0 | celeborn.slots.assign.loadAware.fetchTimeWeight |

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.master;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.meta.ApplicationMeta;
import org.apache.celeborn.common.network.sasl.SecretRegistry;
import org.apache.celeborn.common.network.sasl.SecretRegistryImpl;
import org.apache.celeborn.service.deploy.master.clustermeta.IMetadataHandler;
/**
* A simple implementation of {@link SecretRegistry} that stores secrets in memory and Ratis. This
* persists an application secret in Ratis but the deletion of that secret happens when
* ApplicationLost is triggered.
*/
public class MasterSecretRegistryImpl extends SecretRegistryImpl {
private static final Logger LOG = LoggerFactory.getLogger(MasterSecretRegistryImpl.class);
private IMetadataHandler metadataHandler;
@Override
public void register(String appId, String secret) {
super.register(appId, secret);
if (metadataHandler != null) {
LOG.info("Persisting metadata for appId: {}", appId);
metadataHandler.handleApplicationMeta(new ApplicationMeta(appId, secret));
}
}
void setMetadataHandler(IMetadataHandler metadataHandler) {
this.metadataHandler = metadataHandler;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.AppDiskUsageMetric;
import org.apache.celeborn.common.meta.AppDiskUsageSnapShot;
import org.apache.celeborn.common.meta.ApplicationMeta;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.DiskStatus;
import org.apache.celeborn.common.meta.WorkerEventInfo;
@ -84,6 +85,9 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
public final LongAdder partitionTotalFileCount = new LongAdder();
public AppDiskUsageMetric appDiskUsageMetric = null;
public final ConcurrentHashMap<String, ApplicationMeta> applicationMetas =
JavaUtils.newConcurrentHashMap();
public void updateRequestSlotsMeta(
String shuffleKey, String hostName, Map<String, Map<String, Integer>> workerWithAllocations) {
registeredShuffle.add(shuffleKey);
@ -117,6 +121,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
public void updateAppLostMeta(String appId) {
registeredShuffle.removeIf(shuffleKey -> shuffleKey.startsWith(appId));
appHeartbeatTime.remove(appId);
applicationMetas.remove(appId);
}
public void updateWorkerExcludeMeta(
@ -269,7 +274,8 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
appDiskUsageMetric.currentSnapShot().get(),
lostWorkers,
shutdownWorkers,
workerEventInfos)
workerEventInfos,
applicationMetas)
.toByteArray();
Files.write(file.toPath(), snapshotBytes);
}
@ -357,6 +363,11 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
new AtomicReference<AppDiskUsageSnapShot>(
PbSerDeUtils.fromPbAppDiskUsageSnapshot(
snapshotMetaInfo.getCurrentAppDiskUsageMetricsSnapshot())));
snapshotMetaInfo
.getApplicationMetasMap()
.forEach(
(key, value) -> applicationMetas.put(key, PbSerDeUtils.fromPbApplicationMeta(value)));
} catch (Exception e) {
throw new IOException(e);
}
@ -384,6 +395,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
partitionTotalWritten.reset();
partitionTotalFileCount.reset();
workerEventInfos.clear();
applicationMetas.clear();
}
public void updateMetaByReportWorkerUnavailable(List<WorkerInfo> failedWorkers) {
@ -442,4 +454,8 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
&& (!workerEventInfos.containsKey(workerInfo)
&& workerInfo.getWorkerStatus().getState() == PbWorkerStatus.State.Normal);
}
public void updateApplicationMeta(ApplicationMeta applicationMeta) {
applicationMetas.putIfAbsent(applicationMeta.appId(), applicationMeta);
}
}

View File

@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.ApplicationMeta;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.meta.WorkerStatus;
@ -82,4 +83,6 @@ public interface IMetadataHandler {
int workerEventTypeValue, List<WorkerInfo> workerInfoList, String requestId);
void handleUpdatePartitionSize();
void handleApplicationMeta(ApplicationMeta applicationMeta);
}

View File

@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.AppDiskUsageMetric;
import org.apache.celeborn.common.meta.ApplicationMeta;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.meta.WorkerStatus;
@ -163,4 +164,9 @@ public class SingleMasterMetaManager extends AbstractMetaManager {
public void handleUpdatePartitionSize() {
updatePartitionSize();
}
@Override
public void handleApplicationMeta(ApplicationMeta applicationMeta) {
updateApplicationMeta(applicationMeta);
}
}

View File

@ -29,6 +29,7 @@ import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.exception.CelebornRuntimeException;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.AppDiskUsageMetric;
import org.apache.celeborn.common.meta.ApplicationMeta;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.meta.WorkerStatus;
@ -375,4 +376,23 @@ public class HAMasterMetaManager extends AbstractMetaManager {
throw e;
}
}
@Override
public void handleApplicationMeta(ApplicationMeta applicationMeta) {
try {
ratisServer.submitRequest(
ResourceRequest.newBuilder()
.setCmdType(Type.ApplicationMeta)
.setRequestId(MasterClient.genRequestId())
.setApplicationMetaRequest(
ResourceProtos.ApplicationMetaRequest.newBuilder()
.setAppId(applicationMeta.appId())
.setSecret(applicationMeta.secret())
.build())
.build());
} catch (CelebornRuntimeException e) {
LOG.error("Handle app meta for {} failed!", applicationMeta.appId(), e);
throw e;
}
}
}

View File

@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.ApplicationMeta;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.meta.WorkerStatus;
@ -267,6 +268,12 @@ public class MetaHandler {
metaSystem.updateWorkerEventMeta(
request.getWorkerEventRequest().getWorkerEventType().getNumber(), workerInfoList);
case ApplicationMeta:
appId = request.getApplicationMetaRequest().getAppId();
String secret = request.getApplicationMetaRequest().getSecret();
metaSystem.updateApplicationMeta(new ApplicationMeta(appId, secret));
break;
default:
throw new IOException("Can not parse this command!" + request);
}

View File

@ -38,6 +38,7 @@ enum Type {
RemoveWorkersUnavailableInfo = 23;
WorkerExclude = 24;
WorkerEvent = 25;
ApplicationMeta = 26;
}
enum WorkerEventType {
@ -69,6 +70,7 @@ message ResourceRequest {
optional RemoveWorkersUnavailableInfoRequest removeWorkersUnavailableInfoRequest = 20;
optional WorkerExcludeRequest workerExcludeRequest = 21;
optional WorkerEventRequest workerEventRequest = 22;
optional ApplicationMetaRequest applicationMetaRequest = 23;
}
message DiskInfo {
@ -222,3 +224,8 @@ message ResourceResponse {
required Status status = 4;
}
message ApplicationMetaRequest {
required string appId = 1;
optional string secret = 2;
}

View File

@ -20,7 +20,7 @@ package org.apache.celeborn.service.deploy.master
import java.io.IOException
import java.net.BindException
import java.util
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.ToLongFunction
@ -38,7 +38,7 @@ import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, SystemMiscSource, ThreadPoolSource}
import org.apache.celeborn.common.network.sasl.SecretRegistryImpl
import org.apache.celeborn.common.network.protocol.TransportMessage
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.message.{ControlMessages, StatusCode}
import org.apache.celeborn.common.protocol.message.ControlMessages._
@ -76,7 +76,7 @@ private[celeborn] class Master(
metricsSystem.registerSource(new SystemMiscSource(conf, MetricsSystem.ROLE_MASTER))
private val authEnabled = conf.authEnabled
private val secretRegistry = new SecretRegistryImpl()
private val secretRegistry = new MasterSecretRegistryImpl()
override val rpcEnv: RpcEnv =
if (!authEnabled) {
@ -144,6 +144,7 @@ private[celeborn] class Master(
} else {
new SingleMasterMetaManager(internalRpcEnvInUse, conf, rackResolver)
}
secretRegistry.setMetadataHandler(statusSystem)
// Threads
private val forwardMessageThread =
@ -253,11 +254,24 @@ private[celeborn] class Master(
internalRpcEndpoint)
}
private val sendApplicationMetaThreads = conf.masterSendApplicationMetaThreads
// Send ApplicationMeta to workers
private var sendApplicationMetaExecutor: ExecutorService = _
// Maintains the mapping for the workers assigned to each application
private val workersAssignedToApp
: util.concurrent.ConcurrentHashMap[String, util.Set[WorkerInfo]] =
JavaUtils.newConcurrentHashMap[String, util.Set[WorkerInfo]]()
// start threads to check timeout for workers and applications
override def onStart(): Unit = {
if (!threadsStarted.compareAndSet(false, true)) {
return
}
if (authEnabled) {
sendApplicationMetaExecutor = ThreadUtils.newDaemonFixedThreadPool(
sendApplicationMetaThreads,
"send-application-meta")
}
checkForWorkerTimeOutTask = forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
@ -321,6 +335,9 @@ private[celeborn] class Master(
}
forwardMessageThread.shutdownNow()
rackResolver.stop()
if (authEnabled) {
sendApplicationMetaExecutor.shutdownNow()
}
logInfo("Celeborn Master is stopped.")
}
@ -850,9 +867,46 @@ private[celeborn] class Master(
logInfo(s"Offered extra $offerSlotsExtraSize slots for $shuffleKey")
}
if (authEnabled) {
pushApplicationMetaToWorkers(requestSlots, slots)
}
context.reply(RequestSlotsResponse(StatusCode.SUCCESS, slots.asInstanceOf[WorkerResource]))
}
def pushApplicationMetaToWorkers(
requestSlots: RequestSlots,
slots: util.Map[WorkerInfo, (util.List[PartitionLocation], util.List[PartitionLocation])])
: Unit = {
// Pass application registration information to the workers
val pbApplicationMeta = PbApplicationMeta.newBuilder()
.setAppId(requestSlots.applicationId)
.setSecret(secretRegistry.getSecretKey(requestSlots.applicationId))
.build()
val transportMessage =
new TransportMessage(MessageType.APPLICATION_META, pbApplicationMeta.toByteArray)
val workerSet = workersAssignedToApp.computeIfAbsent(
requestSlots.applicationId,
new util.function.Function[String, util.Set[WorkerInfo]] {
override def apply(key: String): util.Set[WorkerInfo] =
util.Collections.newSetFromMap(new util.concurrent.ConcurrentHashMap[
WorkerInfo,
java.lang.Boolean]())
})
slots.keySet().asScala.foreach { worker =>
// The app meta info is send to a Worker only if it wasn't previously sent.
if (workerSet.add(worker)) {
sendApplicationMetaExecutor.submit(new Runnable {
override def run(): Unit = {
logDebug(s"Sending app registration info to ${worker.host}:${worker.internalPort}")
internalRpcEnvInUse.setupEndpointRef(
RpcAddress.apply(worker.host, worker.internalPort),
RpcNameConstants.WORKER_INTERNAL_EP).send(transportMessage)
}
})
}
}
}
def handleUnregisterShuffle(
context: RpcCallContext,
applicationId: String,
@ -877,6 +931,7 @@ private[celeborn] class Master(
def handleApplicationLost(context: RpcCallContext, appId: String, requestId: String): Unit = {
nonEagerHandler.submit(new Runnable {
override def run(): Unit = {
workersAssignedToApp.remove(appId)
statusSystem.handleAppLost(appId, requestId)
logInfo(s"Removed application $appId")
if (hasHDFSStorage) {

View File

@ -33,6 +33,7 @@ import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.client.MasterClient;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.ApplicationMeta;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.meta.WorkerStatus;
@ -447,13 +448,15 @@ public class DefaultMetaSystemSuiteJ {
workersToAllocate.put(workerInfo1.toUniqueId(), allocation);
workersToAllocate.put(workerInfo2.toUniqueId(), allocation);
statusSystem.handleApplicationMeta(new ApplicationMeta(APPID1, "testSecret"));
statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, getNewReqeustId());
assertEquals(1, statusSystem.registeredShuffle.size());
assertEquals(1, statusSystem.applicationMetas.size());
statusSystem.handleAppLost(APPID1, getNewReqeustId());
assertTrue(statusSystem.registeredShuffle.isEmpty());
assertTrue(statusSystem.applicationMetas.isEmpty());
}
@Test
@ -690,4 +693,17 @@ public class DefaultMetaSystemSuiteJ {
public void testHandleUpdatePartitionSize() {
statusSystem.handleUpdatePartitionSize();
}
@Test
public void testHandleApplicationMeta() {
String appSecret = "testSecret";
statusSystem.handleApplicationMeta(new ApplicationMeta(APPID1, appSecret));
assertEquals(appSecret, statusSystem.applicationMetas.get(APPID1).secret());
String appId2 = "app02";
String appSecret2 = "testSecret2";
statusSystem.handleApplicationMeta(new ApplicationMeta(appId2, appSecret2));
assertEquals(appSecret2, statusSystem.applicationMetas.get(appId2).secret());
assertEquals(2, statusSystem.applicationMetas.size());
}
}

View File

@ -18,8 +18,9 @@
package org.apache.celeborn.service.deploy.worker
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.exception.CelebornException
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.network.sasl.SecretRegistry
import org.apache.celeborn.common.protocol.PbApplicationMeta
import org.apache.celeborn.common.rpc._
/**
@ -28,7 +29,8 @@ import org.apache.celeborn.common.rpc._
*/
private[celeborn] class InternalRpcEndpoint(
override val rpcEnv: RpcEnv,
val conf: CelebornConf)
val conf: CelebornConf,
val secretRegistry: SecretRegistry)
extends RpcEndpoint with Logging {
override def onDisconnected(address: RpcAddress): Unit = {
@ -36,8 +38,12 @@ private[celeborn] class InternalRpcEndpoint(
}
override def receive: PartialFunction[Any, Unit] = {
// TODO: [CELEBORN-1234] Handle the application secret from the Master
case _ => throw new CelebornException(self + " not implemented")
case pb: PbApplicationMeta =>
val appId = pb.getAppId
val secret = pb.getSecret
if (!secretRegistry.isRegistered(appId)) {
logInfo(s"Received application meta for $appId from the Master.")
secretRegistry.register(appId, secret)
}
}
}

View File

@ -188,7 +188,7 @@ private[celeborn] class Worker(
private[worker] var internalRpcEndpoint: RpcEndpoint = _
private var internalRpcEndpointRef: RpcEndpointRef = _
if (conf.internalPortEnabled) {
internalRpcEndpoint = new InternalRpcEndpoint(internalRpcEnvInUse, conf)
internalRpcEndpoint = new InternalRpcEndpoint(internalRpcEnvInUse, conf, secretRegistry)
internalRpcEndpointRef = internalRpcEnvInUse.setupEndpoint(
RpcNameConstants.WORKER_INTERNAL_EP,
internalRpcEndpoint)