[CELEBORN-2066] Release workers only with high workload when the number of excluded worker set is too large

### What changes were proposed in this pull request?

Provide user options to enable release workers only with high workload when the number of excluded worker set is too large.

### Why are the changes needed?

In some cases, a large percentage of workers were excluded, but most of them were due to high workload. It's better to release such workers from excluded set to ensure the system availability is a priority.

### Does this PR introduce _any_ user-facing change?

New Configuration Option.

### How was this patch tested?
Unit tests.

Closes #3365 from Kalvin2077/exclude-high-stress-workers.

Lead-authored-by: yuanzhen <yuanzhen.hwk@alibaba-inc.com>
Co-authored-by: Kalvin2077 <wk.huang2077@outlook.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
yuanzhen 2025-08-22 10:14:38 +08:00 committed by Shuang
parent 661a096b77
commit 8effb735f7
8 changed files with 566 additions and 30 deletions

View File

@ -812,6 +812,10 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
}
def masterExcludeWorkerUnhealthyDiskRatioThreshold: Double =
get(MASTER_EXCLUDE_WORKER_UNHEALTHY_DISK_RATIO_THRESHOLD)
def masterAutoReleaseHighWorkloadWorkerEnabled: Boolean =
get(MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_ENABLED)
def masterAutoReleaseHighWorkloadWorkerRatioThreshold: Double =
get(MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_RATIO_THRESHOLD)
// //////////////////////////////////////////////////////
// Worker //
@ -6536,6 +6540,25 @@ object CelebornConf extends Logging {
.checkValue(v => v > 0.0 && v <= 1.0, "Should be in (0.0, 1.0].")
.createWithDefault(1)
val MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.excludeWorker.autoReleaseHighWorkLoadEnabled")
.categories("master")
.version("0.7.0")
.doc("Whether to release workers with high workload in excluded worker list.")
.booleanConf
.createWithDefault(false)
val MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_RATIO_THRESHOLD: ConfigEntry[Double] =
buildConf("celeborn.master.excludeWorker.autoReleaseHighWorkLoadRatioThreshold")
.categories("master")
.version("0.7.0")
.doc("Whenever the number of worker with high workload exceeds this ratio, " +
"master will release worker with high workload in excluded worker list. " +
"If this value is set to 0, such workers will never be excluded. ")
.doubleConf
.checkValue(v => v >= 0.0 && v < 1.0, "Should be in [0.0, 1).")
.createWithDefault(0.3)
val QUOTA_CLUSTER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.cluster.diskBytesWritten")
.categories("quota")

View File

@ -47,6 +47,7 @@ class WorkerInfo(
var nextInterruptionNotice = Long.MaxValue
var lastHeartbeat: Long = 0
var workerStatus = WorkerStatus.normalWorkerStatus()
var isHighWorkLoad: Boolean = false;
val diskInfos = {
if (_diskInfos != null) JavaUtils.newConcurrentHashMap[String, DiskInfo](_diskInfos)
else null
@ -182,6 +183,10 @@ class WorkerInfo(
this.workerStatus = workerStatus;
}
def setWorkLoad(isHighWorkLoad: Boolean): Unit = {
this.isHighWorkLoad = isHighWorkLoad;
}
def updateDiskSlots(estimatedPartitionSize: Long): Unit = this.synchronized {
diskInfos.asScala.foreach { case (_, disk) =>
disk.maxSlots = disk.totalSpace / estimatedPartitionSize

View File

@ -43,6 +43,8 @@ license: |
| celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore partition size smaller than this configuration of partition size for estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate |
| celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | false | Initial delay time before start updating partition size for estimation. | 0.3.0 | celeborn.shuffle.estimatedPartitionSize.update.initialDelay |
| celeborn.master.estimatedPartitionSize.update.interval | 10min | false | Interval of updating partition size for estimation. | 0.3.0 | celeborn.shuffle.estimatedPartitionSize.update.interval |
| celeborn.master.excludeWorker.autoReleaseHighWorkLoadEnabled | false | false | Whether to release workers with high workload in excluded worker list. | 0.7.0 | |
| celeborn.master.excludeWorker.autoReleaseHighWorkLoadRatioThreshold | 0.3 | false | Whenever the number of worker with high workload exceeds this ratio, master will release worker with high workload in excluded worker list. If this value is set to 0, such workers will never be excluded. | 0.7.0 | |
| celeborn.master.excludeWorker.unhealthyDiskRatioThreshold | 1.0 | false | Max ratio of unhealthy disks for excluding worker, when unhealthy disk is larger than max unhealthy count, master will exclude worker. If this value is set to 1, master will exclude worker of which disks are all unhealthy. | 0.6.0 | |
| celeborn.master.heartbeat.application.timeout | 300s | false | Application heartbeat timeout. | 0.3.0 | celeborn.application.heartbeat.timeout |
| celeborn.master.heartbeat.worker.timeout | 120s | false | Worker heartbeat timeout. | 0.3.0 | celeborn.worker.heartbeat.timeout |

View File

@ -32,6 +32,8 @@ import scala.Option;
import scala.Tuple2;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.slf4j.Logger;
@ -89,6 +91,9 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
public long initialEstimatedPartitionSize;
public long estimatedPartitionSize;
public double unhealthyDiskRatioThreshold;
protected boolean autoReleaseHighWorkLoadEnabled;
protected double autoReleaseHighWorkLoadRatioThreshold;
protected boolean hasRemoteStorage;
public final LongAdder partitionTotalWritten = new LongAdder();
public final LongAdder partitionTotalFileCount = new LongAdder();
public final LongAdder shuffleTotalCount = new LongAdder();
@ -249,6 +254,34 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
}
}
private boolean hasAvailableStorage(WorkerInfo workerInfo) {
Map<String, DiskInfo> disks = workerInfo.diskInfos();
Pair<Boolean, Long> exceedCheckResult = isExceedingUnhealthyThreshold(disks);
boolean hasDisk = !disks.isEmpty();
boolean isExceeding = exceedCheckResult.getLeft();
long unhealthyCount = exceedCheckResult.getRight();
if (hasDisk) {
if (!isExceeding) {
return true;
} else {
LOG.warn(
"Worker {} doesn't have enough healthy local disk (unhealthy count: {}). Has remote storage: {}",
workerInfo,
unhealthyCount,
hasRemoteStorage);
}
}
if (hasRemoteStorage) {
return true;
} else {
LOG.warn("Worker {} has no available storage", workerInfo);
return false;
}
}
public void updateWorkerHeartbeatMeta(
String host,
int rpcPort,
@ -271,6 +304,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
availableSlots.set(info.totalAvailableSlots());
info.lastHeartbeat_$eq(time);
info.setWorkerStatus(workerStatus);
info.setWorkLoad(highWorkload);
});
}
@ -284,23 +318,34 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
}
// If using HDFSONLY mode, workers with empty disks should not be put into excluded worker list.
long unhealthyDiskNum =
disks.values().stream().filter(s -> !s.status().equals(DiskStatus.HEALTHY)).count();
boolean exceed = unhealthyDiskNum * 1.0 / disks.size() >= unhealthyDiskRatioThreshold;
boolean remoteStorageDirsDefined = conf.remoteStorageDirs().isDefined();
if (!excludedWorkers.contains(worker)
&& (((disks.isEmpty() || exceed) && !remoteStorageDirsDefined) || highWorkload)) {
LOG.warn(
"Worker {} (unhealthy disks num: {}, high workload: {}) adds to excluded workers",
worker,
unhealthyDiskNum,
highWorkload);
if (!excludedWorkers.contains(worker) && (!hasAvailableStorage(worker) || highWorkload)) {
LOG.warn("Worker {} adds to excluded workers, high workload: {}", worker, highWorkload);
excludedWorkers.add(worker);
} else if ((availableSlots.get() > 0 || remoteStorageDirsDefined) && !highWorkload) {
} else if ((availableSlots.get() > 0 || hasRemoteStorage) && !highWorkload) {
// only unblack if numSlots larger than 0
excludedWorkers.remove(worker);
}
// release high work load workers when too many excluded workers
if (autoReleaseHighWorkLoadEnabled
&& excludedWorkers.size()
>= Math.floor(workersMap.size() * autoReleaseHighWorkLoadRatioThreshold)) {
synchronized (workersMap) {
List<WorkerInfo> toRemoved =
excludedWorkers.stream()
.filter(
w -> {
WorkerInfo info = workersMap.get(w.toUniqueId());
return info != null
&& info.isHighWorkLoad()
&& hasAvailableStorage(w)
&& info.totalAvailableSlots() > 0;
})
.collect(Collectors.toList());
updateExcludedWorkersMeta(new ArrayList<>(), toRemoved);
}
}
// try to update the available workers if the worker status is Normal
if (workerStatus.getState() == PbWorkerStatus.State.Normal) {
updateAvailableWorkers(worker);
@ -639,4 +684,11 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
workerInfo.ifPresent(info -> info.updateThenGetUserResourceConsumption(resourceConsumptions));
}
}
private Pair<Boolean, Long> isExceedingUnhealthyThreshold(Map<String, DiskInfo> diskMap) {
long unhealthyCount =
diskMap.values().stream().filter(disk -> !disk.status().equals(DiskStatus.HEALTHY)).count();
return new ImmutablePair<>(
unhealthyCount * 1.0 / diskMap.size() >= unhealthyDiskRatioThreshold, unhealthyCount);
}
}

View File

@ -47,6 +47,10 @@ public class SingleMasterMetaManager extends AbstractMetaManager {
this.initialEstimatedPartitionSize = conf.initialEstimatedPartitionSize();
this.estimatedPartitionSize = initialEstimatedPartitionSize;
this.unhealthyDiskRatioThreshold = conf.masterExcludeWorkerUnhealthyDiskRatioThreshold();
this.autoReleaseHighWorkLoadEnabled = conf.masterAutoReleaseHighWorkloadWorkerEnabled();
this.autoReleaseHighWorkLoadRatioThreshold =
conf.masterAutoReleaseHighWorkloadWorkerRatioThreshold();
this.hasRemoteStorage = conf.remoteStorageDirs().isDefined();
this.rackResolver = rackResolver;
}

View File

@ -56,6 +56,9 @@ public class HAMasterMetaManager extends AbstractMetaManager {
this.initialEstimatedPartitionSize = conf.initialEstimatedPartitionSize();
this.estimatedPartitionSize = initialEstimatedPartitionSize;
this.unhealthyDiskRatioThreshold = conf.masterExcludeWorkerUnhealthyDiskRatioThreshold();
this.autoReleaseHighWorkLoadEnabled = conf.masterAutoReleaseHighWorkloadWorkerEnabled();
this.autoReleaseHighWorkLoadRatioThreshold =
conf.masterAutoReleaseHighWorkloadWorkerRatioThreshold();
this.rackResolver = rackResolver;
}

View File

@ -704,8 +704,8 @@ public class DefaultMetaSystemSuiteJ {
workerStatus,
getNewReqeustId());
assertEquals(statusSystem.excludedWorkers.size(), 1);
assertEquals(statusSystem.availableWorkers.size(), 2);
assertEquals(1, statusSystem.excludedWorkers.size());
assertEquals(2, statusSystem.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
@ -720,8 +720,8 @@ public class DefaultMetaSystemSuiteJ {
workerStatus,
getNewReqeustId());
assertEquals(statusSystem.excludedWorkers.size(), 2);
assertEquals(statusSystem.availableWorkers.size(), 1);
assertEquals(2, statusSystem.excludedWorkers.size());
assertEquals(1, statusSystem.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME3,
@ -736,8 +736,8 @@ public class DefaultMetaSystemSuiteJ {
workerStatus,
getNewReqeustId());
assertEquals(statusSystem.excludedWorkers.size(), 2);
assertEquals(statusSystem.availableWorkers.size(), 1);
assertEquals(2, statusSystem.excludedWorkers.size());
assertEquals(1, statusSystem.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME3,
@ -752,8 +752,184 @@ public class DefaultMetaSystemSuiteJ {
workerStatus,
getNewReqeustId());
assertEquals(statusSystem.excludedWorkers.size(), 3);
assertEquals(statusSystem.availableWorkers.size(), 0);
assertEquals(3, statusSystem.excludedWorkers.size());
assertEquals(0, statusSystem.availableWorkers.size());
}
@Test
public void testAutoReleaseHighWorkLoadWorkers() {
conf.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_ENABLED(), true);
conf.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_RATIO_THRESHOLD(), 0.8);
statusSystem = new SingleMasterMetaManager(mockRpcEnv, conf);
statusSystem.handleRegisterWorker(
HOSTNAME1,
RPCPORT1,
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
INTERNALPORT1,
NETWORK_LOCATION1,
disks1,
userResourceConsumption1,
getNewReqeustId());
statusSystem.handleRegisterWorker(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
INTERNALPORT2,
NETWORK_LOCATION2,
disks2,
userResourceConsumption2,
getNewReqeustId());
statusSystem.handleRegisterWorker(
HOSTNAME3,
RPCPORT3,
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
INTERNALPORT3,
NETWORK_LOCATION3,
disks3,
userResourceConsumption3,
getNewReqeustId());
// worker2 and work3 are unhealthy
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
new HashMap<>(),
userResourceConsumption2,
1,
false,
workerStatus,
getNewReqeustId());
assertEquals(1, statusSystem.excludedWorkers.size());
assertEquals(2, statusSystem.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME3,
RPCPORT3,
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
new HashMap<>(),
userResourceConsumption3,
1,
false,
workerStatus,
getNewReqeustId());
assertEquals(2, statusSystem.excludedWorkers.size());
assertEquals(1, statusSystem.availableWorkers.size());
// worker2 and work3 have high workload
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
disks2,
userResourceConsumption2,
1,
false,
workerStatus,
getNewReqeustId());
assertEquals(1, statusSystem.excludedWorkers.size());
assertEquals(2, statusSystem.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME3,
RPCPORT3,
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
disks3,
userResourceConsumption3,
1,
false,
workerStatus,
getNewReqeustId());
assertEquals(0, statusSystem.excludedWorkers.size());
assertEquals(3, statusSystem.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
disks2,
userResourceConsumption2,
1,
true,
workerStatus,
getNewReqeustId());
assertEquals(1, statusSystem.excludedWorkers.size());
assertEquals(2, statusSystem.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME3,
RPCPORT3,
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
disks3,
userResourceConsumption3,
1,
true,
workerStatus,
getNewReqeustId());
// release 2 workers with high workload
assertEquals(0, statusSystem.excludedWorkers.size());
assertEquals(3, statusSystem.availableWorkers.size());
// work2 has high workload and work3 is unhealthy
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
disks2,
userResourceConsumption2,
1,
true,
workerStatus,
getNewReqeustId());
assertEquals(1, statusSystem.excludedWorkers.size());
assertEquals(2, statusSystem.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME3,
RPCPORT3,
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
new HashMap<>(),
userResourceConsumption3,
1,
false,
workerStatus,
getNewReqeustId());
// release worker2
assertEquals(1, statusSystem.excludedWorkers.size());
assertEquals(2, statusSystem.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
new HashMap<>(),
userResourceConsumption2,
1,
true,
workerStatus,
getNewReqeustId());
assertEquals(2, statusSystem.excludedWorkers.size());
assertEquals(1, statusSystem.availableWorkers.size());
}
@Test
@ -837,7 +1013,7 @@ public class DefaultMetaSystemSuiteJ {
// Size between minEstimateSize -> maxEstimateSize
statusSystem.handleUpdatePartitionSize();
Assert.assertEquals(statusSystem.estimatedPartitionSize, 500000000);
Assert.assertEquals(500000000, statusSystem.estimatedPartitionSize);
statusSystem.handleAppHeartbeat(
APPID1, 1000l, 1, 1, 1, new HashMap<>(), new HashMap<>(), dummy, getNewReqeustId());
@ -951,11 +1127,11 @@ public class DefaultMetaSystemSuiteJ {
dummy,
getNewReqeustId());
assertEquals(statusSystem.shuffleTotalCount.longValue(), 5);
assertEquals(statusSystem.applicationTotalCount.longValue(), 3);
assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY1).longValue(), 3);
assertEquals(statusSystem.shuffleFallbackCounts.get(POLICY2).longValue(), 2);
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY1).longValue(), 2);
assertEquals(statusSystem.applicationFallbackCounts.get(POLICY2).longValue(), 1);
assertEquals(5, statusSystem.shuffleTotalCount.longValue());
assertEquals(3, statusSystem.applicationTotalCount.longValue());
assertEquals(3, statusSystem.shuffleFallbackCounts.get(POLICY1).longValue());
assertEquals(2, statusSystem.shuffleFallbackCounts.get(POLICY2).longValue());
assertEquals(2, statusSystem.applicationFallbackCounts.get(POLICY1).longValue());
assertEquals(1, statusSystem.applicationFallbackCounts.get(POLICY2).longValue());
}
}

View File

@ -108,9 +108,9 @@ public class RatisMasterStatusSystemSuiteJ {
while (!serversStarted) {
try {
STATUSSYSTEM1 = new HAMasterMetaManager(mockRpcEnv, new CelebornConf());
STATUSSYSTEM2 = new HAMasterMetaManager(mockRpcEnv, new CelebornConf());
STATUSSYSTEM3 = new HAMasterMetaManager(mockRpcEnv, new CelebornConf());
STATUSSYSTEM1 = new HAMasterMetaManager(mockRpcEnv, conf1);
STATUSSYSTEM2 = new HAMasterMetaManager(mockRpcEnv, conf2);
STATUSSYSTEM3 = new HAMasterMetaManager(mockRpcEnv, conf3);
MetaHandler handler1 = new MetaHandler(STATUSSYSTEM1);
MetaHandler handler2 = new MetaHandler(STATUSSYSTEM2);
@ -1138,6 +1138,277 @@ public class RatisMasterStatusSystemSuiteJ {
Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
}
@Test
public void testAutoReleaseHighWorkLoadWorkers() throws InterruptedException, IOException {
CelebornConf conf1 = new CelebornConf();
conf1.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_ENABLED(), true);
conf1.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_RATIO_THRESHOLD(), 0.8);
CelebornConf conf2 = new CelebornConf();
conf2.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_ENABLED(), true);
conf2.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_RATIO_THRESHOLD(), 0.8);
CelebornConf conf3 = new CelebornConf();
conf3.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_ENABLED(), true);
conf3.set(CelebornConf.MASTER_AUTO_RELEASE_HIGH_WORKLOAD_WORKER_RATIO_THRESHOLD(), 0.8);
try {
resetRaftServer(
configureServerConf(conf1, 1),
configureServerConf(conf2, 2),
configureServerConf(conf3, 3),
false);
AbstractMetaManager statusSystem = pickLeaderStatusSystem();
Assert.assertNotNull(statusSystem);
statusSystem.handleRegisterWorker(
HOSTNAME1,
RPCPORT1,
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
INTERNALPORT1,
NETWORK_LOCATION1,
disks1,
userResourceConsumption1,
getNewReqeustId());
statusSystem.handleRegisterWorker(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
INTERNALPORT2,
NETWORK_LOCATION2,
disks2,
userResourceConsumption2,
getNewReqeustId());
statusSystem.handleRegisterWorker(
HOSTNAME3,
RPCPORT3,
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
INTERNALPORT3,
NETWORK_LOCATION3,
disks3,
userResourceConsumption3,
getNewReqeustId());
Thread.sleep(3000L);
// worker2 and work3 are unhealthy
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
new HashMap<>(),
userResourceConsumption2,
1,
false,
workerStatus,
getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(1, statusSystem.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
Assert.assertEquals(2, statusSystem.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME3,
RPCPORT3,
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
new HashMap<>(),
userResourceConsumption3,
1,
false,
workerStatus,
getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(2, statusSystem.excludedWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM3.excludedWorkers.size());
Assert.assertEquals(1, statusSystem.availableWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM1.availableWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM2.availableWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
// worker2 and work3 have high workload
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
disks2,
userResourceConsumption2,
1,
false,
workerStatus,
getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(1, statusSystem.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
Assert.assertEquals(2, statusSystem.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME3,
RPCPORT3,
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
disks3,
userResourceConsumption3,
1,
false,
workerStatus,
getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(0, statusSystem.excludedWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size());
Assert.assertEquals(3, statusSystem.availableWorkers.size());
Assert.assertEquals(3, STATUSSYSTEM1.availableWorkers.size());
Assert.assertEquals(3, STATUSSYSTEM2.availableWorkers.size());
Assert.assertEquals(3, STATUSSYSTEM3.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
disks2,
userResourceConsumption2,
1,
true,
workerStatus,
getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(1, statusSystem.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
Assert.assertEquals(2, statusSystem.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME3,
RPCPORT3,
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
disks3,
userResourceConsumption3,
1,
true,
workerStatus,
getNewReqeustId());
Thread.sleep(3000L);
// release 2 workers with high workload
Assert.assertEquals(0, statusSystem.excludedWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(0, STATUSSYSTEM3.excludedWorkers.size());
Assert.assertEquals(3, statusSystem.availableWorkers.size());
Assert.assertEquals(3, STATUSSYSTEM1.availableWorkers.size());
Assert.assertEquals(3, STATUSSYSTEM2.availableWorkers.size());
Assert.assertEquals(3, STATUSSYSTEM3.availableWorkers.size());
// work2 has high workload and work3 is unhealthy
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
disks2,
userResourceConsumption2,
1,
true,
workerStatus,
getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(1, statusSystem.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
Assert.assertEquals(2, statusSystem.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME3,
RPCPORT3,
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
new HashMap<>(),
userResourceConsumption3,
1,
false,
workerStatus,
getNewReqeustId());
Thread.sleep(3000L);
// release worker2
Assert.assertEquals(1, statusSystem.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());
Assert.assertEquals(2, statusSystem.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM1.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM2.availableWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM3.availableWorkers.size());
statusSystem.handleWorkerHeartbeat(
HOSTNAME2,
RPCPORT2,
PUSHPORT2,
FETCHPORT2,
REPLICATEPORT2,
new HashMap<>(),
userResourceConsumption2,
1,
true,
workerStatus,
getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(2, statusSystem.excludedWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM3.excludedWorkers.size());
Assert.assertEquals(1, statusSystem.availableWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM1.availableWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM2.availableWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM3.availableWorkers.size());
} catch (Exception e) {
Assert.fail(e.getMessage());
} finally {
resetRaftServer(
configureServerConf(new CelebornConf(), 1),
configureServerConf(new CelebornConf(), 2),
configureServerConf(new CelebornConf(), 3),
false);
}
}
@Before
public void resetStatus() {
STATUSSYSTEM1.registeredAppAndShuffles.clear();