[CELEBORN-1258] Support to register application info with user identifier and extra info

### What changes were proposed in this pull request?
Support to register application info with user identifier and extra info.

### Why are the changes needed?
To provide more insight for the application information.

### Does this PR introduce _any_ user-facing change?
A new RESTful API.

### How was this patch tested?
UT.

Closes #3428 from turboFei/app_info_uid.

Lead-authored-by: Wang, Fei <fwang12@ebay.com>
Co-authored-by: Fei Wang <cn.feiwang@gmail.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
Wang, Fei 2025-09-01 11:15:40 +08:00 committed by Shuang
parent 2817f7fb9e
commit d038dd2b32
26 changed files with 844 additions and 16 deletions

View File

@ -21,7 +21,7 @@ import java.lang.{Byte => JByte}
import java.nio.ByteBuffer
import java.security.SecureRandom
import java.util
import java.util.{function, Collections, List => JList}
import java.util.{function, List => JList}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicInteger, LongAdder}
import java.util.function.{BiConsumer, BiFunction, Consumer}
@ -42,7 +42,7 @@ import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, Shu
import org.apache.celeborn.client.listener.WorkerStatusListener
import org.apache.celeborn.common.{CelebornConf, CommitMetadata}
import org.apache.celeborn.common.CelebornConf.ACTIVE_STORAGE_TYPES
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.client.{ApplicationInfoProvider, MasterClient}
import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ApplicationMeta, ShufflePartitionLocationInfo, WorkerInfo}
@ -248,12 +248,21 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
private val messagesHelper: TransportMessagesHelper = new TransportMessagesHelper()
private def registerApplicationInfo(): Unit = {
Utils.tryLogNonFatalError(
masterClient.send(RegisterApplicationInfo(
appUniqueId,
userIdentifier,
ApplicationInfoProvider.instantiate(conf).provide().asJava)))
}
// Since method `onStart` is executed when `rpcEnv.setupEndpoint` is executed, and
// `masterClient` is initialized after `rpcEnv` is initialized, if method `onStart` contains
// a reference to `masterClient`, there may be cases where `masterClient` is null when
// `masterClient` is called. Therefore, it's necessary to uniformly execute the initialization
// method at the end of the construction of the class to perform the initialization operations.
private def initialize(): Unit = {
registerApplicationInfo()
// noinspection ConvertExpressionToSAM
commitManager.start()
heartbeater.start()

View File

@ -116,6 +116,7 @@ enum MessageType {
GET_STAGE_END_RESPONSE = 93;
READ_REDUCER_PARTITION_END = 94;
READ_REDUCER_PARTITION_END_RESPONSE = 95;
REGISTER_APPLICATION_INFO = 96;
}
enum StreamType {
@ -753,6 +754,7 @@ message PbSnapshotMetaInfo {
map<string, int64> shuffleFallbackCounts = 19;
int64 applicationTotalCount = 20;
map<string, int64> applicationFallbackCounts = 21;
map<string, PbApplicationInfo> applicationInfos = 22;
}
message PbOpenStream {
@ -904,6 +906,13 @@ message PbApplicationMeta {
string secret = 2;
}
message PbApplicationInfo {
string appId = 1;
PbUserIdentifier userIdentifier = 2;
map<string, string> extraInfo = 3;
int64 registrationTime = 4;
}
message PbApplicationMetaRequest {
string appId = 1;
}
@ -1005,6 +1014,7 @@ enum PbMetaRequestType {
ReportWorkerDecommission = 27;
BatchUnRegisterShuffle = 28;
ReviseLostShuffles = 29;
RegisterApplicationInfo = 30;
}
message PbMetaRequest {
@ -1028,6 +1038,7 @@ message PbMetaRequest {
PbApplicationMeta applicationMetaRequest = 23;
PbReportWorkerDecommission reportWorkerDecommissionRequest = 24;
PbMetaBatchUnregisterShuffles batchUnregisterShuffleRequest = 25;
PbRegisterApplicationInfo registerApplicationInfoRequest = 26;
PbReviseLostShuffles reviseLostShufflesRequest = 102;
}
@ -1074,4 +1085,11 @@ message PbMetaRequestResponse {
bool success = 2;
string message = 3;
PbMetaRequestStatus status = 4;
}
}
message PbRegisterApplicationInfo {
string appId = 1;
PbUserIdentifier userIdentifier = 2;
map<string, string> extraInfo = 3;
string requestId = 4;
}

View File

@ -30,6 +30,7 @@ import scala.util.matching.Regex
import io.netty.channel.epoll.Epoll
import org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
import org.apache.celeborn.common.client.{ApplicationInfoProvider, DefaultApplicationInfoProvider}
import org.apache.celeborn.common.identity.{DefaultIdentityProvider, HadoopBasedIdentityProvider, IdentityProvider}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.internal.config._
@ -901,7 +902,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def metricsCollectCriticalEnabled: Boolean = get(METRICS_COLLECT_CRITICAL_ENABLED)
def metricsCapacity: Int = get(METRICS_CAPACITY)
def metricsExtraLabels: Map[String, String] =
get(METRICS_EXTRA_LABELS).map(Utils.parseMetricLabels).toMap
get(METRICS_EXTRA_LABELS).map(Utils.parseKeyValuePair).toMap
def metricsWorkerAppTopResourceConsumptionCount: Int =
get(METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT)
def metricsWorkerAppTopResourceConsumptionBytesWrittenThreshold: Long =
@ -954,6 +955,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)
def clientMrMaxPushData: Long = get(CLIENT_MR_PUSH_DATA_MAX)
def clientApplicationUUIDSuffixEnabled: Boolean = get(CLIENT_APPLICATION_UUID_SUFFIX_ENABLED)
def clientApplicationInfoProvider: String = get(CLIENT_APPLICATION_INFO_PROVIDER)
def clientApplicationInfoUserSpecific: Map[String, String] =
get(USER_SPECIFIC_APPLICATION_INFO).map(Utils.parseKeyValuePair).toMap
def clientShuffleIntegrityCheckEnabled: Boolean =
get(CLIENT_SHUFFLE_INTEGRITY_CHECK_ENABLED)
@ -5633,6 +5637,30 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)
val CLIENT_APPLICATION_INFO_PROVIDER: ConfigEntry[String] =
buildConf("celeborn.client.application.info.provider")
.categories("client")
.doc(s"ApplicationInfoProvider class name. Default class is " +
s"`${classOf[DefaultApplicationInfoProvider].getName}`. " +
s"Optional values: " +
s"${classOf[DefaultIdentityProvider].getName} user name and tenant id are default values or user-specific values.")
.version("0.6.1")
.stringConf
.createWithDefault(classOf[DefaultApplicationInfoProvider].getName)
val USER_SPECIFIC_APPLICATION_INFO: ConfigEntry[Seq[String]] =
buildConf("celeborn.client.application.info.user-specific")
.categories("client")
.version("0.6.1")
.doc("User specific information for application registration, pattern is" +
" `<key1>=<value1>[,<key2>=<value2>]*`, e.g. `cluster=celeborn`.")
.stringConf
.toSequence
.checkValue(
pairs => pairs.map(_ => Try(Utils.parseKeyValuePair(_))).forall(_.isSuccess),
"Allowed pattern is: `<key1>=<value1>[,<key2>=<value2>]*`")
.createWithDefault(Seq.empty)
val TEST_ALTERNATIVE: OptionalConfigEntry[String] =
buildConf("celeborn.test.alternative.key")
.withAlternative("celeborn.test.alternative.deprecatedKey")
@ -5700,8 +5728,8 @@ object CelebornConf extends Logging {
.stringConf
.toSequence
.checkValue(
labels => labels.map(_ => Try(Utils.parseMetricLabels(_))).forall(_.isSuccess),
"Allowed pattern is: `<label1_key>:<label1_value>[,<label2_key>:<label2_value>]*`")
labels => labels.map(_ => Try(Utils.parseKeyValuePair(_))).forall(_.isSuccess),
"Allowed pattern is: `<label1_key>=<label1_value>[,<label2_key>=<label2_value>]*`")
.createWithDefault(Seq.empty)
val METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT: ConfigEntry[Int] =

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.common.client
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.util.Utils
abstract class ApplicationInfoProvider(conf: CelebornConf) {
def provide(): Map[String, String]
}
object ApplicationInfoProvider {
def instantiate(conf: CelebornConf): ApplicationInfoProvider = {
val className = conf.clientApplicationInfoProvider
Utils.instantiateClassWithCelebornConf[ApplicationInfoProvider](className, conf)
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.common.client
import org.apache.celeborn.common.CelebornConf
class DefaultApplicationInfoProvider(conf: CelebornConf) extends ApplicationInfoProvider(conf) {
override def provide(): Map[String, String] = {
conf.clientApplicationInfoUserSpecific
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.common.meta
import java.util.{Map => JMap}
import org.apache.celeborn.common.identity.UserIdentifier
/**
* Application info
*/
case class ApplicationInfo(
appId: String,
userIdentifier: UserIdentifier,
extraInfo: JMap[String, String],
registrationTime: Long)

View File

@ -18,8 +18,7 @@
package org.apache.celeborn.common.protocol.message
import java.util
import java.util.{Collections, UUID}
import java.util.concurrent.atomic.AtomicIntegerArray
import java.util.{Collections, Map => JMap, UUID}
import scala.collection.JavaConverters._
@ -433,6 +432,13 @@ object ControlMessages extends Logging {
case class ApplicationLostResponse(status: StatusCode) extends MasterMessage
case class RegisterApplicationInfo(
applicationId: String,
userIdentifier: UserIdentifier,
extraInfo: JMap[String, String],
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage
case class HeartbeatFromApplication(
appId: String,
totalWritten: Long,
@ -878,6 +884,19 @@ object ControlMessages extends Logging {
.setStatus(status.getValue).build().toByteArray
new TransportMessage(MessageType.APPLICATION_LOST_RESPONSE, payload)
case RegisterApplicationInfo(
applicationId,
userIdentifier,
extraInfo,
requestId) =>
val payload = PbRegisterApplicationInfo.newBuilder()
.setAppId(applicationId)
.setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier))
.putAllExtraInfo(extraInfo)
.setRequestId(requestId)
.build().toByteArray
new TransportMessage(MessageType.REGISTER_APPLICATION_INFO, payload)
case HeartbeatFromApplication(
appId,
totalWritten,
@ -1537,6 +1556,13 @@ object ControlMessages extends Logging {
case GET_STAGE_END_RESPONSE_VALUE =>
PbGetStageEndResponse.parseFrom(message.getPayload)
case REGISTER_APPLICATION_INFO_VALUE =>
val pbRegisterApplicationInfo = PbRegisterApplicationInfo.parseFrom(message.getPayload)
RegisterApplicationInfo(
pbRegisterApplicationInfo.getAppId,
PbSerDeUtils.fromPbUserIdentifier(pbRegisterApplicationInfo.getUserIdentifier),
pbRegisterApplicationInfo.getExtraInfoMap)
}
}
}

View File

@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import com.google.protobuf.InvalidProtocolBufferException
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta.{ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.meta.{ApplicationInfo, ApplicationMeta, DeviceInfo, DiskFileInfo, DiskInfo, MapFileMeta, ReduceFileMeta, WorkerEventInfo, WorkerInfo, WorkerStatus}
import org.apache.celeborn.common.meta.MapFileMeta.SegmentIndex
import org.apache.celeborn.common.protocol._
import org.apache.celeborn.common.protocol.PartitionLocation.Mode
@ -442,6 +442,7 @@ object PbSerDeUtils {
shutdownWorkers: java.util.Set[WorkerInfo],
workerEventInfos: ConcurrentHashMap[WorkerInfo, WorkerEventInfo],
applicationMetas: ConcurrentHashMap[String, ApplicationMeta],
applicationInfos: ConcurrentHashMap[String, ApplicationInfo],
decommissionWorkers: java.util.Set[WorkerInfo]): PbSnapshotMetaInfo = {
val builder = PbSnapshotMetaInfo.newBuilder()
.setEstimatedPartitionSize(estimatedPartitionSize)
@ -478,6 +479,14 @@ object PbSerDeUtils {
if (localCollectionUtils.isNotEmpty(pbApplicationMetas)) {
builder.putAllApplicationMetas(pbApplicationMetas)
}
val pbApplicationInfos = applicationInfos.asScala.map {
case (appId, applicationInfo) => (appId, toPbApplicationInfo(applicationInfo))
}.asJava
if (localCollectionUtils.isNotEmpty(pbApplicationInfos)) {
builder.putAllApplicationInfos(pbApplicationInfos)
}
builder.build()
}
@ -491,6 +500,23 @@ object PbSerDeUtils {
ApplicationMeta(pbApplicationMeta.getAppId, pbApplicationMeta.getSecret)
}
def toPbApplicationInfo(applicationInfo: ApplicationInfo): PbApplicationInfo = {
PbApplicationInfo.newBuilder()
.setAppId(applicationInfo.appId)
.setUserIdentifier(toPbUserIdentifier(applicationInfo.userIdentifier))
.putAllExtraInfo(applicationInfo.extraInfo)
.setRegistrationTime(applicationInfo.registrationTime)
.build()
}
def fromPbApplicationInfo(pbApplicationInfo: PbApplicationInfo): ApplicationInfo = {
ApplicationInfo(
pbApplicationInfo.getAppId,
fromPbUserIdentifier(pbApplicationInfo.getUserIdentifier),
pbApplicationInfo.getExtraInfoMap,
pbApplicationInfo.getRegistrationTime)
}
def toPbWorkerStatus(workerStatus: WorkerStatus): PbWorkerStatus = {
PbWorkerStatus.newBuilder()
.setState(workerStatus.getState)

View File

@ -1184,12 +1184,12 @@ object Utils extends Logging {
}
}
def parseMetricLabels(label: String): (String, String) = {
val labelPart = label.split("=")
if (labelPart.size != 2) {
throw new IllegalArgumentException(s"Illegal metric extra labels: $label")
def parseKeyValuePair(pair: String): (String, String) = {
val parts = pair.split("=")
if (parts.size != 2) {
throw new IllegalArgumentException(s"Illegal kay=value pair: $pair")
}
labelPart(0).trim -> labelPart(1).trim
parts(0).trim -> parts(1).trim
}
def getProcessId: String = ManagementFactory.getRuntimeMXBean.getName.split("@")(0)

View File

@ -19,6 +19,7 @@ package org.apache.celeborn.common.util
import java.io.File
import java.util
import java.util.Collections
import scala.collection.JavaConverters._
import scala.collection.mutable
@ -380,6 +381,18 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
assert(restoredApplicationMeta.equals(applicationMeta))
}
test("fromAndToPbApplicationInfo") {
val applicationInfo = new ApplicationInfo(
"app1",
UserIdentifier("tenant", "user"),
Collections.singletonMap("key", "value"),
System.currentTimeMillis())
val pbApplicationInfo = PbSerDeUtils.toPbApplicationInfo(applicationInfo)
val restoredApplicationInfo = PbSerDeUtils.fromPbApplicationInfo(pbApplicationInfo)
assert(restoredApplicationInfo.equals(applicationInfo))
}
test("testPackedPartitionLocationPairCase1") {
partitionLocation3.setPeer(partitionLocation2)
val pairPb = PbSerDeUtils.toPbPackedPartitionLocationsPair(

View File

@ -21,6 +21,8 @@ license: |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.client.adaptive.optimizeSkewedPartitionRead.enabled | false | false | If this is true, Celeborn will adaptively split skewed partitions instead of reading them by Spark map range. Please note that this feature requires the `Celeborn-Optimize-Skew-Partitions-spark3_3.patch`. | 0.6.0 | |
| celeborn.client.application.heartbeatInterval | 10s | false | Interval for client to send heartbeat message to master. | 0.3.0 | celeborn.application.heartbeatInterval |
| celeborn.client.application.info.provider | org.apache.celeborn.common.client.DefaultApplicationInfoProvider | false | ApplicationInfoProvider class name. Default class is `org.apache.celeborn.common.client.DefaultApplicationInfoProvider`. Optional values: org.apache.celeborn.common.identity.DefaultIdentityProvider user name and tenant id are default values or user-specific values. | 0.6.1 | |
| celeborn.client.application.info.user-specific | | false | User specific information for application registration, pattern is `<key1>=<value1>[,<key2>=<value2>]*`, e.g. `cluster=celeborn`. | 0.6.1 | |
| celeborn.client.application.unregister.enabled | true | false | When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. | 0.3.2 | |
| celeborn.client.application.uuidSuffix.enabled | false | false | Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id. Currently, this only applies to Spark and MR. | 0.6.0 | |
| celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable chunk prefetch when creating CelebornInputStream. | 0.5.1 | |

View File

@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.ApplicationInfo;
import org.apache.celeborn.common.meta.ApplicationMeta;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.DiskStatus;
@ -101,9 +102,17 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
public final Map<String, Long> shuffleFallbackCounts = JavaUtils.newConcurrentHashMap();
public final Map<String, Long> applicationFallbackCounts = JavaUtils.newConcurrentHashMap();
public final ConcurrentHashMap<String, ApplicationInfo> applicationInfos =
JavaUtils.newConcurrentHashMap();
public final ConcurrentHashMap<String, ApplicationMeta> applicationMetas =
JavaUtils.newConcurrentHashMap();
public void updateApplicationInfo(
String appId, UserIdentifier userIdentifier, Map<String, String> extraInfo) {
applicationInfos.putIfAbsent(
appId, new ApplicationInfo(appId, userIdentifier, extraInfo, System.currentTimeMillis()));
}
public void updateRequestSlotsMeta(
String shuffleKey, String hostName, Map<String, Map<String, Integer>> workerWithAllocations) {
Tuple2<String, Object> appIdShuffleId = Utils.splitShuffleKey(shuffleKey);
@ -175,6 +184,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
registeredAppAndShuffles.remove(appId);
appHeartbeatTime.remove(appId);
applicationMetas.remove(appId);
applicationInfos.remove(appId);
}
@VisibleForTesting
@ -418,6 +428,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
shutdownWorkers,
workerEventInfos,
applicationMetas,
applicationInfos,
decommissionWorkers)
.toByteArray();
Files.write(file.toPath(), snapshotBytes);
@ -523,6 +534,11 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
.forEach(
(key, value) -> applicationMetas.put(key, PbSerDeUtils.fromPbApplicationMeta(value)));
snapshotMetaInfo
.getApplicationInfosMap()
.forEach(
(key, value) -> applicationInfos.put(key, PbSerDeUtils.fromPbApplicationInfo(value)));
availableWorkers.addAll(
workersMap.values().stream()
.filter(worker -> isWorkerAvailable(worker))
@ -562,6 +578,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
applicationFallbackCounts.clear();
workerEventInfos.clear();
applicationMetas.clear();
applicationInfos.clear();
}
public void updateMetaByReportWorkerUnavailable(List<WorkerInfo> failedWorkers) {

View File

@ -28,6 +28,9 @@ import org.apache.celeborn.common.meta.WorkerStatus;
import org.apache.celeborn.common.quota.ResourceConsumption;
public interface IMetadataHandler {
void handleRegisterApplicationInfo(
String appId, UserIdentifier userIdentifier, Map<String, String> extraInfo, String requestId);
void handleRequestSlots(
String shuffleKey,
String hostName,

View File

@ -54,6 +54,15 @@ public class SingleMasterMetaManager extends AbstractMetaManager {
this.rackResolver = rackResolver;
}
@Override
public void handleRegisterApplicationInfo(
String appId,
UserIdentifier userIdentifier,
Map<String, String> extraInfo,
String requestId) {
updateApplicationInfo(appId, userIdentifier, extraInfo);
}
@Override
public void handleRequestSlots(
String shuffleKey,

View File

@ -70,6 +70,34 @@ public class HAMasterMetaManager extends AbstractMetaManager {
this.ratisServer = ratisServer;
}
@Override
public void handleRegisterApplicationInfo(
String appId,
UserIdentifier userIdentifier,
Map<String, String> extraInfo,
String requestId) {
try {
ratisServer.submitRequest(
ResourceRequest.newBuilder()
.setCmdType(Type.RegisterApplicationInfo)
.setRequestId(requestId)
.setRegisterApplicationInfoRequest(
ResourceProtos.RegisterApplicationInfoRequest.newBuilder()
.setAppId(appId)
.setUserIdentifier(
ResourceProtos.UserIdentifier.newBuilder()
.setTenantId(userIdentifier.tenantId())
.setName(userIdentifier.name())
.build())
.putAllExtraInfo(extraInfo)
.build())
.build());
} catch (CelebornRuntimeException e) {
LOG.error("Handle app lost for {} failed!", appId, e);
throw e;
}
}
@Override
public void handleRequestSlots(
String shuffleKey,

View File

@ -146,6 +146,18 @@ public class MetaHandler {
LOG.debug("Handle batch unregister shuffle for {}", shuffleKeys);
break;
case RegisterApplicationInfo:
appId = request.getRegisterApplicationInfoRequest().getAppId();
UserIdentifier userIdentifier =
new UserIdentifier(
request.getRegisterApplicationInfoRequest().getUserIdentifier().getTenantId(),
request.getRegisterApplicationInfoRequest().getUserIdentifier().getName());
Map<String, String> extraInfo =
request.getRegisterApplicationInfoRequest().getExtraInfoMap();
metaSystem.updateApplicationInfo(appId, userIdentifier, extraInfo);
LOG.debug("Handle register application info for {}/{}", appId, userIdentifier);
break;
case AppHeartbeat:
appId = request.getAppHeartbeatRequest().getAppId();
long time = request.getAppHeartbeatRequest().getTime();

View File

@ -44,6 +44,8 @@ enum Type {
BatchUnRegisterShuffle = 28;
ReviseLostShuffles = 29;
RegisterApplicationInfo = 30;
}
enum WorkerEventType {
@ -78,6 +80,7 @@ message ResourceRequest {
optional ApplicationMetaRequest applicationMetaRequest = 23;
optional ReportWorkerDecommissionRequest reportWorkerDecommissionRequest = 24;
optional BatchUnregisterShuffleRequest batchUnregisterShuffleRequest = 25;
optional RegisterApplicationInfoRequest registerApplicationInfoRequest = 26;
optional ReviseLostShufflesRequest reviseLostShufflesRequest = 102;
}
@ -265,4 +268,10 @@ message ApplicationMetaRequest {
message ReviseLostShufflesRequest {
string appId = 1 ;
repeated int32 lostShuffles = 2 ;
}
message RegisterApplicationInfoRequest {
string appId = 1;
UserIdentifier userIdentifier = 2;
map<string, string> extraInfo = 3;
}

View File

@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy.master
import java.io.IOException
import java.net.BindException
import java.util
import java.util.{Map => JMap}
import java.util.Collections
import java.util.concurrent.{ExecutorService, ScheduledFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
@ -429,6 +430,13 @@ private[celeborn] class Master(
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterApplicationInfo(appId, userIdentifier, extraInfo, requestId) =>
logDebug(
s"Received RegisterApplicationInfo request for app $appId/$userIdentifier/$extraInfo.")
checkAuth(context, appId)
executeWithLeaderChecker(
context,
handleRegisterApplicationInfo(context, appId, userIdentifier, extraInfo, requestId))
case HeartbeatFromApplication(
appId,
totalWritten,
@ -1166,6 +1174,16 @@ private[celeborn] class Master(
}
}
private def handleRegisterApplicationInfo(
context: RpcCallContext,
appId: String,
userIdentifier: UserIdentifier,
extraInfo: JMap[String, String],
requestId: String): Unit = {
statusSystem.handleRegisterApplicationInfo(appId, userIdentifier, extraInfo, requestId)
context.reply(OneWayMessageResponse)
}
private def handleHeartbeatFromApplication(
context: RpcCallContext,
appId: String,

View File

@ -17,7 +17,7 @@
package org.apache.celeborn.service.deploy.master.http.api.v1
import javax.ws.rs.{Consumes, DELETE, GET, Path, POST, Produces}
import javax.ws.rs.{Consumes, GET, Path, POST, Produces}
import javax.ws.rs.core.MediaType
import scala.collection.JavaConverters._
@ -27,7 +27,7 @@ import io.swagger.v3.oas.annotations.media.{Content, Schema}
import io.swagger.v3.oas.annotations.responses.ApiResponse
import io.swagger.v3.oas.annotations.tags.Tag
import org.apache.celeborn.rest.v1.model.{ApplicationHeartbeatData, ApplicationsHeartbeatResponse, DeleteAppsRequest, HandleResponse, HostnamesResponse, ReviseLostShufflesRequest}
import org.apache.celeborn.rest.v1.model.{ApplicationHeartbeatData, ApplicationInfo, ApplicationInfoResponse, ApplicationsHeartbeatResponse, DeleteAppsRequest, HandleResponse, HostnamesResponse, ReviseLostShufflesRequest}
import org.apache.celeborn.server.common.http.api.ApiRequestContext
import org.apache.celeborn.service.deploy.master.Master
@ -54,6 +54,26 @@ class ApplicationResource extends ApiRequestContext {
}.toSeq.asJava)
}
@Operation(description = "List all running application's info of the cluster.")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON,
schema = new Schema(implementation = classOf[ApplicationInfoResponse]))))
@GET
@Path("/info")
def applicationsInfo(): ApplicationInfoResponse = {
new ApplicationInfoResponse()
.applications(
statusSystem.applicationInfos.asScala.map { case (appId, appInfo) =>
new ApplicationInfo()
.appId(appId)
.userIdentifier(appInfo.userIdentifier.toString)
.extraInfo(appInfo.extraInfo)
.registrationTime(appInfo.registrationTime)
}.toSeq.asJava)
}
@Operation(description = "Delete resource of apps.")
@ApiResponse(
responseCode = "200",

View File

@ -1134,4 +1134,17 @@ public class DefaultMetaSystemSuiteJ {
assertEquals(2, statusSystem.applicationFallbackCounts.get(POLICY1).longValue());
assertEquals(1, statusSystem.applicationFallbackCounts.get(POLICY2).longValue());
}
@Test
public void testRegisterApplicationInfo() {
statusSystem.applicationInfos.clear();
UserIdentifier userIdentifier = new UserIdentifier("tenant", "celeborn");
String appId = "app1";
Map<String, String> extraInfo = Collections.singletonMap("k1", "v1");
statusSystem.handleRegisterApplicationInfo(appId, userIdentifier, extraInfo, getNewReqeustId());
assertEquals(statusSystem.applicationInfos.get(appId).userIdentifier(), userIdentifier);
assertEquals(statusSystem.applicationInfos.get(appId).extraInfo(), extraInfo);
}
}

View File

@ -1985,4 +1985,19 @@ public class RatisMasterStatusSystemSuiteJ {
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY1).longValue(), 2);
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY2).longValue(), 1);
}
@Test
public void testRegisterApplicationInfo() {
AbstractMetaManager statusSystem = pickLeaderStatusSystem();
Assert.assertNotNull(statusSystem);
statusSystem.applicationInfos.clear();
UserIdentifier userIdentifier = new UserIdentifier("tenant", "celeborn");
String appId = "app1";
Map<String, String> extraInfo = Collections.singletonMap("k1", "v1");
statusSystem.handleRegisterApplicationInfo(appId, userIdentifier, extraInfo, getNewReqeustId());
assertEquals(statusSystem.applicationInfos.get(appId).userIdentifier(), userIdentifier);
assertEquals(statusSystem.applicationInfos.get(appId).extraInfo(), extraInfo);
}
}

View File

@ -25,6 +25,7 @@ import org.apache.celeborn.rest.v1.master.invoker.BaseApi;
import org.apache.celeborn.rest.v1.master.invoker.Configuration;
import org.apache.celeborn.rest.v1.master.invoker.Pair;
import org.apache.celeborn.rest.v1.model.ApplicationInfoResponse;
import org.apache.celeborn.rest.v1.model.ApplicationsHeartbeatResponse;
import org.apache.celeborn.rest.v1.model.DeleteAppsRequest;
import org.apache.celeborn.rest.v1.model.HandleResponse;
@ -253,6 +254,73 @@ public class ApplicationApi extends BaseApi {
);
}
/**
*
* List all running application&#39;s info of the cluster.
* @return ApplicationInfoResponse
* @throws ApiException if fails to make API call
*/
public ApplicationInfoResponse getApplicationsInfo() throws ApiException {
return this.getApplicationsInfo(Collections.emptyMap());
}
/**
*
* List all running application&#39;s info of the cluster.
* @param additionalHeaders additionalHeaders for this call
* @return ApplicationInfoResponse
* @throws ApiException if fails to make API call
*/
public ApplicationInfoResponse getApplicationsInfo(Map<String, String> additionalHeaders) throws ApiException {
Object localVarPostBody = null;
// create path and map variables
String localVarPath = "/api/v1/applications/info";
StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
String localVarQueryParameterBaseName;
List<Pair> localVarQueryParams = new ArrayList<Pair>();
List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
Map<String, String> localVarHeaderParams = new HashMap<String, String>();
Map<String, String> localVarCookieParams = new HashMap<String, String>();
Map<String, Object> localVarFormParams = new HashMap<String, Object>();
localVarHeaderParams.putAll(additionalHeaders);
final String[] localVarAccepts = {
"application/json"
};
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);
final String[] localVarContentTypes = {
};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);
String[] localVarAuthNames = new String[] { "basic" };
TypeReference<ApplicationInfoResponse> localVarReturnType = new TypeReference<ApplicationInfoResponse>() {};
return apiClient.invokeAPI(
localVarPath,
"GET",
localVarQueryParams,
localVarCollectionQueryParams,
localVarQueryStringJoiner.toString(),
localVarPostBody,
localVarHeaderParams,
localVarCookieParams,
localVarFormParams,
localVarAccept,
localVarContentType,
localVarAuthNames,
localVarReturnType
);
}
/**
*
* Revise lost shuffles or deleted shuffles of an application.

View File

@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.rest.v1.model;
import java.util.Objects;
import java.util.Arrays;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.annotation.JsonValue;
import java.util.HashMap;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.annotation.JsonTypeName;
/**
* ApplicationInfo
*/
@JsonPropertyOrder({
ApplicationInfo.JSON_PROPERTY_APP_ID,
ApplicationInfo.JSON_PROPERTY_USER_IDENTIFIER,
ApplicationInfo.JSON_PROPERTY_EXTRA_INFO,
ApplicationInfo.JSON_PROPERTY_REGISTRATION_TIME
})
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator version: 7.8.0")
public class ApplicationInfo {
public static final String JSON_PROPERTY_APP_ID = "appId";
private String appId;
public static final String JSON_PROPERTY_USER_IDENTIFIER = "userIdentifier";
private String userIdentifier;
public static final String JSON_PROPERTY_EXTRA_INFO = "extraInfo";
private Map<String, String> extraInfo = new HashMap<>();
public static final String JSON_PROPERTY_REGISTRATION_TIME = "registrationTime";
private Long registrationTime;
public ApplicationInfo() {
}
public ApplicationInfo appId(String appId) {
this.appId = appId;
return this;
}
/**
* The id of the application.
* @return appId
*/
@javax.annotation.Nonnull
@JsonProperty(JSON_PROPERTY_APP_ID)
@JsonInclude(value = JsonInclude.Include.ALWAYS)
public String getAppId() {
return appId;
}
@JsonProperty(JSON_PROPERTY_APP_ID)
@JsonInclude(value = JsonInclude.Include.ALWAYS)
public void setAppId(String appId) {
this.appId = appId;
}
public ApplicationInfo userIdentifier(String userIdentifier) {
this.userIdentifier = userIdentifier;
return this;
}
/**
* The user identifier of the application.
* @return userIdentifier
*/
@javax.annotation.Nonnull
@JsonProperty(JSON_PROPERTY_USER_IDENTIFIER)
@JsonInclude(value = JsonInclude.Include.ALWAYS)
public String getUserIdentifier() {
return userIdentifier;
}
@JsonProperty(JSON_PROPERTY_USER_IDENTIFIER)
@JsonInclude(value = JsonInclude.Include.ALWAYS)
public void setUserIdentifier(String userIdentifier) {
this.userIdentifier = userIdentifier;
}
public ApplicationInfo extraInfo(Map<String, String> extraInfo) {
this.extraInfo = extraInfo;
return this;
}
public ApplicationInfo putExtraInfoItem(String key, String extraInfoItem) {
if (this.extraInfo == null) {
this.extraInfo = new HashMap<>();
}
this.extraInfo.put(key, extraInfoItem);
return this;
}
/**
* Extra information of the application.
* @return extraInfo
*/
@javax.annotation.Nullable
@JsonProperty(JSON_PROPERTY_EXTRA_INFO)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public Map<String, String> getExtraInfo() {
return extraInfo;
}
@JsonProperty(JSON_PROPERTY_EXTRA_INFO)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public void setExtraInfo(Map<String, String> extraInfo) {
this.extraInfo = extraInfo;
}
public ApplicationInfo registrationTime(Long registrationTime) {
this.registrationTime = registrationTime;
return this;
}
/**
* The registration time of the application.
* @return registrationTime
*/
@javax.annotation.Nullable
@JsonProperty(JSON_PROPERTY_REGISTRATION_TIME)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public Long getRegistrationTime() {
return registrationTime;
}
@JsonProperty(JSON_PROPERTY_REGISTRATION_TIME)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public void setRegistrationTime(Long registrationTime) {
this.registrationTime = registrationTime;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ApplicationInfo applicationInfo = (ApplicationInfo) o;
return Objects.equals(this.appId, applicationInfo.appId) &&
Objects.equals(this.userIdentifier, applicationInfo.userIdentifier) &&
Objects.equals(this.extraInfo, applicationInfo.extraInfo) &&
Objects.equals(this.registrationTime, applicationInfo.registrationTime);
}
@Override
public int hashCode() {
return Objects.hash(appId, userIdentifier, extraInfo, registrationTime);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class ApplicationInfo {\n");
sb.append(" appId: ").append(toIndentedString(appId)).append("\n");
sb.append(" userIdentifier: ").append(toIndentedString(userIdentifier)).append("\n");
sb.append(" extraInfo: ").append(toIndentedString(extraInfo)).append("\n");
sb.append(" registrationTime: ").append(toIndentedString(registrationTime)).append("\n");
sb.append("}");
return sb.toString();
}
/**
* Convert the given object to string with each line indented by 4 spaces
* (except the first line).
*/
private String toIndentedString(Object o) {
if (o == null) {
return "null";
}
return o.toString().replace("\n", "\n ");
}
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.rest.v1.model;
import java.util.Objects;
import java.util.Arrays;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.annotation.JsonValue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.celeborn.rest.v1.model.ApplicationInfo;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.annotation.JsonTypeName;
/**
* ApplicationInfoResponse
*/
@JsonPropertyOrder({
ApplicationInfoResponse.JSON_PROPERTY_APPLICATIONS
})
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator version: 7.8.0")
public class ApplicationInfoResponse {
public static final String JSON_PROPERTY_APPLICATIONS = "applications";
private List<ApplicationInfo> applications = new ArrayList<>();
public ApplicationInfoResponse() {
}
public ApplicationInfoResponse applications(List<ApplicationInfo> applications) {
this.applications = applications;
return this;
}
public ApplicationInfoResponse addApplicationsItem(ApplicationInfo applicationsItem) {
if (this.applications == null) {
this.applications = new ArrayList<>();
}
this.applications.add(applicationsItem);
return this;
}
/**
* The application information.
* @return applications
*/
@javax.annotation.Nonnull
@JsonProperty(JSON_PROPERTY_APPLICATIONS)
@JsonInclude(value = JsonInclude.Include.ALWAYS)
public List<ApplicationInfo> getApplications() {
return applications;
}
@JsonProperty(JSON_PROPERTY_APPLICATIONS)
@JsonInclude(value = JsonInclude.Include.ALWAYS)
public void setApplications(List<ApplicationInfo> applications) {
this.applications = applications;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ApplicationInfoResponse applicationInfoResponse = (ApplicationInfoResponse) o;
return Objects.equals(this.applications, applicationInfoResponse.applications);
}
@Override
public int hashCode() {
return Objects.hash(applications);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class ApplicationInfoResponse {\n");
sb.append(" applications: ").append(toIndentedString(applications)).append("\n");
sb.append("}");
return sb.toString();
}
/**
* Convert the given object to string with each line indented by 4 spaces
* (except the first line).
*/
private String toIndentedString(Object o) {
if (o == null) {
return "null";
}
return o.toString().replace("\n", "\n ");
}
}

View File

@ -327,6 +327,20 @@ paths:
schema:
$ref: '#/components/schemas/ApplicationsHeartbeatResponse'
/api/v1/applications/info:
get:
tags:
- Application
operationId: getApplicationsInfo
description: List all running application's info of the cluster.
responses:
"200":
description: The request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/ApplicationInfoResponse'
/api/v1/applications/delete_apps:
post:
tags:
@ -793,6 +807,40 @@ components:
required:
- applications
ApplicationInfo:
type: object
properties:
appId:
type: string
description: The id of the application.
userIdentifier:
type: string
description: The user identifier of the application.
extraInfo:
type: object
description: Extra information of the application.
additionalProperties:
type: string
registrationTime:
type: integer
format: int64
description: The registration time of the application.
required:
- appId
- userIdentifier
ApplicationInfoResponse:
type: object
properties:
applications:
type: array
description: The application information.
items:
$ref: '#/components/schemas/ApplicationInfo'
required:
- applications
HostnamesResponse:
type: object
properties:

View File

@ -19,16 +19,25 @@ package org.apache.celeborn.tests.client
import java.util
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.concurrent.Futures.{interval, timeout}
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.celeborn.client.{LifecycleManager, WithShuffleClientSuite}
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.service.deploy.MiniClusterFeature
class LifecycleManagerSuite extends WithShuffleClientSuite with MiniClusterFeature {
override protected val userIdentifier = UserIdentifier("test", "celeborn")
celebornConf
.set(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key, "true")
.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
.set(CelebornConf.USER_SPECIFIC_TENANT.key, userIdentifier.tenantId)
.set(CelebornConf.USER_SPECIFIC_USERNAME.key, userIdentifier.name)
.set(CelebornConf.USER_SPECIFIC_APPLICATION_INFO.key, "k1=v1")
override def beforeAll(): Unit = {
super.beforeAll()
@ -99,6 +108,24 @@ class LifecycleManagerSuite extends WithShuffleClientSuite with MiniClusterFeatu
lifecycleManager.stop()
}
test("CELEBORN-1258: Support to register application info with user identifier and extra info") {
val lifecycleManager: LifecycleManager = new LifecycleManager(APP, celebornConf)
val arrayList = new util.ArrayList[Integer]()
(0 to 10).foreach(i => {
arrayList.add(i)
})
lifecycleManager.requestMasterRequestSlotsWithRetry(0, arrayList)
eventually(timeout(3.seconds), interval(0.milliseconds)) {
val appInfo = masterInfo._1.statusSystem.applicationInfos.get(APP)
assert(appInfo.userIdentifier == userIdentifier)
assert(appInfo.extraInfo.get("k1") == "v1")
assert(appInfo.registrationTime > 0 && appInfo.registrationTime < System.currentTimeMillis())
}
}
override def afterAll(): Unit = {
logInfo("all test complete , stop celeborn mini cluster")
shutdownMiniCluster()