[CELEBORN-1317] Refine celeborn http server and support swagger ui

### What changes were proposed in this pull request?

Before, there is no http request spec likes query param, http method and response mediaType.
And for each api, a HttpEndpoint class is needed.

In this PR, we refine the code for http service and provide swagger ui.

Note that: This pr does not change the orignal api request and response behavior, including metrics APIs.

TODO:
1. define DTO
2. http request authentication

<img width="1900" alt="image" src="https://github.com/apache/incubator-celeborn/assets/6757692/7f8c2363-170d-4bdf-b2c9-74260e31d3e5">

<img width="1138" alt="image" src="https://github.com/apache/incubator-celeborn/assets/6757692/3ae6ec8e-00a8-475b-bb37-0329536185f6">

### Why are the changes needed?

To close CELEBORN-1317

### Does this PR introduce _any_ user-facing change?

The api is align with before.

### How was this patch tested?
UT.

Closes #2371 from turboFei/jetty.

Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
This commit is contained in:
Fei Wang 2024-03-27 23:18:18 +08:00 committed by Shuang
parent e9e984b517
commit adbc77cd4f
38 changed files with 1676 additions and 667 deletions

View File

@ -248,6 +248,11 @@ io.netty:netty-transport-native-unix-common
io.netty:netty-transport-rxtx
io.netty:netty-transport-sctp
io.netty:netty-transport-udt
io.swagger.core.v3:swagger-annotations
io.swagger.core.v3:swagger-core
io.swagger.core.v3:swagger-integration
io.swagger.core.v3:swagger-jaxrs2
io.swagger.core.v3:swagger-models
org.apache.commons:commons-crypto
org.apache.commons:commons-lang3
org.apache.hadoop:hadoop-client-api
@ -267,6 +272,13 @@ org.apache.ratis:ratis-server
org.apache.ratis:ratis-server-api
org.apache.ratis:ratis-shell
org.apache.ratis:ratis-thirdparty-misc
org.eclipse.jetty:jetty-http
org.eclipse.jetty:jetty-io
org.eclipse.jetty:jetty-security
org.eclipse.jetty:jetty-server
org.eclipse.jetty:jetty-servlet
org.eclipse.jetty:jetty-util-ajax
org.eclipse.jetty:jetty-util
org.javassist:javassist
org.reflections:reflections
org.roaringbitmap:RoaringBitmap
@ -275,6 +287,7 @@ org.rocksdb:rocksdbjni
org.scala-lang:scala-library
org.scala-lang:scala-reflect
org.slf4j:jcl-over-slf4j
org.webjars:swagger-ui
org.xerial.snappy:snappy-java
org.yaml:snakeyaml
@ -308,3 +321,15 @@ org.slf4j:slf4j-api
------------
See licenses/LICENSE-javassist.txt for detail.
org.javassist:javassist
Eclipse Public License (EPL) 2.0
--------------------------------
jakarta.annotation:jakarta.annotation-api
jakarta.servlet:jakarta.servlet-api
jakarta.ws.rs:jakarta.ws.rs-api
org.glassfish.hk2:hk2-api
org.glassfish.hk2:hk2-locator
org.glassfish.hk2:hk2-utils
org.glassfish.hk2.external:aopalliance-repackaged
org.glassfish.hk2.external:jakarta.inject
org.glassfish.hk2:osgi-resource-locator

View File

@ -586,6 +586,10 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def masterHttpPort: Int = get(MASTER_HTTP_PORT)
def masterHttpMaxWorkerThreads: Int = get(MASTER_HTTP_MAX_WORKER_THREADS)
def masterHttpStopTimeout: Long = get(MASTER_HTTP_STOP_TIMEOUT)
def haEnabled: Boolean = get(HA_ENABLED)
def haMasterNodeId: Option[String] = get(HA_MASTER_NODE_ID)
@ -676,6 +680,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def workerHttpHost: String =
get(WORKER_HTTP_HOST).replace("<localhost>", Utils.localHostName(this))
def workerHttpPort: Int = get(WORKER_HTTP_PORT)
def workerHttpMaxWorkerThreads: Int = get(WORKER_HTTP_MAX_WORKER_THREADS)
def workerHttpStopTimeout: Long = get(WORKER_HTTP_STOP_TIMEOUT)
def workerRpcPort: Int = get(WORKER_RPC_PORT)
def workerPushPort: Int = get(WORKER_PUSH_PORT)
def workerFetchPort: Int = get(WORKER_FETCH_PORT)
@ -1976,6 +1982,23 @@ object CelebornConf extends Logging {
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(9098)
val MASTER_HTTP_MAX_WORKER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.master.http.maxWorkerThreads")
.categories("master")
.version("0.5.0")
.doc("Maximum number of threads in the master http worker thread pool.")
.intConf
.checkValue(_ > 0, "Must be positive.")
.createWithDefault(999)
val MASTER_HTTP_STOP_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.http.stopTimeout")
.categories("master")
.version("0.5.0")
.doc("Master http server stop timeout.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
val HA_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.master.ha.enabled")
.withAlternative("celeborn.ha.enabled")
@ -2534,6 +2557,23 @@ object CelebornConf extends Logging {
.checkValue(p => p >= 1024 && p < 65535, "Invalid port")
.createWithDefault(9096)
val WORKER_HTTP_MAX_WORKER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.http.maxWorkerThreads")
.categories("worker")
.version("0.5.0")
.doc("Maximum number of threads in the worker http worker thread pool.")
.intConf
.checkValue(_ > 0, "Must be positive.")
.createWithDefault(999)
val WORKER_HTTP_STOP_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.http.stopTimeout")
.categories("worker")
.version("0.5.0")
.doc("Worker http server stop timeout.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5s")
val WORKER_RPC_PORT: ConfigEntry[Int] =
buildConf("celeborn.worker.rpc.port")
.categories("worker")

View File

@ -15,19 +15,19 @@
* limitations under the License.
*/
package org.apache.celeborn.server.common.http
package org.apache.celeborn.common.metrics
import io.netty.channel.{ChannelInitializer, SimpleChannelInboundHandler}
import io.netty.channel.socket.SocketChannel
import io.netty.handler.codec.http.{HttpObjectAggregator, HttpServerCodec}
import java.util.concurrent.TimeUnit
class HttpServerInitializer(
handlers: SimpleChannelInboundHandler[_]) extends ChannelInitializer[SocketChannel] {
object MetricsUtils {
private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
private[this] val MINIMAL_POLL_PERIOD = 1
override def initChannel(channel: SocketChannel): Unit = {
val pipeline = channel.pipeline()
pipeline.addLast(new HttpServerCodec())
.addLast("httpAggregator", new HttpObjectAggregator(512 * 1024))
.addLast(handlers)
def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
if (period < MINIMAL_POLL_PERIOD) {
throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit +
" below than minimal polling period ")
}
}
}

View File

@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.{CsvReporter, MetricRegistry}
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.MetricsUtils
class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val CSV_KEY_PERIOD = "period"
@ -44,7 +44,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
}
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
MetricsUtils.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
case Some(s) => s

View File

@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.graphite.{Graphite, GraphiteReporter, GraphiteUDP}
import org.apache.celeborn.common.metrics.MetricsSystem
import org.apache.celeborn.common.metrics.MetricsUtils
private class GraphiteSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val GRAPHITE_DEFAULT_PERIOD = 10
@ -62,7 +62,7 @@ private class GraphiteSink(val property: Properties, val registry: MetricRegistr
val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX)
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
MetricsUtils.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val graphite = propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase(Locale.ROOT)) match {
case Some("udp") => new GraphiteUDP(host, port)

View File

@ -17,7 +17,9 @@
HikariCP/4.0.3//HikariCP-4.0.3.jar
RoaringBitmap/0.9.32//RoaringBitmap-0.9.32.jar
aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar
ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar
classgraph/4.8.138//classgraph-4.8.138.jar
commons-cli/1.5.0//commons-cli-1.5.0.jar
commons-crypto/1.0.0//commons-crypto-1.0.0.jar
commons-io/2.13.0//commons-io-2.13.0.jar
@ -27,13 +29,43 @@ failureaccess/1.0.1//failureaccess-1.0.1.jar
guava/32.1.3-jre//guava-32.1.3-jre.jar
hadoop-client-api/3.3.6//hadoop-client-api-3.3.6.jar
hadoop-client-runtime/3.3.6//hadoop-client-runtime-3.3.6.jar
hk2-api/2.6.1//hk2-api-2.6.1.jar
hk2-locator/2.6.1//hk2-locator-2.6.1.jar
hk2-utils/2.6.1//hk2-utils-2.6.1.jar
jackson-annotations/2.15.3//jackson-annotations-2.15.3.jar
jackson-core/2.15.3//jackson-core-2.15.3.jar
jackson-databind/2.15.3//jackson-databind-2.15.3.jar
jackson-dataformat-yaml/2.13.2//jackson-dataformat-yaml-2.13.2.jar
jackson-datatype-jsr310/2.13.2//jackson-datatype-jsr310-2.13.2.jar
jackson-jaxrs-base/2.13.2//jackson-jaxrs-base-2.13.2.jar
jackson-jaxrs-json-provider/2.13.2//jackson-jaxrs-json-provider-2.13.2.jar
jackson-module-jaxb-annotations/2.14.1//jackson-module-jaxb-annotations-2.14.1.jar
jackson-module-scala_2.12/2.15.3//jackson-module-scala_2.12-2.15.3.jar
javassist/3.28.0-GA//javassist-3.28.0-GA.jar
javax.servlet-api/3.1.0//javax.servlet-api-3.1.0.jar
jakarta.annotation-api/1.3.5//jakarta.annotation-api-1.3.5.jar
jakarta.inject/2.6.1//jakarta.inject-2.6.1.jar
jakarta.servlet-api/4.0.4//jakarta.servlet-api-4.0.4.jar
jakarta.validation-api/2.0.2//jakarta.validation-api-2.0.2.jar
jakarta.ws.rs-api/2.1.6//jakarta.ws.rs-api-2.1.6.jar
jakarta.xml.bind-api/2.3.3//jakarta.xml.bind-api-2.3.3.jar
javassist/3.29.0-GA//javassist-3.29.0-GA.jar
jcl-over-slf4j/1.7.36//jcl-over-slf4j-1.7.36.jar
jersey-client/2.39.1//jersey-client-2.39.1.jar
jersey-common/2.39.1//jersey-common-2.39.1.jar
jersey-container-servlet-core/2.39.1//jersey-container-servlet-core-2.39.1.jar
jersey-entity-filtering/2.39.1//jersey-entity-filtering-2.39.1.jar
jersey-hk2/2.39.1//jersey-hk2-2.39.1.jar
jersey-media-json-jackson/2.39.1//jersey-media-json-jackson-2.39.1.jar
jersey-media-multipart/2.39.1//jersey-media-multipart-2.39.1.jar
jersey-server/2.39.1//jersey-server-2.39.1.jar
jetty-client/9.4.52.v20230823//jetty-client-9.4.52.v20230823.jar
jetty-http/9.4.52.v20230823//jetty-http-9.4.52.v20230823.jar
jetty-io/9.4.52.v20230823//jetty-io-9.4.52.v20230823.jar
jetty-proxy/9.4.52.v20230823//jetty-proxy-9.4.52.v20230823.jar
jetty-security/9.4.52.v20230823//jetty-security-9.4.52.v20230823.jar
jetty-server/9.4.52.v20230823//jetty-server-9.4.52.v20230823.jar
jetty-servlet/9.4.52.v20230823//jetty-servlet-9.4.52.v20230823.jar
jetty-util-ajax/9.4.52.v20230823//jetty-util-ajax-9.4.52.v20230823.jar
jetty-util/9.4.52.v20230823//jetty-util-9.4.52.v20230823.jar
jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
@ -46,6 +78,7 @@ maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
mimepull/1.9.15//mimepull-1.9.15.jar
mybatis/3.5.15//mybatis-3.5.15.jar
netty-all/4.1.107.Final//netty-all-4.1.107.Final.jar
netty-buffer/4.1.107.Final//netty-buffer-4.1.107.Final.jar
@ -81,6 +114,7 @@ netty-transport-rxtx/4.1.107.Final//netty-transport-rxtx-4.1.107.Final.jar
netty-transport-sctp/4.1.107.Final//netty-transport-sctp-4.1.107.Final.jar
netty-transport-udt/4.1.107.Final//netty-transport-udt-4.1.107.Final.jar
netty-transport/4.1.107.Final//netty-transport-4.1.107.Final.jar
osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
paranamer/2.8//paranamer-2.8.jar
protobuf-java/3.21.7//protobuf-java-3.21.7.jar
ratis-client/2.5.1//ratis-client-2.5.1.jar
@ -101,4 +135,10 @@ shims/0.9.32//shims-0.9.32.jar
slf4j-api/1.7.36//slf4j-api-1.7.36.jar
snakeyaml/2.2//snakeyaml-2.2.jar
snappy-java/1.1.10.5//snappy-java-1.1.10.5.jar
swagger-annotations/2.2.1//swagger-annotations-2.2.1.jar
swagger-core/2.2.1//swagger-core-2.2.1.jar
swagger-integration/2.2.1//swagger-integration-2.2.1.jar
swagger-jaxrs2/2.2.1//swagger-jaxrs2-2.2.1.jar
swagger-models/2.2.1//swagger-models-2.2.1.jar
swagger-ui/4.9.1//swagger-ui-4.9.1.jar
zstd-jni/1.5.2-1//zstd-jni-1.5.2-1.jar

View File

@ -43,7 +43,9 @@ license: |
| celeborn.master.heartbeat.worker.timeout | 120s | false | Worker heartbeat timeout. | 0.3.0 | celeborn.worker.heartbeat.timeout |
| celeborn.master.host | &lt;localhost&gt; | false | Hostname for master to bind. | 0.2.0 | |
| celeborn.master.http.host | &lt;localhost&gt; | false | Master's http host. | 0.4.0 | celeborn.metrics.master.prometheus.host,celeborn.master.metrics.prometheus.host |
| celeborn.master.http.maxWorkerThreads | 999 | false | Maximum number of threads in the master http worker thread pool. | 0.5.0 | |
| celeborn.master.http.port | 9098 | false | Master's http port. | 0.4.0 | celeborn.metrics.master.prometheus.port,celeborn.master.metrics.prometheus.port |
| celeborn.master.http.stopTimeout | 5s | false | Master http server stop timeout. | 0.5.0 | |
| celeborn.master.internal.port | 8097 | false | Internal port on the master where both workers and other master nodes connect. | 0.5.0 | |
| celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 | |
| celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for refreshing the node rack information periodically. | 0.5.0 | |

View File

@ -83,7 +83,9 @@ license: |
| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | false | Whether to call sync method to save committed file infos into Level DB to handle OS crash. | 0.3.1 | |
| celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's graceful shutdown timeout time. | 0.2.0 | |
| celeborn.worker.http.host | &lt;localhost&gt; | false | Worker's http host. | 0.4.0 | celeborn.metrics.worker.prometheus.host,celeborn.worker.metrics.prometheus.host |
| celeborn.worker.http.maxWorkerThreads | 999 | false | Maximum number of threads in the worker http worker thread pool. | 0.5.0 | |
| celeborn.worker.http.port | 9096 | false | Worker's http port. | 0.4.0 | celeborn.metrics.worker.prometheus.port,celeborn.worker.metrics.prometheus.port |
| celeborn.worker.http.stopTimeout | 5s | false | Worker http server stop timeout. | 0.5.0 | |
| celeborn.worker.internal.port | 0 | false | Internal server port on the Worker where the master nodes connect. | 0.5.0 | |
| celeborn.worker.jvmProfiler.enabled | false | false | Turn on code profiling via async_profiler in workers. | 0.5.0 | |
| celeborn.worker.jvmProfiler.localDir | . | false | Local file system path on worker where profiler output is saved. Defaults to the working directory of the worker process. | 0.5.0 | |

View File

@ -83,11 +83,27 @@
<artifactId>log4j-1.2-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-service_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.test-framework</groupId>
<artifactId>jersey-test-framework-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.test-framework.providers</groupId>
<artifactId>jersey-test-framework-provider-jetty</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.master.http.api
import javax.ws.rs.{GET, Path, POST, QueryParam}
import javax.ws.rs.core.MediaType
import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.responses.ApiResponse
import org.apache.celeborn.server.common.http.api.ApiRequestContext
@Path("/")
class ApiMasterResource extends ApiRequestContext {
@Path("/masterGroupInfo")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description =
"List master group information of the service. It will list all master's LEADER, FOLLOWER information.")
@GET
def masterGroupInfo: String = httpService.getMasterGroupInfo
@Path("/lostWorkers")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description = "List all lost workers of the master.")
@GET
def lostWorkers: String = httpService.getLostWorkers
@Path("/excludedWorkers")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description = "List all excluded workers of the master.")
@GET
def excludedWorkers: String = httpService.getExcludedWorkers
@Path("/shutdownWorkers")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description = "List all shutdown workers of the master.")
@GET
def shutdownWorkers: String = httpService.getShutdownWorkers
@Path("/hostnames")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description = "List all running application's LifecycleManager's hostnames of the cluster.")
@GET
def hostnames: String = httpService.getHostnameList
@Path("/sendWorkerEvent")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description =
"For Master(Leader) can send worker event to manager workers. Legal types are 'None', 'Immediately', 'Decommission', 'DecommissionThenIdle', 'Graceful', 'Recommission'")
@POST
def sendWorkerEvent(
@QueryParam("TYPE") eventType: String,
@QueryParam("WORKERS") workers: String): String = {
httpService.handleWorkerEvent(normalizeParam(eventType), normalizeParam(workers))
}
@Path("/workerEventInfo")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description = "List all worker event infos of the master.")
@GET
def workerEventInfo: String = httpService.getWorkerEventInfo()
@Path("/exclude")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description = "List all worker event infos of the master.")
@POST
def excludeWorkers(
@QueryParam("ADD") addWorkers: String,
@QueryParam("REMOVE") removeWorkers: String): String = {
httpService.exclude(normalizeParam(addWorkers), normalizeParam(removeWorkers))
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.master
import javax.ws.rs.core.MediaType
import com.google.common.io.Files
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
import org.apache.celeborn.server.common.HttpService
import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
class ApiMasterResourceSuite extends ApiBaseResourceSuite {
private var master: Master = _
override protected def httpService: HttpService = master
def getTmpDir(): String = {
val tmpDir = Files.createTempDir()
tmpDir.deleteOnExit()
tmpDir.getAbsolutePath
}
override def beforeAll(): Unit = {
val randomMasterPort = Utils.selectRandomPort(1024, 65535)
val randomHttpPort = randomMasterPort + 1
celebornConf.set(CelebornConf.HA_ENABLED.key, "false")
celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir())
celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir())
celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1")
celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString)
val args = Array("-h", "localhost", "-p", randomMasterPort.toString)
val masterArgs = new MasterArguments(args, celebornConf)
master = new Master(celebornConf, masterArgs)
new Thread() {
override def run(): Unit = {
master.initialize()
}
}.start()
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
master.rpcEnv.shutdown()
}
test("masterGroupInfo") {
val response = webTarget.path("masterGroupInfo").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("lostWorkers") {
val response = webTarget.path("lostWorkers").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("excludedWorkers") {
val response = webTarget.path("excludedWorkers").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("shutdownWorkers") {
val response = webTarget.path("shutdownWorkers").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("hostnames") {
val response = webTarget.path("hostnames").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("sendWorkerEvent") {
val response = webTarget.path("sendWorkerEvent")
.request(MediaType.TEXT_PLAIN)
.post(null)
assert(200 == response.getStatus)
}
test("workerEventInfo") {
val response = webTarget.path("workerEventInfo").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("exclude") {
val response = webTarget.path("exclude").request(MediaType.TEXT_PLAIN).post(null)
assert(200 == response.getStatus)
}
}

112
pom.xml
View File

@ -80,7 +80,6 @@
<google.jsr305.version>1.3.9</google.jsr305.version>
<grpc.version>1.44.0</grpc.version>
<guava.version>32.1.3-jre</guava.version>
<javaxservlet.version>3.1.0</javaxservlet.version>
<junit.version>4.13.2</junit.version>
<leveldb.version>1.8</leveldb.version>
<log4j2.version>2.17.2</log4j2.version>
@ -107,6 +106,13 @@
<hikaricp.version>4.0.3</hikaricp.version>
<h2.version>2.2.224</h2.version>
<!-- RESTful service dependencies -->
<swagger.version>2.2.1</swagger.version>
<swagger-ui.version>4.9.1</swagger-ui.version>
<jersey.version>2.39.1</jersey.version>
<jetty.version>9.4.52.v20230823</jetty.version>
<jakarta.servlet-api.version>4.0.4</jakarta.servlet-api.version>
<shading.prefix>org.apache.celeborn.shaded</shading.prefix>
<maven.plugin.antrun.version>3.0.0</maven.plugin.antrun.version>
@ -355,11 +361,6 @@
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>${javaxservlet.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
@ -457,6 +458,105 @@
<scope>test</scope>
</dependency>
<!-- RESTful service dependencies -->
<dependency>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
<version>${jakarta.servlet-api.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
<version>${jersey.version}</version>
<exclusions>
<exclusion>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet-core</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-multipart</artifactId>
<version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.test-framework</groupId>
<artifactId>jersey-test-framework-core</artifactId>
<version>${jersey.version}</version>
<exclusions>
<exclusion>
<groupId>jakarta.activation</groupId>
<artifactId>jakarta.activation-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.test-framework.providers</groupId>
<artifactId>jersey-test-framework-provider-jetty</artifactId>
<version>${jersey.version}</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-jaxrs2</artifactId>
<version>${swagger.version}</version>
<exclusions>
<exclusion>
<groupId>com.sun.activation</groupId>
<artifactId>jakarta.activation</artifactId>
</exclusion>
<exclusion>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--
1. This library only contains swagger-ui static resource (.html/.css/.js/.png), for more detail, see
https://github.com/swagger-api/swagger-ui/blob/master/dist/
2. Note that when trying to upgrade swagger-ui, we should also update the version in the file(
service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala).
-->
<dependency>
<groupId>org.webjars</groupId>
<artifactId>swagger-ui</artifactId>
<version>${swagger-ui.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -45,7 +45,6 @@ object Dependencies {
val findbugsVersion = "1.3.9"
val guavaVersion = "32.1.3-jre"
val hadoopVersion = "3.3.6"
val javaxServletVersion = "3.1.0"
val junitInterfaceVersion = "0.13.3"
// don't forget update `junitInterfaceVersion` when we upgrade junit
val junitVersion = "4.13.2"
@ -67,6 +66,11 @@ object Dependencies {
val mybatisVersion = "3.5.15"
val hikaricpVersion = "4.0.3"
val h2Version = "2.2.224"
val swaggerVersion = "2.2.1"
val swaggerUiVersion = "4.9.1"
val jerseyVersion = "2.39.1"
val jettyVersion = "9.4.52.v20230823"
val jakartaServeletApiVersion = "4.0.4"
// For SSL support
val bouncycastleVersion = "1.77"
@ -105,7 +109,6 @@ object Dependencies {
val ioDropwizardMetricsJvm = "io.dropwizard.metrics" % "metrics-jvm" % metricsVersion
val ioNetty = "io.netty" % "netty-all" % nettyVersion excludeAll(
ExclusionRule("io.netty", "netty-handler-ssl-ocsp"))
val javaxServletApi = "javax.servlet" % "javax.servlet-api" % javaxServletVersion
val leveldbJniAll = "org.fusesource.leveldbjni" % "leveldbjni-all" % leveldbJniVersion
val log4j12Api = "org.apache.logging.log4j" % "log4j-1.2-api" % log4j2Version
val log4jSlf4jImpl = "org.apache.logging.log4j" % "log4j-slf4j-impl" % log4j2Version
@ -133,6 +136,23 @@ object Dependencies {
val zstdJni = "com.github.luben" % "zstd-jni" % zstdJniVersion
val mybatis = "org.mybatis" % "mybatis" % mybatisVersion
val hikaricp = "com.zaxxer" % "HikariCP" % hikaricpVersion
val jettyServer = "org.eclipse.jetty" % "jetty-server" % jettyVersion excludeAll(
ExclusionRule("javax.servlet", "javax.servlet-api"))
val jettyServlet = "org.eclipse.jetty" % "jetty-servlet" % jettyVersion excludeAll(
ExclusionRule("javax.servlet", "javax.servlet-api"))
val jettyProxy = "org.eclipse.jetty" % "jetty-proxy" % jettyVersion
val jakartaServletApi = "jakarta.servlet" % "jakarta.servlet-api" % jakartaServeletApiVersion
val jerseyServer = "org.glassfish.jersey.core" % "jersey-server" % jerseyVersion excludeAll(
ExclusionRule("jakarta.xml.bind", "jakarta.xml.bind-api"))
val jerseyContainerServletCore = "org.glassfish.jersey.containers" % "jersey-container-servlet-core" % jerseyVersion
val jerseyHk2 = "org.glassfish.jersey.inject" % "jersey-hk2" % jerseyVersion
val jerseyMediaJsonJackson = "org.glassfish.jersey.media" % "jersey-media-json-jackson" % jerseyVersion
val jerseyMediaMultipart = "org.glassfish.jersey.media" % "jersey-media-multipart" % jerseyVersion
val swaggerJaxrs2 = "io.swagger.core.v3" % "swagger-jaxrs2" %swaggerVersion excludeAll(
ExclusionRule("com.sun.activation", "jakarta.activation"),
ExclusionRule("org.javassist", "javassist"),
ExclusionRule("jakarta.activation", "jakarta.activation-api"))
val swaggerUi = "org.webjars" % "swagger-ui" % swaggerUiVersion
// Test dependencies
// https://www.scala-sbt.org/1.x/docs/Testing.html
@ -143,6 +163,10 @@ object Dependencies {
val scalatestMockito = "org.mockito" %% "mockito-scala-scalatest" % scalatestMockitoVersion
val scalatest = "org.scalatest" %% "scalatest" % scalatestVersion
val h2 = "com.h2database" % "h2" % h2Version
val jerseyTestFrameworkCore = "org.glassfish.jersey.test-framework" % "jersey-test-framework-core" % jerseyVersion
val jerseyTestFrameworkProviderJetty = "org.glassfish.jersey.test-framework.providers" % "jersey-test-framework-provider-jetty" % jerseyVersion excludeAll(
ExclusionRule("org.eclipse.jetty", "jetty-util"),
ExclusionRule("org.eclipse.jetty", "jetty-continuation"))
// SSL support
val bouncycastleBcprovJdk18on = "org.bouncycastle" % "bcprov-jdk18on" % bouncycastleVersion % "test"
@ -458,21 +482,34 @@ object CelebornService {
Dependencies.findbugsJsr305,
Dependencies.commonsIo,
Dependencies.ioNetty,
Dependencies.javaxServletApi,
Dependencies.commonsCrypto,
Dependencies.slf4jApi,
Dependencies.mybatis,
Dependencies.hikaricp,
Dependencies.swaggerJaxrs2,
Dependencies.swaggerUi,
Dependencies.jakartaServletApi,
Dependencies.jerseyServer,
Dependencies.jerseyContainerServletCore,
Dependencies.jerseyHk2,
Dependencies.jerseyMediaJsonJackson,
Dependencies.jerseyMediaMultipart,
Dependencies.jettyServer,
Dependencies.jettyServlet,
Dependencies.jettyProxy,
Dependencies.log4jSlf4jImpl % "test",
Dependencies.log4j12Api % "test",
Dependencies.h2 % "test"
Dependencies.h2 % "test",
Dependencies.jerseyTestFrameworkCore % "test",
Dependencies.jerseyTestFrameworkProviderJetty % "test"
) ++ commonUnitTestDependencies
)
}
object CelebornMaster {
lazy val master = Project("celeborn-master", file("master"))
.dependsOn(CelebornCommon.common, CelebornService.service)
.dependsOn(CelebornCommon.common)
.dependsOn(CelebornService.service % "test->test;compile->compile")
.settings (
commonSettings,
protoSettings,
@ -497,6 +534,7 @@ object CelebornWorker {
lazy val worker = Project("celeborn-worker", file("worker"))
.dependsOn(CelebornService.service)
.dependsOn(CelebornCommon.common % "test->test;compile->compile")
.dependsOn(CelebornService.service % "test->test;compile->compile")
.dependsOn(CelebornClient.client % "test->compile")
.dependsOn(CelebornMaster.master % "test->compile")
.settings (
@ -516,7 +554,9 @@ object CelebornWorker {
Dependencies.leveldbJniAll,
Dependencies.roaringBitmap,
Dependencies.rocksdbJni,
Dependencies.scalatestMockito % "test"
Dependencies.scalatestMockito % "test",
Dependencies.jerseyTestFrameworkCore % "test",
Dependencies.jerseyTestFrameworkProviderJetty % "test"
) ++ commonUnitTestDependencies
)
}
@ -724,7 +764,9 @@ trait SparkClientProjects {
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "test",
"org.apache.spark" %% "spark-sql" % sparkVersion % "test",
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests" excludeAll(
ExclusionRule("org.glassfish.jersey.inject", "*"),
ExclusionRule("org.glassfish.jersey.core", "*")),
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests"
) ++ commonUnitTestDependencies
)

View File

@ -29,6 +29,36 @@
<packaging>jar</packaging>
<name>Celeborn Service</name>
<dependencyManagement>
<dependencies>
<!-- RESTful service dependencies, place here to prevent impacting sbt mr dependencies -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
<exclusions>
<!--
Use `jakarta.servlet-api` instead.
-->
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-proxy</artifactId>
<version>${jetty.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.celeborn</groupId>
@ -43,10 +73,6 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
@ -75,6 +101,62 @@
<scope>test</scope>
</dependency>
<!-- RESTful dependencies -->
<dependency>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-jaxrs2</artifactId>
</dependency>
<dependency>
<groupId>org.webjars</groupId>
<artifactId>swagger-ui</artifactId>
</dependency>
<dependency>
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet-core</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-multipart</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-proxy</artifactId>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
@ -91,5 +173,15 @@
<artifactId>log4j-1.2-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.test-framework</groupId>
<artifactId>jersey-test-framework-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.test-framework.providers</groupId>
<artifactId>jersey-test-framework-provider-jetty</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,73 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Apache Celeborn(Incubating) REST API Documentation</title>
<link rel="stylesheet" type="text/css" href="../swagger-static/swagger-ui.css" />
<link rel="icon" type="image/png" href="../swagger-static/favicon-32x32.png" sizes="32x32" />
<link rel="icon" type="image/png" href="../swagger-static/favicon-16x16.png" sizes="16x16" />
<style>
html
{
box-sizing: border-box;
overflow: -moz-scrollbars-vertical;
overflow-y: scroll;
}
*,
*:before,
*:after
{
box-sizing: inherit;
}
body
{
margin:0;
background: #fafafa;
}
</style>
</head>
<div id="swagger-ui"></div>
<script src="../swagger-static/swagger-ui-bundle.js" charset="UTF-8"> </script>
<script src="../swagger-static/swagger-ui-standalone-preset.js" charset="UTF-8"> </script>
<script>
window.onload = function() {
// Begin Swagger UI call region
window.ui = SwaggerUIBundle({
url: location.origin + "/openapi.json",
dom_id: '#swagger-ui',
deepLinking: true,
presets: [
SwaggerUIBundle.presets.apis,
SwaggerUIStandalonePreset
],
plugins: [
SwaggerUIBundle.plugins.DownloadUrl
],
layout: "StandaloneLayout"
});
// End Swagger UI call region
};
</script>
</body>
</html>

View File

@ -25,11 +25,12 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex
import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.CelebornConf.{METRICS_JSON_PATH, METRICS_PROMETHEUS_PATH}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.sink.{JsonServlet, PrometheusServlet, ServletHttpRequestHandler, Sink}
import org.apache.celeborn.common.metrics.sink.{JsonServlet, PrometheusServlet, Sink}
import org.apache.celeborn.common.metrics.source.Source
import org.apache.celeborn.common.util.Utils
@ -51,7 +52,7 @@ class MetricsSystem(
metricsConfig.initialize()
def getServletHandlers: Array[ServletHttpRequestHandler] = {
def getServletContextHandlers: Array[ServletContextHandler] = {
require(running, "Can only call getServletHandlers on a running MetricsSystem")
prometheusServlet.map(_.getHandlers(conf)).getOrElse(Array()) ++
jsonServlet.map(_.getHandlers(conf)).getOrElse(Array())
@ -139,7 +140,7 @@ class MetricsSystem(
prometheusServlet = Some(servlet.newInstance(
kv._2,
registry,
sources.asScala,
sources.asScala.toSeq,
prometheusServletPath).asInstanceOf[PrometheusServlet])
} else if (kv._1 == "jsonServlet") {
val servlet = Utils.classForName(classPath)
@ -152,7 +153,7 @@ class MetricsSystem(
jsonServlet = Some(servlet.newInstance(
kv._2,
registry,
sources.asScala,
sources.asScala.toSeq,
jsonServletPath,
conf.metricsJsonPrettyEnabled.asInstanceOf[Object]).asInstanceOf[JsonServlet])
} else {

View File

@ -16,17 +16,19 @@
*/
package org.apache.celeborn.common.metrics.sink
import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.source.Source
abstract class AbstractServlet(sources: Seq[Source]) extends Sink with Logging {
def getHandlers(conf: CelebornConf): Array[ServletHttpRequestHandler] = {
Array[ServletHttpRequestHandler](
createHttpRequestHandler())
def getHandlers(conf: CelebornConf): Array[ServletContextHandler] = {
Array[ServletContextHandler](
createServletHandler())
}
def createHttpRequestHandler(): ServletHttpRequestHandler
def createServletHandler(): ServletContextHandler
def getMetricsSnapshot: String = {
sources.map(_.getMetrics).mkString
@ -38,11 +40,3 @@ abstract class AbstractServlet(sources: Seq[Source]) extends Sink with Logging {
override def report(): Unit = {}
}
abstract class ServletHttpRequestHandler(path: String) extends Logging {
def handleRequest(uri: String): String
def getServletPath(): String = path
}

View File

@ -24,10 +24,12 @@ import scala.collection.mutable.ArrayBuffer
import com.codahale.metrics.MetricRegistry
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
import io.netty.channel.ChannelHandler.Sharable
import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.celeborn.common.metrics.{CelebornHistogram, CelebornTimer, ResettableSlidingWindowReservoir}
import org.apache.celeborn.common.metrics.source.{AbstractSource, NamedCounter, NamedGauge, NamedHistogram, NamedTimer, Source}
import org.apache.celeborn.common.metrics.source._
import org.apache.celeborn.server.common.http.HttpUtils
import org.apache.celeborn.server.common.http.HttpUtils.ServletParams
object JsonConverter {
val mapper = new ObjectMapper() with ClassTagExtensions
@ -72,8 +74,10 @@ class JsonServlet(
}
}
override def createHttpRequestHandler(): ServletHttpRequestHandler = {
new JsonHttpRequestHandler(servletPath, this)
override def createServletHandler(): ServletContextHandler = {
HttpUtils.createServletHandler(
servletPath,
new ServletParams(_ => getMetricsSnapshot, "text/json"))
}
override def stop(): Unit = {}
@ -344,12 +348,3 @@ class JsonServlet(
}
}
}
@Sharable
class JsonHttpRequestHandler(path: String, jsonServlet: JsonServlet)
extends ServletHttpRequestHandler(path) {
override def handleRequest(uri: String): String = {
jsonServlet.getMetricsSnapshot
}
}

View File

@ -20,9 +20,11 @@ package org.apache.celeborn.common.metrics.sink
import java.util.Properties
import com.codahale.metrics.MetricRegistry
import io.netty.channel.ChannelHandler.Sharable
import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.celeborn.common.metrics.source.Source
import org.apache.celeborn.server.common.http.HttpUtils
import org.apache.celeborn.server.common.http.HttpUtils.ServletParams
class PrometheusServlet(
val property: Properties,
@ -30,17 +32,9 @@ class PrometheusServlet(
val sources: Seq[Source],
val servletPath: String) extends AbstractServlet(sources) {
override def createHttpRequestHandler(): ServletHttpRequestHandler = {
new PrometheusHttpRequestHandler(servletPath, this)
}
}
@Sharable
class PrometheusHttpRequestHandler(
path: String,
prometheusServlet: PrometheusServlet) extends ServletHttpRequestHandler(path) {
override def handleRequest(uri: String): String = {
prometheusServlet.getMetricsSnapshot
override def createServletHandler(): ServletContextHandler = {
HttpUtils.createServletHandler(
servletPath,
new ServletParams(_ => getMetricsSnapshot, "text/plain"))
}
}

View File

@ -23,7 +23,8 @@ import scala.collection.JavaConverters._
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.server.common.http.{HttpRequestHandler, HttpServer, HttpServerInitializer}
import org.apache.celeborn.server.common.http.HttpServer
import org.apache.celeborn.server.common.http.api.ApiRootResource
import org.apache.celeborn.server.common.service.config.ConfigLevel
abstract class HttpService extends Service with Logging {
@ -173,18 +174,20 @@ abstract class HttpService extends Service with Logging {
def getWorkerEventInfo(): String = throw new UnsupportedOperationException()
def startHttpServer(): Unit = {
val handlers =
if (metricsSystem.running) {
new HttpRequestHandler(this, metricsSystem.getServletHandlers)
} else {
new HttpRequestHandler(this, null)
}
httpServer = new HttpServer(
httpServer = HttpServer(
serviceName,
httpHost(),
httpPort(),
new HttpServerInitializer(handlers))
httpMaxWorkerThreads(),
httpStopTimeout())
httpServer.start()
startInternal()
// block until the HTTP server is started, otherwise, we may get
// the wrong HTTP server port -1
while (httpServer.getState != "STARTED") {
logInfo(s"Waiting for $serviceName's HTTP server getting started")
Thread.sleep(1000)
}
}
private def httpHost(): String = {
@ -205,6 +208,41 @@ abstract class HttpService extends Service with Logging {
}
}
private def httpMaxWorkerThreads(): Int = {
serviceName match {
case Service.MASTER =>
conf.masterHttpMaxWorkerThreads
case Service.WORKER =>
conf.workerHttpMaxWorkerThreads
}
}
private def httpStopTimeout(): Long = {
serviceName match {
case Service.MASTER =>
conf.masterHttpStopTimeout
case Service.WORKER =>
conf.workerHttpStopTimeout
}
}
def connectionUrl: String = {
httpServer.getServerUri
}
protected def startInternal(): Unit = {
httpServer.addHandler(ApiRootResource.getServletHandler(this))
httpServer.addStaticHandler("META-INF/resources/webjars/swagger-ui/4.9.1/", "/swagger-static/")
httpServer.addStaticHandler("org/apache/celeborn/swagger", "/swagger")
httpServer.addRedirectHandler("/help", "/swagger")
httpServer.addRedirectHandler("/docs", "/swagger")
if (metricsSystem.running) {
metricsSystem.getServletContextHandlers.foreach { handler =>
httpServer.addHandler(handler)
}
}
}
override def initialize(): Unit = {
super.initialize()
startHttpServer()

View File

@ -1,269 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.server.common.http
import org.apache.celeborn.server.common.{HttpService, Service}
import org.apache.celeborn.server.common.service.config.ConfigLevel
/**
* HTTP endpoints of Rest API providers.
*/
trait HttpEndpoint {
def path: String
def description(service: String): String
def handle(service: HttpService, parameters: Map[String, String]): String
}
case object Conf extends HttpEndpoint {
override def path: String = "/conf"
override def description(service: String): String = s"List the conf setting of the $service."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.getConf
}
case object ListDynamicConfigs extends HttpEndpoint {
override def path: String = "/listDynamicConfigs"
override def description(service: String): String = s"List the dynamic configs of the $service. " +
s"The parameter level specifies the config level of dynamic configs. " +
s"The parameter tenant specifies the tenant id of ${ConfigLevel.TENANT.name()} or ${ConfigLevel.TENANT_USER.name()} level. " +
s"The parameter name specifies the user name of ${ConfigLevel.TENANT_USER.name()} level. " +
s"Meanwhile, either none or all of the parameter tenant and name are specified for ${ConfigLevel.TENANT_USER.name()} level."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.getDynamicConfigs(
parameters.getOrElse("LEVEL", "").trim,
parameters.getOrElse("TENANT", "").trim,
parameters.getOrElse("NAME", "").trim)
}
case object WorkerInfo extends HttpEndpoint {
override def path: String = "/workerInfo"
override def description(service: String): String = {
if (service == Service.MASTER)
"List worker information of the service. It will list all registered workers 's information."
else "List the worker information of the worker."
}
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.getWorkerInfo
}
case object ThreadDump extends HttpEndpoint {
override def path: String = "/threadDump"
override def description(service: String): String =
s"List the current thread dump of the $service."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.getThreadDump
}
case object Shuffles extends HttpEndpoint {
override def path: String = "/shuffles"
override def description(service: String): String = {
if (service == Service.MASTER)
"List all running shuffle keys of the service. It will return all running shuffle's key of the cluster."
else
"List all the running shuffle keys of the worker. It only return keys of shuffles running in that worker."
}
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.getShuffleList
}
case object Applications extends HttpEndpoint {
override def path: String = "/applications"
override def description(service: String): String =
if (service == Service.MASTER)
"List all running application's ids of the cluster."
else
"List all running application's ids of the worker. It only return application ids running in that worker."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.getApplicationList
}
case object ListTopDiskUsedApps extends HttpEndpoint {
override def path: String = "/listTopDiskUsedApps"
override def description(service: String): String = {
if (service == Service.MASTER)
"List the top disk usage application ids. It will return the top disk usage application ids for the cluster."
else
"List the top disk usage application ids. It only return application ids running in that worker."
}
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.listTopDiskUseApps
}
case object Help extends HttpEndpoint {
override def path: String = "/help"
override def description(service: String): String =
s"List the available API providers of the $service."
override def handle(service: HttpService, parameters: Map[String, String]): String =
HttpUtils.help(service.serviceName)
}
case object Invalid extends HttpEndpoint {
val invalid = "invalid"
override def path: String = None.toString
override def description(service: String): String = s"Invalid uri of the $service."
override def handle(service: HttpService, parameters: Map[String, String]): String = invalid
}
case object MasterGroupInfo extends HttpEndpoint {
override def path: String = "/masterGroupInfo"
override def description(service: String): String =
"List master group information of the service. It will list all master's LEADER, FOLLOWER information."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.getMasterGroupInfo
}
case object LostWorkers extends HttpEndpoint {
override def path: String = "/lostWorkers"
override def description(service: String): String = "List all lost workers of the master."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.getLostWorkers
}
case object ExcludedWorkers extends HttpEndpoint {
override def path: String = "/excludedWorkers"
override def description(service: String): String = "List all excluded workers of the master."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.getExcludedWorkers
}
case object ShutdownWorkers extends HttpEndpoint {
override def path: String = "/shutdownWorkers"
override def description(service: String): String = "List all shutdown workers of the master."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.getShutdownWorkers
}
case object Hostnames extends HttpEndpoint {
override def path: String = "/hostnames"
override def description(service: String): String =
"List all running application's LifecycleManager's hostnames of the cluster."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.getHostnameList
}
case object Exclude extends HttpEndpoint {
override def path: String = "/exclude"
override def description(service: String): String =
"Excluded workers of the master add or remove the worker manually given worker id. The parameter add or remove specifies the excluded workers to add or remove, which value is separated by commas."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.exclude(parameters.getOrElse("ADD", "").trim, parameters.getOrElse("REMOVE", "").trim)
}
case object ListPartitionLocationInfo extends HttpEndpoint {
override def path: String = "/listPartitionLocationInfo"
override def description(service: String): String =
"List all the living PartitionLocation information in that worker."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.listPartitionLocationInfo
}
case object UnavailablePeers extends HttpEndpoint {
override def path: String = "/unavailablePeers"
override def description(service: String): String =
"List the unavailable peers of the worker, this always means the worker connect to the peer failed."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.getUnavailablePeers
}
case object IsShutdown extends HttpEndpoint {
override def path: String = "/isShutdown"
override def description(service: String): String =
"Show if the worker is during the process of shutdown."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.isShutdown
}
case object IsRegistered extends HttpEndpoint {
override def path: String = "/isRegistered"
override def description(service: String): String =
"Show if the worker is registered to the master success."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.isRegistered
}
case object Exit extends HttpEndpoint {
override def path: String = "/exit"
override def description(service: String): String =
"Trigger this worker to exit. Legal types are 'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.exit(parameters.getOrElse("TYPE", ""))
}
case object SendWorkerEvent extends HttpEndpoint {
override def path: String = "/sendWorkerEvent"
override def description(service: String): String =
"For Master(Leader) can send worker event to manager workers. Legal types are 'None', 'Immediately', 'Decommission', 'DecommissionThenIdle', 'Graceful', 'Recommission'"
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.handleWorkerEvent(parameters.getOrElse("TYPE", ""), parameters.getOrElse("WORKERS", ""))
}
case object WorkerEventInfo extends HttpEndpoint {
override def path: String = "/workerEventInfo"
override def description(service: String): String =
"List all worker event infos of the master."
override def handle(service: HttpService, parameters: Map[String, String]): String =
service.getWorkerEventInfo()
}

View File

@ -1,77 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.server.common.http
import io.netty.buffer.Unpooled
import io.netty.channel.{ChannelFutureListener, ChannelHandlerContext, SimpleChannelInboundHandler}
import io.netty.channel.ChannelHandler.Sharable
import io.netty.handler.codec.http._
import io.netty.util.CharsetUtil
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.sink.{JsonHttpRequestHandler, ServletHttpRequestHandler}
import org.apache.celeborn.server.common.HttpService
/**
* A handler for the REST API that defines how to handle the HTTP request given a message.
*
* @param service The service of HTTP server.
* @param uri The uri of HTTP request.
*/
@Sharable
class HttpRequestHandler(
service: HttpService,
servletHttpRequestHandlers: Array[ServletHttpRequestHandler])
extends SimpleChannelInboundHandler[FullHttpRequest] with Logging {
override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
ctx.flush()
}
override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): Unit = {
val uri = req.uri()
val (path, parameters) = HttpUtils.parseUri(uri)
val msg = HttpUtils.handleRequest(service, path, parameters)
val textType = "text/plain; charset=UTF-8"
val jsonType = "application/json"
val (response, contentType) = msg match {
case Invalid.invalid =>
if (servletHttpRequestHandlers != null) {
servletHttpRequestHandlers.find(servlet =>
uri == servlet.getServletPath()).map {
case jsonHandler: JsonHttpRequestHandler =>
(jsonHandler.handleRequest(uri), jsonType)
case handler: ServletHttpRequestHandler =>
(handler.handleRequest(uri), textType)
}.getOrElse((s"Unknown path $uri!", textType))
} else {
(
s"${Invalid.description(service.serviceName)} ${HttpUtils.help(service.serviceName)}",
textType)
}
case _ => (msg, textType)
}
val res = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.OK,
Unpooled.copiedBuffer(response, CharsetUtil.UTF_8))
res.headers().set(HttpHeaderNames.CONTENT_TYPE, contentType)
ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE)
}
}

View File

@ -17,75 +17,108 @@
package org.apache.celeborn.server.common.http
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.{ChannelFuture, ChannelInitializer}
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.logging.{LoggingHandler, LogLevel}
import org.apache.commons.lang3.SystemUtils
import org.eclipse.jetty.server.{Handler, HttpConfiguration, HttpConnectionFactory, Server, ServerConnector}
import org.eclipse.jetty.server.handler.{ContextHandlerCollection, ErrorHandler}
import org.eclipse.jetty.util.component.LifeCycle
import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.network.util.{IOMode, NettyUtils}
import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
import org.apache.celeborn.common.util.CelebornExitKind
class HttpServer(
private[celeborn] case class HttpServer(
role: String,
host: String,
port: Int,
channelInitializer: ChannelInitializer[_]) extends Logging {
server: Server,
connector: ServerConnector,
rootHandler: ContextHandlerCollection) extends Logging {
private var bootstrap: ServerBootstrap = _
private var bindFuture: ChannelFuture = _
@volatile private var isStarted = false
@throws[Exception]
def start(): Unit = synchronized {
val boss = NettyUtils.createEventLoop(IOMode.NIO, 1, role + "-http-boss")
val worker = NettyUtils.createEventLoop(IOMode.NIO, 2, role + "-http-worker")
bootstrap = new ServerBootstrap
bootstrap
.group(boss, worker)
.handler(new LoggingHandler(LogLevel.DEBUG))
.channel(classOf[NioServerSocketChannel])
.childHandler(channelInitializer)
val address = new InetSocketAddress(host, port)
bindFuture = bootstrap.bind(address).sync
bindFuture.syncUninterruptibly()
logInfo(s"$role: HttpServer started on ${address.getHostString}:$port.")
isStarted = true
try {
server.start()
connector.start()
server.addConnector(connector)
logInfo(s"$role: HttpServer started on ${connector.getHost}:${connector.getPort}.")
isStarted = true
} catch {
case e: Exception =>
stop(CelebornExitKind.EXIT_IMMEDIATELY)
throw e
}
}
def stop(exitCode: Int): Unit = synchronized {
if (isStarted) {
if (exitCode == CelebornExitKind.EXIT_IMMEDIATELY) {
server.setStopTimeout(0)
}
logInfo(s"$role: Stopping HttpServer")
if (bindFuture != null) {
// close is a local operation and should finish within milliseconds; timeout just to be safe
bindFuture.channel.close.awaitUninterruptibly(10, TimeUnit.SECONDS)
bindFuture = null
server.stop()
connector.stop()
server.getThreadPool match {
case lifeCycle: LifeCycle => lifeCycle.stop()
case _ =>
}
if (bootstrap != null && bootstrap.config.group != null) {
Utils.tryLogNonFatalError {
if (exitCode == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
bootstrap.config.group.shutdownGracefully(3, 5, TimeUnit.SECONDS)
} else {
bootstrap.config.group.shutdownGracefully(0, 0, TimeUnit.SECONDS)
}
}
}
if (bootstrap != null && bootstrap.config.childGroup != null) {
Utils.tryLogNonFatalError {
if (exitCode == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
bootstrap.config.childGroup.shutdownGracefully(3, 5, TimeUnit.SECONDS)
} else {
bootstrap.config.childGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS)
}
}
}
bootstrap = null
logInfo(s"$role: HttpServer stopped.")
isStarted = false
}
}
def getServerUri: String = connector.getHost + ":" + connector.getLocalPort
def addHandler(handler: Handler): Unit = synchronized {
rootHandler.addHandler(handler)
if (!handler.isStarted) handler.start()
}
def addStaticHandler(
resourceBase: String,
contextPath: String): Unit = {
addHandler(HttpUtils.createStaticHandler(resourceBase, contextPath))
}
def addRedirectHandler(
src: String,
dest: String): Unit = {
addHandler(HttpUtils.createRedirectHandler(src, dest))
}
def getState: String = server.getState
}
object HttpServer {
def apply(role: String, host: String, port: Int, poolSize: Int, stopTimeout: Long): HttpServer = {
val pool = new QueuedThreadPool(poolSize)
pool.setName(s"$role-JettyThreadPool")
pool.setDaemon(true)
val server = new Server(pool)
server.setStopTimeout(stopTimeout)
val errorHandler = new ErrorHandler()
errorHandler.setShowStacks(true)
errorHandler.setServer(server)
server.addBean(errorHandler)
val collection = new ContextHandlerCollection
server.setHandler(collection)
val serverExecutor = new ScheduledExecutorScheduler(s"$role-JettyScheduler", true)
val httpConf = new HttpConfiguration()
val connector = new ServerConnector(
server,
null,
serverExecutor,
null,
-1,
-1,
new HttpConnectionFactory(httpConf))
connector.setHost(host)
connector.setPort(port)
connector.setReuseAddress(!SystemUtils.IS_OS_WINDOWS)
connector.setAcceptQueueSize(math.min(connector.getAcceptors, 8))
new HttpServer(role, server, connector, collection)
}
}

View File

@ -18,78 +18,111 @@
package org.apache.celeborn.server.common.http
import java.net.URL
import java.util.Locale
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
import org.apache.celeborn.server.common.{HttpService, Service}
import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder}
object HttpUtils {
import org.apache.celeborn.common.exception.CelebornException
import org.apache.celeborn.common.internal.Logging
private val baseEndpoints: List[HttpEndpoint] =
List(
Conf,
ListDynamicConfigs,
WorkerInfo,
ThreadDump,
Shuffles,
Applications,
ListTopDiskUsedApps,
Help)
private val masterEndpoints: List[HttpEndpoint] = List(
MasterGroupInfo,
LostWorkers,
ExcludedWorkers,
ShutdownWorkers,
Hostnames,
SendWorkerEvent,
WorkerEventInfo,
Exclude) ++ baseEndpoints
private val workerEndpoints: List[HttpEndpoint] =
List(
ListPartitionLocationInfo,
UnavailablePeers,
IsShutdown,
IsRegistered,
Exit) ++ baseEndpoints
private[celeborn] object HttpUtils extends Logging {
// Base type for a function that returns something based on an HTTP request. Allows for
// implicit conversion from many types of functions to jetty Handlers.
type Responder[T] = HttpServletRequest => T
def parseUri(uri: String): (String, Map[String, String]) = {
val url = new URL(s"https://127.0.0.1:9000$uri")
val parameter =
if (url.getQuery == null) {
Map.empty[String, String]
} else {
url.getQuery
.split("&")
.map(_.split("="))
.map(arr => arr(0).toUpperCase(Locale.ROOT) -> arr(1)).toMap
}
(url.getPath, parameter)
}
class ServletParams[T <: AnyRef](
val responder: Responder[T],
val contentType: String,
val extractFn: T => String = (in: Any) => in.toString) {}
def handleRequest(
service: HttpService,
/** Create a context handler that responds to a request with the given path prefix */
def createServletHandler[T <: AnyRef](
path: String,
parameters: Map[String, String]): String = {
endpoints(service.serviceName).find(endpoint => endpoint.path == path).orElse(
Some(Invalid)).get.handle(
service,
parameters)
servletParams: ServletParams[T]): ServletContextHandler = {
createServletHandler(path, createServlet(servletParams))
}
def help(service: String): String = {
val sb = new StringBuilder
sb.append("Available API providers include:\n")
val httpEndpoints: List[HttpEndpoint] = endpoints(service)
val maxLength = httpEndpoints.map(_.path.length).max
httpEndpoints.sortBy(_.path).foreach(endpoint =>
sb.append(
s"${endpoint.path.padTo(maxLength, " ").mkString} ${endpoint.description(service)}\n"))
sb.toString
private def createServlet[T <: AnyRef](servletParams: ServletParams[T]): HttpServlet = {
new HttpServlet {
override def doGet(request: HttpServletRequest, response: HttpServletResponse): Unit = {
try {
response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
response.setStatus(HttpServletResponse.SC_OK)
val result = servletParams.responder(request)
response.getWriter.print(servletParams.extractFn(result))
} catch {
case e: IllegalArgumentException =>
response.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage)
case e: Exception =>
logWarning(s"GET ${request.getRequestURI} failed: $e", e)
throw e
}
}
override protected def doTrace(req: HttpServletRequest, res: HttpServletResponse): Unit = {
res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
}
}
}
private def endpoints(service: String): List[HttpEndpoint] = {
if (service == Service.MASTER)
masterEndpoints
else
workerEndpoints
/**
* Create a handler for serving files from a static directory
*
* @param resourceBase the resource directory contains static resource files
* @param contextPath the content path to set for the handler
* @return a static [[ServletContextHandler]]
*/
def createStaticHandler(
resourceBase: String,
contextPath: String): ServletContextHandler = {
val contextHandler = new ServletContextHandler()
val holder = new ServletHolder(classOf[DefaultServlet])
Option(Thread.currentThread().getContextClassLoader.getResource(resourceBase)) match {
case Some(res) =>
holder.setInitParameter("resourceBase", res.toString)
case None =>
throw new CelebornException("Could not find resource path for Web UI: " + resourceBase)
}
contextHandler.setContextPath(contextPath)
contextHandler.addServlet(holder, "/")
contextHandler
}
def createServletHandler(contextPath: String, servlet: HttpServlet): ServletContextHandler = {
val handler = new ServletContextHandler()
val holder = new ServletHolder(servlet)
handler.setContextPath(contextPath)
handler.addServlet(holder, "/")
handler
}
def createRedirectHandler(src: String, dest: String): ServletContextHandler = {
val redirectedServlet = new HttpServlet {
private def doReq(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
val newURL = new URL(new URL(req.getRequestURL.toString), dest).toString
resp.sendRedirect(newURL)
}
override def doGet(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
doReq(req, resp)
}
override def doPut(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
doReq(req, resp)
}
override def doPost(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
doReq(req, resp)
}
override def doDelete(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
doReq(req, resp)
}
override protected def doTrace(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED)
}
}
createServletHandler(src, redirectedServlet)
}
}

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.server.common.http.api
import javax.ws.rs.{GET, Path, Produces, QueryParam}
import javax.ws.rs.core.MediaType
import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.responses.ApiResponse
@Path("/")
private[api] class ApiBaseResource extends ApiRequestContext {
def service: String = httpService.serviceName
@GET
@Path("ping")
@Produces(Array(MediaType.TEXT_PLAIN))
def ping(): String = "pong"
@Path("/conf")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description = "List the conf setting.")
@GET
def conf: String = httpService.getConf
@Path("/listDynamicConfigs")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description = "List the dynamic configs. " +
"The parameter level specifies the config level of dynamic configs. " +
"The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. " +
"The parameter name specifies the user name of TENANT_USER level. " +
"Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.")
@GET
def listDynamicConfigs(
@QueryParam("LEVEL") level: String,
@QueryParam("TENANT") tenant: String,
@QueryParam("NAME") name: String): String = {
httpService.getDynamicConfigs(
normalizeParam(level),
normalizeParam(tenant),
normalizeParam(name))
}
@Path("/workerInfo")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description =
"For MASTER: List worker information of the service. It will list all registered workers 's information.\n" +
"For WORKER: List the worker information of the worker.")
@GET
def workerInfo(): String = {
httpService.getWorkerInfo
}
@Path("/threadDump")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description = "List the current thread dump.")
@GET
def threadDump(): String = {
httpService.getThreadDump
}
@Path("shuffle")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description =
"For MASTER: List all running shuffle keys of the service. It will return all running shuffle's key of the cluster.\n" +
"For WORKER: List all the running shuffle keys of the worker. It only return keys of shuffles running in that worker.")
@GET
def shuffles(): String = {
httpService.getShuffleList
}
@Path("applications")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description =
"For MASTER: List all running application's ids of the cluster.\n" +
"For WORKER: List all running application's ids of the worker. It only return application ids running in that worker.")
@GET
def applications(): String = {
httpService.getApplicationList
}
@Path("listTopDiskUsedApps")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description =
"For MASTER: List the top disk usage application ids. It will return the top disk usage application ids for the cluster.\n" +
"For WORKER: List the top disk usage application ids. It only return application ids running in that worker.")
@GET
def listTopDiskUsedApps(): String = {
httpService.listTopDiskUseApps
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.server.common.http.api
import javax.servlet.ServletContext
import javax.servlet.http.HttpServletRequest
import javax.ws.rs.core.Context
import org.eclipse.jetty.server.handler.ContextHandler
import org.apache.celeborn.server.common.HttpService
private[celeborn] trait ApiRequestContext {
@Context
protected var servletContext: ServletContext = _
@Context
protected var httpRequest: HttpServletRequest = _
final protected def httpService: HttpService = HttpServiceContext.get(servletContext)
protected def normalizeParam(param: String): String = Option(param).map(_.trim).getOrElse("")
}
private[celeborn] object HttpServiceContext {
private val attribute = getClass.getCanonicalName
def set(contextHandler: ContextHandler, rs: HttpService): Unit = {
contextHandler.setAttribute(attribute, rs)
}
def get(context: ServletContext): HttpService = {
context.getAttribute(attribute).asInstanceOf[HttpService]
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.server.common.http.api
import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
import org.glassfish.jersey.server.ResourceConfig
import org.glassfish.jersey.servlet.ServletContainer
import org.apache.celeborn.server.common.HttpService
private[celeborn] object ApiRootResource {
def getServletHandler(rs: HttpService): ServletContextHandler = {
val openapiConf: ResourceConfig = new OpenAPIConfig
val holder = new ServletHolder(new ServletContainer(openapiConf))
val handler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
handler.setContextPath("/")
HttpServiceContext.set(handler, rs)
handler.addServlet(holder, "/*")
handler
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.server.common.http.api
import javax.servlet.ServletConfig
import javax.ws.rs.{GET, Path, PathParam, Produces}
import javax.ws.rs.core.{Application, Context, HttpHeaders, MediaType, Response, UriInfo}
import scala.collection.JavaConverters._
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter
import io.swagger.v3.jaxrs2.integration.JaxrsOpenApiContextBuilder
import io.swagger.v3.jaxrs2.integration.resources.BaseOpenApiResource
import io.swagger.v3.oas.annotations.Operation
import io.swagger.v3.oas.integration.api.OpenApiContext
import io.swagger.v3.oas.models.OpenAPI
import io.swagger.v3.oas.models.info.{Info, License}
import io.swagger.v3.oas.models.servers.Server
import org.apache.commons.lang3.StringUtils
@Path("/openapi.{type:json|yaml}")
class CelebornOpenApiResource extends BaseOpenApiResource with ApiRequestContext {
@Context
protected var config: ServletConfig = _
@Context
protected var app: Application = _
@GET
@Produces(Array(MediaType.APPLICATION_JSON, "application/yaml"))
@Operation(hidden = true)
def getOpenApi(
@Context headers: HttpHeaders,
@Context uriInfo: UriInfo,
@PathParam("type") tpe: String): Response = {
val ctxId = getContextId(config)
val ctx: OpenApiContext = new CelebornJaxrsOpenApiContextBuilder()
.servletConfig(config)
.application(app)
.resourcePackages(OpenAPIConfig.packages.toSet.asJava)
.configLocation(configLocation)
.openApiConfiguration(openApiConfiguration)
.ctxId(ctxId)
.buildContext(true)
val openApi = setCelebornOpenAPIDefinition(ctx.read())
if (StringUtils.isNotBlank(tpe) && tpe.trim().equalsIgnoreCase("yaml")) {
Response.status(Response.Status.OK)
.entity(
ctx.getOutputYamlMapper()
.writer(new DefaultPrettyPrinter())
.writeValueAsString(openApi))
.`type`("application/yaml")
.build()
} else {
Response.status(Response.Status.OK)
.entity(
ctx.getOutputJsonMapper
.writer(new DefaultPrettyPrinter())
.writeValueAsString(openApi))
.`type`(MediaType.APPLICATION_JSON_TYPE)
.build()
}
}
private def setCelebornOpenAPIDefinition(openApi: OpenAPI): OpenAPI = {
// TODO: to improve when https is enabled.
val apiUrl = s"http://${httpService.connectionUrl}/"
openApi.info(
new Info().title(
s"Apache Celeborn (Incubating) REST API Documentation")
.description(s"Role: ${httpService.serviceName}")
.license(
new License().name("Apache License 2.0")
.url("https://www.apache.org/licenses/LICENSE-2.0.txt")))
.servers(List(new Server().url(apiUrl)).asJava)
}
}
class CelebornJaxrsOpenApiContextBuilder
extends JaxrsOpenApiContextBuilder[CelebornJaxrsOpenApiContextBuilder]

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.server.common.http.api
import javax.ws.rs.ext.ContextResolver
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
class CelebornScalaObjectMapper extends ContextResolver[ObjectMapper] {
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
override def getContext(aClass: Class[_]): ObjectMapper = mapper
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.server.common.http.api
import org.glassfish.jersey.server.ResourceConfig
class OpenAPIConfig extends ResourceConfig {
packages(OpenAPIConfig.packages: _*)
register(classOf[CelebornOpenApiResource])
register(classOf[CelebornScalaObjectMapper])
}
object OpenAPIConfig {
val packages = Seq(
"org.apache.celeborn.server.common.http.api",
"org.apache.celeborn.service.deploy.master.http.api",
"org.apache.celeborn.service.deploy.worker.http.api")
}

View File

@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet
*.sink.jsonServlet.class=org.apache.celeborn.common.metrics.sink.JsonServlet

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.server.common.http
import javax.ws.rs.core.MediaType
import org.apache.celeborn.common.CelebornConf
abstract class ApiBaseResourceSuite extends HttpTestHelper {
celebornConf.set(CelebornConf.METRICS_ENABLED.key, "true")
.set(
CelebornConf.METRICS_CONF.key,
Thread.currentThread().getContextClassLoader.getResource("metrics-api.properties").getFile)
test("ping") {
val response = webTarget.path("ping").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
assert(response.readEntity(classOf[String]) == "pong")
}
test("conf") {
val response = webTarget.path("conf").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("listDynamicConfigs") {
val response = webTarget.path("listDynamicConfigs")
.queryParam("LEVEL", "TENANT")
.request(MediaType.TEXT_PLAIN)
.get()
assert(200 == response.getStatus)
}
test("workerInfo") {
val response = webTarget.path("workerInfo").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("threadDump") {
val response = webTarget.path("threadDump").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("shuffle") {
val response = webTarget.path("shuffle").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("applications") {
val response = webTarget.path("applications").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("listTopDiskUsedApps") {
val response = webTarget.path("listTopDiskUsedApps").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("openapi.json") {
val response = webTarget.path("openapi.json").request(MediaType.APPLICATION_JSON).get()
assert(200 == response.getStatus)
assert(response.readEntity(classOf[String]).contains("/conf"))
}
test("swagger") {
Seq("swagger", "docs", "help").foreach { path =>
val response = webTarget.path(path).request(MediaType.TEXT_HTML).get()
assert(200 == response.getStatus)
assert(response.readEntity(classOf[String]).contains("swagger-ui"))
}
}
test("metrics/prometheus") {
val response = webTarget.path("metrics/prometheus").request(MediaType.APPLICATION_JSON).get()
assert(200 == response.getStatus)
assert(response.readEntity(classOf[String]).contains("metrics_jvm_memory_heap_max_Value"))
}
test("metrics/json") {
val response = webTarget.path("metrics/json").request(MediaType.APPLICATION_JSON).get()
assert(200 == response.getStatus)
assert(response.readEntity(classOf[String]).contains("\"name\" : \"jvm.memory.heap.max\""))
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.server.common.http
import java.net.URI
import javax.ws.rs.client.WebTarget
import javax.ws.rs.core.{Application, UriBuilder}
import org.glassfish.jersey.client.ClientConfig
import org.glassfish.jersey.media.multipart.MultiPartFeature
import org.glassfish.jersey.server.ResourceConfig
import org.glassfish.jersey.test.JerseyTest
import org.glassfish.jersey.test.jetty.JettyTestContainerFactory
import org.glassfish.jersey.test.spi.TestContainerFactory
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.server.common.HttpService
import org.apache.celeborn.server.common.http.HttpTestHelper.RestApiBaseSuite
import org.apache.celeborn.server.common.http.api.CelebornScalaObjectMapper
object HttpTestHelper {
class RestApiBaseSuite extends JerseyTest {
override def configure: Application = new ResourceConfig(getClass)
.register(classOf[MultiPartFeature])
override def configureClient(config: ClientConfig): Unit = {
config.register(classOf[CelebornScalaObjectMapper])
.register(classOf[MultiPartFeature])
}
override def getTestContainerFactory: TestContainerFactory = new JettyTestContainerFactory
}
}
trait HttpTestHelper extends AnyFunSuite
with BeforeAndAfterAll
with BeforeAndAfterEach
with Logging {
protected val celebornConf = new CelebornConf()
protected def httpService: HttpService
protected val restApiBaseSuite: JerseyTest = new RestApiBaseSuite
override def beforeAll(): Unit = {
super.beforeAll()
restApiBaseSuite.setUp()
}
override def afterAll(): Unit = {
restApiBaseSuite.tearDown()
super.afterAll()
}
protected lazy val baseUri: URI =
UriBuilder.fromUri(s"http://${httpService.connectionUrl}/").build()
protected lazy val webTarget: WebTarget = restApiBaseSuite.client.target(baseUri)
}

View File

@ -1,121 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.server.common.http
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.server.common.Service
class HttpUtilsSuite extends AnyFunSuite with Logging {
def checkParseUri(
uri: String,
expectPath: String,
expectParameters: Map[String, String]): Unit = {
val (path, parameters) = HttpUtils.parseUri(uri)
assert(path == expectPath)
assert(parameters == expectParameters)
}
test("CELEBORN-448: Support exclude worker manually") {
checkParseUri("/exclude", "/exclude", Map.empty)
checkParseUri(
"/exclude?add=localhost:1001:1002:1003:1004",
"/exclude",
Map("ADD" -> "localhost:1001:1002:1003:1004"))
checkParseUri(
"/exclude?remove=localhost:1001:1002:1003:1004",
"/exclude",
Map("REMOVE" -> "localhost:1001:1002:1003:1004"))
checkParseUri(
"/exclude?add=localhost:1001:1002:1003:1004&remove=localhost:2001:2002:2003:2004",
"/exclude",
Map("ADD" -> "localhost:1001:1002:1003:1004", "REMOVE" -> "localhost:2001:2002:2003:2004"))
}
test("CELEBORN-847: Support parse HTTP Restful API parameters") {
checkParseUri("/exit", "/exit", Map.empty)
checkParseUri("/exit?type=decommission", "/exit", Map("TYPE" -> "decommission"))
checkParseUri(
"/exit?type=decommission&foo=A",
"/exit",
Map("TYPE" -> "decommission", "FOO" -> "A"))
}
test("CELEBORN-829: Improve response message of invalid HTTP request") {
assert(HttpUtils.help(Service.MASTER) ==
s"""Available API providers include:
|/applications List all running application's ids of the cluster.
|/conf List the conf setting of the master.
|/exclude Excluded workers of the master add or remove the worker manually given worker id. The parameter add or remove specifies the excluded workers to add or remove, which value is separated by commas.
|/excludedWorkers List all excluded workers of the master.
|/help List the available API providers of the master.
|/hostnames List all running application's LifecycleManager's hostnames of the cluster.
|/listDynamicConfigs List the dynamic configs of the master. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
|/listTopDiskUsedApps List the top disk usage application ids. It will return the top disk usage application ids for the cluster.
|/lostWorkers List all lost workers of the master.
|/masterGroupInfo List master group information of the service. It will list all master's LEADER, FOLLOWER information.
|/sendWorkerEvent For Master(Leader) can send worker event to manager workers. Legal types are 'None', 'Immediately', 'Decommission', 'DecommissionThenIdle', 'Graceful', 'Recommission'
|/shuffles List all running shuffle keys of the service. It will return all running shuffle's key of the cluster.
|/shutdownWorkers List all shutdown workers of the master.
|/threadDump List the current thread dump of the master.
|/workerEventInfo List all worker event infos of the master.
|/workerInfo List worker information of the service. It will list all registered workers 's information.
|""".stripMargin)
assert(HttpUtils.help(Service.WORKER) ==
s"""Available API providers include:
|/applications List all running application's ids of the worker. It only return application ids running in that worker.
|/conf List the conf setting of the worker.
|/exit Trigger this worker to exit. Legal types are 'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'.
|/help List the available API providers of the worker.
|/isRegistered Show if the worker is registered to the master success.
|/isShutdown Show if the worker is during the process of shutdown.
|/listDynamicConfigs List the dynamic configs of the worker. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.
|/listPartitionLocationInfo List all the living PartitionLocation information in that worker.
|/listTopDiskUsedApps List the top disk usage application ids. It only return application ids running in that worker.
|/shuffles List all the running shuffle keys of the worker. It only return keys of shuffles running in that worker.
|/threadDump List the current thread dump of the worker.
|/unavailablePeers List the unavailable peers of the worker, this always means the worker connect to the peer failed.
|/workerInfo List the worker information of the worker.
|""".stripMargin)
}
test("CELEBORN-1245: Support Master manage workers") {
checkParseUri(
"/sendWorkerEvent?type=decommission&workers=localhost:1001:1002:1003:1004",
"/sendWorkerEvent",
Map("TYPE" -> "decommission", "WORKERS" -> "localhost:1001:1002:1003:1004"))
}
test("CELEBORN-1056: Introduce Rest API of listing dynamic configuration") {
checkParseUri("/listDynamicConfigs", "/listDynamicConfigs", Map.empty)
checkParseUri(
"/listDynamicConfigs?level=system",
"/listDynamicConfigs",
Map("LEVEL" -> "system"))
checkParseUri(
"/listDynamicConfigs?level=tenant&tenant=tenantId1",
"/listDynamicConfigs",
Map("LEVEL" -> "tenant", "TENANT" -> "tenantId1"))
checkParseUri(
"/listDynamicConfigs?level=tenant_user&tenant=tenantId1&name=user1",
"/listDynamicConfigs",
Map("LEVEL" -> "tenant_user", "TENANT" -> "tenantId1", "NAME" -> "user1"))
}
}

View File

@ -75,6 +75,14 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>

View File

@ -98,6 +98,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-service_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-master_${scala.binary.version}</artifactId>
@ -109,6 +115,16 @@
<artifactId>mockito-scala-scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.test-framework</groupId>
<artifactId>jersey-test-framework-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.test-framework.providers</groupId>
<artifactId>jersey-test-framework-provider-jetty</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.worker.http.api
import javax.ws.rs.{GET, Path, POST, QueryParam}
import javax.ws.rs.core.MediaType
import io.swagger.v3.oas.annotations.media.Content
import io.swagger.v3.oas.annotations.responses.ApiResponse
import org.apache.celeborn.server.common.http.api.ApiRequestContext
@Path("/")
class ApiWorkerResource extends ApiRequestContext {
@Path("/listPartitionLocationInfo")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description = "List all the living PartitionLocation information in that worker.")
@GET
def listPartitionLocationInfo: String = httpService.listPartitionLocationInfo
@Path("/unavailablePeers")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description =
"List the unavailable peers of the worker, this always means the worker connect to the peer failed.")
@GET
def unavailablePeers: String = httpService.getUnavailablePeers
@Path("/isShutdown")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description = "Show if the worker is during the process of shutdown.")
@GET
def isShutdown: String = httpService.isShutdown
@Path("/isRegistered")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description = "Show if the worker is registered to the master success.")
@GET
def isRegistered: String = httpService.isRegistered
@Path("/exit")
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.TEXT_PLAIN)),
description =
"Trigger this worker to exit. Legal types are 'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'.")
@POST
def exit(@QueryParam("TYPE") exitType: String): String = {
httpService.exit(normalizeParam(exitType))
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.service.deploy.worker.storage
import javax.ws.rs.core.MediaType
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
import org.apache.celeborn.server.common.HttpService
import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments}
class ApiWorkerResourceSuite extends ApiBaseResourceSuite {
private var worker: Worker = _
override protected def httpService: HttpService = worker
override def beforeAll(): Unit = {
val workerArgs = new WorkerArguments(Array(), celebornConf)
worker = new Worker(celebornConf, workerArgs)
worker.metricsSystem.start()
worker.startHttpServer()
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
worker.metricsSystem.stop()
worker.rpcEnv.shutdown()
worker.stop(CelebornExitKind.EXIT_IMMEDIATELY)
}
test("listPartitionLocationInfo") {
val response = webTarget.path("listPartitionLocationInfo").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("unavailablePeers") {
val response = webTarget.path("unavailablePeers").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("isShutdown") {
val response = webTarget.path("isShutdown").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
test("isRegistered") {
val response = webTarget.path("isRegistered").request(MediaType.TEXT_PLAIN).get()
assert(200 == response.getStatus)
}
}