diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 14dfdafff..57349f130 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -21,7 +21,7 @@ import java.lang.{Byte => JByte} import java.nio.ByteBuffer import java.security.SecureRandom import java.util -import java.util.{function, Collections, List => JList} +import java.util.{function, List => JList} import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicInteger, LongAdder} import java.util.function.{BiConsumer, BiFunction, Consumer} @@ -42,7 +42,7 @@ import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, Shu import org.apache.celeborn.client.listener.WorkerStatusListener import org.apache.celeborn.common.{CelebornConf, CommitMetadata} import org.apache.celeborn.common.CelebornConf.ACTIVE_STORAGE_TYPES -import org.apache.celeborn.common.client.MasterClient +import org.apache.celeborn.common.client.{ApplicationInfoProvider, MasterClient} import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier} import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{ApplicationMeta, ShufflePartitionLocationInfo, WorkerInfo} @@ -248,12 +248,21 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends private val messagesHelper: TransportMessagesHelper = new TransportMessagesHelper() + private def registerApplicationInfo(): Unit = { + Utils.tryLogNonFatalError( + masterClient.send(RegisterApplicationInfo( + appUniqueId, + userIdentifier, + ApplicationInfoProvider.instantiate(conf).provide().asJava))) + } + // Since method `onStart` is executed when `rpcEnv.setupEndpoint` is executed, and // `masterClient` is initialized after `rpcEnv` is initialized, if method `onStart` contains // a reference to `masterClient`, there may be cases where `masterClient` is null when // `masterClient` is called. Therefore, it's necessary to uniformly execute the initialization // method at the end of the construction of the class to perform the initialization operations. private def initialize(): Unit = { + registerApplicationInfo() // noinspection ConvertExpressionToSAM commitManager.start() heartbeater.start() diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 76ec5b457..be8af738a 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -116,6 +116,7 @@ enum MessageType { GET_STAGE_END_RESPONSE = 93; READ_REDUCER_PARTITION_END = 94; READ_REDUCER_PARTITION_END_RESPONSE = 95; + REGISTER_APPLICATION_INFO = 96; } enum StreamType { @@ -753,6 +754,7 @@ message PbSnapshotMetaInfo { map shuffleFallbackCounts = 19; int64 applicationTotalCount = 20; map applicationFallbackCounts = 21; + map applicationInfos = 22; } message PbOpenStream { @@ -904,6 +906,13 @@ message PbApplicationMeta { string secret = 2; } +message PbApplicationInfo { + string appId = 1; + PbUserIdentifier userIdentifier = 2; + map extraInfo = 3; + int64 registrationTime = 4; +} + message PbApplicationMetaRequest { string appId = 1; } @@ -1005,6 +1014,7 @@ enum PbMetaRequestType { ReportWorkerDecommission = 27; BatchUnRegisterShuffle = 28; ReviseLostShuffles = 29; + RegisterApplicationInfo = 30; } message PbMetaRequest { @@ -1028,6 +1038,7 @@ message PbMetaRequest { PbApplicationMeta applicationMetaRequest = 23; PbReportWorkerDecommission reportWorkerDecommissionRequest = 24; PbMetaBatchUnregisterShuffles batchUnregisterShuffleRequest = 25; + PbRegisterApplicationInfo registerApplicationInfoRequest = 26; PbReviseLostShuffles reviseLostShufflesRequest = 102; } @@ -1074,4 +1085,11 @@ message PbMetaRequestResponse { bool success = 2; string message = 3; PbMetaRequestStatus status = 4; -} \ No newline at end of file +} + +message PbRegisterApplicationInfo { + string appId = 1; + PbUserIdentifier userIdentifier = 2; + map extraInfo = 3; + string requestId = 4; +} diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 7379a19df..75c0dae22 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -30,6 +30,7 @@ import scala.util.matching.Regex import io.netty.channel.epoll.Epoll import org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl +import org.apache.celeborn.common.client.{ApplicationInfoProvider, DefaultApplicationInfoProvider} import org.apache.celeborn.common.identity.{DefaultIdentityProvider, HadoopBasedIdentityProvider, IdentityProvider} import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.internal.config._ @@ -901,7 +902,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def metricsCollectCriticalEnabled: Boolean = get(METRICS_COLLECT_CRITICAL_ENABLED) def metricsCapacity: Int = get(METRICS_CAPACITY) def metricsExtraLabels: Map[String, String] = - get(METRICS_EXTRA_LABELS).map(Utils.parseMetricLabels).toMap + get(METRICS_EXTRA_LABELS).map(Utils.parseKeyValuePair).toMap def metricsWorkerAppTopResourceConsumptionCount: Int = get(METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT) def metricsWorkerAppTopResourceConsumptionBytesWrittenThreshold: Long = @@ -954,6 +955,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED) def clientMrMaxPushData: Long = get(CLIENT_MR_PUSH_DATA_MAX) def clientApplicationUUIDSuffixEnabled: Boolean = get(CLIENT_APPLICATION_UUID_SUFFIX_ENABLED) + def clientApplicationInfoProvider: String = get(CLIENT_APPLICATION_INFO_PROVIDER) + def clientApplicationInfoUserSpecific: Map[String, String] = + get(USER_SPECIFIC_APPLICATION_INFO).map(Utils.parseKeyValuePair).toMap def clientShuffleIntegrityCheckEnabled: Boolean = get(CLIENT_SHUFFLE_INTEGRITY_CHECK_ENABLED) @@ -5633,6 +5637,30 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(false) + val CLIENT_APPLICATION_INFO_PROVIDER: ConfigEntry[String] = + buildConf("celeborn.client.application.info.provider") + .categories("client") + .doc(s"ApplicationInfoProvider class name. Default class is " + + s"`${classOf[DefaultApplicationInfoProvider].getName}`. " + + s"Optional values: " + + s"${classOf[DefaultIdentityProvider].getName} user name and tenant id are default values or user-specific values.") + .version("0.6.1") + .stringConf + .createWithDefault(classOf[DefaultApplicationInfoProvider].getName) + + val USER_SPECIFIC_APPLICATION_INFO: ConfigEntry[Seq[String]] = + buildConf("celeborn.client.application.info.user-specific") + .categories("client") + .version("0.6.1") + .doc("User specific information for application registration, pattern is" + + " `=[,=]*`, e.g. `cluster=celeborn`.") + .stringConf + .toSequence + .checkValue( + pairs => pairs.map(_ => Try(Utils.parseKeyValuePair(_))).forall(_.isSuccess), + "Allowed pattern is: `=[,=]*`") + .createWithDefault(Seq.empty) + val TEST_ALTERNATIVE: OptionalConfigEntry[String] = buildConf("celeborn.test.alternative.key") .withAlternative("celeborn.test.alternative.deprecatedKey") @@ -5700,8 +5728,8 @@ object CelebornConf extends Logging { .stringConf .toSequence .checkValue( - labels => labels.map(_ => Try(Utils.parseMetricLabels(_))).forall(_.isSuccess), - "Allowed pattern is: `:[,:]*`") + labels => labels.map(_ => Try(Utils.parseKeyValuePair(_))).forall(_.isSuccess), + "Allowed pattern is: `=[,=]*`") .createWithDefault(Seq.empty) val METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT: ConfigEntry[Int] = diff --git a/common/src/main/scala/org/apache/celeborn/common/client/ApplicationInfoProvider.scala b/common/src/main/scala/org/apache/celeborn/common/client/ApplicationInfoProvider.scala new file mode 100644 index 000000000..d2ccedbfe --- /dev/null +++ b/common/src/main/scala/org/apache/celeborn/common/client/ApplicationInfoProvider.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.client + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.util.Utils +abstract class ApplicationInfoProvider(conf: CelebornConf) { + def provide(): Map[String, String] +} + +object ApplicationInfoProvider { + def instantiate(conf: CelebornConf): ApplicationInfoProvider = { + val className = conf.clientApplicationInfoProvider + Utils.instantiateClassWithCelebornConf[ApplicationInfoProvider](className, conf) + } +} diff --git a/common/src/main/scala/org/apache/celeborn/common/client/DefaultApplicationInfoProvider.scala b/common/src/main/scala/org/apache/celeborn/common/client/DefaultApplicationInfoProvider.scala new file mode 100644 index 000000000..99e31d843 --- /dev/null +++ b/common/src/main/scala/org/apache/celeborn/common/client/DefaultApplicationInfoProvider.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.client + +import org.apache.celeborn.common.CelebornConf + +class DefaultApplicationInfoProvider(conf: CelebornConf) extends ApplicationInfoProvider(conf) { + override def provide(): Map[String, String] = { + conf.clientApplicationInfoUserSpecific + } +} diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationInfo.scala b/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationInfo.scala new file mode 100644 index 000000000..e685c7da4 --- /dev/null +++ b/common/src/main/scala/org/apache/celeborn/common/meta/ApplicationInfo.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.meta + +import java.util.{Map => JMap} + +import org.apache.celeborn.common.identity.UserIdentifier + +/** + * Application info + */ +case class ApplicationInfo( + appId: String, + userIdentifier: UserIdentifier, + extraInfo: JMap[String, String], + registrationTime: Long) diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 7fcb0fd2f..0640e7e47 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -18,8 +18,7 @@ package org.apache.celeborn.common.protocol.message import java.util -import java.util.{Collections, UUID} -import java.util.concurrent.atomic.AtomicIntegerArray +import java.util.{Collections, Map => JMap, UUID} import scala.collection.JavaConverters._ @@ -433,6 +432,13 @@ object ControlMessages extends Logging { case class ApplicationLostResponse(status: StatusCode) extends MasterMessage + case class RegisterApplicationInfo( + applicationId: String, + userIdentifier: UserIdentifier, + extraInfo: JMap[String, String], + override var requestId: String = ZERO_UUID) + extends MasterRequestMessage + case class HeartbeatFromApplication( appId: String, totalWritten: Long, @@ -878,6 +884,19 @@ object ControlMessages extends Logging { .setStatus(status.getValue).build().toByteArray new TransportMessage(MessageType.APPLICATION_LOST_RESPONSE, payload) + case RegisterApplicationInfo( + applicationId, + userIdentifier, + extraInfo, + requestId) => + val payload = PbRegisterApplicationInfo.newBuilder() + .setAppId(applicationId) + .setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier)) + .putAllExtraInfo(extraInfo) + .setRequestId(requestId) + .build().toByteArray + new TransportMessage(MessageType.REGISTER_APPLICATION_INFO, payload) + case HeartbeatFromApplication( appId, totalWritten, @@ -1537,6 +1556,13 @@ object ControlMessages extends Logging { case GET_STAGE_END_RESPONSE_VALUE => PbGetStageEndResponse.parseFrom(message.getPayload) + + case REGISTER_APPLICATION_INFO_VALUE => + val pbRegisterApplicationInfo = PbRegisterApplicationInfo.parseFrom(message.getPayload) + RegisterApplicationInfo( + pbRegisterApplicationInfo.getAppId, + PbSerDeUtils.fromPbUserIdentifier(pbRegisterApplicationInfo.getUserIdentifier), + pbRegisterApplicationInfo.getExtraInfoMap) } } } diff --git a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala index 6b2c9bde7..b74a31fdf 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import com.google.protobuf.InvalidProtocolBufferException import org.apache.celeborn.common.identity.UserIdentifier -import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus} +import org.apache.celeborn.common.meta.{ApplicationInfo, ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus} import org.apache.celeborn.common.meta.MapFileMeta.SegmentIndex import org.apache.celeborn.common.protocol._ import org.apache.celeborn.common.protocol.PartitionLocation.Mode @@ -442,6 +442,7 @@ object PbSerDeUtils { shutdownWorkers: java.util.Set[WorkerInfo], workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo], applicationMetas: ConcurrentHashMap[String, ApplicationMeta], + applicationInfos: ConcurrentHashMap[String, ApplicationInfo], decommissionWorkers: java.util.Set[WorkerInfo]): PbSnapshotMetaInfo = { val builder = PbSnapshotMetaInfo.newBuilder() .setEstimatedPartitionSize(estimatedPartitionSize) @@ -478,6 +479,14 @@ object PbSerDeUtils { if (localCollectionUtils.isNotEmpty(pbApplicationMetas)) { builder.putAllApplicationMetas(pbApplicationMetas) } + + val pbApplicationInfos = applicationInfos.asScala.map { + case (appId, applicationInfo) => (appId, toPbApplicationInfo(applicationInfo)) + }.asJava + if (localCollectionUtils.isNotEmpty(pbApplicationInfos)) { + builder.putAllApplicationInfos(pbApplicationInfos) + } + builder.build() } @@ -491,6 +500,23 @@ object PbSerDeUtils { ApplicationMeta(pbApplicationMeta.getAppId, pbApplicationMeta.getSecret) } + def toPbApplicationInfo(applicationInfo: ApplicationInfo): PbApplicationInfo = { + PbApplicationInfo.newBuilder() + .setAppId(applicationInfo.appId) + .setUserIdentifier(toPbUserIdentifier(applicationInfo.userIdentifier)) + .putAllExtraInfo(applicationInfo.extraInfo) + .setRegistrationTime(applicationInfo.registrationTime) + .build() + } + + def fromPbApplicationInfo(pbApplicationInfo: PbApplicationInfo): ApplicationInfo = { + ApplicationInfo( + pbApplicationInfo.getAppId, + fromPbUserIdentifier(pbApplicationInfo.getUserIdentifier), + pbApplicationInfo.getExtraInfoMap, + pbApplicationInfo.getRegistrationTime) + } + def toPbWorkerStatus(workerStatus: WorkerStatus): PbWorkerStatus = { PbWorkerStatus.newBuilder() .setState(workerStatus.getState) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index aa719cff1..cddcfec40 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -1184,12 +1184,12 @@ object Utils extends Logging { } } - def parseMetricLabels(label: String): (String, String) = { - val labelPart = label.split("=") - if (labelPart.size != 2) { - throw new IllegalArgumentException(s"Illegal metric extra labels: $label") + def parseKeyValuePair(pair: String): (String, String) = { + val parts = pair.split("=") + if (parts.size != 2) { + throw new IllegalArgumentException(s"Illegal kay=value pair: $pair") } - labelPart(0).trim -> labelPart(1).trim + parts(0).trim -> parts(1).trim } def getProcessId: String = ManagementFactory.getRuntimeMXBean.getName.split("@")(0) diff --git a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala index e8252c4d2..fa2ebbfbb 100644 --- a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala +++ b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala @@ -19,6 +19,7 @@ package org.apache.celeborn.common.util import java.io.File import java.util +import java.util.Collections import scala.collection.JavaConverters._ import scala.collection.mutable @@ -380,6 +381,18 @@ class PbSerDeUtilsTest extends CelebornFunSuite { assert(restoredApplicationMeta.equals(applicationMeta)) } + test("fromAndToPbApplicationInfo") { + val applicationInfo = new ApplicationInfo( + "app1", + UserIdentifier("tenant", "user"), + Collections.singletonMap("key", "value"), + System.currentTimeMillis()) + val pbApplicationInfo = PbSerDeUtils.toPbApplicationInfo(applicationInfo) + val restoredApplicationInfo = PbSerDeUtils.fromPbApplicationInfo(pbApplicationInfo) + + assert(restoredApplicationInfo.equals(applicationInfo)) + } + test("testPackedPartitionLocationPairCase1") { partitionLocation3.setPeer(partitionLocation2) val pairPb = PbSerDeUtils.toPbPackedPartitionLocationsPair( diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 8e640d0f3..8fd4ef30a 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -21,6 +21,8 @@ license: | | --- | ------- | --------- | ----------- | ----- | ---------- | | celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled | false | false | If this is true, Celeborn will adaptively split skewed partitions instead of reading them by Spark map range. Please note that this feature requires the `Celeborn-Optimize-Skew-Partitions-spark3_3.patch`. | 0.6.0 | | | celeborn.client.application.heartbeatInterval | 10s | false | Interval for client to send heartbeat message to master. | 0.3.0 | celeborn.application.heartbeatInterval | +| celeborn.client.application.info.provider | org.apache.celeborn.common.client.DefaultApplicationInfoProvider | false | ApplicationInfoProvider class name. Default class is `org.apache.celeborn.common.client.DefaultApplicationInfoProvider`. Optional values: org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. | 0.6.1 | | +| celeborn.client.application.info.user-specific | | false | User specific information for application registration, pattern is `=[,=]*`, e.g. `cluster=celeborn`. | 0.6.1 | | | celeborn.client.application.unregister.enabled | true | false | When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. | 0.3.2 | | | celeborn.client.application.uuidSuffix.enabled | false | false | Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id. Currently, this only applies to Spark and MR. | 0.6.0 | | | celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable chunk prefetch when creating CelebornInputStream. | 0.5.1 | | diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 4ae879743..ae639d2e5 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.identity.UserIdentifier; +import org.apache.celeborn.common.meta.ApplicationInfo; import org.apache.celeborn.common.meta.ApplicationMeta; import org.apache.celeborn.common.meta.DiskInfo; import org.apache.celeborn.common.meta.DiskStatus; @@ -101,9 +102,17 @@ public abstract class AbstractMetaManager implements IMetadataHandler { public final Map shuffleFallbackCounts = JavaUtils.newConcurrentHashMap(); public final Map applicationFallbackCounts = JavaUtils.newConcurrentHashMap(); + public final ConcurrentHashMap applicationInfos = + JavaUtils.newConcurrentHashMap(); public final ConcurrentHashMap applicationMetas = JavaUtils.newConcurrentHashMap(); + public void updateApplicationInfo( + String appId, UserIdentifier userIdentifier, Map extraInfo) { + applicationInfos.putIfAbsent( + appId, new ApplicationInfo(appId, userIdentifier, extraInfo, System.currentTimeMillis())); + } + public void updateRequestSlotsMeta( String shuffleKey, String hostName, Map> workerWithAllocations) { Tuple2 appIdShuffleId = Utils.splitShuffleKey(shuffleKey); @@ -175,6 +184,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { registeredAppAndShuffles.remove(appId); appHeartbeatTime.remove(appId); applicationMetas.remove(appId); + applicationInfos.remove(appId); } @VisibleForTesting @@ -418,6 +428,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { shutdownWorkers, workerEventInfos, applicationMetas, + applicationInfos, decommissionWorkers) .toByteArray(); Files.write(file.toPath(), snapshotBytes); @@ -523,6 +534,11 @@ public abstract class AbstractMetaManager implements IMetadataHandler { .forEach( (key, value) -> applicationMetas.put(key, PbSerDeUtils.fromPbApplicationMeta(value))); + snapshotMetaInfo + .getApplicationInfosMap() + .forEach( + (key, value) -> applicationInfos.put(key, PbSerDeUtils.fromPbApplicationInfo(value))); + availableWorkers.addAll( workersMap.values().stream() .filter(worker -> isWorkerAvailable(worker)) @@ -562,6 +578,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { applicationFallbackCounts.clear(); workerEventInfos.clear(); applicationMetas.clear(); + applicationInfos.clear(); } public void updateMetaByReportWorkerUnavailable(List failedWorkers) { diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java index 34fd74e34..e3e063348 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java @@ -28,6 +28,9 @@ import org.apache.celeborn.common.meta.WorkerStatus; import org.apache.celeborn.common.quota.ResourceConsumption; public interface IMetadataHandler { + void handleRegisterApplicationInfo( + String appId, UserIdentifier userIdentifier, Map extraInfo, String requestId); + void handleRequestSlots( String shuffleKey, String hostName, diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java index 1a0beeffe..c4b2e843f 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java @@ -54,6 +54,15 @@ public class SingleMasterMetaManager extends AbstractMetaManager { this.rackResolver = rackResolver; } + @Override + public void handleRegisterApplicationInfo( + String appId, + UserIdentifier userIdentifier, + Map extraInfo, + String requestId) { + updateApplicationInfo(appId, userIdentifier, extraInfo); + } + @Override public void handleRequestSlots( String shuffleKey, diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java index 72f531d95..3372143aa 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java @@ -70,6 +70,34 @@ public class HAMasterMetaManager extends AbstractMetaManager { this.ratisServer = ratisServer; } + @Override + public void handleRegisterApplicationInfo( + String appId, + UserIdentifier userIdentifier, + Map extraInfo, + String requestId) { + try { + ratisServer.submitRequest( + ResourceRequest.newBuilder() + .setCmdType(Type.RegisterApplicationInfo) + .setRequestId(requestId) + .setRegisterApplicationInfoRequest( + ResourceProtos.RegisterApplicationInfoRequest.newBuilder() + .setAppId(appId) + .setUserIdentifier( + ResourceProtos.UserIdentifier.newBuilder() + .setTenantId(userIdentifier.tenantId()) + .setName(userIdentifier.name()) + .build()) + .putAllExtraInfo(extraInfo) + .build()) + .build()); + } catch (CelebornRuntimeException e) { + LOG.error("Handle app lost for {} failed!", appId, e); + throw e; + } + } + @Override public void handleRequestSlots( String shuffleKey, diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java index 8cbf4de95..fa18ce925 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java @@ -146,6 +146,18 @@ public class MetaHandler { LOG.debug("Handle batch unregister shuffle for {}", shuffleKeys); break; + case RegisterApplicationInfo: + appId = request.getRegisterApplicationInfoRequest().getAppId(); + UserIdentifier userIdentifier = + new UserIdentifier( + request.getRegisterApplicationInfoRequest().getUserIdentifier().getTenantId(), + request.getRegisterApplicationInfoRequest().getUserIdentifier().getName()); + Map extraInfo = + request.getRegisterApplicationInfoRequest().getExtraInfoMap(); + metaSystem.updateApplicationInfo(appId, userIdentifier, extraInfo); + LOG.debug("Handle register application info for {}/{}", appId, userIdentifier); + break; + case AppHeartbeat: appId = request.getAppHeartbeatRequest().getAppId(); long time = request.getAppHeartbeatRequest().getTime(); diff --git a/master/src/main/proto/Resource.proto b/master/src/main/proto/Resource.proto index e3c688aa1..a10a368fa 100644 --- a/master/src/main/proto/Resource.proto +++ b/master/src/main/proto/Resource.proto @@ -44,6 +44,8 @@ enum Type { BatchUnRegisterShuffle = 28; ReviseLostShuffles = 29; + + RegisterApplicationInfo = 30; } enum WorkerEventType { @@ -78,6 +80,7 @@ message ResourceRequest { optional ApplicationMetaRequest applicationMetaRequest = 23; optional ReportWorkerDecommissionRequest reportWorkerDecommissionRequest = 24; optional BatchUnregisterShuffleRequest batchUnregisterShuffleRequest = 25; + optional RegisterApplicationInfoRequest registerApplicationInfoRequest = 26; optional ReviseLostShufflesRequest reviseLostShufflesRequest = 102; } @@ -265,4 +268,10 @@ message ApplicationMetaRequest { message ReviseLostShufflesRequest { string appId = 1 ; repeated int32 lostShuffles = 2 ; +} + +message RegisterApplicationInfoRequest { + string appId = 1; + UserIdentifier userIdentifier = 2; + map extraInfo = 3; } \ No newline at end of file diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index f4caae517..c5e31defb 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy.master import java.io.IOException import java.net.BindException import java.util +import java.util.{Map => JMap} import java.util.Collections import java.util.concurrent.{ExecutorService, ScheduledFuture, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean @@ -429,6 +430,13 @@ private[celeborn] class Master( } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case RegisterApplicationInfo(appId, userIdentifier, extraInfo, requestId) => + logDebug( + s"Received RegisterApplicationInfo request for app $appId/$userIdentifier/$extraInfo.") + checkAuth(context, appId) + executeWithLeaderChecker( + context, + handleRegisterApplicationInfo(context, appId, userIdentifier, extraInfo, requestId)) case HeartbeatFromApplication( appId, totalWritten, @@ -1166,6 +1174,16 @@ private[celeborn] class Master( } } + private def handleRegisterApplicationInfo( + context: RpcCallContext, + appId: String, + userIdentifier: UserIdentifier, + extraInfo: JMap[String, String], + requestId: String): Unit = { + statusSystem.handleRegisterApplicationInfo(appId, userIdentifier, extraInfo, requestId) + context.reply(OneWayMessageResponse) + } + private def handleHeartbeatFromApplication( context: RpcCallContext, appId: String, diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala index f34e79159..ae7c18948 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/ApplicationResource.scala @@ -17,7 +17,7 @@ package org.apache.celeborn.service.deploy.master.http.api.v1 -import javax.ws.rs.{Consumes, DELETE, GET, Path, POST, Produces} +import javax.ws.rs.{Consumes, GET, Path, POST, Produces} import javax.ws.rs.core.MediaType import scala.collection.JavaConverters._ @@ -27,7 +27,7 @@ import io.swagger.v3.oas.annotations.media.{Content, Schema} import io.swagger.v3.oas.annotations.responses.ApiResponse import io.swagger.v3.oas.annotations.tags.Tag -import org.apache.celeborn.rest.v1.model.{ApplicationHeartbeatData, ApplicationsHeartbeatResponse, DeleteAppsRequest, HandleResponse, HostnamesResponse, ReviseLostShufflesRequest} +import org.apache.celeborn.rest.v1.model.{ApplicationHeartbeatData, ApplicationInfo, ApplicationInfoResponse, ApplicationsHeartbeatResponse, DeleteAppsRequest, HandleResponse, HostnamesResponse, ReviseLostShufflesRequest} import org.apache.celeborn.server.common.http.api.ApiRequestContext import org.apache.celeborn.service.deploy.master.Master @@ -54,6 +54,26 @@ class ApplicationResource extends ApiRequestContext { }.toSeq.asJava) } + @Operation(description = "List all running application's info of the cluster.") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.APPLICATION_JSON, + schema = new Schema(implementation = classOf[ApplicationInfoResponse])))) + @GET + @Path("/info") + def applicationsInfo(): ApplicationInfoResponse = { + new ApplicationInfoResponse() + .applications( + statusSystem.applicationInfos.asScala.map { case (appId, appInfo) => + new ApplicationInfo() + .appId(appId) + .userIdentifier(appInfo.userIdentifier.toString) + .extraInfo(appInfo.extraInfo) + .registrationTime(appInfo.registrationTime) + }.toSeq.asJava) + } + @Operation(description = "Delete resource of apps.") @ApiResponse( responseCode = "200", diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java index 04a984cbc..898e7686e 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java @@ -1134,4 +1134,17 @@ public class DefaultMetaSystemSuiteJ { assertEquals(2, statusSystem.applicationFallbackCounts.get(POLICY1).longValue()); assertEquals(1, statusSystem.applicationFallbackCounts.get(POLICY2).longValue()); } + + @Test + public void testRegisterApplicationInfo() { + statusSystem.applicationInfos.clear(); + UserIdentifier userIdentifier = new UserIdentifier("tenant", "celeborn"); + + String appId = "app1"; + Map extraInfo = Collections.singletonMap("k1", "v1"); + statusSystem.handleRegisterApplicationInfo(appId, userIdentifier, extraInfo, getNewReqeustId()); + + assertEquals(statusSystem.applicationInfos.get(appId).userIdentifier(), userIdentifier); + assertEquals(statusSystem.applicationInfos.get(appId).extraInfo(), extraInfo); + } } diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java index 952789a32..3e8d8ab60 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java @@ -1985,4 +1985,19 @@ public class RatisMasterStatusSystemSuiteJ { assertEquals(statusSystem.applicationFallbackCounts.get(POLICY1).longValue(), 2); assertEquals(statusSystem.applicationFallbackCounts.get(POLICY2).longValue(), 1); } + + @Test + public void testRegisterApplicationInfo() { + AbstractMetaManager statusSystem = pickLeaderStatusSystem(); + Assert.assertNotNull(statusSystem); + statusSystem.applicationInfos.clear(); + UserIdentifier userIdentifier = new UserIdentifier("tenant", "celeborn"); + + String appId = "app1"; + Map extraInfo = Collections.singletonMap("k1", "v1"); + statusSystem.handleRegisterApplicationInfo(appId, userIdentifier, extraInfo, getNewReqeustId()); + + assertEquals(statusSystem.applicationInfos.get(appId).userIdentifier(), userIdentifier); + assertEquals(statusSystem.applicationInfos.get(appId).extraInfo(), extraInfo); + } } diff --git a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java index 47ecf36d1..4c8e51d1c 100644 --- a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java +++ b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/master/ApplicationApi.java @@ -25,6 +25,7 @@ import org.apache.celeborn.rest.v1.master.invoker.BaseApi; import org.apache.celeborn.rest.v1.master.invoker.Configuration; import org.apache.celeborn.rest.v1.master.invoker.Pair; +import org.apache.celeborn.rest.v1.model.ApplicationInfoResponse; import org.apache.celeborn.rest.v1.model.ApplicationsHeartbeatResponse; import org.apache.celeborn.rest.v1.model.DeleteAppsRequest; import org.apache.celeborn.rest.v1.model.HandleResponse; @@ -253,6 +254,73 @@ public class ApplicationApi extends BaseApi { ); } + /** + * + * List all running application's info of the cluster. + * @return ApplicationInfoResponse + * @throws ApiException if fails to make API call + */ + public ApplicationInfoResponse getApplicationsInfo() throws ApiException { + return this.getApplicationsInfo(Collections.emptyMap()); + } + + + /** + * + * List all running application's info of the cluster. + * @param additionalHeaders additionalHeaders for this call + * @return ApplicationInfoResponse + * @throws ApiException if fails to make API call + */ + public ApplicationInfoResponse getApplicationsInfo(Map additionalHeaders) throws ApiException { + Object localVarPostBody = null; + + // create path and map variables + String localVarPath = "/api/v1/applications/info"; + + StringJoiner localVarQueryStringJoiner = new StringJoiner("&"); + String localVarQueryParameterBaseName; + List localVarQueryParams = new ArrayList(); + List localVarCollectionQueryParams = new ArrayList(); + Map localVarHeaderParams = new HashMap(); + Map localVarCookieParams = new HashMap(); + Map localVarFormParams = new HashMap(); + + + localVarHeaderParams.putAll(additionalHeaders); + + + + final String[] localVarAccepts = { + "application/json" + }; + final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts); + + final String[] localVarContentTypes = { + + }; + final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes); + + String[] localVarAuthNames = new String[] { "basic" }; + + TypeReference localVarReturnType = new TypeReference() {}; + return apiClient.invokeAPI( + localVarPath, + "GET", + localVarQueryParams, + localVarCollectionQueryParams, + localVarQueryStringJoiner.toString(), + localVarPostBody, + localVarHeaderParams, + localVarCookieParams, + localVarFormParams, + localVarAccept, + localVarContentType, + localVarAuthNames, + localVarReturnType + ); + } + /** * * Revise lost shuffles or deleted shuffles of an application. diff --git a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfo.java b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfo.java new file mode 100644 index 000000000..f81f32ed3 --- /dev/null +++ b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfo.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.celeborn.rest.v1.model; + +import java.util.Objects; +import java.util.Arrays; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.annotation.JsonValue; +import java.util.HashMap; +import java.util.Map; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.annotation.JsonTypeName; + +/** + * ApplicationInfo + */ +@JsonPropertyOrder({ + ApplicationInfo.JSON_PROPERTY_APP_ID, + ApplicationInfo.JSON_PROPERTY_USER_IDENTIFIER, + ApplicationInfo.JSON_PROPERTY_EXTRA_INFO, + ApplicationInfo.JSON_PROPERTY_REGISTRATION_TIME +}) +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator version: 7.8.0") +public class ApplicationInfo { + public static final String JSON_PROPERTY_APP_ID = "appId"; + private String appId; + + public static final String JSON_PROPERTY_USER_IDENTIFIER = "userIdentifier"; + private String userIdentifier; + + public static final String JSON_PROPERTY_EXTRA_INFO = "extraInfo"; + private Map extraInfo = new HashMap<>(); + + public static final String JSON_PROPERTY_REGISTRATION_TIME = "registrationTime"; + private Long registrationTime; + + public ApplicationInfo() { + } + + public ApplicationInfo appId(String appId) { + + this.appId = appId; + return this; + } + + /** + * The id of the application. + * @return appId + */ + @javax.annotation.Nonnull + @JsonProperty(JSON_PROPERTY_APP_ID) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + + public String getAppId() { + return appId; + } + + + @JsonProperty(JSON_PROPERTY_APP_ID) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public void setAppId(String appId) { + this.appId = appId; + } + + public ApplicationInfo userIdentifier(String userIdentifier) { + + this.userIdentifier = userIdentifier; + return this; + } + + /** + * The user identifier of the application. + * @return userIdentifier + */ + @javax.annotation.Nonnull + @JsonProperty(JSON_PROPERTY_USER_IDENTIFIER) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + + public String getUserIdentifier() { + return userIdentifier; + } + + + @JsonProperty(JSON_PROPERTY_USER_IDENTIFIER) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public void setUserIdentifier(String userIdentifier) { + this.userIdentifier = userIdentifier; + } + + public ApplicationInfo extraInfo(Map extraInfo) { + + this.extraInfo = extraInfo; + return this; + } + + public ApplicationInfo putExtraInfoItem(String key, String extraInfoItem) { + if (this.extraInfo == null) { + this.extraInfo = new HashMap<>(); + } + this.extraInfo.put(key, extraInfoItem); + return this; + } + + /** + * Extra information of the application. + * @return extraInfo + */ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_EXTRA_INFO) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + + public Map getExtraInfo() { + return extraInfo; + } + + + @JsonProperty(JSON_PROPERTY_EXTRA_INFO) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setExtraInfo(Map extraInfo) { + this.extraInfo = extraInfo; + } + + public ApplicationInfo registrationTime(Long registrationTime) { + + this.registrationTime = registrationTime; + return this; + } + + /** + * The registration time of the application. + * @return registrationTime + */ + @javax.annotation.Nullable + @JsonProperty(JSON_PROPERTY_REGISTRATION_TIME) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + + public Long getRegistrationTime() { + return registrationTime; + } + + + @JsonProperty(JSON_PROPERTY_REGISTRATION_TIME) + @JsonInclude(value = JsonInclude.Include.USE_DEFAULTS) + public void setRegistrationTime(Long registrationTime) { + this.registrationTime = registrationTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ApplicationInfo applicationInfo = (ApplicationInfo) o; + return Objects.equals(this.appId, applicationInfo.appId) && + Objects.equals(this.userIdentifier, applicationInfo.userIdentifier) && + Objects.equals(this.extraInfo, applicationInfo.extraInfo) && + Objects.equals(this.registrationTime, applicationInfo.registrationTime); + } + + @Override + public int hashCode() { + return Objects.hash(appId, userIdentifier, extraInfo, registrationTime); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class ApplicationInfo {\n"); + sb.append(" appId: ").append(toIndentedString(appId)).append("\n"); + sb.append(" userIdentifier: ").append(toIndentedString(userIdentifier)).append("\n"); + sb.append(" extraInfo: ").append(toIndentedString(extraInfo)).append("\n"); + sb.append(" registrationTime: ").append(toIndentedString(registrationTime)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + +} + diff --git a/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfoResponse.java b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfoResponse.java new file mode 100644 index 000000000..906cc1c03 --- /dev/null +++ b/openapi/openapi-client/src/main/java/org/apache/celeborn/rest/v1/model/ApplicationInfoResponse.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.celeborn.rest.v1.model; + +import java.util.Objects; +import java.util.Arrays; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.annotation.JsonValue; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.celeborn.rest.v1.model.ApplicationInfo; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.annotation.JsonTypeName; + +/** + * ApplicationInfoResponse + */ +@JsonPropertyOrder({ + ApplicationInfoResponse.JSON_PROPERTY_APPLICATIONS +}) +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator version: 7.8.0") +public class ApplicationInfoResponse { + public static final String JSON_PROPERTY_APPLICATIONS = "applications"; + private List applications = new ArrayList<>(); + + public ApplicationInfoResponse() { + } + + public ApplicationInfoResponse applications(List applications) { + + this.applications = applications; + return this; + } + + public ApplicationInfoResponse addApplicationsItem(ApplicationInfo applicationsItem) { + if (this.applications == null) { + this.applications = new ArrayList<>(); + } + this.applications.add(applicationsItem); + return this; + } + + /** + * The application information. + * @return applications + */ + @javax.annotation.Nonnull + @JsonProperty(JSON_PROPERTY_APPLICATIONS) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + + public List getApplications() { + return applications; + } + + + @JsonProperty(JSON_PROPERTY_APPLICATIONS) + @JsonInclude(value = JsonInclude.Include.ALWAYS) + public void setApplications(List applications) { + this.applications = applications; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ApplicationInfoResponse applicationInfoResponse = (ApplicationInfoResponse) o; + return Objects.equals(this.applications, applicationInfoResponse.applications); + } + + @Override + public int hashCode() { + return Objects.hash(applications); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class ApplicationInfoResponse {\n"); + sb.append(" applications: ").append(toIndentedString(applications)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + +} + diff --git a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml index 675978dea..18e7de891 100644 --- a/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml +++ b/openapi/openapi-client/src/main/openapi3/master_rest_v1.yaml @@ -327,6 +327,20 @@ paths: schema: $ref: '#/components/schemas/ApplicationsHeartbeatResponse' + /api/v1/applications/info: + get: + tags: + - Application + operationId: getApplicationsInfo + description: List all running application's info of the cluster. + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/ApplicationInfoResponse' + /api/v1/applications/delete_apps: post: tags: @@ -793,6 +807,40 @@ components: required: - applications + + ApplicationInfo: + type: object + properties: + appId: + type: string + description: The id of the application. + userIdentifier: + type: string + description: The user identifier of the application. + extraInfo: + type: object + description: Extra information of the application. + additionalProperties: + type: string + registrationTime: + type: integer + format: int64 + description: The registration time of the application. + required: + - appId + - userIdentifier + + ApplicationInfoResponse: + type: object + properties: + applications: + type: array + description: The application information. + items: + $ref: '#/components/schemas/ApplicationInfo' + required: + - applications + HostnamesResponse: type: object properties: diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala index 87b05c766..d34c79419 100644 --- a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerSuite.scala @@ -19,16 +19,25 @@ package org.apache.celeborn.tests.client import java.util +import org.scalatest.concurrent.Eventually.eventually +import org.scalatest.concurrent.Futures.{interval, timeout} +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime + import org.apache.celeborn.client.{LifecycleManager, WithShuffleClientSuite} import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.protocol.message.StatusCode import org.apache.celeborn.service.deploy.MiniClusterFeature class LifecycleManagerSuite extends WithShuffleClientSuite with MiniClusterFeature { + override protected val userIdentifier = UserIdentifier("test", "celeborn") celebornConf .set(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key, "true") .set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K") + .set(CelebornConf.USER_SPECIFIC_TENANT.key, userIdentifier.tenantId) + .set(CelebornConf.USER_SPECIFIC_USERNAME.key, userIdentifier.name) + .set(CelebornConf.USER_SPECIFIC_APPLICATION_INFO.key, "k1=v1") override def beforeAll(): Unit = { super.beforeAll() @@ -99,6 +108,24 @@ class LifecycleManagerSuite extends WithShuffleClientSuite with MiniClusterFeatu lifecycleManager.stop() } + test("CELEBORN-1258: Support to register application info with user identifier and extra info") { + val lifecycleManager: LifecycleManager = new LifecycleManager(APP, celebornConf) + + val arrayList = new util.ArrayList[Integer]() + (0 to 10).foreach(i => { + arrayList.add(i) + }) + + lifecycleManager.requestMasterRequestSlotsWithRetry(0, arrayList) + + eventually(timeout(3.seconds), interval(0.milliseconds)) { + val appInfo = masterInfo._1.statusSystem.applicationInfos.get(APP) + assert(appInfo.userIdentifier == userIdentifier) + assert(appInfo.extraInfo.get("k1") == "v1") + assert(appInfo.registrationTime > 0 && appInfo.registrationTime < System.currentTimeMillis()) + } + } + override def afterAll(): Unit = { logInfo("all test complete , stop celeborn mini cluster") shutdownMiniCluster()