[CELEBORN-1794] Fix TestCongestionController occasional failing

### Why are the changes needed?
There are a small probability of the TestCongestionController test failing.

![image](https://github.com/user-attachments/assets/bc5bfb91-0b40-4ee0-bd74-2b96715d7cd7)

That is because the `checkService` will excute once it was init, which can cause a multithreading conflict with the test code.

![image](https://github.com/user-attachments/assets/dff58b08-a340-4c29-a61d-fafaeb182c5d)

### What changes were proposed in this pull request?
Fix ut bug.
In fact, `shutDownCheckService` still wont prevent the `checkService` from excuting at once but can make the main testing thread waiting for it to shutDown.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
manual test.

Closes #3017 from zaynt4606/clb1794.

Authored-by: zhengtao <shuaizhentao.szt@alibaba-inc.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
zhengtao 2024-12-23 17:47:28 +08:00 committed by Shuang
parent e496a3cfae
commit 406ceb64c1
2 changed files with 11 additions and 9 deletions

View File

@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -291,13 +292,18 @@ public class CongestionController {
public void close() {
logger.info("Closing {}", this.getClass().getSimpleName());
this.removeUserExecutorService.shutdownNow();
this.checkService.shutdownNow();
ThreadUtils.shutdown(this.removeUserExecutorService);
ThreadUtils.shutdown(this.checkService);
this.userBufferStatuses.clear();
this.consumedBufferStatusHub.clear();
this.producedBufferStatusHub.clear();
}
@VisibleForTesting
public void shutDownCheckService() {
ThreadUtils.shutdown(this.checkService);
}
public static synchronized void destroy() {
if (_INSTANCE != null) {
_INSTANCE.close();

View File

@ -36,7 +36,6 @@ public class TestCongestionController {
private long pendingBytes = 0L;
private final long userInactiveTimeMills = 2000L;
private final long checkIntervalTimeMills = Integer.MAX_VALUE;
@Before
public void initialize() {
@ -56,8 +55,6 @@ public class TestCongestionController {
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK().key(), "10000");
celebornConf.set(
CelebornConf.WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL(), userInactiveTimeMills);
celebornConf.set(
CelebornConf.WORKER_CONGESTION_CONTROL_CHECK_INTERVAL(), checkIntervalTimeMills);
// Make sampleTimeWindow a bit larger in case the tests run time exceed this window.
controller =
new CongestionController(source, 10, celebornConf, null) {
@ -71,6 +68,7 @@ public class TestCongestionController {
// No op
}
};
controller.shutDownCheckService();
}
@After
@ -176,8 +174,6 @@ public class TestCongestionController {
celebornConf.set(
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK().key(), "1000");
celebornConf.set(CelebornConf.WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL(), 120L * 1000);
celebornConf.set(
CelebornConf.WORKER_CONGESTION_CONTROL_CHECK_INTERVAL(), checkIntervalTimeMills);
CongestionController controller1 =
new CongestionController(source, 10, celebornConf, null) {
@Override
@ -190,6 +186,7 @@ public class TestCongestionController {
// No op
}
};
controller1.shutDownCheckService();
UserIdentifier user1 = new UserIdentifier("test1", "celeborn");
UserCongestionControlContext context1 = controller1.getUserCongestionContext(user1);
@ -252,8 +249,6 @@ public class TestCongestionController {
celebornConf.set(
CelebornConf.WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK().key(), "700");
celebornConf.set(CelebornConf.WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL(), 120L * 1000);
celebornConf.set(
CelebornConf.WORKER_CONGESTION_CONTROL_CHECK_INTERVAL(), checkIntervalTimeMills);
CongestionController controller1 =
new CongestionController(source, 10, celebornConf, null) {
@Override
@ -266,6 +261,7 @@ public class TestCongestionController {
// No op
}
};
controller1.shutDownCheckService();
UserIdentifier user1 = new UserIdentifier("test1", "celeborn");
UserCongestionControlContext context1 = controller1.getUserCongestionContext(user1);