[CELEBORN-1056][FOLLOWUP] Support upsert and delete of dynamic configuration management

### What changes were proposed in this pull request?

Support upsert and delete of dynamic configuration management.

### Why are the changes needed?

There is only listing dynamic configuration interface for dynamic configuration management. It should support upserting and deleting dynamic configuration.

### Does this PR introduce _any_ user-facing change?

- Rest API:
  - `/api/v1/conf/dynamic/upsert` to upsert dynamic configurations.
  - `/api/v1/conf/dynamic/delete` to delete dynamic configurations.
- CLI:
  - `--upsert-dynamic-conf` to upsert dynamic configurations.
  - `--delete-dynamic-conf` to upsert dynamic configurations.

### How was this patch tested?

- `ConfigServiceSuiteJ`
- `ApiV1BaseResourceSuite`
- `TestCelebornCliCommands`

Closes #3323 from SteNicholas/CELEBORN-1056.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
SteNicholas 2025-06-17 14:54:50 -07:00 committed by Wang, Fei
parent 3d614f848e
commit 46c998067e
29 changed files with 1544 additions and 43 deletions

View File

@ -17,7 +17,69 @@
package org.apache.celeborn.cli.common
import picocli.CommandLine.Command
import java.util
import scala.collection.JavaConverters._
import org.apache.commons.lang3.StringUtils
import picocli.CommandLine.{Command, ParameterException}
import picocli.CommandLine.Model.CommandSpec
import org.apache.celeborn.rest.v1.model.{DeleteDynamicConfigRequest, HandleResponse, UpsertDynamicConfigRequest}
@Command(mixinStandardHelpOptions = true, versionProvider = classOf[CliVersionProvider])
abstract class BaseCommand extends Runnable with CliLogging {}
abstract class BaseCommand extends Runnable with CliLogging {
private[cli] def upsertDynamicConf(
commonOptions: CommonOptions,
spec: CommandSpec,
upsert: (UpsertDynamicConfigRequest, util.Map[String, String]) => HandleResponse)
: HandleResponse = {
if (StringUtils.isBlank(commonOptions.configLevel)) {
throw new ParameterException(
spec.commandLine(),
"Config level must be provided for this command.")
}
if (StringUtils.isBlank(commonOptions.upsertConfigs)) {
throw new ParameterException(
spec.commandLine(),
"Configs to upsert must be provided for this command.")
}
val upsertConfigs =
commonOptions.upsertConfigs.split(',').map(_.trim).filter(_.nonEmpty).map { config =>
val Array(k, v) = config.split(':').map(_.trim)
k -> v
}.toMap.asJava
upsert(
new UpsertDynamicConfigRequest()
.level(UpsertDynamicConfigRequest.LevelEnum.fromValue(commonOptions.configLevel))
.configs(upsertConfigs)
.tenant(commonOptions.configTenant)
.name(commonOptions.configName),
commonOptions.getAuthHeader)
}
private[cli] def deleteDynamicConf(
commonOptions: CommonOptions,
spec: CommandSpec,
delete: (DeleteDynamicConfigRequest, util.Map[String, String]) => HandleResponse)
: HandleResponse = {
if (StringUtils.isBlank(commonOptions.configLevel)) {
throw new ParameterException(
spec.commandLine(),
"Config level must be provided for this command.")
}
if (StringUtils.isBlank(commonOptions.deleteConfigs)) {
throw new ParameterException(
spec.commandLine(),
"Configs to delete must be provided for this command.")
}
delete(
new DeleteDynamicConfigRequest()
.level(DeleteDynamicConfigRequest.LevelEnum.fromValue(commonOptions.configLevel))
.configs(util.Arrays.asList[String](commonOptions.deleteConfigs.split(","): _*))
.tenant(commonOptions.configTenant)
.name(commonOptions.configName),
commonOptions.getAuthHeader)
}
}

View File

@ -75,10 +75,22 @@ class CommonOptions {
description = Array("The username of the TENANT_USER level."))
private[cli] var configName: String = _
@Option(
names = Array("--upsert-configs"),
paramLabel = "k1:v1,k2:v2,k3:v3...",
description = Array("The dynamic configs to upsert in the format of `[key]:[value]`."))
private[cli] var upsertConfigs: String = _
@Option(
names = Array("--delete-configs"),
paramLabel = "c1,c2,c3...",
description = Array("The comma separated dynamic configs to delete."))
private[cli] var deleteConfigs: String = _
@Option(
names = Array("--apps"),
paramLabel = "appId",
description = Array("The application Id list seperated by comma."))
description = Array("The application Id list separated by comma."))
private[cli] var apps: String = _
@Option(

View File

@ -88,6 +88,12 @@ final class MasterOptions {
@Option(names = Array("--show-dynamic-conf"), description = Array("Show dynamic master conf"))
private[master] var showDynamicConf: Boolean = _
@Option(names = Array("--upsert-dynamic-conf"), description = Array("Upsert dynamic master conf"))
private[master] var upsertDynamicConf: Boolean = _
@Option(names = Array("--delete-dynamic-conf"), description = Array("Delete dynamic master conf"))
private[master] var deleteDynamicConf: Boolean = _
@Option(names = Array("--show-thread-dump"), description = Array("Show master thread dump"))
private[master] var showThreadDump: Boolean = _

View File

@ -109,6 +109,10 @@ trait MasterSubcommand extends BaseCommand {
private[master] def runShowDynamicConf: DynamicConfigResponse
private[master] def runUpsertDynamicConf: HandleResponse
private[master] def runDeleteDynamicConf: HandleResponse
private[master] def runShowThreadDump: ThreadStackResponse
private[master] def reviseLostShuffles: HandleResponse

View File

@ -51,6 +51,8 @@ class MasterSubcommandImpl extends MasterSubcommand {
if (masterOptions.showConf) log(runShowConf)
if (masterOptions.showContainerInfo) log(runShowContainerInfo)
if (masterOptions.showDynamicConf) log(runShowDynamicConf)
if (masterOptions.upsertDynamicConf) log(runUpsertDynamicConf)
if (masterOptions.deleteDynamicConf) log(runDeleteDynamicConf)
if (masterOptions.showThreadDump) log(runShowThreadDump)
if (masterOptions.reviseLostShuffles) log(reviseLostShuffles)
if (masterOptions.deleteApps) log(deleteApps)
@ -212,6 +214,14 @@ class MasterSubcommandImpl extends MasterSubcommand {
commonOptions.configName,
commonOptions.getAuthHeader)
private[master] def runUpsertDynamicConf: HandleResponse = {
upsertDynamicConf(commonOptions, spec, confApi.upsertDynamicConf)
}
private[master] def runDeleteDynamicConf: HandleResponse = {
deleteDynamicConf(commonOptions, spec, confApi.deleteDynamicConf)
}
private[master] def runShowThreadDump: ThreadStackResponse =
defaultApi.getThreadDump(commonOptions.getAuthHeader)

View File

@ -70,6 +70,12 @@ final class WorkerOptions {
@Option(names = Array("--show-dynamic-conf"), description = Array("Show dynamic worker conf"))
private[worker] var showDynamicConf: Boolean = _
@Option(names = Array("--upsert-dynamic-conf"), description = Array("Upsert dynamic worker conf"))
private[worker] var upsertDynamicConf: Boolean = _
@Option(names = Array("--delete-dynamic-conf"), description = Array("Delete dynamic worker conf"))
private[worker] var deleteDynamicConf: Boolean = _
@Option(names = Array("--show-thread-dump"), description = Array("Show worker thread dump"))
private[worker] var showThreadDump: Boolean = _

View File

@ -80,6 +80,10 @@ trait WorkerSubcommand extends BaseCommand {
private[worker] def runShowDynamicConf: DynamicConfigResponse
private[worker] def runUpsertDynamicConf: HandleResponse
private[worker] def runDeleteDynamicConf: HandleResponse
private[worker] def runShowThreadDump: ThreadStackResponse
}

View File

@ -38,6 +38,8 @@ class WorkerSubcommandImpl extends WorkerSubcommand {
if (workerOptions.showConf) log(runShowConf)
if (workerOptions.showContainerInfo) log(runShowContainerInfo)
if (workerOptions.showDynamicConf) log(runShowDynamicConf)
if (workerOptions.upsertDynamicConf) log(runUpsertDynamicConf)
if (workerOptions.deleteDynamicConf) log(runDeleteDynamicConf)
if (workerOptions.showThreadDump) log(runShowThreadDump)
}
@ -78,6 +80,14 @@ class WorkerSubcommandImpl extends WorkerSubcommand {
commonOptions.configName,
commonOptions.getAuthHeader)
private[worker] def runUpsertDynamicConf: HandleResponse = {
upsertDynamicConf(commonOptions, spec, confApi.upsertDynamicConf)
}
private[worker] def runDeleteDynamicConf: HandleResponse = {
deleteDynamicConf(commonOptions, spec, confApi.deleteDynamicConf)
}
private[worker] def runShowThreadDump: ThreadStackResponse =
defaultApi.getThreadDump(commonOptions.getAuthHeader)

View File

@ -148,6 +148,28 @@ class TestCelebornCliCommands extends CelebornFunSuite with MiniClusterFeature {
captureOutputAndValidateResponse(args, "")
}
test("worker --upsert-dynamic-conf") {
cancel("This test is temporarily disabled since dynamic conf is not enabled in unit tests.")
val args = prepareWorkerArgs() ++ Array(
"--upsert-dynamic-conf",
"--config-level",
"SYSTEM",
"--upsert-configs",
"key1:val1,key2:val2")
captureOutputAndValidateResponse(args, "success: true")
}
test("worker --delete-dynamic-conf") {
cancel("This test is temporarily disabled since dynamic conf is not enabled in unit tests.")
val args = prepareWorkerArgs() ++ Array(
"--delete-dynamic-conf",
"--config-level",
"SYSTEM",
"--delete-configs",
"conf1,conf2")
captureOutputAndValidateResponse(args, "success: true")
}
test("worker --show-thread-dump") {
val args = prepareWorkerArgs() :+ "--show-thread-dump"
captureOutputAndValidateResponse(args, "ThreadStackResponse")
@ -230,6 +252,28 @@ class TestCelebornCliCommands extends CelebornFunSuite with MiniClusterFeature {
captureOutputAndValidateResponse(args, "")
}
test("master --upsert-dynamic-conf") {
cancel("This test is temporarily disabled since dynamic conf is not enabled in unit tests.")
val args = prepareMasterArgs() ++ Array(
"--upsert-dynamic-conf",
"--config-level",
"SYSTEM",
"--upsert-configs",
"key1:val1,key2:val2")
captureOutputAndValidateResponse(args, "success: true")
}
test("master --delete-dynamic-conf") {
cancel("This test is temporarily disabled since dynamic conf is not enabled in unit tests.")
val args = prepareMasterArgs() ++ Array(
"--delete-dynamic-conf",
"--config-level",
"SYSTEM",
"--delete-configs",
"conf1,conf2")
captureOutputAndValidateResponse(args, "success: true")
}
test("master --show-thread-dump") {
val args = prepareMasterArgs() :+ "--show-thread-dump"
captureOutputAndValidateResponse(args, "ThreadStackResponse")

View File

@ -81,12 +81,14 @@ $ celeborn-cli master -h
Usage: celeborn-cli master [-hV] [--apps=appId] [--auth-header=authHeader]
[--cluster=cluster_alias] [--config-level=level]
[--config-name=username] [--config-tenant=tenant_id]
[--host-list=h1,h2,h3...] [--hostport=host:port]
[--worker-ids=w1,w2,w3...] (--show-masters-info |
--show-cluster-apps | --show-cluster-shuffles |
--exclude-worker | --remove-excluded-worker |
--send-worker-event=IMMEDIATELY | DECOMMISSION |
DECOMMISSION_THEN_IDLE | GRACEFUL | RECOMMISSION |
[--delete-configs=c1,c2,c3...] [--host-list=h1,h2,
h3...] [--hostport=host:port] [--upsert-configs=k1:
v1,k2:v2,k3:v3...] [--worker-ids=w1,w2,w3...]
(--show-masters-info | --show-cluster-apps |
--show-cluster-shuffles | --exclude-worker |
--remove-excluded-worker |
--send-worker-event=IMMEDIATELY | DECOMMISSION |
DECOMMISSION_THEN_IDLE | GRACEFUL | RECOMMISSION |
NONE | --show-worker-event-info |
--show-lost-workers | --show-excluded-workers |
--show-manual-excluded-workers |
@ -94,7 +96,8 @@ Usage: celeborn-cli master [-hV] [--apps=appId] [--auth-header=authHeader]
--show-decommissioning-workers |
--show-lifecycle-managers | --show-workers |
--show-workers-topology | --show-conf |
--show-dynamic-conf | --show-thread-dump |
--show-dynamic-conf | --upsert-dynamic-conf |
--delete-dynamic-conf | --show-thread-dump |
--show-container-info | --add-cluster-alias=alias |
--remove-cluster-alias=alias |
--remove-workers-unavailable-info |
@ -105,7 +108,7 @@ Usage: celeborn-cli master [-hV] [--apps=appId] [--auth-header=authHeader]
--add-cluster-alias=alias
Add alias to use in the cli for the given set of
masters
--apps=appId The application Id list seperated by comma.
--apps=appId The application Id list separated by comma.
--auth-header=authHeader
The http `Authorization` header for
authentication. It should be in the format of
@ -118,6 +121,9 @@ Usage: celeborn-cli master [-hV] [--apps=appId] [--auth-header=authHeader]
--config-tenant=tenant_id
The tenant id of TENANT or TENANT_USER level.
--delete-apps Delete resource of an application.
--delete-configs=c1,c2,c3...
The comma separated dynamic configs to delete.
--delete-dynamic-conf Delete dynamic master conf
--exclude-worker Exclude workers by ID
-h, --help Show this help message and exit.
--host-list=h1,h2,h3...
@ -165,6 +171,10 @@ Usage: celeborn-cli master [-hV] [--apps=appId] [--auth-header=authHeader]
--update-interruption-notices=workerId1=timestamp,workerId2=timestamp,
workerId3=timestamp
Update interruption notices of workers.
--upsert-configs=k1:v1,k2:v2,k3:v3...
The dynamic configs to upsert in the format of `
[key]:[value]`.
--upsert-dynamic-conf Upsert dynamic master conf
-V, --version Print version information and exit.
--worker-ids=w1,w2,w3...
List of workerIds to pass to the command. Each
@ -179,16 +189,19 @@ $ celeborn-cli worker -h
Usage: celeborn-cli worker [-hV] [--apps=appId] [--auth-header=authHeader]
[--cluster=cluster_alias] [--config-level=level]
[--config-name=username] [--config-tenant=tenant_id]
[--host-list=h1,h2,h3...] [--hostport=host:port]
[--worker-ids=w1,w2,w3...] (--show-worker-info |
--show-apps-on-worker | --show-shuffles-on-worker |
[--delete-configs=c1,c2,c3...] [--host-list=h1,h2,
h3...] [--hostport=host:port] [--upsert-configs=k1:
v1,k2:v2,k3:v3...] [--worker-ids=w1,w2,w3...]
(--show-worker-info | --show-apps-on-worker |
--show-shuffles-on-worker |
--show-partition-location-info |
--show-unavailable-peers | --is-shutdown |
--is-decommissioning | --is-registered |
--exit=exit_type | --show-conf |
--show-container-info | --show-dynamic-conf |
--upsert-dynamic-conf | --delete-dynamic-conf |
--show-thread-dump)
--apps=appId The application Id list seperated by comma.
--apps=appId The application Id list separated by comma.
--auth-header=authHeader
The http `Authorization` header for
authentication. It should be in the format of
@ -200,6 +213,9 @@ Usage: celeborn-cli worker [-hV] [--apps=appId] [--auth-header=authHeader]
--config-name=username The username of the TENANT_USER level.
--config-tenant=tenant_id
The tenant id of TENANT or TENANT_USER level.
--delete-configs=c1,c2,c3...
The comma separated dynamic configs to delete.
--delete-dynamic-conf Delete dynamic worker conf
--exit=exit_type Exit the application with a specified type
-h, --help Show this help message and exit.
--host-list=h1,h2,h3...
@ -220,6 +236,10 @@ Usage: celeborn-cli worker [-hV] [--apps=appId] [--auth-header=authHeader]
--show-unavailable-peers
Show unavailable peers
--show-worker-info Show worker info
--upsert-configs=k1:v1,k2:v2,k3:v3...
The dynamic configs to upsert in the format of `
[key]:[value]`.
--upsert-dynamic-conf Upsert dynamic worker conf
-V, --version Print version information and exit.
--worker-ids=w1,w2,w3...
List of workerIds to pass to the command. Each

View File

@ -26,7 +26,10 @@ import org.apache.celeborn.rest.v1.master.invoker.Configuration;
import org.apache.celeborn.rest.v1.master.invoker.Pair;
import org.apache.celeborn.rest.v1.model.ConfResponse;
import org.apache.celeborn.rest.v1.model.DeleteDynamicConfigRequest;
import org.apache.celeborn.rest.v1.model.DynamicConfigResponse;
import org.apache.celeborn.rest.v1.model.HandleResponse;
import org.apache.celeborn.rest.v1.model.UpsertDynamicConfigRequest;
import java.util.ArrayList;
@ -47,6 +50,75 @@ public class ConfApi extends BaseApi {
super(apiClient);
}
/**
*
* Delete the dynamic configs. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
* @param deleteDynamicConfigRequest (optional)
* @return HandleResponse
* @throws ApiException if fails to make API call
*/
public HandleResponse deleteDynamicConf(DeleteDynamicConfigRequest deleteDynamicConfigRequest) throws ApiException {
return this.deleteDynamicConf(deleteDynamicConfigRequest, Collections.emptyMap());
}
/**
*
* Delete the dynamic configs. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
* @param deleteDynamicConfigRequest (optional)
* @param additionalHeaders additionalHeaders for this call
* @return HandleResponse
* @throws ApiException if fails to make API call
*/
public HandleResponse deleteDynamicConf(DeleteDynamicConfigRequest deleteDynamicConfigRequest, Map<String, String> additionalHeaders) throws ApiException {
Object localVarPostBody = deleteDynamicConfigRequest;
// create path and map variables
String localVarPath = "/api/v1/conf/dynamic/delete";
StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
String localVarQueryParameterBaseName;
List<Pair> localVarQueryParams = new ArrayList<Pair>();
List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
Map<String, String> localVarHeaderParams = new HashMap<String, String>();
Map<String, String> localVarCookieParams = new HashMap<String, String>();
Map<String, Object> localVarFormParams = new HashMap<String, Object>();
localVarHeaderParams.putAll(additionalHeaders);
final String[] localVarAccepts = {
"application/json"
};
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);
final String[] localVarContentTypes = {
"application/json"
};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);
String[] localVarAuthNames = new String[] { "basic" };
TypeReference<HandleResponse> localVarReturnType = new TypeReference<HandleResponse>() {};
return apiClient.invokeAPI(
localVarPath,
"POST",
localVarQueryParams,
localVarCollectionQueryParams,
localVarQueryStringJoiner.toString(),
localVarPostBody,
localVarHeaderParams,
localVarCookieParams,
localVarFormParams,
localVarAccept,
localVarContentType,
localVarAuthNames,
localVarReturnType
);
}
/**
*
* List the conf setting.
@ -190,6 +262,75 @@ public class ConfApi extends BaseApi {
);
}
/**
*
* Upsert the dynamic configs. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
* @param upsertDynamicConfigRequest (optional)
* @return HandleResponse
* @throws ApiException if fails to make API call
*/
public HandleResponse upsertDynamicConf(UpsertDynamicConfigRequest upsertDynamicConfigRequest) throws ApiException {
return this.upsertDynamicConf(upsertDynamicConfigRequest, Collections.emptyMap());
}
/**
*
* Upsert the dynamic configs. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
* @param upsertDynamicConfigRequest (optional)
* @param additionalHeaders additionalHeaders for this call
* @return HandleResponse
* @throws ApiException if fails to make API call
*/
public HandleResponse upsertDynamicConf(UpsertDynamicConfigRequest upsertDynamicConfigRequest, Map<String, String> additionalHeaders) throws ApiException {
Object localVarPostBody = upsertDynamicConfigRequest;
// create path and map variables
String localVarPath = "/api/v1/conf/dynamic/upsert";
StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
String localVarQueryParameterBaseName;
List<Pair> localVarQueryParams = new ArrayList<Pair>();
List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
Map<String, String> localVarHeaderParams = new HashMap<String, String>();
Map<String, String> localVarCookieParams = new HashMap<String, String>();
Map<String, Object> localVarFormParams = new HashMap<String, Object>();
localVarHeaderParams.putAll(additionalHeaders);
final String[] localVarAccepts = {
"application/json"
};
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);
final String[] localVarContentTypes = {
"application/json"
};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);
String[] localVarAuthNames = new String[] { "basic" };
TypeReference<HandleResponse> localVarReturnType = new TypeReference<HandleResponse>() {};
return apiClient.invokeAPI(
localVarPath,
"POST",
localVarQueryParams,
localVarCollectionQueryParams,
localVarQueryStringJoiner.toString(),
localVarPostBody,
localVarHeaderParams,
localVarCookieParams,
localVarFormParams,
localVarAccept,
localVarContentType,
localVarAuthNames,
localVarReturnType
);
}
@Override
public <T> T invokeAPI(String url, String method, Object request, TypeReference<T> returnType, Map<String, String> additionalHeaders) throws ApiException {
String localVarPath = url.replace(apiClient.getBaseURL(), "");
@ -208,7 +349,7 @@ public class ConfApi extends BaseApi {
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);
final String[] localVarContentTypes = {
"application/json"
};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);

View File

@ -0,0 +1,249 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.rest.v1.model;
import java.util.Objects;
import java.util.Arrays;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.annotation.JsonValue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.annotation.JsonTypeName;
/**
* DeleteDynamicConfigRequest
*/
@JsonPropertyOrder({
DeleteDynamicConfigRequest.JSON_PROPERTY_LEVEL,
DeleteDynamicConfigRequest.JSON_PROPERTY_CONFIGS,
DeleteDynamicConfigRequest.JSON_PROPERTY_TENANT,
DeleteDynamicConfigRequest.JSON_PROPERTY_NAME
})
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator version: 7.8.0")
public class DeleteDynamicConfigRequest {
/**
* The config level of dynamic configs.
*/
public enum LevelEnum {
SYSTEM("SYSTEM"),
TENANT("TENANT"),
TENANT_USER("TENANT_USER");
private String value;
LevelEnum(String value) {
this.value = value;
}
@JsonValue
public String getValue() {
return value;
}
@Override
public String toString() {
return String.valueOf(value);
}
@JsonCreator
public static LevelEnum fromValue(String value) {
for (LevelEnum b : LevelEnum.values()) {
if (b.value.equalsIgnoreCase(value)) {
return b;
}
}
throw new IllegalArgumentException("Unexpected value '" + value + "'");
}
}
public static final String JSON_PROPERTY_LEVEL = "level";
private LevelEnum level;
public static final String JSON_PROPERTY_CONFIGS = "configs";
private List<String> configs = new ArrayList<>();
public static final String JSON_PROPERTY_TENANT = "tenant";
private String tenant;
public static final String JSON_PROPERTY_NAME = "name";
private String name;
public DeleteDynamicConfigRequest() {
}
public DeleteDynamicConfigRequest level(LevelEnum level) {
this.level = level;
return this;
}
/**
* The config level of dynamic configs.
* @return level
*/
@javax.annotation.Nonnull
@JsonProperty(JSON_PROPERTY_LEVEL)
@JsonInclude(value = JsonInclude.Include.ALWAYS)
public LevelEnum getLevel() {
return level;
}
@JsonProperty(JSON_PROPERTY_LEVEL)
@JsonInclude(value = JsonInclude.Include.ALWAYS)
public void setLevel(LevelEnum level) {
this.level = level;
}
public DeleteDynamicConfigRequest configs(List<String> configs) {
this.configs = configs;
return this;
}
public DeleteDynamicConfigRequest addConfigsItem(String configsItem) {
if (this.configs == null) {
this.configs = new ArrayList<>();
}
this.configs.add(configsItem);
return this;
}
/**
* The dynamic configs to delete.
* @return configs
*/
@javax.annotation.Nonnull
@JsonProperty(JSON_PROPERTY_CONFIGS)
@JsonInclude(value = JsonInclude.Include.ALWAYS)
public List<String> getConfigs() {
return configs;
}
@JsonProperty(JSON_PROPERTY_CONFIGS)
@JsonInclude(value = JsonInclude.Include.ALWAYS)
public void setConfigs(List<String> configs) {
this.configs = configs;
}
public DeleteDynamicConfigRequest tenant(String tenant) {
this.tenant = tenant;
return this;
}
/**
* The tenant id of TENANT or TENANT_USER level.
* @return tenant
*/
@javax.annotation.Nullable
@JsonProperty(JSON_PROPERTY_TENANT)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public String getTenant() {
return tenant;
}
@JsonProperty(JSON_PROPERTY_TENANT)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public void setTenant(String tenant) {
this.tenant = tenant;
}
public DeleteDynamicConfigRequest name(String name) {
this.name = name;
return this;
}
/**
* The user name of TENANT_USER level.
* @return name
*/
@javax.annotation.Nullable
@JsonProperty(JSON_PROPERTY_NAME)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public String getName() {
return name;
}
@JsonProperty(JSON_PROPERTY_NAME)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public void setName(String name) {
this.name = name;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DeleteDynamicConfigRequest deleteDynamicConfigRequest = (DeleteDynamicConfigRequest) o;
return Objects.equals(this.level, deleteDynamicConfigRequest.level) &&
Objects.equals(this.configs, deleteDynamicConfigRequest.configs) &&
Objects.equals(this.tenant, deleteDynamicConfigRequest.tenant) &&
Objects.equals(this.name, deleteDynamicConfigRequest.name);
}
@Override
public int hashCode() {
return Objects.hash(level, configs, tenant, name);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class DeleteDynamicConfigRequest {\n");
sb.append(" level: ").append(toIndentedString(level)).append("\n");
sb.append(" configs: ").append(toIndentedString(configs)).append("\n");
sb.append(" tenant: ").append(toIndentedString(tenant)).append("\n");
sb.append(" name: ").append(toIndentedString(name)).append("\n");
sb.append("}");
return sb.toString();
}
/**
* Convert the given object to string with each line indented by 4 spaces
* (except the first line).
*/
private String toIndentedString(Object o) {
if (o == null) {
return "null";
}
return o.toString().replace("\n", "\n ");
}
}

View File

@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.rest.v1.model;
import java.util.Objects;
import java.util.Arrays;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.annotation.JsonValue;
import java.util.HashMap;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.annotation.JsonTypeName;
/**
* UpsertDynamicConfigRequest
*/
@JsonPropertyOrder({
UpsertDynamicConfigRequest.JSON_PROPERTY_LEVEL,
UpsertDynamicConfigRequest.JSON_PROPERTY_CONFIGS,
UpsertDynamicConfigRequest.JSON_PROPERTY_TENANT,
UpsertDynamicConfigRequest.JSON_PROPERTY_NAME
})
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", comments = "Generator version: 7.8.0")
public class UpsertDynamicConfigRequest {
/**
* The config level of dynamic configs.
*/
public enum LevelEnum {
SYSTEM("SYSTEM"),
TENANT("TENANT"),
TENANT_USER("TENANT_USER");
private String value;
LevelEnum(String value) {
this.value = value;
}
@JsonValue
public String getValue() {
return value;
}
@Override
public String toString() {
return String.valueOf(value);
}
@JsonCreator
public static LevelEnum fromValue(String value) {
for (LevelEnum b : LevelEnum.values()) {
if (b.value.equalsIgnoreCase(value)) {
return b;
}
}
throw new IllegalArgumentException("Unexpected value '" + value + "'");
}
}
public static final String JSON_PROPERTY_LEVEL = "level";
private LevelEnum level;
public static final String JSON_PROPERTY_CONFIGS = "configs";
private Map<String, String> configs = new HashMap<>();
public static final String JSON_PROPERTY_TENANT = "tenant";
private String tenant;
public static final String JSON_PROPERTY_NAME = "name";
private String name;
public UpsertDynamicConfigRequest() {
}
public UpsertDynamicConfigRequest level(LevelEnum level) {
this.level = level;
return this;
}
/**
* The config level of dynamic configs.
* @return level
*/
@javax.annotation.Nonnull
@JsonProperty(JSON_PROPERTY_LEVEL)
@JsonInclude(value = JsonInclude.Include.ALWAYS)
public LevelEnum getLevel() {
return level;
}
@JsonProperty(JSON_PROPERTY_LEVEL)
@JsonInclude(value = JsonInclude.Include.ALWAYS)
public void setLevel(LevelEnum level) {
this.level = level;
}
public UpsertDynamicConfigRequest configs(Map<String, String> configs) {
this.configs = configs;
return this;
}
public UpsertDynamicConfigRequest putConfigsItem(String key, String configsItem) {
this.configs.put(key, configsItem);
return this;
}
/**
* The dynamic configs to upsert.
* @return configs
*/
@javax.annotation.Nonnull
@JsonProperty(JSON_PROPERTY_CONFIGS)
@JsonInclude(value = JsonInclude.Include.ALWAYS)
public Map<String, String> getConfigs() {
return configs;
}
@JsonProperty(JSON_PROPERTY_CONFIGS)
@JsonInclude(value = JsonInclude.Include.ALWAYS)
public void setConfigs(Map<String, String> configs) {
this.configs = configs;
}
public UpsertDynamicConfigRequest tenant(String tenant) {
this.tenant = tenant;
return this;
}
/**
* The tenant id of TENANT or TENANT_USER level.
* @return tenant
*/
@javax.annotation.Nullable
@JsonProperty(JSON_PROPERTY_TENANT)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public String getTenant() {
return tenant;
}
@JsonProperty(JSON_PROPERTY_TENANT)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public void setTenant(String tenant) {
this.tenant = tenant;
}
public UpsertDynamicConfigRequest name(String name) {
this.name = name;
return this;
}
/**
* The user name of TENANT_USER level.
* @return name
*/
@javax.annotation.Nullable
@JsonProperty(JSON_PROPERTY_NAME)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public String getName() {
return name;
}
@JsonProperty(JSON_PROPERTY_NAME)
@JsonInclude(value = JsonInclude.Include.USE_DEFAULTS)
public void setName(String name) {
this.name = name;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UpsertDynamicConfigRequest upsertDynamicConfigRequest = (UpsertDynamicConfigRequest) o;
return Objects.equals(this.level, upsertDynamicConfigRequest.level) &&
Objects.equals(this.configs, upsertDynamicConfigRequest.configs) &&
Objects.equals(this.tenant, upsertDynamicConfigRequest.tenant) &&
Objects.equals(this.name, upsertDynamicConfigRequest.name);
}
@Override
public int hashCode() {
return Objects.hash(level, configs, tenant, name);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class UpsertDynamicConfigRequest {\n");
sb.append(" level: ").append(toIndentedString(level)).append("\n");
sb.append(" configs: ").append(toIndentedString(configs)).append("\n");
sb.append(" tenant: ").append(toIndentedString(tenant)).append("\n");
sb.append(" name: ").append(toIndentedString(name)).append("\n");
sb.append("}");
return sb.toString();
}
/**
* Convert the given object to string with each line indented by 4 spaces
* (except the first line).
*/
private String toIndentedString(Object o) {
if (o == null) {
return "null";
}
return o.toString().replace("\n", "\n ");
}
}

View File

@ -26,7 +26,10 @@ import org.apache.celeborn.rest.v1.worker.invoker.Configuration;
import org.apache.celeborn.rest.v1.worker.invoker.Pair;
import org.apache.celeborn.rest.v1.model.ConfResponse;
import org.apache.celeborn.rest.v1.model.DeleteDynamicConfigRequest;
import org.apache.celeborn.rest.v1.model.DynamicConfigResponse;
import org.apache.celeborn.rest.v1.model.HandleResponse;
import org.apache.celeborn.rest.v1.model.UpsertDynamicConfigRequest;
import java.util.ArrayList;
@ -47,6 +50,75 @@ public class ConfApi extends BaseApi {
super(apiClient);
}
/**
*
* Delete the dynamic configs. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
* @param deleteDynamicConfigRequest (optional)
* @return HandleResponse
* @throws ApiException if fails to make API call
*/
public HandleResponse deleteDynamicConf(DeleteDynamicConfigRequest deleteDynamicConfigRequest) throws ApiException {
return this.deleteDynamicConf(deleteDynamicConfigRequest, Collections.emptyMap());
}
/**
*
* Delete the dynamic configs. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
* @param deleteDynamicConfigRequest (optional)
* @param additionalHeaders additionalHeaders for this call
* @return HandleResponse
* @throws ApiException if fails to make API call
*/
public HandleResponse deleteDynamicConf(DeleteDynamicConfigRequest deleteDynamicConfigRequest, Map<String, String> additionalHeaders) throws ApiException {
Object localVarPostBody = deleteDynamicConfigRequest;
// create path and map variables
String localVarPath = "/api/v1/conf/dynamic/delete";
StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
String localVarQueryParameterBaseName;
List<Pair> localVarQueryParams = new ArrayList<Pair>();
List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
Map<String, String> localVarHeaderParams = new HashMap<String, String>();
Map<String, String> localVarCookieParams = new HashMap<String, String>();
Map<String, Object> localVarFormParams = new HashMap<String, Object>();
localVarHeaderParams.putAll(additionalHeaders);
final String[] localVarAccepts = {
"application/json"
};
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);
final String[] localVarContentTypes = {
"application/json"
};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);
String[] localVarAuthNames = new String[] { "basic" };
TypeReference<HandleResponse> localVarReturnType = new TypeReference<HandleResponse>() {};
return apiClient.invokeAPI(
localVarPath,
"POST",
localVarQueryParams,
localVarCollectionQueryParams,
localVarQueryStringJoiner.toString(),
localVarPostBody,
localVarHeaderParams,
localVarCookieParams,
localVarFormParams,
localVarAccept,
localVarContentType,
localVarAuthNames,
localVarReturnType
);
}
/**
*
* List the conf setting.
@ -190,6 +262,75 @@ public class ConfApi extends BaseApi {
);
}
/**
*
* Upsert the dynamic configs. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
* @param upsertDynamicConfigRequest (optional)
* @return HandleResponse
* @throws ApiException if fails to make API call
*/
public HandleResponse upsertDynamicConf(UpsertDynamicConfigRequest upsertDynamicConfigRequest) throws ApiException {
return this.upsertDynamicConf(upsertDynamicConfigRequest, Collections.emptyMap());
}
/**
*
* Upsert the dynamic configs. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
* @param upsertDynamicConfigRequest (optional)
* @param additionalHeaders additionalHeaders for this call
* @return HandleResponse
* @throws ApiException if fails to make API call
*/
public HandleResponse upsertDynamicConf(UpsertDynamicConfigRequest upsertDynamicConfigRequest, Map<String, String> additionalHeaders) throws ApiException {
Object localVarPostBody = upsertDynamicConfigRequest;
// create path and map variables
String localVarPath = "/api/v1/conf/dynamic/upsert";
StringJoiner localVarQueryStringJoiner = new StringJoiner("&");
String localVarQueryParameterBaseName;
List<Pair> localVarQueryParams = new ArrayList<Pair>();
List<Pair> localVarCollectionQueryParams = new ArrayList<Pair>();
Map<String, String> localVarHeaderParams = new HashMap<String, String>();
Map<String, String> localVarCookieParams = new HashMap<String, String>();
Map<String, Object> localVarFormParams = new HashMap<String, Object>();
localVarHeaderParams.putAll(additionalHeaders);
final String[] localVarAccepts = {
"application/json"
};
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);
final String[] localVarContentTypes = {
"application/json"
};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);
String[] localVarAuthNames = new String[] { "basic" };
TypeReference<HandleResponse> localVarReturnType = new TypeReference<HandleResponse>() {};
return apiClient.invokeAPI(
localVarPath,
"POST",
localVarQueryParams,
localVarCollectionQueryParams,
localVarQueryStringJoiner.toString(),
localVarPostBody,
localVarHeaderParams,
localVarCookieParams,
localVarFormParams,
localVarAccept,
localVarContentType,
localVarAuthNames,
localVarReturnType
);
}
@Override
public <T> T invokeAPI(String url, String method, Object request, TypeReference<T> returnType, Map<String, String> additionalHeaders) throws ApiException {
String localVarPath = url.replace(apiClient.getBaseURL(), "");
@ -208,7 +349,7 @@ public class ConfApi extends BaseApi {
final String localVarAccept = apiClient.selectHeaderAccept(localVarAccepts);
final String[] localVarContentTypes = {
"application/json"
};
final String localVarContentType = apiClient.selectHeaderContentType(localVarContentTypes);

View File

@ -93,6 +93,58 @@ paths:
"503":
description: Dynamic configuration is disabled.
/api/v1/conf/dynamic/upsert:
post:
tags:
- Conf
operationId: upsertDynamicConf
description: |
Upsert the dynamic configs.
The parameter level specifies the config level of dynamic configs.
The parameter tenant specifies the tenant id of TENANT or TENANT_USER level.
The parameter name specifies the user name of TENANT_USER level.
Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/UpsertDynamicConfigRequest'
responses:
"200":
description: The request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/HandleResponse'
"503":
description: Dynamic configuration is disabled.
/api/v1/conf/dynamic/delete:
post:
tags:
- Conf
operationId: deleteDynamicConf
description: |
Delete the dynamic configs.
The parameter level specifies the config level of dynamic configs.
The parameter tenant specifies the tenant id of TENANT or TENANT_USER level.
The parameter name specifies the user name of TENANT_USER level.
Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/DeleteDynamicConfigRequest'
responses:
"200":
description: The request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/HandleResponse'
"503":
description: Dynamic configuration is disabled.
/api/v1/thread_dump:
get:
operationId: getThreadDump
@ -588,6 +640,56 @@ components:
items:
$ref: '#/components/schemas/DynamicConfig'
UpsertDynamicConfigRequest:
type: object
properties:
level:
type: string
description: The config level of dynamic configs.
enum:
- SYSTEM
- TENANT
- TENANT_USER
configs:
type: object
description: The dynamic configs to upsert.
additionalProperties:
type: string
tenant:
type: string
description: The tenant id of TENANT or TENANT_USER level.
name:
type: string
description: The user name of TENANT_USER level.
required:
- level
- configs
DeleteDynamicConfigRequest:
type: object
properties:
level:
type: string
description: The config level of dynamic configs.
enum:
- SYSTEM
- TENANT
- TENANT_USER
configs:
type: array
description: The dynamic configs to delete.
items:
type: string
tenant:
type: string
description: The tenant id of TENANT or TENANT_USER level.
name:
type: string
description: The user name of TENANT_USER level.
required:
- level
- configs
ThreadStack:
type: object
properties:

View File

@ -93,6 +93,58 @@ paths:
"503":
description: Dynamic configuration is disabled.
/api/v1/conf/dynamic/upsert:
post:
tags:
- Conf
operationId: upsertDynamicConf
description: |
Upsert the dynamic configs.
The parameter level specifies the config level of dynamic configs.
The parameter tenant specifies the tenant id of TENANT or TENANT_USER level.
The parameter name specifies the user name of TENANT_USER level.
Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
requestBody:
content:
application/json:
schema:
$ref: './master_rest_v1.yaml#/components/schemas/UpsertDynamicConfigRequest'
responses:
"200":
description: The request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/HandleResponse'
"503":
description: Dynamic configuration is disabled.
/api/v1/conf/dynamic/delete:
post:
tags:
- Conf
operationId: deleteDynamicConf
description: |
Delete the dynamic configs.
The parameter level specifies the config level of dynamic configs.
The parameter tenant specifies the tenant id of TENANT or TENANT_USER level.
The parameter name specifies the user name of TENANT_USER level.
Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
requestBody:
content:
application/json:
schema:
$ref: './master_rest_v1.yaml#/components/schemas/DeleteDynamicConfigRequest'
responses:
"200":
description: The request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/HandleResponse'
"503":
description: Dynamic configuration is disabled.
/api/v1/thread_dump:
get:
operationId: getThreadDump

View File

@ -19,6 +19,7 @@ package org.apache.celeborn.server.common.service.config;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.celeborn.common.CelebornConf;
@ -117,4 +118,42 @@ public interface ConfigService {
/** Shutdowns configuration management service. */
void shutdown();
/**
* Upsert the system level dynamic configurations of {@link SystemConfig}.
*
* @param systemConfigs The system level dynamic configurations to upsert.
*/
void upsertSystemConfig(Map<String, String> systemConfigs);
/**
* Upsert the tenant or tenant user level dynamic configurations of {@link TenantConfig}.
*
* @param configLevel The config level to upsert.
* @param tenantId The tenant id to upsert.
* @param name The name to upsert.
* @param tenantConfigs The tenant or tenant user level dynamic configurations to upsert.
*/
void upsertTenantConfig(
ConfigLevel configLevel, String tenantId, String name, Map<String, String> tenantConfigs);
/**
* Delete the system level dynamic configurations of {@link SystemConfig} by config keys.
*
* @param configKeys The config keys of system level dynamic configurations to delete.
*/
void deleteSystemConfigByKeys(List<String> configKeys);
/**
* Delete the tenant or tenant user level dynamic configurations of {@link TenantConfig} by config
* keys.
*
* @param configLevel The config level to delete.
* @param tenantId The tenant id to delete.
* @param name The name to delete.
* @param configKeys The config keys of tenant or tenant user level dynamic configurations to
* delete.
*/
void deleteTenantConfigByKeys(
ConfigLevel configLevel, String tenantId, String name, List<String> configKeys);
}

View File

@ -18,6 +18,8 @@
package org.apache.celeborn.server.common.service.config;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -59,6 +61,28 @@ public class DbConfigServiceImpl extends BaseConfigServiceImpl implements Config
Function.identity())));
}
@Override
public void upsertSystemConfig(Map<String, String> systemConfigs) {
iServiceManager.upsertSystemConfig(systemConfigs);
}
@Override
public void upsertTenantConfig(
ConfigLevel configLevel, String tenantId, String name, Map<String, String> tenantConfigs) {
iServiceManager.upsertTenantConfig(configLevel, tenantId, name, tenantConfigs);
}
@Override
public void deleteSystemConfigByKeys(List<String> configKeys) {
iServiceManager.deleteSystemConfigByKeys(configKeys);
}
@Override
public void deleteTenantConfigByKeys(
ConfigLevel configLevel, String tenantId, String name, List<String> configKeys) {
iServiceManager.deleteTenantConfigByKeys(configLevel, tenantId, name, configKeys);
}
@VisibleForTesting
public IServiceManager getServiceManager() {
return iServiceManager;

View File

@ -42,7 +42,7 @@ public class FsConfigServiceImpl extends BaseConfigServiceImpl implements Config
}
@Override
public synchronized void refreshCache() throws IOException {
public synchronized void refreshCache() {
try (FileInputStream fileInputStream = new FileInputStream(getConfigFile(System.getenv()))) {
Map<String, TenantConfig> tenantConfigs = new HashMap<>();
Map<Pair<String, String>, TenantConfig> tenantUserConfigs = new HashMap<>();
@ -79,9 +79,33 @@ public class FsConfigServiceImpl extends BaseConfigServiceImpl implements Config
tenantConfigAtomicReference.set(tenantConfigs);
tenantUserConfigAtomicReference.set(tenantUserConfigs);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void upsertSystemConfig(Map<String, String> systemConfigs) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void upsertTenantConfig(
ConfigLevel configLevel, String tenantId, String name, Map<String, String> tenantConfigs) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void deleteSystemConfigByKeys(List<String> configKeys) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void deleteTenantConfigByKeys(
ConfigLevel configLevel, String tenantId, String name, List<String> configKeys) {
throw new UnsupportedOperationException("Not supported yet.");
}
private Map<String, String> getConfigs(Map<String, Object> configMap) {
Map<String, Object> configs = (Map<String, Object>) configMap.get(CONF_CONFIG);
if (configs == null) return Collections.emptyMap();

View File

@ -18,7 +18,9 @@
package org.apache.celeborn.server.common.service.store;
import java.util.List;
import java.util.Map;
import org.apache.celeborn.server.common.service.config.ConfigLevel;
import org.apache.celeborn.server.common.service.config.TenantConfig;
import org.apache.celeborn.server.common.service.model.ClusterInfo;
import org.apache.celeborn.server.common.service.model.ClusterSystemConfig;
@ -37,4 +39,14 @@ public interface IServiceManager {
List<ClusterSystemConfig> getSystemConfig();
List<ClusterTag> getClusterTags();
void upsertSystemConfig(Map<String, String> systemConfigs);
void upsertTenantConfig(
ConfigLevel configLevel, String tenantId, String name, Map<String, String> tenantConfigs);
void deleteSystemConfigByKeys(List<String> configKeys);
void deleteTenantConfigByKeys(
ConfigLevel configLevel, String tenantId, String name, List<String> configKeys);
}

View File

@ -22,6 +22,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
@ -171,4 +172,85 @@ public class DbServiceManagerImpl implements IServiceManager {
return mapper.getClusterTags(clusterId);
}
}
@Override
public void upsertSystemConfig(Map<String, String> systemConfigs) {
try (SqlSession sqlSession = sqlSessionFactory.openSession(true)) {
ClusterSystemConfigMapper mapper = sqlSession.getMapper(ClusterSystemConfigMapper.class);
for (Entry<String, String> systemConfig : systemConfigs.entrySet()) {
ClusterSystemConfig config = new ClusterSystemConfig();
Instant now = Instant.now();
config.setClusterId(clusterId);
config.setConfigKey(systemConfig.getKey());
config.setConfigValue(systemConfig.getValue());
config.setGmtCreate(now);
config.setGmtModify(now);
int updated = mapper.update(config);
if (updated == 0) {
mapper.insert(config);
}
}
}
}
@Override
public void upsertTenantConfig(
ConfigLevel configLevel, String tenantId, String name, Map<String, String> tenantConfigs) {
try (SqlSession sqlSession = sqlSessionFactory.openSession(true)) {
ClusterTenantConfigMapper mapper = sqlSession.getMapper(ClusterTenantConfigMapper.class);
for (Entry<String, String> systemConfig : tenantConfigs.entrySet()) {
ClusterTenantConfig config = new ClusterTenantConfig();
Instant now = Instant.now();
config.setClusterId(clusterId);
config.setLevel(configLevel.name());
config.setTenantId(tenantId);
config.setName(name);
config.setConfigKey(systemConfig.getKey());
config.setConfigValue(systemConfig.getValue());
config.setGmtCreate(now);
config.setGmtModify(now);
int updated =
ConfigLevel.TENANT.equals(configLevel)
? mapper.updateConfig(config)
: mapper.updateUserConfig(config);
if (updated == 0) {
mapper.insert(config);
}
}
}
}
@Override
public void deleteSystemConfigByKeys(List<String> configKeys) {
try (SqlSession sqlSession = sqlSessionFactory.openSession(true)) {
ClusterSystemConfigMapper mapper = sqlSession.getMapper(ClusterSystemConfigMapper.class);
for (String configKey : configKeys) {
ClusterSystemConfig config = new ClusterSystemConfig();
config.setClusterId(clusterId);
config.setConfigKey(configKey);
mapper.delete(config);
}
}
}
@Override
public void deleteTenantConfigByKeys(
ConfigLevel configLevel, String tenantId, String name, List<String> configKeys) {
try (SqlSession sqlSession = sqlSessionFactory.openSession(true)) {
ClusterTenantConfigMapper mapper = sqlSession.getMapper(ClusterTenantConfigMapper.class);
for (String configKey : configKeys) {
ClusterTenantConfig config = new ClusterTenantConfig();
config.setClusterId(clusterId);
config.setLevel(configLevel.name());
config.setTenantId(tenantId);
config.setName(name);
config.setConfigKey(configKey);
if (ConfigLevel.TENANT.equals(configLevel)) {
mapper.deleteConfig(config);
} else {
mapper.deleteUserConfig(config);
}
}
}
}
}

View File

@ -19,12 +19,30 @@ package org.apache.celeborn.server.common.service.store.db.mapper;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.apache.celeborn.server.common.service.model.ClusterSystemConfig;
public interface ClusterSystemConfigMapper {
@Insert(
"INSERT INTO celeborn_cluster_system_config(cluster_id, config_key, config_value, gmt_create, gmt_modify) "
+ "VALUES (#{clusterId}, #{configKey}, #{configValue}, #{gmtCreate}, #{gmtModify})")
void insert(ClusterSystemConfig clusterSystemConfig);
@Update(
"UPDATE celeborn_cluster_system_config SET config_value = #{configValue}, gmt_modify = #{gmtModify} "
+ "WHERE cluster_id = #{clusterId} AND config_key = #{configKey}")
int update(ClusterSystemConfig clusterSystemConfig);
@Delete(
"DELETE FROM celeborn_cluster_system_config "
+ "WHERE cluster_id = #{clusterId} AND config_key = #{configKey}")
int delete(ClusterSystemConfig clusterSystemConfig);
@Select(
"SELECT id, cluster_id, config_key, config_value, type, gmt_create, gmt_modify "
+ "FROM celeborn_cluster_system_config WHERE cluster_id = #{clusterId}")

View File

@ -19,16 +19,44 @@ package org.apache.celeborn.server.common.service.store.db.mapper;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.apache.celeborn.server.common.service.model.ClusterTenantConfig;
public interface ClusterTenantConfigMapper {
@Insert(
"INSERT INTO celeborn_cluster_tenant_config(cluster_id, tenant_id, level, name, config_key, config_value, gmt_create, gmt_modify) "
+ "VALUES (#{clusterId}, #{tenantId}, #{level}, #{name}, #{configKey}, #{configValue}, #{gmtCreate}, #{gmtModify})")
void insert(ClusterTenantConfig clusterTenantConfig);
@Update(
"UPDATE celeborn_cluster_tenant_config SET config_value = #{configValue}, gmt_modify = #{gmtModify} "
+ "WHERE cluster_id = #{clusterId} AND level = #{level} AND tenant_id = #{tenantId} AND config_key = #{configKey}")
int updateConfig(ClusterTenantConfig clusterTenantConfig);
@Update(
"UPDATE celeborn_cluster_tenant_config SET config_value = #{configValue}, gmt_modify = #{gmtModify} "
+ "WHERE cluster_id = #{clusterId} AND level = #{level} AND tenant_id = #{tenantId} AND name = #{name} AND config_key = #{configKey}")
int updateUserConfig(ClusterTenantConfig clusterTenantConfig);
@Delete(
"DELETE FROM celeborn_cluster_tenant_config "
+ "WHERE cluster_id = #{clusterId} AND level = #{level} AND tenant_id = #{tenantId} AND config_key = #{configKey}")
int deleteConfig(ClusterTenantConfig clusterTenantConfig);
@Delete(
"DELETE FROM celeborn_cluster_tenant_config "
+ "WHERE cluster_id = #{clusterId} AND level = #{level} AND tenant_id = #{tenantId} AND name = #{name} AND config_key = #{configKey}")
int deleteUserConfig(ClusterTenantConfig clusterTenantConfig);
@Select(
"SELECT id, cluster_id, tenant_id, level, name, config_key, config_value, type, gmt_create, gmt_modify "
+ "FROM celeborn_cluster_tenant_config WHERE cluster_id = #{clusterId} AND level=#{level} LIMIT #{offset}, #{pageSize}")
+ "FROM celeborn_cluster_tenant_config WHERE cluster_id = #{clusterId} AND level = #{level} LIMIT #{offset}, #{pageSize}")
List<ClusterTenantConfig> getClusterTenantConfigs(
@Param("clusterId") int clusterId,
@Param("level") String configLevel,
@ -36,7 +64,7 @@ public interface ClusterTenantConfigMapper {
@Param("pageSize") int pageSize);
@Select(
"SELECT count(*) FROM celeborn_cluster_tenant_config WHERE cluster_id = #{clusterId} AND level=#{level}")
"SELECT count(*) FROM celeborn_cluster_tenant_config WHERE cluster_id = #{clusterId} AND level = #{level}")
int getClusterTenantConfigsNum(
@Param("clusterId") int clusterId, @Param("level") String configLevel);
}

View File

@ -45,7 +45,7 @@ CREATE TABLE IF NOT EXISTS celeborn_cluster_tenant_config
id int NOT NULL AUTO_INCREMENT,
cluster_id int NOT NULL,
tenant_id varchar(255) NOT NULL,
level varchar(255) NOT NULL COMMENT 'config level, valid level is TENANT,USER',
level varchar(255) NOT NULL COMMENT 'config level, valid level is TENANT,TENANT_USER',
name varchar(255) DEFAULT NULL COMMENT 'tenant sub user',
config_key varchar(255) NOT NULL,
config_value varchar(255) NOT NULL,

View File

@ -17,7 +17,7 @@
package org.apache.celeborn.server.common.http.api.v1
import javax.ws.rs.{Consumes, GET, Path, Produces, QueryParam, ServiceUnavailableException}
import javax.ws.rs.{Consumes, GET, Path, POST, Produces, QueryParam, ServiceUnavailableException}
import javax.ws.rs.core.MediaType
import scala.collection.JavaConverters._
@ -30,7 +30,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.util.Utils
import org.apache.celeborn.rest.v1.model.{ConfigData, ConfResponse, DynamicConfig, DynamicConfigResponse}
import org.apache.celeborn.rest.v1.model.{ConfigData, ConfResponse, DeleteDynamicConfigRequest, DynamicConfig, DynamicConfigResponse, HandleResponse, UpsertDynamicConfigRequest}
import org.apache.celeborn.rest.v1.model.DynamicConfig.LevelEnum
import org.apache.celeborn.server.common.http.api.ApiRequestContext
import org.apache.celeborn.server.common.service.config.ConfigLevel
@ -89,6 +89,52 @@ private[api] class ConfResource extends ApiRequestContext {
}
}
@Operation(description = "Upsert the dynamic configs. " +
"The parameter level specifies the config level of dynamic configs. " +
"The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. " +
"The parameter name specifies the user name of TENANT_USER level. " +
"Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON,
schema = new Schema(implementation = classOf[HandleResponse]))))
@POST
@Path("/dynamic/upsert")
def upsertDynamicConf(request: UpsertDynamicConfigRequest): HandleResponse = {
if (configService == null) {
throw new ServiceUnavailableException(
s"Dynamic configuration is disabled. Please check whether to config" +
s" `${CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND.key}`.")
} else {
upsertDynamicConfig(request)
new HandleResponse().success(true).message(s"Upsert dynamic configs of `$request`.")
}
}
@Operation(description = "Delete the dynamic configs. " +
"The parameter level specifies the config level of dynamic configs. " +
"The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. " +
"The parameter name specifies the user name of TENANT_USER level. " +
"Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON,
schema = new Schema(implementation = classOf[HandleResponse]))))
@POST
@Path("/dynamic/delete")
def deleteDynamicConf(request: DeleteDynamicConfigRequest): HandleResponse = {
if (configService == null) {
throw new ServiceUnavailableException(
s"Dynamic configuration is disabled. Please check whether to config" +
s" `${CelebornConf.DYNAMIC_CONFIG_STORE_BACKEND.key}`.")
} else {
deleteDynamicConfig(request)
new HandleResponse().success(true).message(s"Delete dynamic configs of `$request`.")
}
}
private def getDynamicConfig(level: String, tenant: String, name: String): Seq[DynamicConfig] = {
if (ConfigLevel.SYSTEM.name().equalsIgnoreCase(level)) {
val config = configService.getSystemConfigFromCache.getConfigs.asScala
@ -133,4 +179,32 @@ private[api] class ConfResource extends ApiRequestContext {
Seq.empty[DynamicConfig]
}
}
private def upsertDynamicConfig(request: UpsertDynamicConfigRequest): Unit = {
val level = request.getLevel.name()
if (ConfigLevel.SYSTEM.name().equalsIgnoreCase(level)) {
configService.upsertSystemConfig(request.getConfigs)
} else if (ConfigLevel.TENANT.name().equalsIgnoreCase(level)
|| ConfigLevel.TENANT_USER.name().equalsIgnoreCase(level)) {
configService.upsertTenantConfig(
ConfigLevel.valueOf(level),
request.getTenant,
request.getName,
request.getConfigs)
}
}
private def deleteDynamicConfig(request: DeleteDynamicConfigRequest): Unit = {
val level = request.getLevel.name()
if (ConfigLevel.SYSTEM.name().equalsIgnoreCase(level)) {
configService.deleteSystemConfigByKeys(request.getConfigs)
} else if (ConfigLevel.TENANT.name().equalsIgnoreCase(level)
|| ConfigLevel.TENANT_USER.name().equalsIgnoreCase(level)) {
configService.deleteTenantConfigByKeys(
ConfigLevel.valueOf(level),
request.getTenant,
request.getName,
request.getConfigs)
}
}
}

View File

@ -24,6 +24,9 @@ import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@ -61,12 +64,11 @@ public class ConfigServiceSuiteJ {
verifyTenantUserConfig(configService);
verifyTags(configService);
configService.upsertSystemConfig(Collections.singletonMap("celeborn.test.int.only", "100"));
SqlSessionFactory sqlSessionFactory = DBSessionFactory.get(celebornConf);
try (SqlSession sqlSession = sqlSessionFactory.openSession(true)) {
Statement statement = sqlSession.getConnection().createStatement();
statement.execute(
"UPDATE celeborn_cluster_system_config SET config_value = 100 WHERE config_key='celeborn.test.int.only'");
statement.execute("UPDATE celeborn_cluster_tags SET tag = 'tag3' WHERE tag='tag2'");
} catch (SQLException e) {
throw new RuntimeException(e);
@ -79,6 +81,82 @@ public class ConfigServiceSuiteJ {
((DbConfigServiceImpl) configService).getServiceManager(),
celebornConf,
DATE_FORMAT.get().parse("2023-08-26 22:08:30").toInstant());
configService.upsertSystemConfig(
Collections.singletonMap("celeborn.test.system.upsert", "insert"));
configService.refreshCache();
SystemConfig systemConfig = configService.getSystemConfigFromCache();
Assert.assertEquals(
"insert",
systemConfig.getValue(
"celeborn.test.system.upsert", null, String.class, ConfigType.STRING));
configService.deleteSystemConfigByKeys(Collections.singletonList("celeborn.test.int.only"));
configService.refreshCache();
systemConfig = configService.getSystemConfigFromCache();
Assert.assertFalse(systemConfig.configs.containsKey("celeborn.test.int.only"));
Map<String, String> tenantConfigs = new HashMap<>();
tenantConfigs.put(CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(), "20480");
tenantConfigs.put("celeborn.test.tenant.upsert", "insert");
configService.upsertTenantConfig(ConfigLevel.TENANT, "tenant_id", null, tenantConfigs);
configService.refreshCache();
TenantConfig tenantConfig = configService.getRawTenantConfigFromCache("tenant_id");
Assert.assertEquals(
20480,
tenantConfig
.getValue(
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(),
Long.TYPE,
ConfigType.BYTES)
.longValue());
Assert.assertEquals(
"insert",
tenantConfig.getValue(
"celeborn.test.tenant.upsert", null, String.class, ConfigType.STRING));
configService.deleteTenantConfigByKeys(
ConfigLevel.TENANT,
"tenant_id",
null,
Arrays.asList(
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(), "celeborn.test.tenant.upsert"));
configService.refreshCache();
tenantConfig = configService.getRawTenantConfigFromCache("tenant_id");
Assert.assertFalse(
tenantConfig.configs.containsKey(CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key()));
Assert.assertFalse(tenantConfig.configs.containsKey("celeborn.test.tenant.upsert"));
tenantConfigs = new HashMap<>();
tenantConfigs.put(CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(), "2k");
tenantConfigs.put("celeborn.test.tenant.user.upsert", "insert");
configService.upsertTenantConfig(ConfigLevel.TENANT_USER, "tenant_id1", "Jerry", tenantConfigs);
configService.refreshCache();
tenantConfig = configService.getRawTenantUserConfigFromCache("tenant_id1", "Jerry");
Assert.assertEquals(
2048,
tenantConfig
.getValue(
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(),
Long.TYPE,
ConfigType.BYTES)
.longValue());
Assert.assertEquals(
"insert",
tenantConfig.getValue(
"celeborn.test.tenant.user.upsert", null, String.class, ConfigType.STRING));
configService.deleteTenantConfigByKeys(
ConfigLevel.TENANT_USER,
"tenant_id1",
"Jerry",
Arrays.asList(
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(),
"celeborn.test.tenant.user.upsert"));
configService.refreshCache();
tenantConfig = configService.getRawTenantUserConfigFromCache("tenant_id1", "Jerry");
Assert.assertFalse(
tenantConfig.configs.containsKey(CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key()));
Assert.assertFalse(tenantConfig.configs.containsKey("celeborn.test.tenant.user.upsert"));
}
@Test
@ -338,8 +416,6 @@ public class ConfigServiceSuiteJ {
}
private void verifyTagsChanged(ConfigService configService) {
System.out.println("Tags changed");
SystemConfig systemConfig = configService.getSystemConfigFromCache();
Map<String, Set<String>> tags = systemConfig.getTags();

View File

@ -29,20 +29,20 @@ VALUES
( 8, 1, 'celeborn.test.int.only', '10', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' );
INSERT INTO `celeborn_cluster_tenant_config` ( `id`, `cluster_id`, `tenant_id`, `level`, `name`, `config_key`, `config_value`, `type`, `gmt_create`, `gmt_modify` )
VALUES
( 1, 1, 'tenant_id', 'TENANT', '', 'celeborn.client.push.buffer.initial.size', '10240', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 2, 1, 'tenant_id', 'TENANT', '', 'celeborn.client.push.buffer.initial.size.only', '102400', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 3, 1, 'tenant_id', 'TENANT', '', 'celeborn.worker.fetch.heartbeat.enabled', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 4, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.timeoutMs.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 5, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.enabled.only', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 6, 1, 'tenant_id', 'TENANT', '', 'celeborn.test.tenant.int.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 7, 1, 'tenant_id', 'TENANT', '', 'celeborn.client.push.queue.capacity', '1024', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 8, 1, 'tenant_id1', 'TENANT', '', 'celeborn.client.push.buffer.initial.size', '10240', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 9, 1, 'tenant_id1', 'TENANT', '', 'celeborn.client.push.buffer.initial.size.only', '102400', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 10, 1, 'tenant_id1', 'TENANT', '', 'celeborn.worker.fetch.heartbeat.enabled', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 11, 1, 'tenant_id1', 'TENANT', '', 'celeborn.test.tenant.timeoutMs.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 12, 1, 'tenant_id1', 'TENANT', '', 'celeborn.test.tenant.enabled.only', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 13, 1, 'tenant_id1', 'TENANT', '', 'celeborn.test.tenant.int.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 14, 1, 'tenant_id1', 'TENANT', '', 'celeborn.client.push.queue.capacity', '1024', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 1, 1, 'tenant_id', 'TENANT', NULL, 'celeborn.client.push.buffer.initial.size', '10240', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 2, 1, 'tenant_id', 'TENANT', NULL, 'celeborn.client.push.buffer.initial.size.only', '102400', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 3, 1, 'tenant_id', 'TENANT', NULL, 'celeborn.worker.fetch.heartbeat.enabled', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 4, 1, 'tenant_id', 'TENANT', NULL, 'celeborn.test.tenant.timeoutMs.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 5, 1, 'tenant_id', 'TENANT', NULL, 'celeborn.test.tenant.enabled.only', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 6, 1, 'tenant_id', 'TENANT', NULL, 'celeborn.test.tenant.int.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 7, 1, 'tenant_id', 'TENANT', NULL, 'celeborn.client.push.queue.capacity', '1024', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 8, 1, 'tenant_id1', 'TENANT', NULL, 'celeborn.client.push.buffer.initial.size', '10240', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 9, 1, 'tenant_id1', 'TENANT', NULL, 'celeborn.client.push.buffer.initial.size.only', '102400', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 10, 1, 'tenant_id1', 'TENANT', NULL, 'celeborn.worker.fetch.heartbeat.enabled', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 11, 1, 'tenant_id1', 'TENANT', NULL, 'celeborn.test.tenant.timeoutMs.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 12, 1, 'tenant_id1', 'TENANT', NULL, 'celeborn.test.tenant.enabled.only', 'false', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 13, 1, 'tenant_id1', 'TENANT', NULL, 'celeborn.test.tenant.int.only', '100s', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 14, 1, 'tenant_id1', 'TENANT', NULL, 'celeborn.client.push.queue.capacity', '1024', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 15, 1, 'tenant_id1', 'TENANT_USER', 'Jerry', 'celeborn.client.push.buffer.initial.size', '1k', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' ),
( 16, 1, 'tenant_id1', 'TENANT_USER', 'Jerry', 'celeborn.client.push.buffer.initial.size.user.only', '512k', 'QUOTA', '2023-08-26 22:08:30', '2023-08-26 22:08:30' );
INSERT INTO `celeborn_cluster_tags` ( `id`, `cluster_id`, `tag`, `worker_id`, `gmt_create`, `gmt_modify` )

View File

@ -54,7 +54,7 @@ CREATE TABLE IF NOT EXISTS celeborn_cluster_tenant_config
cluster_id int NOT NULL,
tenant_id varchar(255) NOT NULL,
level varchar(255) NOT NULL COMMENT 'config level, valid level is TENANT,USER',
name varchar(255) NOT NULL COMMENT 'tenant sub user',
name varchar(255) DEFAULT NULL COMMENT 'tenant sub user',
config_key varchar(255) NOT NULL,
config_value varchar(255) NOT NULL,
type varchar(255) DEFAULT NULL COMMENT 'conf categories, such as quota',

View File

@ -24,7 +24,7 @@ import javax.ws.rs.core.{MediaType, UriBuilder}
import scala.collection.JavaConverters._
import org.apache.celeborn.rest.v1.model.{ConfResponse, LoggerInfo, LoggerInfos, ThreadStackResponse}
import org.apache.celeborn.rest.v1.model.{ConfResponse, DeleteDynamicConfigRequest, LoggerInfo, LoggerInfos, ThreadStackResponse, UpsertDynamicConfigRequest}
import org.apache.celeborn.server.common.http.HttpTestHelper
abstract class ApiV1BaseResourceSuite extends HttpTestHelper {
@ -39,6 +39,22 @@ abstract class ApiV1BaseResourceSuite extends HttpTestHelper {
response = webTarget.path("conf/dynamic").request(MediaType.APPLICATION_JSON).get()
assert(HttpServletResponse.SC_SERVICE_UNAVAILABLE == response.getStatus)
assert(response.readEntity(classOf[String]).contains("Dynamic configuration is disabled."))
response =
webTarget.path("conf/dynamic/upsert").request(MediaType.APPLICATION_JSON).post(Entity.entity(
new UpsertDynamicConfigRequest().level(UpsertDynamicConfigRequest.LevelEnum.SYSTEM).configs(
Map("test.system.config" -> "upsert").asJava),
MediaType.APPLICATION_JSON))
assert(HttpServletResponse.SC_SERVICE_UNAVAILABLE == response.getStatus)
assert(response.readEntity(classOf[String]).contains("Dynamic configuration is disabled."))
response =
webTarget.path("conf/dynamic/delete").request(MediaType.APPLICATION_JSON).post(Entity.entity(
new DeleteDynamicConfigRequest().level(DeleteDynamicConfigRequest.LevelEnum.SYSTEM).configs(
List("test.system.config").asJava),
MediaType.APPLICATION_JSON))
assert(HttpServletResponse.SC_SERVICE_UNAVAILABLE == response.getStatus)
assert(response.readEntity(classOf[String]).contains("Dynamic configuration is disabled."))
}
test("logger resource") {