[CELEBORN-421] Add shutdown and registered to http request (#1346)

* [CELEBORN-421] Add shutdown and registered to http request
This commit is contained in:
Angerszhuuuu 2023-03-14 18:23:21 +08:00 committed by GitHub
parent 7d7279a9bc
commit 3907d70212
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 17 additions and 6 deletions

View File

@ -777,6 +777,10 @@ private[celeborn] class Master(
override def getUnavailablePeers: String = throw new UnsupportedOperationException()
override def isShutdown: Boolean = throw new UnsupportedOperationException()
override def isRegistered: Boolean = throw new UnsupportedOperationException()
private def requestGetWorkerInfos(endpoint: RpcEndpointRef): GetWorkerInfosResponse = {
try {
if (endpoint != null) {

View File

@ -55,6 +55,10 @@ abstract class HttpService extends Service with Logging {
def getUnavailablePeers: String
def isShutdown: Boolean
def isRegistered: Boolean
def startHttpServer(): Unit = {
val handlers =
if (metricsSystem.running) {

View File

@ -84,6 +84,10 @@ class HttpRequestHandler(
service.listPartitionLocationInfo
case "/unavailablePeers" if service.serviceName == Service.WORKER =>
service.getUnavailablePeers
case "/isShutdown" if service.serviceName == Service.WORKER =>
service.isShutdown.toString
case "/isRegistered" if service.serviceName == Service.WORKER =>
service.isRegistered.toString
case _ => INVALID
}
}

View File

@ -463,6 +463,10 @@ private[celeborn] class Worker(
storageManager.shuffleKeySet().asScala.mkString("\n")
}
override def isShutdown: Boolean = shutdown.get()
override def isRegistered: Boolean = registered.get()
override def listTopDiskUseApps: String = {
val stringBuilder = new StringBuilder()
storageManager.topAppDiskUsage.asScala.foreach { case (appId, usage) =>
@ -484,11 +488,6 @@ private[celeborn] class Worker(
sb.toString()
}
@VisibleForTesting
def isRegistered(): Boolean = {
registered.get()
}
ShutdownHookManager.get().addShutdownHook(
new Thread(new Runnable {
override def run(): Unit = {

View File

@ -114,7 +114,7 @@ trait MiniClusterFeature extends Logging {
Thread.sleep(5000L)
workerInfos.foreach {
case (worker, _) => assert(worker.isRegistered())
case (worker, _) => assert(worker.isRegistered)
}
(master, workerInfos.keySet)
}