diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java index 8bcd2827b..e1eb5b481 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleMaster.java @@ -21,7 +21,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.BatchShuffleMode; @@ -56,9 +56,9 @@ public class RemoteShuffleMaster implements ShuffleMaster> pushTaskQueues; private ScheduledExecutorService cleaner = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-sendBufferPool-cleaner"); + ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-client-sendBufferPool-cleaner"); private SendBufferPool(int capacity, long checkInterval, long timeout) { assert capacity > 0; diff --git a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java index 3c52e6b9f..eb07aa375 100644 --- a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java +++ b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java @@ -40,7 +40,8 @@ class ReviveManager { private final int batchSize; ShuffleClientImpl shuffleClient; private final ScheduledExecutorService batchReviveRequestScheduler = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("batch-revive-scheduler"); + ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "celeborn-client-lifecycle-manager-batch-revive-scheduler"); public ReviveManager(ShuffleClientImpl shuffleClient, CelebornConf conf) { this.shuffleClient = shuffleClient; diff --git a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java index 458e5d0e5..38306f6c2 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java +++ b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java @@ -128,7 +128,7 @@ public class DfsPartitionReader implements PartitionReader { numChunks = chunkOffsets.size() - 1; fetchThread = ThreadUtils.newDaemonSingleThreadExecutor( - "Dfs-fetch-thread" + location.getStorageInfo().getFilePath()); + "celeborn-client-dfs-partition-fetcher" + location.getStorageInfo().getFilePath()); logger.debug("Start dfs read on location {}", location); ShuffleClient.incrementTotalReadCounter(); } diff --git a/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java index 97a3acb34..ace28c61e 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java +++ b/client/src/main/java/org/apache/celeborn/client/read/LocalPartitionReader.java @@ -83,7 +83,7 @@ public class LocalPartitionReader implements PartitionReader { if (readLocalShufflePool == null) { readLocalShufflePool = ThreadUtils.newDaemonCachedThreadPool( - "local-shuffle-reader-thread", conf.readLocalShuffleThreads(), 60); + "celeborn-client-local-shuffle-reader", conf.readLocalShuffleThreads(), 60); } } } diff --git a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java index 25b26f4ad..5acfbe938 100644 --- a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java +++ b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java @@ -100,7 +100,7 @@ public class DataPusher { this.mapStatusLengths = mapStatusLengths; pushThread = - new Thread("DataPusher-" + taskId) { + new Thread("celeborn-client-data-pusher-" + taskId) { private void reclaimTask(PushTask task) throws InterruptedException { idleLock.lockInterruptibly(); try { diff --git a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala index 67692da74..20a02b207 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala @@ -42,7 +42,8 @@ class ApplicationHeartbeater( private val appHeartbeatIntervalMs = conf.appHeartbeatIntervalMs private val applicationUnregisterEnabled = conf.applicationUnregisterEnabled private val appHeartbeatHandlerThread = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-app-heartbeat") + ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "celeborn-client-lifecycle-manager-app-heartbeater") private var appHeartbeat: ScheduledFuture[_] = _ def start(): Unit = { diff --git a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala index 4edda41da..b565da4a5 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala @@ -53,14 +53,14 @@ class ChangePartitionManager( private val batchHandleChangePartitionEnabled = conf.batchHandleChangePartitionEnabled private val batchHandleChangePartitionExecutors = ThreadUtils.newDaemonCachedThreadPool( - "celeborn-lifecycle-manager-change-partition-executor", + "celeborn-client-lifecycle-manager-change-partition-executor", conf.batchHandleChangePartitionNumThreads) private val batchHandleChangePartitionRequestInterval = conf.batchHandleChangePartitionRequestInterval private val batchHandleChangePartitionSchedulerThread: Option[ScheduledExecutorService] = if (batchHandleChangePartitionEnabled) { Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor( - "celeborn-lifecycle-manager-change-partition-scheduler")) + "celeborn-client-lifecycle-manager-change-partition-scheduler")) } else { None } diff --git a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala index bc4e57b28..c1be16644 100644 --- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala @@ -78,14 +78,14 @@ class CommitManager(appUniqueId: String, val conf: CelebornConf, lifecycleManage val committedPartitionInfo = new CommittedPartitionInfo private val batchHandleCommitPartitionEnabled = conf.batchHandleCommitPartitionEnabled private val batchHandleCommitPartitionExecutors = ThreadUtils.newDaemonCachedThreadPool( - "celeborn-lifecycle-manager-commit-partition-executor", + "celeborn-client-lifecycle-manager-commit-partition-executor", conf.batchHandleCommitPartitionNumThreads) private val batchHandleCommitPartitionRequestInterval = conf.batchHandleCommitPartitionRequestInterval private val batchHandleCommitPartitionSchedulerThread: Option[ScheduledExecutorService] = if (batchHandleCommitPartitionEnabled) { Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor( - "celeborn-lifecycle-manager-commit-partition-scheduler")) + "celeborn-client-lifecycle-manager-commit-partition-scheduler")) } else { None } diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 3c1e32523..a716b62a6 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -139,10 +139,13 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends // Threads private val forwardMessageThread = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread") + ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-message-forwarder") private var checkForShuffleRemoval: ScheduledFuture[_] = _ val rpcSharedThreadPool = - ThreadUtils.newDaemonCachedThreadPool("shared-rpc-pool", conf.clientRpcSharedThreads, 30) + ThreadUtils.newDaemonCachedThreadPool( + "celeborn-client-lifecycle-manager-shared-rpc-pool", + conf.clientRpcSharedThreads, + 30) val ec = ExecutionContext.fromExecutor(rpcSharedThreadPool) // init driver celeborn LifecycleManager rpc service diff --git a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala index aabb965ae..32620d396 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala @@ -38,14 +38,14 @@ class ReleasePartitionManager( private val shuffleReleasePartitionRequests = JavaUtils.newConcurrentHashMap[Int, util.Set[Int]] private val batchHandleReleasePartitionEnabled = conf.batchHandleReleasePartitionEnabled private val batchHandleReleasePartitionExecutors = ThreadUtils.newDaemonCachedThreadPool( - "celeborn-lifecycle-manager-release-partition-executor", + "celeborn-client-lifecycle-manager-release-partition-executor", conf.batchHandleReleasePartitionNumThreads) private val batchHandleReleasePartitionRequestInterval = conf.batchHandleReleasePartitionRequestInterval private val batchHandleReleasePartitionSchedulerThread: Option[ScheduledExecutorService] = if (batchHandleReleasePartitionEnabled) { Some(ThreadUtils.newDaemonSingleThreadScheduledExecutor( - "celeborn-lifecycle-manager-release-partition-scheduler")) + "celeborn-client-lifecycle-manager-release-partition-scheduler")) } else { None } diff --git a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java index a53691645..2e8468d23 100644 --- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java +++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java @@ -67,7 +67,8 @@ public class MasterClient { this.maxRetries = Math.max(masterEndpoints.size(), conf.masterClientMaxRetries()); this.rpcTimeout = conf.masterClientRpcAskTimeout(); this.rpcEndpointRef = new AtomicReference<>(); - this.oneWayMessageSender = ThreadUtils.newDaemonSingleThreadExecutor("One-Way-Message-Sender"); + this.oneWayMessageSender = + ThreadUtils.newDaemonSingleThreadExecutor("celeborn-one-way-message-sender"); } private static final String SPLITTER = "#"; diff --git a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java index 48193549d..d0659c352 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java +++ b/common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java @@ -95,7 +95,7 @@ public class TransportResponseHandler extends MessageHandler { if (pushTimeoutChecker == null) { pushTimeoutChecker = ThreadUtils.newDaemonThreadPoolScheduledExecutor( - "push-timeout-checker", conf.pushDataTimeoutCheckerThreads()); + "celeborn-push-timeout-checker", conf.pushDataTimeoutCheckerThreads()); } } @@ -103,7 +103,7 @@ public class TransportResponseHandler extends MessageHandler { if (fetchTimeoutChecker == null) { fetchTimeoutChecker = ThreadUtils.newDaemonThreadPoolScheduledExecutor( - "fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads()); + "celeborn-fetch-timeout-checker", conf.fetchDataTimeoutCheckerThreads()); } } } diff --git a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java index 523d4c0e5..47659fda0 100644 --- a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java +++ b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java @@ -24,13 +24,11 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,8 +61,7 @@ public final class ShutdownHookManager { public static final TimeUnit TIME_UNIT_DEFAULT = TimeUnit.MILLISECONDS; private static final ExecutorService EXECUTOR = - Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("shutdown-hook-%01d").build()); + ThreadUtils.newDaemonSingleThreadExecutor("celeborn-shutdown-hook-%01d"); static { try { diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala b/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala index f66756ba1..92e08b5e0 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala @@ -118,9 +118,9 @@ class AppDiskUsageMetric(conf: CelebornConf) extends Logging { val interval = conf.metricsAppTopDiskUsageInterval val snapShots = new Array[AppDiskUsageSnapShot](snapshotCount) val logExecutor = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("App_disk_usage_log_thread") + ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-app-disk-usage-metrics-logger") val updateExecutor = - ThreadUtils.newDaemonSingleThreadExecutor("App_disk_usage_metric_thread") + ThreadUtils.newDaemonSingleThreadExecutor("master-app-disk-usage-metrics-updater") var currentSnapShot: AtomicReference[AppDiskUsageSnapShot] = new AtomicReference[AppDiskUsageSnapShot]() diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala index 153751808..293e113cb 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala @@ -64,7 +64,7 @@ class NettyRpcEnv( val clientFactory = transportContext.createClientFactory() private val timeoutScheduler = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout") + ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-netty-rpc-env-timeout-checker") // Because TransportClientFactory.createClient is blocking, we need to run it in this thread pool // to implement non-blocking send/ask. diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java index 4c2573955..9e502058f 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HARaftServer.java @@ -85,7 +85,7 @@ public class HARaftServer { private final StateMachine masterStateMachine; private final ScheduledExecutorService scheduledRoleChecker = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("ratis-role-checker"); + ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-ratis-role-checker"); private long roleCheckIntervalMs; private final ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock(); private Optional cachedPeerRole = Optional.empty(); diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java index 21d9bdb18..a6be2b317 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java @@ -90,7 +90,8 @@ public class StateMachine extends BaseStateMachine { this.masterRatisServer = ratisServer; this.metaHandler = ratisServer.getMetaHandler(); - this.executorService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Meta-StateMachine"); + this.executorService = + ThreadUtils.newDaemonSingleThreadExecutor("master-state-machine-executor"); } /** Initializes the State Machine with the given server, group and storage. */ diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 1704a95a0..fed3c8a7a 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -92,7 +92,7 @@ private[celeborn] class Master( // Threads private val forwardMessageThread = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread") + ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-message-forwarder") private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _ private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _ private var checkForUnavailableWorkerTimeOutTask: ScheduledFuture[_] = _ @@ -135,7 +135,7 @@ private[celeborn] class Master( private val estimatedPartitionSizeForEstimationUpdateInterval = conf.estimatedPartitionSizeForEstimationUpdateInterval private val partitionSizeUpdateService = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("partition-size-updater") + ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-partition-size-updater") partitionSizeUpdateService.scheduleWithFixedDelay( new Runnable { override def run(): Unit = { diff --git a/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java b/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java index bb7519d9b..3e160d8e7 100644 --- a/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java +++ b/service/src/main/java/org/apache/celeborn/server/common/service/config/FsConfigServiceImpl.java @@ -48,7 +48,7 @@ public class FsConfigServiceImpl implements ConfigService { private static final String CONF_CONFIG = "config"; private final ScheduledExecutorService configRefreshService = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("config-refresh-service"); + ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-config-refresher"); public FsConfigServiceImpl(CelebornConf celebornConf) { this.celebornConf = celebornConf; diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java index 662d1fe30..0a5cc3eae 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/congestcontrol/CongestionController.java @@ -72,12 +72,14 @@ public class CongestionController { this.userBufferStatuses = JavaUtils.newConcurrentHashMap(); this.removeUserExecutorService = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("remove-inactive-user"); + ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "worker-congestion-controller-inactive-user-remover"); this.removeUserExecutorService.scheduleWithFixedDelay( this::removeInactiveUsers, 0, userInactiveTimeMills, TimeUnit.MILLISECONDS); - this.checkService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("congestion-checker"); + this.checkService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-congestion-controller-checker"); this.checkService.scheduleWithFixedDelay( this::checkCongestion, 0, checkIntervalTimeMills, TimeUnit.MILLISECONDS); diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java index 71861f17a..ec6a0d065 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java @@ -52,13 +52,13 @@ public class MemoryManager { private final List memoryPressureListeners = new ArrayList<>(); private final ScheduledExecutorService checkService = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("memory-manager-checker"); + ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-memory-manager-checker"); private final ScheduledExecutorService reportService = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("memory-manager-reporter"); + ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-memory-manager-reporter"); private final ExecutorService actionService = - ThreadUtils.newDaemonSingleThreadExecutor("memory-manager-actor"); + ThreadUtils.newDaemonSingleThreadExecutor("worker-memory-manager-actor"); private final AtomicBoolean trimInProcess = new AtomicBoolean(false); @@ -82,7 +82,7 @@ public class MemoryManager { private long lastNotifiedTarget = 0; private final ScheduledExecutorService readBufferTargetUpdateService = ThreadUtils.newDaemonSingleThreadScheduledExecutor( - "memory-manager-read-buffer-target-updater"); + "worker-memory-manager-read-buffer-target-updater"); private CreditStreamManager creditStreamManager = null; private long memoryShuffleStorageThreshold = 0; diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java index 7e2c6ee3a..1fb7766a8 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/CreditStreamManager.java @@ -190,7 +190,7 @@ public class CreditStreamManager { synchronized (lock) { if (recycleThread == null) { recycleThread = - ThreadUtils.newDaemonSingleThreadExecutor("credit-stream-manager-recycle-thread"); + ThreadUtils.newDaemonSingleThreadExecutor("worker-credit-stream-manager-recycler"); recycleThread.submit( () -> { while (true) { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 6ca5e7d21..c438ff8fe 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -262,7 +262,7 @@ private[celeborn] class Worker( "worker-clean-expired-shuffle-keys", conf.workerCleanThreads) val asyncReplyPool: ScheduledExecutorService = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("async-reply") + ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-rpc-async-replier") val timer = new HashedWheelTimer() // Configs @@ -271,7 +271,7 @@ private[celeborn] class Worker( private val cleanTaskQueue = new LinkedBlockingQueue[JHashSet[String]] var cleaner: ExecutorService = - ThreadUtils.newDaemonSingleThreadExecutor("worker-cleaner") + ThreadUtils.newDaemonSingleThreadExecutor("worker-expired-shuffle-cleaner") private val workerResourceConsumptionInterval = conf.workerResourceConsumptionInterval private val userResourceConsumptions = diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala index a88a39e22..40b14c6d0 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala @@ -65,7 +65,7 @@ class JVMQuake(conf: CelebornConf, uniqueId: String = UUID.randomUUID().toString if (enabled) { lastExitTime = getLastExitTime lastGCTime = getLastGCTime - scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("jvm-quake") + scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-jvm-quake-scheduler") scheduler.scheduleWithFixedDelay( new Runnable() { override def run(): Unit = { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index f2171b7f5..d4438da56 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -222,7 +222,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, FileInfo]]() saveCommittedFileInfosExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( - "StorageManager-save-committed-fileinfo-thread") + "worker-storage-manager-committed-fileinfo-saver") saveCommittedFileInfosExecutor.scheduleWithFixedDelay( new Runnable { override def run(): Unit = { @@ -520,7 +520,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } private val storageScheduler = - ThreadUtils.newDaemonSingleThreadScheduledExecutor("storage-scheduler") + ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-storage-manager-scheduler") storageScheduler.scheduleWithFixedDelay( new Runnable {