[CELEBORN-1242] Unify celeborn thread name format

### What changes were proposed in this pull request?

Unify celeborn thread name format with the following pattern:

- client: `celeborn-client-[component]-[function]er`
- service: `[master|worker]-[component]-[function]er`
- other: `celeborn-[component]-[function]er`

### Why are the changes needed?

It's recommended to unify celeborn thread name format especially client side for application.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No.

Closes #2248 from AngersZhuuuu/CELEBORN-1242.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
Angerszhuuuu 2024-01-23 16:56:40 +08:00 committed by mingji
parent 37a513deaa
commit 67e6cbfb51
No known key found for this signature in database
GPG Key ID: 6392F71F37356FA0
26 changed files with 51 additions and 45 deletions

View File

@ -21,7 +21,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.BatchShuffleMode;
@ -56,9 +56,9 @@ public class RemoteShuffleMaster implements ShuffleMaster<RemoteShuffleDescripto
private volatile LifecycleManager lifecycleManager;
private final ShuffleTaskInfo shuffleTaskInfo = new ShuffleTaskInfo();
private ShuffleResourceTracker shuffleResourceTracker;
private final ScheduledThreadPoolExecutor executor =
new ScheduledThreadPoolExecutor(
1, ThreadUtils.namedThreadFactory("remote-shuffle-master-executor"));
private final ScheduledExecutorService executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"celeborn-client-remote-shuffle-master-executor");
private final ResultPartitionAdapter resultPartitionDelegation;
private final long lifecycleManagerTimestamp;

View File

@ -48,7 +48,7 @@ public class SendBufferPool {
private final LinkedList<LinkedBlockingQueue<PushTask>> pushTaskQueues;
private ScheduledExecutorService cleaner =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-sendBufferPool-cleaner");
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-client-sendBufferPool-cleaner");
private SendBufferPool(int capacity, long checkInterval, long timeout) {
assert capacity > 0;

View File

@ -40,7 +40,8 @@ class ReviveManager {
private final int batchSize;
ShuffleClientImpl shuffleClient;
private final ScheduledExecutorService batchReviveRequestScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("batch-revive-scheduler");
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"celeborn-client-lifecycle-manager-batch-revive-scheduler");
public ReviveManager(ShuffleClientImpl shuffleClient, CelebornConf conf) {
this.shuffleClient = shuffleClient;

View File

@ -128,7 +128,7 @@ public class DfsPartitionReader implements PartitionReader {
numChunks = chunkOffsets.size() - 1;
fetchThread =
ThreadUtils.newDaemonSingleThreadExecutor(
"Dfs-fetch-thread" + location.getStorageInfo().getFilePath());
"celeborn-client-dfs-partition-fetcher" + location.getStorageInfo().getFilePath());
logger.debug("Start dfs read on location {}", location);
ShuffleClient.incrementTotalReadCounter();
}

View File

@ -83,7 +83,7 @@ public class LocalPartitionReader implements PartitionReader {
if (readLocalShufflePool == null) {
readLocalShufflePool =
ThreadUtils.newDaemonCachedThreadPool(
"local-shuffle-reader-thread", conf.readLocalShuffleThreads(), 60);
"celeborn-client-local-shuffle-reader", conf.readLocalShuffleThreads(), 60);
}
}
}

View File

@ -100,7 +100,7 @@ public class DataPusher {
this.mapStatusLengths = mapStatusLengths;
pushThread =
new Thread("DataPusher-" + taskId) {
new Thread("celeborn-client-data-pusher-" + taskId) {
private void reclaimTask(PushTask task) throws InterruptedException {
idleLock.lockInterruptibly();
try {

View File

@ -42,7 +42,8 @@ class ApplicationHeartbeater(
private val appHeartbeatIntervalMs = conf.appHeartbeatIntervalMs
private val applicationUnregisterEnabled = conf.applicationUnregisterEnabled
private val appHeartbeatHandlerThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-app-heartbeat")
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"celeborn-client-lifecycle-manager-app-heartbeater")
private var appHeartbeat: ScheduledFuture[_] = _
def start(): Unit = {

View File

@ -53,14 +53,14 @@ class ChangePartitionManager(
private val batchHandleChangePartitionEnabled = conf.batchHandleChangePartitionEnabled
private val batchHandleChangePartitionExecutors = ThreadUtils.newDaemonCachedThreadPool(
"celeborn-lifecycle-manager-change-partition-executor",
"celeborn-client-lifecycle-manager-change-partition-executor",
conf.batchHandleChangePartitionNumThreads)
private val batchHandleChangePartitionRequestInterval =
conf.batchHandleChangePartitionRequestInterval
private val batchHandleChangePartitionSchedulerThread: Option[ScheduledExecutorService] =
if (batchHandleChangePartitionEnabled) {
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"celeborn-lifecycle-manager-change-partition-scheduler"))
"celeborn-client-lifecycle-manager-change-partition-scheduler"))
} else {
None
}

View File

@ -78,14 +78,14 @@ class CommitManager(appUniqueId: String, val conf: CelebornConf, lifecycleManage
val committedPartitionInfo = new CommittedPartitionInfo
private val batchHandleCommitPartitionEnabled = conf.batchHandleCommitPartitionEnabled
private val batchHandleCommitPartitionExecutors = ThreadUtils.newDaemonCachedThreadPool(
"celeborn-lifecycle-manager-commit-partition-executor",
"celeborn-client-lifecycle-manager-commit-partition-executor",
conf.batchHandleCommitPartitionNumThreads)
private val batchHandleCommitPartitionRequestInterval =
conf.batchHandleCommitPartitionRequestInterval
private val batchHandleCommitPartitionSchedulerThread: Option[ScheduledExecutorService] =
if (batchHandleCommitPartitionEnabled) {
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"celeborn-lifecycle-manager-commit-partition-scheduler"))
"celeborn-client-lifecycle-manager-commit-partition-scheduler"))
} else {
None
}

View File

@ -139,10 +139,13 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
// Threads
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-message-forwarder")
private var checkForShuffleRemoval: ScheduledFuture[_] = _
val rpcSharedThreadPool =
ThreadUtils.newDaemonCachedThreadPool("shared-rpc-pool", conf.clientRpcSharedThreads, 30)
ThreadUtils.newDaemonCachedThreadPool(
"celeborn-client-lifecycle-manager-shared-rpc-pool",
conf.clientRpcSharedThreads,
30)
val ec = ExecutionContext.fromExecutor(rpcSharedThreadPool)
// init driver celeborn LifecycleManager rpc service

View File

@ -38,14 +38,14 @@ class ReleasePartitionManager(
private val shuffleReleasePartitionRequests = JavaUtils.newConcurrentHashMap[Int, util.Set[Int]]
private val batchHandleReleasePartitionEnabled = conf.batchHandleReleasePartitionEnabled
private val batchHandleReleasePartitionExecutors = ThreadUtils.newDaemonCachedThreadPool(
"celeborn-lifecycle-manager-release-partition-executor",
"celeborn-client-lifecycle-manager-release-partition-executor",
conf.batchHandleReleasePartitionNumThreads)
private val batchHandleReleasePartitionRequestInterval =
conf.batchHandleReleasePartitionRequestInterval
private val batchHandleReleasePartitionSchedulerThread: Option[ScheduledExecutorService] =
if (batchHandleReleasePartitionEnabled) {
Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"celeborn-lifecycle-manager-release-partition-scheduler"))
"celeborn-client-lifecycle-manager-release-partition-scheduler"))
} else {
None
}

View File

@ -67,7 +67,8 @@ public class MasterClient {
this.maxRetries = Math.max(masterEndpoints.size(), conf.masterClientMaxRetries());
this.rpcTimeout = conf.masterClientRpcAskTimeout();
this.rpcEndpointRef = new AtomicReference<>();
this.oneWayMessageSender = ThreadUtils.newDaemonSingleThreadExecutor("One-Way-Message-Sender");
this.oneWayMessageSender =
ThreadUtils.newDaemonSingleThreadExecutor("celeborn-one-way-message-sender");
}
private static final String SPLITTER = "#";

View File

@ -95,7 +95,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
if (pushTimeoutChecker == null) {
pushTimeoutChecker =
ThreadUtils.newDaemonThreadPoolScheduledExecutor(
"push-timeout-checker", conf.pushDataTimeoutCheckerThreads());
"celeborn-push-timeout-checker", conf.pushDataTimeoutCheckerThreads());
}
}
@ -103,7 +103,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
if (fetchTimeoutChecker == null) {
fetchTimeoutChecker =
ThreadUtils.newDaemonThreadPoolScheduledExecutor(
"fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads());
"celeborn-fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads());
}
}
}

View File

@ -24,13 +24,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,8 +61,7 @@ public final class ShutdownHookManager {
public static final TimeUnit TIME_UNIT_DEFAULT = TimeUnit.MILLISECONDS;
private static final ExecutorService EXECUTOR =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("shutdown-hook-%01d").build());
ThreadUtils.newDaemonSingleThreadExecutor("celeborn-shutdown-hook-%01d");
static {
try {

View File

@ -118,9 +118,9 @@ class AppDiskUsageMetric(conf: CelebornConf) extends Logging {
val interval = conf.metricsAppTopDiskUsageInterval
val snapShots = new Array[AppDiskUsageSnapShot](snapshotCount)
val logExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("App_disk_usage_log_thread")
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-app-disk-usage-metrics-logger")
val updateExecutor =
ThreadUtils.newDaemonSingleThreadExecutor("App_disk_usage_metric_thread")
ThreadUtils.newDaemonSingleThreadExecutor("master-app-disk-usage-metrics-updater")
var currentSnapShot: AtomicReference[AppDiskUsageSnapShot] =
new AtomicReference[AppDiskUsageSnapShot]()

View File

@ -64,7 +64,7 @@ class NettyRpcEnv(
val clientFactory = transportContext.createClientFactory()
private val timeoutScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-netty-rpc-env-timeout-checker")
// Because TransportClientFactory.createClient is blocking, we need to run it in this thread pool
// to implement non-blocking send/ask.

View File

@ -85,7 +85,7 @@ public class HARaftServer {
private final StateMachine masterStateMachine;
private final ScheduledExecutorService scheduledRoleChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("ratis-role-checker");
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-ratis-role-checker");
private long roleCheckIntervalMs;
private final ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
private Optional<RaftProtos.RaftPeerRole> cachedPeerRole = Optional.empty();

View File

@ -90,7 +90,8 @@ public class StateMachine extends BaseStateMachine {
this.masterRatisServer = ratisServer;
this.metaHandler = ratisServer.getMetaHandler();
this.executorService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Meta-StateMachine");
this.executorService =
ThreadUtils.newDaemonSingleThreadExecutor("master-state-machine-executor");
}
/** Initializes the State Machine with the given server, group and storage. */

View File

@ -92,7 +92,7 @@ private[celeborn] class Master(
// Threads
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-message-forwarder")
private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _
private var checkForUnavailableWorkerTimeOutTask: ScheduledFuture[_] = _
@ -135,7 +135,7 @@ private[celeborn] class Master(
private val estimatedPartitionSizeForEstimationUpdateInterval =
conf.estimatedPartitionSizeForEstimationUpdateInterval
private val partitionSizeUpdateService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("partition-size-updater")
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-partition-size-updater")
partitionSizeUpdateService.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {

View File

@ -48,7 +48,7 @@ public class FsConfigServiceImpl implements ConfigService {
private static final String CONF_CONFIG = "config";
private final ScheduledExecutorService configRefreshService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("config-refresh-service");
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-config-refresher");
public FsConfigServiceImpl(CelebornConf celebornConf) {
this.celebornConf = celebornConf;

View File

@ -72,12 +72,14 @@ public class CongestionController {
this.userBufferStatuses = JavaUtils.newConcurrentHashMap();
this.removeUserExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("remove-inactive-user");
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"worker-congestion-controller-inactive-user-remover");
this.removeUserExecutorService.scheduleWithFixedDelay(
this::removeInactiveUsers, 0, userInactiveTimeMills, TimeUnit.MILLISECONDS);
this.checkService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("congestion-checker");
this.checkService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-congestion-controller-checker");
this.checkService.scheduleWithFixedDelay(
this::checkCongestion, 0, checkIntervalTimeMills, TimeUnit.MILLISECONDS);

View File

@ -52,13 +52,13 @@ public class MemoryManager {
private final List<MemoryPressureListener> memoryPressureListeners = new ArrayList<>();
private final ScheduledExecutorService checkService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("memory-manager-checker");
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-memory-manager-checker");
private final ScheduledExecutorService reportService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("memory-manager-reporter");
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-memory-manager-reporter");
private final ExecutorService actionService =
ThreadUtils.newDaemonSingleThreadExecutor("memory-manager-actor");
ThreadUtils.newDaemonSingleThreadExecutor("worker-memory-manager-actor");
private final AtomicBoolean trimInProcess = new AtomicBoolean(false);
@ -82,7 +82,7 @@ public class MemoryManager {
private long lastNotifiedTarget = 0;
private final ScheduledExecutorService readBufferTargetUpdateService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"memory-manager-read-buffer-target-updater");
"worker-memory-manager-read-buffer-target-updater");
private CreditStreamManager creditStreamManager = null;
private long memoryShuffleStorageThreshold = 0;

View File

@ -190,7 +190,7 @@ public class CreditStreamManager {
synchronized (lock) {
if (recycleThread == null) {
recycleThread =
ThreadUtils.newDaemonSingleThreadExecutor("credit-stream-manager-recycle-thread");
ThreadUtils.newDaemonSingleThreadExecutor("worker-credit-stream-manager-recycler");
recycleThread.submit(
() -> {
while (true) {

View File

@ -262,7 +262,7 @@ private[celeborn] class Worker(
"worker-clean-expired-shuffle-keys",
conf.workerCleanThreads)
val asyncReplyPool: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("async-reply")
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-rpc-async-replier")
val timer = new HashedWheelTimer()
// Configs
@ -271,7 +271,7 @@ private[celeborn] class Worker(
private val cleanTaskQueue = new LinkedBlockingQueue[JHashSet[String]]
var cleaner: ExecutorService =
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleaner")
ThreadUtils.newDaemonSingleThreadExecutor("worker-expired-shuffle-cleaner")
private val workerResourceConsumptionInterval = conf.workerResourceConsumptionInterval
private val userResourceConsumptions =

View File

@ -65,7 +65,7 @@ class JVMQuake(conf: CelebornConf, uniqueId: String = UUID.randomUUID().toString
if (enabled) {
lastExitTime = getLastExitTime
lastGCTime = getLastGCTime
scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("jvm-quake")
scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-jvm-quake-scheduler")
scheduler.scheduleWithFixedDelay(
new Runnable() {
override def run(): Unit = {

View File

@ -222,7 +222,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, FileInfo]]()
saveCommittedFileInfosExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"StorageManager-save-committed-fileinfo-thread")
"worker-storage-manager-committed-fileinfo-saver")
saveCommittedFileInfosExecutor.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
@ -520,7 +520,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
}
private val storageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("storage-scheduler")
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-storage-manager-scheduler")
storageScheduler.scheduleWithFixedDelay(
new Runnable {