[CELEBORN-1487][PHASE2] CongestionController support dynamic config
### What changes were proposed in this pull request? CongestionController support dynamic config ### Why are the changes needed? Currently, Celeborn only supports quota management based on disk file bytes/count, and this quota management cannot cope with sudden increases in traffic, which will cause corrupt to the cluster. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT. Closes #2817 from leixm/CELEBORN-1487-2. Authored-by: Xianming Lei <31424839+leixm@users.noreply.github.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
parent
497bfdf5d7
commit
7c9a008a14
@ -1280,17 +1280,17 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
|
||||
// TODO related to `WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE`,
|
||||
// `WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE`and `WORKER_DIRECT_MEMORY_RATIO_RESUME`,
|
||||
// we'd better refine the logic among them
|
||||
def workerCongestionControlDiskBufferLowWatermark: Option[Long] =
|
||||
def workerCongestionControlDiskBufferLowWatermark: Long =
|
||||
get(WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK)
|
||||
def workerCongestionControlDiskBufferHighWatermark: Option[Long] =
|
||||
def workerCongestionControlDiskBufferHighWatermark: Long =
|
||||
get(WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK)
|
||||
def workerCongestionControlUserProduceSpeedLowWatermark: Option[Long] =
|
||||
def workerCongestionControlUserProduceSpeedLowWatermark: Long =
|
||||
get(WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK)
|
||||
def workerCongestionControlUserProduceSpeedHighWatermark: Option[Long] =
|
||||
def workerCongestionControlUserProduceSpeedHighWatermark: Long =
|
||||
get(WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK)
|
||||
def workerCongestionControlWorkerProduceSpeedLowWatermark: Option[Long] =
|
||||
def workerCongestionControlWorkerProduceSpeedLowWatermark: Long =
|
||||
get(WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK)
|
||||
def workerCongestionControlWorkerProduceSpeedHighWatermark: Option[Long] =
|
||||
def workerCongestionControlWorkerProduceSpeedHighWatermark: Long =
|
||||
get(WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK)
|
||||
|
||||
def workerCongestionControlUserInactiveIntervalMs: Long =
|
||||
@ -3815,7 +3815,7 @@ object CelebornConf extends Logging {
|
||||
.timeConf(TimeUnit.SECONDS)
|
||||
.createWithDefaultString("10s")
|
||||
|
||||
val WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK: OptionalConfigEntry[Long] =
|
||||
val WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK: ConfigEntry[Long] =
|
||||
buildConf("celeborn.worker.congestionControl.diskBuffer.low.watermark")
|
||||
.withAlternative("celeborn.worker.congestionControl.low.watermark")
|
||||
.categories("worker")
|
||||
@ -3823,9 +3823,9 @@ object CelebornConf extends Logging {
|
||||
"this configuration")
|
||||
.version("0.3.0")
|
||||
.bytesConf(ByteUnit.BYTE)
|
||||
.createOptional
|
||||
.createWithDefault(Long.MaxValue)
|
||||
|
||||
val WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK: OptionalConfigEntry[Long] =
|
||||
val WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK: ConfigEntry[Long] =
|
||||
buildConf("celeborn.worker.congestionControl.diskBuffer.high.watermark")
|
||||
.withAlternative("celeborn.worker.congestionControl.high.watermark")
|
||||
.categories("worker")
|
||||
@ -3836,41 +3836,41 @@ object CelebornConf extends Logging {
|
||||
s"${WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK.key}")
|
||||
.version("0.3.0")
|
||||
.bytesConf(ByteUnit.BYTE)
|
||||
.createOptional
|
||||
.createWithDefault(Long.MaxValue)
|
||||
|
||||
val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK: OptionalConfigEntry[Long] =
|
||||
val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK: ConfigEntry[Long] =
|
||||
buildConf("celeborn.worker.congestionControl.userProduceSpeed.low.watermark")
|
||||
.categories("worker")
|
||||
.doc("For those users that produce byte speeds less than this configuration, " +
|
||||
"stop congestion for these users")
|
||||
.version("0.6.0")
|
||||
.bytesConf(ByteUnit.BYTE)
|
||||
.createOptional
|
||||
.createWithDefault(Long.MaxValue)
|
||||
|
||||
val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK: OptionalConfigEntry[Long] =
|
||||
val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK: ConfigEntry[Long] =
|
||||
buildConf("celeborn.worker.congestionControl.userProduceSpeed.high.watermark")
|
||||
.categories("worker")
|
||||
.doc("For those users that produce byte speeds greater than this configuration, " +
|
||||
"start congestion for these users")
|
||||
.version("0.6.0")
|
||||
.bytesConf(ByteUnit.BYTE)
|
||||
.createOptional
|
||||
.createWithDefault(Long.MaxValue)
|
||||
|
||||
val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK: OptionalConfigEntry[Long] =
|
||||
val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK: ConfigEntry[Long] =
|
||||
buildConf("celeborn.worker.congestionControl.workerProduceSpeed.low.watermark")
|
||||
.categories("worker")
|
||||
.doc("Stop congestion If worker total produce speed less than this configuration")
|
||||
.version("0.6.0")
|
||||
.bytesConf(ByteUnit.BYTE)
|
||||
.createOptional
|
||||
.createWithDefault(Long.MaxValue)
|
||||
|
||||
val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK: OptionalConfigEntry[Long] =
|
||||
val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK: ConfigEntry[Long] =
|
||||
buildConf("celeborn.worker.congestionControl.workerProduceSpeed.high.watermark")
|
||||
.categories("worker")
|
||||
.doc("Start congestion If worker total produce speed greater than this configuration")
|
||||
.version("0.6.0")
|
||||
.bytesConf(ByteUnit.BYTE)
|
||||
.createOptional
|
||||
.createWithDefault(Long.MaxValue)
|
||||
|
||||
val WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL: ConfigEntry[Long] =
|
||||
buildConf("celeborn.worker.congestionControl.user.inactive.interval")
|
||||
|
||||
@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.celeborn.common.quota
|
||||
|
||||
case class UserTrafficQuota(
|
||||
userProduceSpeedHighWatermark: Long,
|
||||
userProduceSpeedLowWatermark: Long)
|
||||
@ -0,0 +1,24 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.celeborn.common.quota
|
||||
|
||||
case class WorkerTrafficQuota(
|
||||
diskBufferHighWatermark: Long,
|
||||
diskBufferLowWatermark: Long,
|
||||
workerProduceSpeedHighWatermark: Long,
|
||||
workerProduceSpeedLowWatermark: Long)
|
||||
@ -59,15 +59,15 @@ license: |
|
||||
| celeborn.worker.commitFiles.timeout | 120s | false | Timeout for a Celeborn worker to commit files of a shuffle. It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.availableTypes`. | 0.3.0 | celeborn.worker.shuffle.commit.timeout |
|
||||
| celeborn.worker.commitFiles.wait.threads | 32 | false | Thread number of worker to wait for commit shuffle data files to finish. | 0.5.0 | |
|
||||
| celeborn.worker.congestionControl.check.interval | 10ms | false | Interval of worker checks congestion if celeborn.worker.congestionControl.enabled is true. | 0.3.2 | |
|
||||
| celeborn.worker.congestionControl.diskBuffer.high.watermark | <undefined> | false | If the total bytes in disk buffer exceeds this configure, will start to congestusers whose produce rate is higher than the potential average consume rate. The congestion will stop if the produce rate is lower or equal to the average consume rate, or the total pending bytes lower than celeborn.worker.congestionControl.diskBuffer.low.watermark | 0.3.0 | celeborn.worker.congestionControl.high.watermark |
|
||||
| celeborn.worker.congestionControl.diskBuffer.low.watermark | <undefined> | false | Will stop congest users if the total pending bytes of disk buffer is lower than this configuration | 0.3.0 | celeborn.worker.congestionControl.low.watermark |
|
||||
| celeborn.worker.congestionControl.diskBuffer.high.watermark | 9223372036854775807b | false | If the total bytes in disk buffer exceeds this configure, will start to congestusers whose produce rate is higher than the potential average consume rate. The congestion will stop if the produce rate is lower or equal to the average consume rate, or the total pending bytes lower than celeborn.worker.congestionControl.diskBuffer.low.watermark | 0.3.0 | celeborn.worker.congestionControl.high.watermark |
|
||||
| celeborn.worker.congestionControl.diskBuffer.low.watermark | 9223372036854775807b | false | Will stop congest users if the total pending bytes of disk buffer is lower than this configuration | 0.3.0 | celeborn.worker.congestionControl.low.watermark |
|
||||
| celeborn.worker.congestionControl.enabled | false | false | Whether to enable congestion control or not. | 0.3.0 | |
|
||||
| celeborn.worker.congestionControl.sample.time.window | 10s | false | The worker holds a time sliding list to calculate users' produce/consume rate | 0.3.0 | |
|
||||
| celeborn.worker.congestionControl.user.inactive.interval | 10min | false | How long will consider this user is inactive if it doesn't send data | 0.3.0 | |
|
||||
| celeborn.worker.congestionControl.userProduceSpeed.high.watermark | <undefined> | false | For those users that produce byte speeds greater than this configuration, start congestion for these users | 0.6.0 | |
|
||||
| celeborn.worker.congestionControl.userProduceSpeed.low.watermark | <undefined> | false | For those users that produce byte speeds less than this configuration, stop congestion for these users | 0.6.0 | |
|
||||
| celeborn.worker.congestionControl.workerProduceSpeed.high.watermark | <undefined> | false | Start congestion If worker total produce speed greater than this configuration | 0.6.0 | |
|
||||
| celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | <undefined> | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | |
|
||||
| celeborn.worker.congestionControl.userProduceSpeed.high.watermark | 9223372036854775807b | false | For those users that produce byte speeds greater than this configuration, start congestion for these users | 0.6.0 | |
|
||||
| celeborn.worker.congestionControl.userProduceSpeed.low.watermark | 9223372036854775807b | false | For those users that produce byte speeds less than this configuration, stop congestion for these users | 0.6.0 | |
|
||||
| celeborn.worker.congestionControl.workerProduceSpeed.high.watermark | 9223372036854775807b | false | Start congestion If worker total produce speed greater than this configuration | 0.6.0 | |
|
||||
| celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | 9223372036854775807b | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | |
|
||||
| celeborn.worker.decommission.checkInterval | 30s | false | The wait interval of checking whether all the shuffle expired during worker decommission | 0.4.0 | |
|
||||
| celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time of waiting for all the shuffle expire during worker decommission. | 0.4.0 | |
|
||||
| celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max ratio of direct memory to store shuffle data. This feature is experimental and disabled by default. | 0.5.0 | |
|
||||
|
||||
@ -22,6 +22,7 @@ import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@ -48,6 +49,8 @@ public abstract class BaseConfigServiceImpl implements ConfigService {
|
||||
private final ScheduledExecutorService configRefreshService =
|
||||
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-config-refresher");
|
||||
|
||||
private LinkedBlockingDeque<Runnable> listeners;
|
||||
|
||||
public BaseConfigServiceImpl(CelebornConf celebornConf) throws IOException {
|
||||
this.celebornConf = celebornConf;
|
||||
this.systemConfigAtomicReference.set(new SystemConfig(celebornConf));
|
||||
@ -57,6 +60,7 @@ public abstract class BaseConfigServiceImpl implements ConfigService {
|
||||
() -> {
|
||||
try {
|
||||
refreshCache();
|
||||
notifyListenersOnConfigUpdate();
|
||||
} catch (Throwable e) {
|
||||
LOG.error(
|
||||
"Failed to refresh dynamic configs. Encounter exception: {}.", e.getMessage(), e);
|
||||
@ -65,6 +69,7 @@ public abstract class BaseConfigServiceImpl implements ConfigService {
|
||||
dynamicConfigRefreshInterval,
|
||||
dynamicConfigRefreshInterval,
|
||||
TimeUnit.MILLISECONDS);
|
||||
this.listeners = new LinkedBlockingDeque<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -101,4 +106,15 @@ public abstract class BaseConfigServiceImpl implements ConfigService {
|
||||
public void shutdown() {
|
||||
ThreadUtils.shutdown(configRefreshService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerListenerOnConfigUpdate(Runnable listener) {
|
||||
listeners.add(listener);
|
||||
}
|
||||
|
||||
private void notifyListenersOnConfigUpdate() {
|
||||
for (Runnable listener : listeners) {
|
||||
listener.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -108,6 +108,13 @@ public interface ConfigService {
|
||||
*/
|
||||
void refreshCache() throws IOException;
|
||||
|
||||
/**
|
||||
* Registers a listener to be called when the configuration is updated.
|
||||
*
|
||||
* @param listener the listener to be registered
|
||||
*/
|
||||
void registerListenerOnConfigUpdate(Runnable listener);
|
||||
|
||||
/** Shutdowns configuration management service. */
|
||||
void shutdown();
|
||||
}
|
||||
|
||||
@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
|
||||
import org.apache.celeborn.common.CelebornConf;
|
||||
import org.apache.celeborn.common.internal.config.ConfigEntry;
|
||||
import org.apache.celeborn.common.quota.Quota;
|
||||
import org.apache.celeborn.common.quota.UserTrafficQuota;
|
||||
import org.apache.celeborn.common.quota.WorkerTrafficQuota;
|
||||
import org.apache.celeborn.common.util.Utils;
|
||||
|
||||
/**
|
||||
@ -129,6 +131,44 @@ public abstract class DynamicConfig {
|
||||
ConfigType.STRING));
|
||||
}
|
||||
|
||||
public UserTrafficQuota getUserTrafficQuota() {
|
||||
return new UserTrafficQuota(
|
||||
getValue(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK().key(),
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK(),
|
||||
Long.TYPE,
|
||||
ConfigType.BYTES),
|
||||
getValue(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK().key(),
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK(),
|
||||
Long.TYPE,
|
||||
ConfigType.BYTES));
|
||||
}
|
||||
|
||||
public WorkerTrafficQuota getWorkerTrafficQuota() {
|
||||
return new WorkerTrafficQuota(
|
||||
getValue(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK().key(),
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK(),
|
||||
Long.TYPE,
|
||||
ConfigType.BYTES),
|
||||
getValue(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK().key(),
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK(),
|
||||
Long.TYPE,
|
||||
ConfigType.BYTES),
|
||||
getValue(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK().key(),
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK(),
|
||||
Long.TYPE,
|
||||
ConfigType.BYTES),
|
||||
getValue(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK().key(),
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK(),
|
||||
Long.TYPE,
|
||||
ConfigType.BYTES));
|
||||
}
|
||||
|
||||
public Map<String, String> getConfigs() {
|
||||
return configs;
|
||||
}
|
||||
|
||||
@ -27,9 +27,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.celeborn.common.CelebornConf;
|
||||
import org.apache.celeborn.common.identity.UserIdentifier;
|
||||
import org.apache.celeborn.common.quota.UserTrafficQuota;
|
||||
import org.apache.celeborn.common.quota.WorkerTrafficQuota;
|
||||
import org.apache.celeborn.common.util.JavaUtils;
|
||||
import org.apache.celeborn.common.util.ThreadUtils;
|
||||
import org.apache.celeborn.server.common.service.config.ConfigService;
|
||||
import org.apache.celeborn.service.deploy.worker.WorkerSource;
|
||||
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
|
||||
|
||||
@ -40,12 +44,6 @@ public class CongestionController {
|
||||
private final WorkerSource workerSource;
|
||||
|
||||
private final int sampleTimeWindowSeconds;
|
||||
private final long diskBufferHighWatermark;
|
||||
private final long diskBufferLowWatermark;
|
||||
private final long userProduceSpeedHighWatermark;
|
||||
private final long userProduceSpeedLowWatermark;
|
||||
private final long workerProduceSpeedHighWatermark;
|
||||
private final long workerProduceSpeedLowWatermark;
|
||||
private final long userInactiveTimeMills;
|
||||
|
||||
private final AtomicBoolean overHighWatermark = new AtomicBoolean(false);
|
||||
@ -63,35 +61,38 @@ public class CongestionController {
|
||||
private final ConcurrentHashMap<UserIdentifier, UserCongestionControlContext>
|
||||
userCongestionContextMap;
|
||||
|
||||
private final ConfigService configService;
|
||||
|
||||
private final UserTrafficQuota defaultUserQuota;
|
||||
|
||||
private volatile WorkerTrafficQuota workerTrafficQuota;
|
||||
|
||||
protected CongestionController(
|
||||
WorkerSource workerSource,
|
||||
int sampleTimeWindowSeconds,
|
||||
long diskBufferHighWatermark,
|
||||
long diskBufferLowWatermark,
|
||||
long userProduceSpeedHighWatermark,
|
||||
long userProduceSpeedLowWatermark,
|
||||
long workerProduceSpeedHighWatermark,
|
||||
long workerProduceSpeedLowWatermark,
|
||||
long userInactiveTimeMills,
|
||||
long checkIntervalTimeMills) {
|
||||
assert (diskBufferHighWatermark > diskBufferLowWatermark);
|
||||
assert (userProduceSpeedHighWatermark > userProduceSpeedLowWatermark);
|
||||
assert (workerProduceSpeedHighWatermark > workerProduceSpeedLowWatermark);
|
||||
CelebornConf conf,
|
||||
ConfigService configService) {
|
||||
|
||||
this.workerSource = workerSource;
|
||||
this.sampleTimeWindowSeconds = sampleTimeWindowSeconds;
|
||||
this.diskBufferHighWatermark = diskBufferHighWatermark;
|
||||
this.diskBufferLowWatermark = diskBufferLowWatermark;
|
||||
this.userProduceSpeedHighWatermark = userProduceSpeedHighWatermark;
|
||||
this.userProduceSpeedLowWatermark = userProduceSpeedLowWatermark;
|
||||
this.workerProduceSpeedHighWatermark = workerProduceSpeedHighWatermark;
|
||||
this.workerProduceSpeedLowWatermark = workerProduceSpeedLowWatermark;
|
||||
this.userInactiveTimeMills = userInactiveTimeMills;
|
||||
this.userInactiveTimeMills = conf.workerCongestionControlUserInactiveIntervalMs();
|
||||
this.consumedBufferStatusHub = new BufferStatusHub(sampleTimeWindowSeconds);
|
||||
this.producedBufferStatusHub = new BufferStatusHub(sampleTimeWindowSeconds);
|
||||
this.userBufferStatuses = JavaUtils.newConcurrentHashMap();
|
||||
this.userCongestionContextMap = JavaUtils.newConcurrentHashMap();
|
||||
|
||||
defaultUserQuota =
|
||||
new UserTrafficQuota(
|
||||
conf.workerCongestionControlUserProduceSpeedHighWatermark(),
|
||||
conf.workerCongestionControlUserProduceSpeedLowWatermark());
|
||||
|
||||
workerTrafficQuota =
|
||||
new WorkerTrafficQuota(
|
||||
conf.workerCongestionControlDiskBufferHighWatermark(),
|
||||
conf.workerCongestionControlDiskBufferLowWatermark(),
|
||||
conf.workerCongestionControlWorkerProduceSpeedHighWatermark(),
|
||||
conf.workerCongestionControlWorkerProduceSpeedLowWatermark());
|
||||
|
||||
this.removeUserExecutorService =
|
||||
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
|
||||
"worker-congestion-controller-inactive-user-remover");
|
||||
@ -103,38 +104,31 @@ public class CongestionController {
|
||||
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-congestion-controller-checker");
|
||||
|
||||
this.checkService.scheduleWithFixedDelay(
|
||||
this::checkCongestion, 0, checkIntervalTimeMills, TimeUnit.MILLISECONDS);
|
||||
this::checkCongestion,
|
||||
0,
|
||||
conf.workerCongestionControlCheckIntervalMs(),
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
this.workerSource.addGauge(
|
||||
WorkerSource.POTENTIAL_CONSUME_SPEED(), this::getPotentialConsumeSpeed);
|
||||
|
||||
this.workerSource.addGauge(
|
||||
WorkerSource.WORKER_CONSUME_SPEED(), consumedBufferStatusHub::avgBytesPerSec);
|
||||
|
||||
this.configService = configService;
|
||||
|
||||
if (configService != null) {
|
||||
updateQuota();
|
||||
configService.registerListenerOnConfigUpdate(this::updateQuota);
|
||||
}
|
||||
}
|
||||
|
||||
public static synchronized CongestionController initialize(
|
||||
WorkerSource workSource,
|
||||
int sampleTimeWindowSeconds,
|
||||
long highWatermarkDiskBuffer,
|
||||
long lowWatermarkDiskBuffer,
|
||||
long highWatermarkUserProduceSpeed,
|
||||
long lowWatermarkUserProduceSpeed,
|
||||
long highWatermarkWorkerProduceSpeed,
|
||||
long lowWatermarkWorkerProduceSpeed,
|
||||
long userInactiveTimeMills,
|
||||
long checkIntervalTimeMills) {
|
||||
_INSTANCE =
|
||||
new CongestionController(
|
||||
workSource,
|
||||
sampleTimeWindowSeconds,
|
||||
highWatermarkDiskBuffer,
|
||||
lowWatermarkDiskBuffer,
|
||||
highWatermarkUserProduceSpeed,
|
||||
lowWatermarkUserProduceSpeed,
|
||||
highWatermarkWorkerProduceSpeed,
|
||||
lowWatermarkWorkerProduceSpeed,
|
||||
userInactiveTimeMills,
|
||||
checkIntervalTimeMills);
|
||||
CelebornConf conf,
|
||||
ConfigService configService) {
|
||||
_INSTANCE = new CongestionController(workSource, sampleTimeWindowSeconds, conf, configService);
|
||||
return _INSTANCE;
|
||||
}
|
||||
|
||||
@ -158,6 +152,7 @@ public class CongestionController {
|
||||
|
||||
UserIdentifier userIdentifier = userCongestionControlContext.getUserIdentifier();
|
||||
long userProduceSpeed = getUserProduceSpeed(userCongestionControlContext.getUserBufferInfo());
|
||||
UserTrafficQuota userTrafficQuota = userCongestionControlContext.getUserTrafficQuota();
|
||||
// If the user produce speed is higher that the avg consume speed, will congest it
|
||||
if (overHighWatermark.get()) {
|
||||
long avgConsumeSpeed = getPotentialProduceSpeed();
|
||||
@ -173,17 +168,17 @@ public class CongestionController {
|
||||
}
|
||||
}
|
||||
|
||||
if (userProduceSpeed > userProduceSpeedHighWatermark) {
|
||||
if (userProduceSpeed > userTrafficQuota.userProduceSpeedHighWatermark()) {
|
||||
userCongestionControlContext.onCongestionControl();
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(
|
||||
"The user {}, produceSpeed is {}, while userProduceSpeedHighWatermark is {}, need to congest it.",
|
||||
userIdentifier,
|
||||
userProduceSpeed,
|
||||
userProduceSpeedHighWatermark);
|
||||
userTrafficQuota.userProduceSpeedHighWatermark());
|
||||
}
|
||||
} else if (userCongestionControlContext.inCongestionControl()
|
||||
&& userProduceSpeed < userProduceSpeedLowWatermark) {
|
||||
&& userProduceSpeed < userTrafficQuota.userProduceSpeedLowWatermark()) {
|
||||
userCongestionControlContext.offCongestionControl();
|
||||
}
|
||||
return userCongestionControlContext.inCongestionControl();
|
||||
@ -269,15 +264,15 @@ public class CongestionController {
|
||||
try {
|
||||
long pendingConsume = getTotalPendingBytes();
|
||||
long workerProduceSpeed = producedBufferStatusHub.avgBytesPerSec();
|
||||
if (pendingConsume < diskBufferLowWatermark
|
||||
&& workerProduceSpeed < workerProduceSpeedLowWatermark) {
|
||||
if (pendingConsume < workerTrafficQuota.diskBufferLowWatermark()
|
||||
&& workerProduceSpeed < workerTrafficQuota.workerProduceSpeedLowWatermark()) {
|
||||
if (overHighWatermark.compareAndSet(true, false)) {
|
||||
logger.info(
|
||||
"Pending consume and produce speed is lower than low watermark, exit congestion control");
|
||||
}
|
||||
return;
|
||||
} else if ((pendingConsume > diskBufferHighWatermark
|
||||
|| workerProduceSpeed > workerProduceSpeedHighWatermark)
|
||||
} else if ((pendingConsume > workerTrafficQuota.diskBufferHighWatermark()
|
||||
|| workerProduceSpeed > workerTrafficQuota.workerProduceSpeedHighWatermark())
|
||||
&& overHighWatermark.compareAndSet(false, true)) {
|
||||
logger.info(
|
||||
"Pending consume or produce speed is higher than high watermark, need congestion control");
|
||||
@ -315,8 +310,21 @@ public class CongestionController {
|
||||
userIdentifier,
|
||||
user -> {
|
||||
UserBufferInfo userBufferInfo = getUserBuffer(userIdentifier);
|
||||
UserTrafficQuota userTrafficQuota;
|
||||
if (configService == null) {
|
||||
userTrafficQuota = defaultUserQuota;
|
||||
} else {
|
||||
userTrafficQuota =
|
||||
configService
|
||||
.getTenantUserConfigFromCache(userIdentifier.tenantId(), userIdentifier.name())
|
||||
.getUserTrafficQuota();
|
||||
}
|
||||
return new UserCongestionControlContext(
|
||||
producedBufferStatusHub, userBufferInfo, workerSource, userIdentifier);
|
||||
userTrafficQuota,
|
||||
producedBufferStatusHub,
|
||||
userBufferInfo,
|
||||
workerSource,
|
||||
userIdentifier);
|
||||
});
|
||||
}
|
||||
|
||||
@ -328,4 +336,17 @@ public class CongestionController {
|
||||
public BufferStatusHub getConsumedBufferStatusHub() {
|
||||
return consumedBufferStatusHub;
|
||||
}
|
||||
|
||||
private void updateQuota() {
|
||||
workerTrafficQuota = configService.getSystemConfigFromCache().getWorkerTrafficQuota();
|
||||
for (Map.Entry<UserIdentifier, UserCongestionControlContext> entry :
|
||||
userCongestionContextMap.entrySet()) {
|
||||
UserIdentifier user = entry.getKey();
|
||||
UserCongestionControlContext context = entry.getValue();
|
||||
context.updateUserTrafficQuota(
|
||||
configService
|
||||
.getTenantUserConfigFromCache(user.tenantId(), user.name())
|
||||
.getUserTrafficQuota());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.celeborn.service.deploy.worker.congestcontrol;
|
||||
|
||||
import org.apache.celeborn.common.identity.UserIdentifier;
|
||||
import org.apache.celeborn.common.metrics.source.AbstractSource;
|
||||
import org.apache.celeborn.common.quota.UserTrafficQuota;
|
||||
import org.apache.celeborn.service.deploy.worker.WorkerSource;
|
||||
|
||||
public class UserCongestionControlContext {
|
||||
@ -31,7 +32,10 @@ public class UserCongestionControlContext {
|
||||
|
||||
private final UserIdentifier userIdentifier;
|
||||
|
||||
private volatile UserTrafficQuota userTrafficQuota;
|
||||
|
||||
public UserCongestionControlContext(
|
||||
UserTrafficQuota userTrafficQuota,
|
||||
BufferStatusHub workerBufferStatusHub,
|
||||
UserBufferInfo userBufferInfo,
|
||||
AbstractSource workerSource,
|
||||
@ -40,6 +44,7 @@ public class UserCongestionControlContext {
|
||||
this.userBufferInfo = userBufferInfo;
|
||||
this.userIdentifier = userIdentifier;
|
||||
this.workerBufferStatusHub = workerBufferStatusHub;
|
||||
this.userTrafficQuota = userTrafficQuota;
|
||||
workerSource.addGauge(
|
||||
WorkerSource.USER_PRODUCE_SPEED(),
|
||||
userIdentifier.toJMap(),
|
||||
@ -72,4 +77,12 @@ public class UserCongestionControlContext {
|
||||
public UserIdentifier getUserIdentifier() {
|
||||
return userIdentifier;
|
||||
}
|
||||
|
||||
public UserTrafficQuota getUserTrafficQuota() {
|
||||
return userTrafficQuota;
|
||||
}
|
||||
|
||||
public void updateUserTrafficQuota(UserTrafficQuota userTrafficQuota) {
|
||||
this.userTrafficQuota = userTrafficQuota;
|
||||
}
|
||||
}
|
||||
|
||||
@ -181,27 +181,12 @@ private[celeborn] class Worker(
|
||||
val partitionsSorter = new PartitionFilesSorter(memoryManager, conf, workerSource)
|
||||
|
||||
if (conf.workerCongestionControlEnabled) {
|
||||
if (conf.workerCongestionControlDiskBufferLowWatermark.isEmpty ||
|
||||
conf.workerCongestionControlDiskBufferHighWatermark.isEmpty ||
|
||||
conf.workerCongestionControlUserProduceSpeedLowWatermark.isEmpty ||
|
||||
conf.workerCongestionControlUserProduceSpeedHighWatermark.isEmpty ||
|
||||
conf.workerCongestionControlWorkerProduceSpeedLowWatermark.isEmpty ||
|
||||
conf.workerCongestionControlWorkerProduceSpeedHighWatermark.isEmpty) {
|
||||
throw new IllegalArgumentException("High watermark and low watermark must be set" +
|
||||
" when enabling rate limit")
|
||||
}
|
||||
|
||||
CongestionController.initialize(
|
||||
workerSource,
|
||||
conf.workerCongestionControlSampleTimeWindowSeconds.toInt,
|
||||
conf.workerCongestionControlDiskBufferHighWatermark.get,
|
||||
conf.workerCongestionControlDiskBufferLowWatermark.get,
|
||||
conf.workerCongestionControlUserProduceSpeedHighWatermark.get,
|
||||
conf.workerCongestionControlUserProduceSpeedLowWatermark.get,
|
||||
conf.workerCongestionControlWorkerProduceSpeedHighWatermark.get,
|
||||
conf.workerCongestionControlWorkerProduceSpeedLowWatermark.get,
|
||||
conf.workerCongestionControlUserInactiveIntervalMs,
|
||||
conf.workerCongestionControlCheckIntervalMs)
|
||||
conf,
|
||||
configService)
|
||||
}
|
||||
|
||||
var controller = new Controller(rpcEnv, conf, metricsSystem, workerSource)
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
|
||||
package org.apache.celeborn.service.deploy.worker.congestcontrol;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -24,6 +26,7 @@ import org.junit.Test;
|
||||
|
||||
import org.apache.celeborn.common.CelebornConf;
|
||||
import org.apache.celeborn.common.identity.UserIdentifier;
|
||||
import org.apache.celeborn.server.common.service.config.FsConfigServiceImpl;
|
||||
import org.apache.celeborn.service.deploy.worker.WorkerSource;
|
||||
|
||||
public class TestCongestionController {
|
||||
@ -37,19 +40,27 @@ public class TestCongestionController {
|
||||
|
||||
@Before
|
||||
public void initialize() {
|
||||
CelebornConf celebornConf = new CelebornConf();
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK().key(), "1000");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK().key(), "500");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK().key(), "20000");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK().key(), "10000");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK().key(),
|
||||
"20000");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK().key(), "10000");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL(), userInactiveTimeMills);
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_CHECK_INTERVAL(), checkIntervalTimeMills);
|
||||
// Make sampleTimeWindow a bit larger in case the tests run time exceed this window.
|
||||
controller =
|
||||
new CongestionController(
|
||||
source,
|
||||
10,
|
||||
1000,
|
||||
500,
|
||||
20000,
|
||||
10000,
|
||||
20000,
|
||||
10000,
|
||||
userInactiveTimeMills,
|
||||
checkIntervalTimeMills) {
|
||||
new CongestionController(source, 10, celebornConf, null) {
|
||||
@Override
|
||||
public long getTotalPendingBytes() {
|
||||
return pendingBytes;
|
||||
@ -151,9 +162,24 @@ public class TestCongestionController {
|
||||
|
||||
@Test
|
||||
public void testUserLevelTrafficQuota() throws InterruptedException {
|
||||
CelebornConf celebornConf = new CelebornConf();
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK().key(), "100000");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK().key(), "50000");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK().key(), "500");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK().key(), "400");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK().key(), "1200");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK().key(), "1000");
|
||||
celebornConf.set(CelebornConf.WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL(), 120L * 1000);
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_CHECK_INTERVAL(), checkIntervalTimeMills);
|
||||
CongestionController controller1 =
|
||||
new CongestionController(
|
||||
source, 10, 100000, 50000, 500, 400, 1200, 1000, 120L * 1000, checkIntervalTimeMills) {
|
||||
new CongestionController(source, 10, celebornConf, null) {
|
||||
@Override
|
||||
public long getTotalPendingBytes() {
|
||||
return 0;
|
||||
@ -212,9 +238,24 @@ public class TestCongestionController {
|
||||
|
||||
@Test
|
||||
public void testWorkerLevelTrafficQuota() throws InterruptedException {
|
||||
CelebornConf celebornConf = new CelebornConf();
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK().key(), "100000");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK().key(), "50000");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK().key(), "500");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK().key(), "400");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK().key(), "800");
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK().key(), "700");
|
||||
celebornConf.set(CelebornConf.WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL(), 120L * 1000);
|
||||
celebornConf.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_CHECK_INTERVAL(), checkIntervalTimeMills);
|
||||
CongestionController controller1 =
|
||||
new CongestionController(
|
||||
source, 10, 100000, 50000, 500, 400, 800, 700, 120 * 1000, checkIntervalTimeMills) {
|
||||
new CongestionController(source, 10, celebornConf, null) {
|
||||
@Override
|
||||
public long getTotalPendingBytes() {
|
||||
return 0;
|
||||
@ -253,6 +294,54 @@ public class TestCongestionController {
|
||||
controller1.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamicConfiguration() throws IOException, InterruptedException {
|
||||
String file1 = getClass().getResource("/dynamicConfig.yaml").getFile();
|
||||
CelebornConf celebornConf1 = new CelebornConf();
|
||||
celebornConf1.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK().key(), "100000");
|
||||
celebornConf1.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK().key(), "50000");
|
||||
celebornConf1.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK().key(), "500");
|
||||
celebornConf1.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK().key(), "400");
|
||||
celebornConf1.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK().key(), "2000");
|
||||
celebornConf1.set(
|
||||
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK().key(), "1600");
|
||||
celebornConf1.set(CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH(), file1);
|
||||
celebornConf1.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_INTERVAL(), 1000L);
|
||||
FsConfigServiceImpl configService = new FsConfigServiceImpl(celebornConf1);
|
||||
|
||||
CongestionController controller1 =
|
||||
new CongestionController(source, 10, celebornConf1, configService) {
|
||||
@Override
|
||||
public long getTotalPendingBytes() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void trimMemoryUsage() {
|
||||
// No op
|
||||
}
|
||||
};
|
||||
|
||||
UserIdentifier user1 = new UserIdentifier("default", "Jerry");
|
||||
UserCongestionControlContext context1 = controller1.getUserCongestionContext(user1);
|
||||
Assert.assertFalse(controller1.isUserCongested(context1));
|
||||
produceBytes(controller1, user1, 600);
|
||||
Assert.assertTrue(controller1.isUserCongested(context1));
|
||||
|
||||
String file2 = getClass().getResource("/dynamicConfig_2.yaml").getFile();
|
||||
celebornConf1.set(CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH(), file2);
|
||||
celebornConf1.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_INTERVAL(), 1L);
|
||||
Thread.sleep(2001);
|
||||
|
||||
produceBytes(controller1, user1, 600);
|
||||
Assert.assertFalse(controller1.isUserCongested(context1));
|
||||
}
|
||||
|
||||
private void clearBufferStatus(CongestionController controller) {
|
||||
controller.getProducedBufferStatusHub().clear();
|
||||
controller.getConsumedBufferStatusHub().clear();
|
||||
|
||||
20
worker/src/test/resources/dynamicConfig.yaml
Normal file
20
worker/src/test/resources/dynamicConfig.yaml
Normal file
@ -0,0 +1,20 @@
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
- level: SYSTEM
|
||||
config:
|
||||
celeborn.worker.congestionControl.workerProduceSpeed.low.watermark: 1600
|
||||
celeborn.worker.congestionControl.workerProduceSpeed.high.watermark: 2000
|
||||
32
worker/src/test/resources/dynamicConfig_2.yaml
Normal file
32
worker/src/test/resources/dynamicConfig_2.yaml
Normal file
@ -0,0 +1,32 @@
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
- level: SYSTEM
|
||||
config:
|
||||
celeborn.worker.congestionControl.diskBuffer.low.watermark: 8gb
|
||||
celeborn.worker.congestionControl.diskBuffer.high.watermark: 10gb
|
||||
celeborn.worker.congestionControl.workerProduceSpeed.low.watermark: 1600
|
||||
celeborn.worker.congestionControl.workerProduceSpeed.high.watermark: 2000
|
||||
|
||||
- tenantId: default
|
||||
level: TENANT
|
||||
users:
|
||||
- name: Jerry
|
||||
config:
|
||||
celeborn.worker.congestionControl.userProduceSpeed.low.watermark: 800
|
||||
celeborn.worker.congestionControl.userProduceSpeed.high.watermark: 1000
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user