diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 731e18d46..ab30b95d4 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1280,17 +1280,17 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se // TODO related to `WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE`, // `WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE`and `WORKER_DIRECT_MEMORY_RATIO_RESUME`, // we'd better refine the logic among them - def workerCongestionControlDiskBufferLowWatermark: Option[Long] = + def workerCongestionControlDiskBufferLowWatermark: Long = get(WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK) - def workerCongestionControlDiskBufferHighWatermark: Option[Long] = + def workerCongestionControlDiskBufferHighWatermark: Long = get(WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK) - def workerCongestionControlUserProduceSpeedLowWatermark: Option[Long] = + def workerCongestionControlUserProduceSpeedLowWatermark: Long = get(WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK) - def workerCongestionControlUserProduceSpeedHighWatermark: Option[Long] = + def workerCongestionControlUserProduceSpeedHighWatermark: Long = get(WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK) - def workerCongestionControlWorkerProduceSpeedLowWatermark: Option[Long] = + def workerCongestionControlWorkerProduceSpeedLowWatermark: Long = get(WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK) - def workerCongestionControlWorkerProduceSpeedHighWatermark: Option[Long] = + def workerCongestionControlWorkerProduceSpeedHighWatermark: Long = get(WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK) def workerCongestionControlUserInactiveIntervalMs: Long = @@ -3815,7 +3815,7 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.SECONDS) .createWithDefaultString("10s") - val WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK: OptionalConfigEntry[Long] = + val WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK: ConfigEntry[Long] = buildConf("celeborn.worker.congestionControl.diskBuffer.low.watermark") .withAlternative("celeborn.worker.congestionControl.low.watermark") .categories("worker") @@ -3823,9 +3823,9 @@ object CelebornConf extends Logging { "this configuration") .version("0.3.0") .bytesConf(ByteUnit.BYTE) - .createOptional + .createWithDefault(Long.MaxValue) - val WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK: OptionalConfigEntry[Long] = + val WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK: ConfigEntry[Long] = buildConf("celeborn.worker.congestionControl.diskBuffer.high.watermark") .withAlternative("celeborn.worker.congestionControl.high.watermark") .categories("worker") @@ -3836,41 +3836,41 @@ object CelebornConf extends Logging { s"${WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK.key}") .version("0.3.0") .bytesConf(ByteUnit.BYTE) - .createOptional + .createWithDefault(Long.MaxValue) - val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK: OptionalConfigEntry[Long] = + val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK: ConfigEntry[Long] = buildConf("celeborn.worker.congestionControl.userProduceSpeed.low.watermark") .categories("worker") .doc("For those users that produce byte speeds less than this configuration, " + "stop congestion for these users") .version("0.6.0") .bytesConf(ByteUnit.BYTE) - .createOptional + .createWithDefault(Long.MaxValue) - val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK: OptionalConfigEntry[Long] = + val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK: ConfigEntry[Long] = buildConf("celeborn.worker.congestionControl.userProduceSpeed.high.watermark") .categories("worker") .doc("For those users that produce byte speeds greater than this configuration, " + "start congestion for these users") .version("0.6.0") .bytesConf(ByteUnit.BYTE) - .createOptional + .createWithDefault(Long.MaxValue) - val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK: OptionalConfigEntry[Long] = + val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK: ConfigEntry[Long] = buildConf("celeborn.worker.congestionControl.workerProduceSpeed.low.watermark") .categories("worker") .doc("Stop congestion If worker total produce speed less than this configuration") .version("0.6.0") .bytesConf(ByteUnit.BYTE) - .createOptional + .createWithDefault(Long.MaxValue) - val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK: OptionalConfigEntry[Long] = + val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK: ConfigEntry[Long] = buildConf("celeborn.worker.congestionControl.workerProduceSpeed.high.watermark") .categories("worker") .doc("Start congestion If worker total produce speed greater than this configuration") .version("0.6.0") .bytesConf(ByteUnit.BYTE) - .createOptional + .createWithDefault(Long.MaxValue) val WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL: ConfigEntry[Long] = buildConf("celeborn.worker.congestionControl.user.inactive.interval") diff --git a/common/src/main/scala/org/apache/celeborn/common/quota/UserTrafficQuota.scala b/common/src/main/scala/org/apache/celeborn/common/quota/UserTrafficQuota.scala new file mode 100644 index 000000000..820f7ac94 --- /dev/null +++ b/common/src/main/scala/org/apache/celeborn/common/quota/UserTrafficQuota.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.quota + +case class UserTrafficQuota( + userProduceSpeedHighWatermark: Long, + userProduceSpeedLowWatermark: Long) diff --git a/common/src/main/scala/org/apache/celeborn/common/quota/WorkerTrafficQuota.scala b/common/src/main/scala/org/apache/celeborn/common/quota/WorkerTrafficQuota.scala new file mode 100644 index 000000000..c071e6c7d --- /dev/null +++ b/common/src/main/scala/org/apache/celeborn/common/quota/WorkerTrafficQuota.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.quota + +case class WorkerTrafficQuota( + diskBufferHighWatermark: Long, + diskBufferLowWatermark: Long, + workerProduceSpeedHighWatermark: Long, + workerProduceSpeedLowWatermark: Long) diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index bd03a83de..5330c42da 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -59,15 +59,15 @@ license: | | celeborn.worker.commitFiles.timeout | 120s | false | Timeout for a Celeborn worker to commit files of a shuffle. It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.availableTypes`. | 0.3.0 | celeborn.worker.shuffle.commit.timeout | | celeborn.worker.commitFiles.wait.threads | 32 | false | Thread number of worker to wait for commit shuffle data files to finish. | 0.5.0 | | | celeborn.worker.congestionControl.check.interval | 10ms | false | Interval of worker checks congestion if celeborn.worker.congestionControl.enabled is true. | 0.3.2 | | -| celeborn.worker.congestionControl.diskBuffer.high.watermark | <undefined> | false | If the total bytes in disk buffer exceeds this configure, will start to congestusers whose produce rate is higher than the potential average consume rate. The congestion will stop if the produce rate is lower or equal to the average consume rate, or the total pending bytes lower than celeborn.worker.congestionControl.diskBuffer.low.watermark | 0.3.0 | celeborn.worker.congestionControl.high.watermark | -| celeborn.worker.congestionControl.diskBuffer.low.watermark | <undefined> | false | Will stop congest users if the total pending bytes of disk buffer is lower than this configuration | 0.3.0 | celeborn.worker.congestionControl.low.watermark | +| celeborn.worker.congestionControl.diskBuffer.high.watermark | 9223372036854775807b | false | If the total bytes in disk buffer exceeds this configure, will start to congestusers whose produce rate is higher than the potential average consume rate. The congestion will stop if the produce rate is lower or equal to the average consume rate, or the total pending bytes lower than celeborn.worker.congestionControl.diskBuffer.low.watermark | 0.3.0 | celeborn.worker.congestionControl.high.watermark | +| celeborn.worker.congestionControl.diskBuffer.low.watermark | 9223372036854775807b | false | Will stop congest users if the total pending bytes of disk buffer is lower than this configuration | 0.3.0 | celeborn.worker.congestionControl.low.watermark | | celeborn.worker.congestionControl.enabled | false | false | Whether to enable congestion control or not. | 0.3.0 | | | celeborn.worker.congestionControl.sample.time.window | 10s | false | The worker holds a time sliding list to calculate users' produce/consume rate | 0.3.0 | | | celeborn.worker.congestionControl.user.inactive.interval | 10min | false | How long will consider this user is inactive if it doesn't send data | 0.3.0 | | -| celeborn.worker.congestionControl.userProduceSpeed.high.watermark | <undefined> | false | For those users that produce byte speeds greater than this configuration, start congestion for these users | 0.6.0 | | -| celeborn.worker.congestionControl.userProduceSpeed.low.watermark | <undefined> | false | For those users that produce byte speeds less than this configuration, stop congestion for these users | 0.6.0 | | -| celeborn.worker.congestionControl.workerProduceSpeed.high.watermark | <undefined> | false | Start congestion If worker total produce speed greater than this configuration | 0.6.0 | | -| celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | <undefined> | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | | +| celeborn.worker.congestionControl.userProduceSpeed.high.watermark | 9223372036854775807b | false | For those users that produce byte speeds greater than this configuration, start congestion for these users | 0.6.0 | | +| celeborn.worker.congestionControl.userProduceSpeed.low.watermark | 9223372036854775807b | false | For those users that produce byte speeds less than this configuration, stop congestion for these users | 0.6.0 | | +| celeborn.worker.congestionControl.workerProduceSpeed.high.watermark | 9223372036854775807b | false | Start congestion If worker total produce speed greater than this configuration | 0.6.0 | | +| celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | 9223372036854775807b | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | | | celeborn.worker.decommission.checkInterval | 30s | false | The wait interval of checking whether all the shuffle expired during worker decommission | 0.4.0 | | | celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time of waiting for all the shuffle expire during worker decommission. | 0.4.0 | | | celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max ratio of direct memory to store shuffle data. This feature is experimental and disabled by default. | 0.5.0 | | diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java index eb887191d..00b30e737 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/BaseConfigServiceImpl.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -48,6 +49,8 @@ public abstract class BaseConfigServiceImpl implements ConfigService { private final ScheduledExecutorService configRefreshService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-config-refresher"); + private LinkedBlockingDeque listeners; + public BaseConfigServiceImpl(CelebornConf celebornConf) throws IOException { this.celebornConf = celebornConf; this.systemConfigAtomicReference.set(new SystemConfig(celebornConf)); @@ -57,6 +60,7 @@ public abstract class BaseConfigServiceImpl implements ConfigService { () -> { try { refreshCache(); + notifyListenersOnConfigUpdate(); } catch (Throwable e) { LOG.error( "Failed to refresh dynamic configs. Encounter exception: {}.", e.getMessage(), e); @@ -65,6 +69,7 @@ public abstract class BaseConfigServiceImpl implements ConfigService { dynamicConfigRefreshInterval, dynamicConfigRefreshInterval, TimeUnit.MILLISECONDS); + this.listeners = new LinkedBlockingDeque<>(); } @Override @@ -101,4 +106,15 @@ public abstract class BaseConfigServiceImpl implements ConfigService { public void shutdown() { ThreadUtils.shutdown(configRefreshService); } + + @Override + public void registerListenerOnConfigUpdate(Runnable listener) { + listeners.add(listener); + } + + private void notifyListenersOnConfigUpdate() { + for (Runnable listener : listeners) { + listener.run(); + } + } } diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java index 1d8531654..0570e3bca 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/ConfigService.java @@ -108,6 +108,13 @@ public interface ConfigService { */ void refreshCache() throws IOException; + /** + * Registers a listener to be called when the configuration is updated. + * + * @param listener the listener to be registered + */ + void registerListenerOnConfigUpdate(Runnable listener); + /** Shutdowns configuration management service. */ void shutdown(); } diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java index 948bcbe86..14aa73382 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/DynamicConfig.java @@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.internal.config.ConfigEntry; import org.apache.celeborn.common.quota.Quota; +import org.apache.celeborn.common.quota.UserTrafficQuota; +import org.apache.celeborn.common.quota.WorkerTrafficQuota; import org.apache.celeborn.common.util.Utils; /** @@ -129,6 +131,44 @@ public abstract class DynamicConfig { ConfigType.STRING)); } + public UserTrafficQuota getUserTrafficQuota() { + return new UserTrafficQuota( + getValue( + CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK().key(), + CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK(), + Long.TYPE, + ConfigType.BYTES), + getValue( + CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK().key(), + CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK(), + Long.TYPE, + ConfigType.BYTES)); + } + + public WorkerTrafficQuota getWorkerTrafficQuota() { + return new WorkerTrafficQuota( + getValue( + CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK().key(), + CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK(), + Long.TYPE, + ConfigType.BYTES), + getValue( + CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK().key(), + CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK(), + Long.TYPE, + ConfigType.BYTES), + getValue( + CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK().key(), + CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK(), + Long.TYPE, + ConfigType.BYTES), + getValue( + CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK().key(), + CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK(), + Long.TYPE, + ConfigType.BYTES)); + } + public Map getConfigs() { return configs; } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java index 04491e58c..11e68f4c3 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java @@ -27,9 +27,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.identity.UserIdentifier; +import org.apache.celeborn.common.quota.UserTrafficQuota; +import org.apache.celeborn.common.quota.WorkerTrafficQuota; import org.apache.celeborn.common.util.JavaUtils; import org.apache.celeborn.common.util.ThreadUtils; +import org.apache.celeborn.server.common.service.config.ConfigService; import org.apache.celeborn.service.deploy.worker.WorkerSource; import org.apache.celeborn.service.deploy.worker.memory.MemoryManager; @@ -40,12 +44,6 @@ public class CongestionController { private final WorkerSource workerSource; private final int sampleTimeWindowSeconds; - private final long diskBufferHighWatermark; - private final long diskBufferLowWatermark; - private final long userProduceSpeedHighWatermark; - private final long userProduceSpeedLowWatermark; - private final long workerProduceSpeedHighWatermark; - private final long workerProduceSpeedLowWatermark; private final long userInactiveTimeMills; private final AtomicBoolean overHighWatermark = new AtomicBoolean(false); @@ -63,35 +61,38 @@ public class CongestionController { private final ConcurrentHashMap userCongestionContextMap; + private final ConfigService configService; + + private final UserTrafficQuota defaultUserQuota; + + private volatile WorkerTrafficQuota workerTrafficQuota; + protected CongestionController( WorkerSource workerSource, int sampleTimeWindowSeconds, - long diskBufferHighWatermark, - long diskBufferLowWatermark, - long userProduceSpeedHighWatermark, - long userProduceSpeedLowWatermark, - long workerProduceSpeedHighWatermark, - long workerProduceSpeedLowWatermark, - long userInactiveTimeMills, - long checkIntervalTimeMills) { - assert (diskBufferHighWatermark > diskBufferLowWatermark); - assert (userProduceSpeedHighWatermark > userProduceSpeedLowWatermark); - assert (workerProduceSpeedHighWatermark > workerProduceSpeedLowWatermark); + CelebornConf conf, + ConfigService configService) { this.workerSource = workerSource; this.sampleTimeWindowSeconds = sampleTimeWindowSeconds; - this.diskBufferHighWatermark = diskBufferHighWatermark; - this.diskBufferLowWatermark = diskBufferLowWatermark; - this.userProduceSpeedHighWatermark = userProduceSpeedHighWatermark; - this.userProduceSpeedLowWatermark = userProduceSpeedLowWatermark; - this.workerProduceSpeedHighWatermark = workerProduceSpeedHighWatermark; - this.workerProduceSpeedLowWatermark = workerProduceSpeedLowWatermark; - this.userInactiveTimeMills = userInactiveTimeMills; + this.userInactiveTimeMills = conf.workerCongestionControlUserInactiveIntervalMs(); this.consumedBufferStatusHub = new BufferStatusHub(sampleTimeWindowSeconds); this.producedBufferStatusHub = new BufferStatusHub(sampleTimeWindowSeconds); this.userBufferStatuses = JavaUtils.newConcurrentHashMap(); this.userCongestionContextMap = JavaUtils.newConcurrentHashMap(); + defaultUserQuota = + new UserTrafficQuota( + conf.workerCongestionControlUserProduceSpeedHighWatermark(), + conf.workerCongestionControlUserProduceSpeedLowWatermark()); + + workerTrafficQuota = + new WorkerTrafficQuota( + conf.workerCongestionControlDiskBufferHighWatermark(), + conf.workerCongestionControlDiskBufferLowWatermark(), + conf.workerCongestionControlWorkerProduceSpeedHighWatermark(), + conf.workerCongestionControlWorkerProduceSpeedLowWatermark()); + this.removeUserExecutorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor( "worker-congestion-controller-inactive-user-remover"); @@ -103,38 +104,31 @@ public class CongestionController { ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-congestion-controller-checker"); this.checkService.scheduleWithFixedDelay( - this::checkCongestion, 0, checkIntervalTimeMills, TimeUnit.MILLISECONDS); + this::checkCongestion, + 0, + conf.workerCongestionControlCheckIntervalMs(), + TimeUnit.MILLISECONDS); this.workerSource.addGauge( WorkerSource.POTENTIAL_CONSUME_SPEED(), this::getPotentialConsumeSpeed); this.workerSource.addGauge( WorkerSource.WORKER_CONSUME_SPEED(), consumedBufferStatusHub::avgBytesPerSec); + + this.configService = configService; + + if (configService != null) { + updateQuota(); + configService.registerListenerOnConfigUpdate(this::updateQuota); + } } public static synchronized CongestionController initialize( WorkerSource workSource, int sampleTimeWindowSeconds, - long highWatermarkDiskBuffer, - long lowWatermarkDiskBuffer, - long highWatermarkUserProduceSpeed, - long lowWatermarkUserProduceSpeed, - long highWatermarkWorkerProduceSpeed, - long lowWatermarkWorkerProduceSpeed, - long userInactiveTimeMills, - long checkIntervalTimeMills) { - _INSTANCE = - new CongestionController( - workSource, - sampleTimeWindowSeconds, - highWatermarkDiskBuffer, - lowWatermarkDiskBuffer, - highWatermarkUserProduceSpeed, - lowWatermarkUserProduceSpeed, - highWatermarkWorkerProduceSpeed, - lowWatermarkWorkerProduceSpeed, - userInactiveTimeMills, - checkIntervalTimeMills); + CelebornConf conf, + ConfigService configService) { + _INSTANCE = new CongestionController(workSource, sampleTimeWindowSeconds, conf, configService); return _INSTANCE; } @@ -158,6 +152,7 @@ public class CongestionController { UserIdentifier userIdentifier = userCongestionControlContext.getUserIdentifier(); long userProduceSpeed = getUserProduceSpeed(userCongestionControlContext.getUserBufferInfo()); + UserTrafficQuota userTrafficQuota = userCongestionControlContext.getUserTrafficQuota(); // If the user produce speed is higher that the avg consume speed, will congest it if (overHighWatermark.get()) { long avgConsumeSpeed = getPotentialProduceSpeed(); @@ -173,17 +168,17 @@ public class CongestionController { } } - if (userProduceSpeed > userProduceSpeedHighWatermark) { + if (userProduceSpeed > userTrafficQuota.userProduceSpeedHighWatermark()) { userCongestionControlContext.onCongestionControl(); if (logger.isDebugEnabled()) { logger.debug( "The user {}, produceSpeed is {}, while userProduceSpeedHighWatermark is {}, need to congest it.", userIdentifier, userProduceSpeed, - userProduceSpeedHighWatermark); + userTrafficQuota.userProduceSpeedHighWatermark()); } } else if (userCongestionControlContext.inCongestionControl() - && userProduceSpeed < userProduceSpeedLowWatermark) { + && userProduceSpeed < userTrafficQuota.userProduceSpeedLowWatermark()) { userCongestionControlContext.offCongestionControl(); } return userCongestionControlContext.inCongestionControl(); @@ -269,15 +264,15 @@ public class CongestionController { try { long pendingConsume = getTotalPendingBytes(); long workerProduceSpeed = producedBufferStatusHub.avgBytesPerSec(); - if (pendingConsume < diskBufferLowWatermark - && workerProduceSpeed < workerProduceSpeedLowWatermark) { + if (pendingConsume < workerTrafficQuota.diskBufferLowWatermark() + && workerProduceSpeed < workerTrafficQuota.workerProduceSpeedLowWatermark()) { if (overHighWatermark.compareAndSet(true, false)) { logger.info( "Pending consume and produce speed is lower than low watermark, exit congestion control"); } return; - } else if ((pendingConsume > diskBufferHighWatermark - || workerProduceSpeed > workerProduceSpeedHighWatermark) + } else if ((pendingConsume > workerTrafficQuota.diskBufferHighWatermark() + || workerProduceSpeed > workerTrafficQuota.workerProduceSpeedHighWatermark()) && overHighWatermark.compareAndSet(false, true)) { logger.info( "Pending consume or produce speed is higher than high watermark, need congestion control"); @@ -315,8 +310,21 @@ public class CongestionController { userIdentifier, user -> { UserBufferInfo userBufferInfo = getUserBuffer(userIdentifier); + UserTrafficQuota userTrafficQuota; + if (configService == null) { + userTrafficQuota = defaultUserQuota; + } else { + userTrafficQuota = + configService + .getTenantUserConfigFromCache(userIdentifier.tenantId(), userIdentifier.name()) + .getUserTrafficQuota(); + } return new UserCongestionControlContext( - producedBufferStatusHub, userBufferInfo, workerSource, userIdentifier); + userTrafficQuota, + producedBufferStatusHub, + userBufferInfo, + workerSource, + userIdentifier); }); } @@ -328,4 +336,17 @@ public class CongestionController { public BufferStatusHub getConsumedBufferStatusHub() { return consumedBufferStatusHub; } + + private void updateQuota() { + workerTrafficQuota = configService.getSystemConfigFromCache().getWorkerTrafficQuota(); + for (Map.Entry entry : + userCongestionContextMap.entrySet()) { + UserIdentifier user = entry.getKey(); + UserCongestionControlContext context = entry.getValue(); + context.updateUserTrafficQuota( + configService + .getTenantUserConfigFromCache(user.tenantId(), user.name()) + .getUserTrafficQuota()); + } + } } diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java index fec5da7d6..340e1e98b 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/UserCongestionControlContext.java @@ -19,6 +19,7 @@ package org.apache.celeborn.service.deploy.worker.congestcontrol; import org.apache.celeborn.common.identity.UserIdentifier; import org.apache.celeborn.common.metrics.source.AbstractSource; +import org.apache.celeborn.common.quota.UserTrafficQuota; import org.apache.celeborn.service.deploy.worker.WorkerSource; public class UserCongestionControlContext { @@ -31,7 +32,10 @@ public class UserCongestionControlContext { private final UserIdentifier userIdentifier; + private volatile UserTrafficQuota userTrafficQuota; + public UserCongestionControlContext( + UserTrafficQuota userTrafficQuota, BufferStatusHub workerBufferStatusHub, UserBufferInfo userBufferInfo, AbstractSource workerSource, @@ -40,6 +44,7 @@ public class UserCongestionControlContext { this.userBufferInfo = userBufferInfo; this.userIdentifier = userIdentifier; this.workerBufferStatusHub = workerBufferStatusHub; + this.userTrafficQuota = userTrafficQuota; workerSource.addGauge( WorkerSource.USER_PRODUCE_SPEED(), userIdentifier.toJMap(), @@ -72,4 +77,12 @@ public class UserCongestionControlContext { public UserIdentifier getUserIdentifier() { return userIdentifier; } + + public UserTrafficQuota getUserTrafficQuota() { + return userTrafficQuota; + } + + public void updateUserTrafficQuota(UserTrafficQuota userTrafficQuota) { + this.userTrafficQuota = userTrafficQuota; + } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 972b58dec..4eeee3850 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -181,27 +181,12 @@ private[celeborn] class Worker( val partitionsSorter = new PartitionFilesSorter(memoryManager, conf, workerSource) if (conf.workerCongestionControlEnabled) { - if (conf.workerCongestionControlDiskBufferLowWatermark.isEmpty || - conf.workerCongestionControlDiskBufferHighWatermark.isEmpty || - conf.workerCongestionControlUserProduceSpeedLowWatermark.isEmpty || - conf.workerCongestionControlUserProduceSpeedHighWatermark.isEmpty || - conf.workerCongestionControlWorkerProduceSpeedLowWatermark.isEmpty || - conf.workerCongestionControlWorkerProduceSpeedHighWatermark.isEmpty) { - throw new IllegalArgumentException("High watermark and low watermark must be set" + - " when enabling rate limit") - } CongestionController.initialize( workerSource, conf.workerCongestionControlSampleTimeWindowSeconds.toInt, - conf.workerCongestionControlDiskBufferHighWatermark.get, - conf.workerCongestionControlDiskBufferLowWatermark.get, - conf.workerCongestionControlUserProduceSpeedHighWatermark.get, - conf.workerCongestionControlUserProduceSpeedLowWatermark.get, - conf.workerCongestionControlWorkerProduceSpeedHighWatermark.get, - conf.workerCongestionControlWorkerProduceSpeedLowWatermark.get, - conf.workerCongestionControlUserInactiveIntervalMs, - conf.workerCongestionControlCheckIntervalMs) + conf, + configService) } var controller = new Controller(rpcEnv, conf, metricsSystem, workerSource) diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java index ea1da91f5..6cdb1ce9f 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/congestcontrol/TestCongestionController.java @@ -17,6 +17,8 @@ package org.apache.celeborn.service.deploy.worker.congestcontrol; +import java.io.IOException; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -24,6 +26,7 @@ import org.junit.Test; import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.identity.UserIdentifier; +import org.apache.celeborn.server.common.service.config.FsConfigServiceImpl; import org.apache.celeborn.service.deploy.worker.WorkerSource; public class TestCongestionController { @@ -37,19 +40,27 @@ public class TestCongestionController { @Before public void initialize() { + CelebornConf celebornConf = new CelebornConf(); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK().key(), "1000"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK().key(), "500"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK().key(), "20000"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK().key(), "10000"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK().key(), + "20000"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK().key(), "10000"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL(), userInactiveTimeMills); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_CHECK_INTERVAL(), checkIntervalTimeMills); // Make sampleTimeWindow a bit larger in case the tests run time exceed this window. controller = - new CongestionController( - source, - 10, - 1000, - 500, - 20000, - 10000, - 20000, - 10000, - userInactiveTimeMills, - checkIntervalTimeMills) { + new CongestionController(source, 10, celebornConf, null) { @Override public long getTotalPendingBytes() { return pendingBytes; @@ -151,9 +162,24 @@ public class TestCongestionController { @Test public void testUserLevelTrafficQuota() throws InterruptedException { + CelebornConf celebornConf = new CelebornConf(); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK().key(), "100000"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK().key(), "50000"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK().key(), "500"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK().key(), "400"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK().key(), "1200"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK().key(), "1000"); + celebornConf.set(CelebornConf.WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL(), 120L * 1000); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_CHECK_INTERVAL(), checkIntervalTimeMills); CongestionController controller1 = - new CongestionController( - source, 10, 100000, 50000, 500, 400, 1200, 1000, 120L * 1000, checkIntervalTimeMills) { + new CongestionController(source, 10, celebornConf, null) { @Override public long getTotalPendingBytes() { return 0; @@ -212,9 +238,24 @@ public class TestCongestionController { @Test public void testWorkerLevelTrafficQuota() throws InterruptedException { + CelebornConf celebornConf = new CelebornConf(); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK().key(), "100000"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK().key(), "50000"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK().key(), "500"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK().key(), "400"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK().key(), "800"); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK().key(), "700"); + celebornConf.set(CelebornConf.WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL(), 120L * 1000); + celebornConf.set( + CelebornConf.WORKER_CONGESTION_CONTROL_CHECK_INTERVAL(), checkIntervalTimeMills); CongestionController controller1 = - new CongestionController( - source, 10, 100000, 50000, 500, 400, 800, 700, 120 * 1000, checkIntervalTimeMills) { + new CongestionController(source, 10, celebornConf, null) { @Override public long getTotalPendingBytes() { return 0; @@ -253,6 +294,54 @@ public class TestCongestionController { controller1.close(); } + @Test + public void testDynamicConfiguration() throws IOException, InterruptedException { + String file1 = getClass().getResource("/dynamicConfig.yaml").getFile(); + CelebornConf celebornConf1 = new CelebornConf(); + celebornConf1.set( + CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK().key(), "100000"); + celebornConf1.set( + CelebornConf.WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK().key(), "50000"); + celebornConf1.set( + CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK().key(), "500"); + celebornConf1.set( + CelebornConf.WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK().key(), "400"); + celebornConf1.set( + CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK().key(), "2000"); + celebornConf1.set( + CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK().key(), "1600"); + celebornConf1.set(CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH(), file1); + celebornConf1.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_INTERVAL(), 1000L); + FsConfigServiceImpl configService = new FsConfigServiceImpl(celebornConf1); + + CongestionController controller1 = + new CongestionController(source, 10, celebornConf1, configService) { + @Override + public long getTotalPendingBytes() { + return 0; + } + + @Override + public void trimMemoryUsage() { + // No op + } + }; + + UserIdentifier user1 = new UserIdentifier("default", "Jerry"); + UserCongestionControlContext context1 = controller1.getUserCongestionContext(user1); + Assert.assertFalse(controller1.isUserCongested(context1)); + produceBytes(controller1, user1, 600); + Assert.assertTrue(controller1.isUserCongested(context1)); + + String file2 = getClass().getResource("/dynamicConfig_2.yaml").getFile(); + celebornConf1.set(CelebornConf.DYNAMIC_CONFIG_STORE_FS_PATH(), file2); + celebornConf1.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_INTERVAL(), 1L); + Thread.sleep(2001); + + produceBytes(controller1, user1, 600); + Assert.assertFalse(controller1.isUserCongested(context1)); + } + private void clearBufferStatus(CongestionController controller) { controller.getProducedBufferStatusHub().clear(); controller.getConsumedBufferStatusHub().clear(); diff --git a/worker/src/test/resources/dynamicConfig.yaml b/worker/src/test/resources/dynamicConfig.yaml new file mode 100644 index 000000000..80772dc0b --- /dev/null +++ b/worker/src/test/resources/dynamicConfig.yaml @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +- level: SYSTEM + config: + celeborn.worker.congestionControl.workerProduceSpeed.low.watermark: 1600 + celeborn.worker.congestionControl.workerProduceSpeed.high.watermark: 2000 \ No newline at end of file diff --git a/worker/src/test/resources/dynamicConfig_2.yaml b/worker/src/test/resources/dynamicConfig_2.yaml new file mode 100644 index 000000000..90271d87b --- /dev/null +++ b/worker/src/test/resources/dynamicConfig_2.yaml @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +- level: SYSTEM + config: + celeborn.worker.congestionControl.diskBuffer.low.watermark: 8gb + celeborn.worker.congestionControl.diskBuffer.high.watermark: 10gb + celeborn.worker.congestionControl.workerProduceSpeed.low.watermark: 1600 + celeborn.worker.congestionControl.workerProduceSpeed.high.watermark: 2000 + +- tenantId: default + level: TENANT + users: + - name: Jerry + config: + celeborn.worker.congestionControl.userProduceSpeed.low.watermark: 800 + celeborn.worker.congestionControl.userProduceSpeed.high.watermark: 1000 + +