diff --git a/LICENSE-binary b/LICENSE-binary index a33986495..034141317 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -248,6 +248,11 @@ io.netty:netty-transport-native-unix-common io.netty:netty-transport-rxtx io.netty:netty-transport-sctp io.netty:netty-transport-udt +io.swagger.core.v3:swagger-annotations +io.swagger.core.v3:swagger-core +io.swagger.core.v3:swagger-integration +io.swagger.core.v3:swagger-jaxrs2 +io.swagger.core.v3:swagger-models org.apache.commons:commons-crypto org.apache.commons:commons-lang3 org.apache.hadoop:hadoop-client-api @@ -267,6 +272,13 @@ org.apache.ratis:ratis-server org.apache.ratis:ratis-server-api org.apache.ratis:ratis-shell org.apache.ratis:ratis-thirdparty-misc +org.eclipse.jetty:jetty-http +org.eclipse.jetty:jetty-io +org.eclipse.jetty:jetty-security +org.eclipse.jetty:jetty-server +org.eclipse.jetty:jetty-servlet +org.eclipse.jetty:jetty-util-ajax +org.eclipse.jetty:jetty-util org.javassist:javassist org.reflections:reflections org.roaringbitmap:RoaringBitmap @@ -275,6 +287,7 @@ org.rocksdb:rocksdbjni org.scala-lang:scala-library org.scala-lang:scala-reflect org.slf4j:jcl-over-slf4j +org.webjars:swagger-ui org.xerial.snappy:snappy-java org.yaml:snakeyaml @@ -308,3 +321,15 @@ org.slf4j:slf4j-api ------------ See licenses/LICENSE-javassist.txt for detail. org.javassist:javassist + +Eclipse Public License (EPL) 2.0 +-------------------------------- +jakarta.annotation:jakarta.annotation-api +jakarta.servlet:jakarta.servlet-api +jakarta.ws.rs:jakarta.ws.rs-api +org.glassfish.hk2:hk2-api +org.glassfish.hk2:hk2-locator +org.glassfish.hk2:hk2-utils +org.glassfish.hk2.external:aopalliance-repackaged +org.glassfish.hk2.external:jakarta.inject +org.glassfish.hk2:osgi-resource-locator diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 905cfad57..2e4bd2d5a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -586,6 +586,10 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def masterHttpPort: Int = get(MASTER_HTTP_PORT) + def masterHttpMaxWorkerThreads: Int = get(MASTER_HTTP_MAX_WORKER_THREADS) + + def masterHttpStopTimeout: Long = get(MASTER_HTTP_STOP_TIMEOUT) + def haEnabled: Boolean = get(HA_ENABLED) def haMasterNodeId: Option[String] = get(HA_MASTER_NODE_ID) @@ -676,6 +680,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def workerHttpHost: String = get(WORKER_HTTP_HOST).replace("", Utils.localHostName(this)) def workerHttpPort: Int = get(WORKER_HTTP_PORT) + def workerHttpMaxWorkerThreads: Int = get(WORKER_HTTP_MAX_WORKER_THREADS) + def workerHttpStopTimeout: Long = get(WORKER_HTTP_STOP_TIMEOUT) def workerRpcPort: Int = get(WORKER_RPC_PORT) def workerPushPort: Int = get(WORKER_PUSH_PORT) def workerFetchPort: Int = get(WORKER_FETCH_PORT) @@ -1976,6 +1982,23 @@ object CelebornConf extends Logging { .checkValue(p => p >= 1024 && p < 65535, "Invalid port") .createWithDefault(9098) + val MASTER_HTTP_MAX_WORKER_THREADS: ConfigEntry[Int] = + buildConf("celeborn.master.http.maxWorkerThreads") + .categories("master") + .version("0.5.0") + .doc("Maximum number of threads in the master http worker thread pool.") + .intConf + .checkValue(_ > 0, "Must be positive.") + .createWithDefault(999) + + val MASTER_HTTP_STOP_TIMEOUT: ConfigEntry[Long] = + buildConf("celeborn.master.http.stopTimeout") + .categories("master") + .version("0.5.0") + .doc("Master http server stop timeout.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("5s") + val HA_ENABLED: ConfigEntry[Boolean] = buildConf("celeborn.master.ha.enabled") .withAlternative("celeborn.ha.enabled") @@ -2534,6 +2557,23 @@ object CelebornConf extends Logging { .checkValue(p => p >= 1024 && p < 65535, "Invalid port") .createWithDefault(9096) + val WORKER_HTTP_MAX_WORKER_THREADS: ConfigEntry[Int] = + buildConf("celeborn.worker.http.maxWorkerThreads") + .categories("worker") + .version("0.5.0") + .doc("Maximum number of threads in the worker http worker thread pool.") + .intConf + .checkValue(_ > 0, "Must be positive.") + .createWithDefault(999) + + val WORKER_HTTP_STOP_TIMEOUT: ConfigEntry[Long] = + buildConf("celeborn.worker.http.stopTimeout") + .categories("worker") + .version("0.5.0") + .doc("Worker http server stop timeout.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("5s") + val WORKER_RPC_PORT: ConfigEntry[Int] = buildConf("celeborn.worker.rpc.port") .categories("worker") diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsUtils.scala similarity index 57% rename from service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala rename to common/src/main/scala/org/apache/celeborn/common/metrics/MetricsUtils.scala index 2619606b5..0d5086bbb 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServerInitializer.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsUtils.scala @@ -15,19 +15,19 @@ * limitations under the License. */ -package org.apache.celeborn.server.common.http +package org.apache.celeborn.common.metrics -import io.netty.channel.{ChannelInitializer, SimpleChannelInboundHandler} -import io.netty.channel.socket.SocketChannel -import io.netty.handler.codec.http.{HttpObjectAggregator, HttpServerCodec} +import java.util.concurrent.TimeUnit -class HttpServerInitializer( - handlers: SimpleChannelInboundHandler[_]) extends ChannelInitializer[SocketChannel] { +object MetricsUtils { + private[this] val MINIMAL_POLL_UNIT = TimeUnit.SECONDS + private[this] val MINIMAL_POLL_PERIOD = 1 - override def initChannel(channel: SocketChannel): Unit = { - val pipeline = channel.pipeline() - pipeline.addLast(new HttpServerCodec()) - .addLast("httpAggregator", new HttpObjectAggregator(512 * 1024)) - .addLast(handlers) + def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) { + val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit) + if (period < MINIMAL_POLL_PERIOD) { + throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit + + " below than minimal polling period ") + } } } diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/CsvSink.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/CsvSink.scala index 34097b34d..52ccf4638 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/CsvSink.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/CsvSink.scala @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics.{CsvReporter, MetricRegistry} -import org.apache.celeborn.common.metrics.MetricsSystem +import org.apache.celeborn.common.metrics.MetricsUtils class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink { val CSV_KEY_PERIOD = "period" @@ -44,7 +44,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT) } - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + MetricsUtils.checkMinimalPollingPeriod(pollUnit, pollPeriod) val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match { case Some(s) => s diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/GraphiteSink.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/GraphiteSink.scala index cc97b9bef..b2be72fff 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/GraphiteSink.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/sink/GraphiteSink.scala @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics.MetricRegistry import com.codahale.metrics.graphite.{Graphite, GraphiteReporter, GraphiteUDP} -import org.apache.celeborn.common.metrics.MetricsSystem +import org.apache.celeborn.common.metrics.MetricsUtils private class GraphiteSink(val property: Properties, val registry: MetricRegistry) extends Sink { val GRAPHITE_DEFAULT_PERIOD = 10 @@ -62,7 +62,7 @@ private class GraphiteSink(val property: Properties, val registry: MetricRegistr val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX) - MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + MetricsUtils.checkMinimalPollingPeriod(pollUnit, pollPeriod) val graphite = propertyToOption(GRAPHITE_KEY_PROTOCOL).map(_.toLowerCase(Locale.ROOT)) match { case Some("udp") => new GraphiteUDP(host, port) diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server index eadf37b17..ca5a32e7e 100644 --- a/dev/deps/dependencies-server +++ b/dev/deps/dependencies-server @@ -17,7 +17,9 @@ HikariCP/4.0.3//HikariCP-4.0.3.jar RoaringBitmap/0.9.32//RoaringBitmap-0.9.32.jar +aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar +classgraph/4.8.138//classgraph-4.8.138.jar commons-cli/1.5.0//commons-cli-1.5.0.jar commons-crypto/1.0.0//commons-crypto-1.0.0.jar commons-io/2.13.0//commons-io-2.13.0.jar @@ -27,13 +29,43 @@ failureaccess/1.0.1//failureaccess-1.0.1.jar guava/32.1.3-jre//guava-32.1.3-jre.jar hadoop-client-api/3.3.6//hadoop-client-api-3.3.6.jar hadoop-client-runtime/3.3.6//hadoop-client-runtime-3.3.6.jar +hk2-api/2.6.1//hk2-api-2.6.1.jar +hk2-locator/2.6.1//hk2-locator-2.6.1.jar +hk2-utils/2.6.1//hk2-utils-2.6.1.jar jackson-annotations/2.15.3//jackson-annotations-2.15.3.jar jackson-core/2.15.3//jackson-core-2.15.3.jar jackson-databind/2.15.3//jackson-databind-2.15.3.jar +jackson-dataformat-yaml/2.13.2//jackson-dataformat-yaml-2.13.2.jar +jackson-datatype-jsr310/2.13.2//jackson-datatype-jsr310-2.13.2.jar +jackson-jaxrs-base/2.13.2//jackson-jaxrs-base-2.13.2.jar +jackson-jaxrs-json-provider/2.13.2//jackson-jaxrs-json-provider-2.13.2.jar +jackson-module-jaxb-annotations/2.14.1//jackson-module-jaxb-annotations-2.14.1.jar jackson-module-scala_2.12/2.15.3//jackson-module-scala_2.12-2.15.3.jar -javassist/3.28.0-GA//javassist-3.28.0-GA.jar -javax.servlet-api/3.1.0//javax.servlet-api-3.1.0.jar +jakarta.annotation-api/1.3.5//jakarta.annotation-api-1.3.5.jar +jakarta.inject/2.6.1//jakarta.inject-2.6.1.jar +jakarta.servlet-api/4.0.4//jakarta.servlet-api-4.0.4.jar +jakarta.validation-api/2.0.2//jakarta.validation-api-2.0.2.jar +jakarta.ws.rs-api/2.1.6//jakarta.ws.rs-api-2.1.6.jar +jakarta.xml.bind-api/2.3.3//jakarta.xml.bind-api-2.3.3.jar +javassist/3.29.0-GA//javassist-3.29.0-GA.jar jcl-over-slf4j/1.7.36//jcl-over-slf4j-1.7.36.jar +jersey-client/2.39.1//jersey-client-2.39.1.jar +jersey-common/2.39.1//jersey-common-2.39.1.jar +jersey-container-servlet-core/2.39.1//jersey-container-servlet-core-2.39.1.jar +jersey-entity-filtering/2.39.1//jersey-entity-filtering-2.39.1.jar +jersey-hk2/2.39.1//jersey-hk2-2.39.1.jar +jersey-media-json-jackson/2.39.1//jersey-media-json-jackson-2.39.1.jar +jersey-media-multipart/2.39.1//jersey-media-multipart-2.39.1.jar +jersey-server/2.39.1//jersey-server-2.39.1.jar +jetty-client/9.4.52.v20230823//jetty-client-9.4.52.v20230823.jar +jetty-http/9.4.52.v20230823//jetty-http-9.4.52.v20230823.jar +jetty-io/9.4.52.v20230823//jetty-io-9.4.52.v20230823.jar +jetty-proxy/9.4.52.v20230823//jetty-proxy-9.4.52.v20230823.jar +jetty-security/9.4.52.v20230823//jetty-security-9.4.52.v20230823.jar +jetty-server/9.4.52.v20230823//jetty-server-9.4.52.v20230823.jar +jetty-servlet/9.4.52.v20230823//jetty-servlet-9.4.52.v20230823.jar +jetty-util-ajax/9.4.52.v20230823//jetty-util-ajax-9.4.52.v20230823.jar +jetty-util/9.4.52.v20230823//jetty-util-9.4.52.v20230823.jar jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar @@ -46,6 +78,7 @@ maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar +mimepull/1.9.15//mimepull-1.9.15.jar mybatis/3.5.15//mybatis-3.5.15.jar netty-all/4.1.107.Final//netty-all-4.1.107.Final.jar netty-buffer/4.1.107.Final//netty-buffer-4.1.107.Final.jar @@ -81,6 +114,7 @@ netty-transport-rxtx/4.1.107.Final//netty-transport-rxtx-4.1.107.Final.jar netty-transport-sctp/4.1.107.Final//netty-transport-sctp-4.1.107.Final.jar netty-transport-udt/4.1.107.Final//netty-transport-udt-4.1.107.Final.jar netty-transport/4.1.107.Final//netty-transport-4.1.107.Final.jar +osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar paranamer/2.8//paranamer-2.8.jar protobuf-java/3.21.7//protobuf-java-3.21.7.jar ratis-client/2.5.1//ratis-client-2.5.1.jar @@ -101,4 +135,10 @@ shims/0.9.32//shims-0.9.32.jar slf4j-api/1.7.36//slf4j-api-1.7.36.jar snakeyaml/2.2//snakeyaml-2.2.jar snappy-java/1.1.10.5//snappy-java-1.1.10.5.jar +swagger-annotations/2.2.1//swagger-annotations-2.2.1.jar +swagger-core/2.2.1//swagger-core-2.2.1.jar +swagger-integration/2.2.1//swagger-integration-2.2.1.jar +swagger-jaxrs2/2.2.1//swagger-jaxrs2-2.2.1.jar +swagger-models/2.2.1//swagger-models-2.2.1.jar +swagger-ui/4.9.1//swagger-ui-4.9.1.jar zstd-jni/1.5.2-1//zstd-jni-1.5.2-1.jar diff --git a/docs/configuration/master.md b/docs/configuration/master.md index 7a504441d..13f1d5c40 100644 --- a/docs/configuration/master.md +++ b/docs/configuration/master.md @@ -43,7 +43,9 @@ license: | | celeborn.master.heartbeat.worker.timeout | 120s | false | Worker heartbeat timeout. | 0.3.0 | celeborn.worker.heartbeat.timeout | | celeborn.master.host | <localhost> | false | Hostname for master to bind. | 0.2.0 | | | celeborn.master.http.host | <localhost> | false | Master's http host. | 0.4.0 | celeborn.metrics.master.prometheus.host,celeborn.master.metrics.prometheus.host | +| celeborn.master.http.maxWorkerThreads | 999 | false | Maximum number of threads in the master http worker thread pool. | 0.5.0 | | | celeborn.master.http.port | 9098 | false | Master's http port. | 0.4.0 | celeborn.metrics.master.prometheus.port,celeborn.master.metrics.prometheus.port | +| celeborn.master.http.stopTimeout | 5s | false | Master http server stop timeout. | 0.5.0 | | | celeborn.master.internal.port | 8097 | false | Internal port on the master where both workers and other master nodes connect. | 0.5.0 | | | celeborn.master.port | 9097 | false | Port for master to bind. | 0.2.0 | | | celeborn.master.rackResolver.refresh.interval | 30s | false | Interval for refreshing the node rack information periodically. | 0.5.0 | | diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 68ad3f494..484cf1b6d 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -83,7 +83,9 @@ license: | | celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | false | Whether to call sync method to save committed file infos into Level DB to handle OS crash. | 0.3.1 | | | celeborn.worker.graceful.shutdown.timeout | 600s | false | The worker's graceful shutdown timeout time. | 0.2.0 | | | celeborn.worker.http.host | <localhost> | false | Worker's http host. | 0.4.0 | celeborn.metrics.worker.prometheus.host,celeborn.worker.metrics.prometheus.host | +| celeborn.worker.http.maxWorkerThreads | 999 | false | Maximum number of threads in the worker http worker thread pool. | 0.5.0 | | | celeborn.worker.http.port | 9096 | false | Worker's http port. | 0.4.0 | celeborn.metrics.worker.prometheus.port,celeborn.worker.metrics.prometheus.port | +| celeborn.worker.http.stopTimeout | 5s | false | Worker http server stop timeout. | 0.5.0 | | | celeborn.worker.internal.port | 0 | false | Internal server port on the Worker where the master nodes connect. | 0.5.0 | | | celeborn.worker.jvmProfiler.enabled | false | false | Turn on code profiling via async_profiler in workers. | 0.5.0 | | | celeborn.worker.jvmProfiler.localDir | . | false | Local file system path on worker where profiler output is saved. Defaults to the working directory of the worker process. | 0.5.0 | | diff --git a/master/pom.xml b/master/pom.xml index a5260a1ce..4dd3ed11c 100644 --- a/master/pom.xml +++ b/master/pom.xml @@ -83,11 +83,27 @@ log4j-1.2-api + + org.apache.celeborn + celeborn-service_${scala.binary.version} + ${project.version} + test-jar + org.mockito mockito-core test + + org.glassfish.jersey.test-framework + jersey-test-framework-core + test + + + org.glassfish.jersey.test-framework.providers + jersey-test-framework-provider-jetty + test + diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala new file mode 100644 index 000000000..2093dee03 --- /dev/null +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResource.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.master.http.api + +import javax.ws.rs.{GET, Path, POST, QueryParam} +import javax.ws.rs.core.MediaType + +import io.swagger.v3.oas.annotations.media.Content +import io.swagger.v3.oas.annotations.responses.ApiResponse + +import org.apache.celeborn.server.common.http.api.ApiRequestContext + +@Path("/") +class ApiMasterResource extends ApiRequestContext { + + @Path("/masterGroupInfo") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = + "List master group information of the service. It will list all master's LEADER, FOLLOWER information.") + @GET + def masterGroupInfo: String = httpService.getMasterGroupInfo + + @Path("/lostWorkers") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = "List all lost workers of the master.") + @GET + def lostWorkers: String = httpService.getLostWorkers + + @Path("/excludedWorkers") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = "List all excluded workers of the master.") + @GET + def excludedWorkers: String = httpService.getExcludedWorkers + + @Path("/shutdownWorkers") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = "List all shutdown workers of the master.") + @GET + def shutdownWorkers: String = httpService.getShutdownWorkers + + @Path("/hostnames") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = "List all running application's LifecycleManager's hostnames of the cluster.") + @GET + def hostnames: String = httpService.getHostnameList + + @Path("/sendWorkerEvent") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = + "For Master(Leader) can send worker event to manager workers. Legal types are 'None', 'Immediately', 'Decommission', 'DecommissionThenIdle', 'Graceful', 'Recommission'") + @POST + def sendWorkerEvent( + @QueryParam("TYPE") eventType: String, + @QueryParam("WORKERS") workers: String): String = { + httpService.handleWorkerEvent(normalizeParam(eventType), normalizeParam(workers)) + } + + @Path("/workerEventInfo") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = "List all worker event infos of the master.") + @GET + def workerEventInfo: String = httpService.getWorkerEventInfo() + + @Path("/exclude") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = "List all worker event infos of the master.") + @POST + def excludeWorkers( + @QueryParam("ADD") addWorkers: String, + @QueryParam("REMOVE") removeWorkers: String): String = { + httpService.exclude(normalizeParam(addWorkers), normalizeParam(removeWorkers)) + } +} diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala new file mode 100644 index 000000000..f5e06e6a5 --- /dev/null +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApiMasterResourceSuite.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.master + +import javax.ws.rs.core.MediaType + +import com.google.common.io.Files + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.util.{CelebornExitKind, Utils} +import org.apache.celeborn.server.common.HttpService +import org.apache.celeborn.server.common.http.ApiBaseResourceSuite + +class ApiMasterResourceSuite extends ApiBaseResourceSuite { + private var master: Master = _ + + override protected def httpService: HttpService = master + + def getTmpDir(): String = { + val tmpDir = Files.createTempDir() + tmpDir.deleteOnExit() + tmpDir.getAbsolutePath + } + + override def beforeAll(): Unit = { + val randomMasterPort = Utils.selectRandomPort(1024, 65535) + val randomHttpPort = randomMasterPort + 1 + celebornConf.set(CelebornConf.HA_ENABLED.key, "false") + celebornConf.set(CelebornConf.HA_MASTER_RATIS_STORAGE_DIR.key, getTmpDir()) + celebornConf.set(CelebornConf.WORKER_STORAGE_DIRS.key, getTmpDir()) + celebornConf.set(CelebornConf.MASTER_HTTP_HOST.key, "127.0.0.1") + celebornConf.set(CelebornConf.MASTER_HTTP_PORT.key, randomHttpPort.toString) + + val args = Array("-h", "localhost", "-p", randomMasterPort.toString) + + val masterArgs = new MasterArguments(args, celebornConf) + master = new Master(celebornConf, masterArgs) + new Thread() { + override def run(): Unit = { + master.initialize() + } + }.start() + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + master.stop(CelebornExitKind.EXIT_IMMEDIATELY) + master.rpcEnv.shutdown() + } + + test("masterGroupInfo") { + val response = webTarget.path("masterGroupInfo").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("lostWorkers") { + val response = webTarget.path("lostWorkers").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("excludedWorkers") { + val response = webTarget.path("excludedWorkers").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("shutdownWorkers") { + val response = webTarget.path("shutdownWorkers").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("hostnames") { + val response = webTarget.path("hostnames").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("sendWorkerEvent") { + val response = webTarget.path("sendWorkerEvent") + .request(MediaType.TEXT_PLAIN) + .post(null) + assert(200 == response.getStatus) + } + + test("workerEventInfo") { + val response = webTarget.path("workerEventInfo").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("exclude") { + val response = webTarget.path("exclude").request(MediaType.TEXT_PLAIN).post(null) + assert(200 == response.getStatus) + } +} diff --git a/pom.xml b/pom.xml index fdd60e69f..00a718a4c 100644 --- a/pom.xml +++ b/pom.xml @@ -80,7 +80,6 @@ 1.3.9 1.44.0 32.1.3-jre - 3.1.0 4.13.2 1.8 2.17.2 @@ -107,6 +106,13 @@ 4.0.3 2.2.224 + + 2.2.1 + 4.9.1 + 2.39.1 + 9.4.52.v20230823 + 4.0.4 + org.apache.celeborn.shaded 3.0.0 @@ -355,11 +361,6 @@ protobuf-java ${protobuf.version} - - javax.servlet - javax.servlet-api - ${javaxservlet.version} - io.netty netty-all @@ -457,6 +458,105 @@ test + + + jakarta.servlet + jakarta.servlet-api + ${jakarta.servlet-api.version} + + + + org.glassfish.jersey.core + jersey-server + ${jersey.version} + + + jakarta.xml.bind + jakarta.xml.bind-api + + + + + + org.glassfish.jersey.containers + jersey-container-servlet-core + ${jersey.version} + + + + org.glassfish.jersey.inject + jersey-hk2 + ${jersey.version} + + + + org.glassfish.jersey.media + jersey-media-json-jackson + ${jersey.version} + + + + org.glassfish.jersey.media + jersey-media-multipart + ${jersey.version} + + + + org.glassfish.jersey.test-framework + jersey-test-framework-core + ${jersey.version} + + + jakarta.activation + jakarta.activation-api + + + + + + org.glassfish.jersey.test-framework.providers + jersey-test-framework-provider-jetty + ${jersey.version} + + + org.eclipse.jetty + jetty-util + + + org.eclipse.jetty + jetty-continuation + + + + + + io.swagger.core.v3 + swagger-jaxrs2 + ${swagger.version} + + + com.sun.activation + jakarta.activation + + + org.javassist + javassist + + + + + + + org.webjars + swagger-ui + ${swagger-ui.version} + + junit junit diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala index 0cd12ac3e..906ca112b 100644 --- a/project/CelebornBuild.scala +++ b/project/CelebornBuild.scala @@ -45,7 +45,6 @@ object Dependencies { val findbugsVersion = "1.3.9" val guavaVersion = "32.1.3-jre" val hadoopVersion = "3.3.6" - val javaxServletVersion = "3.1.0" val junitInterfaceVersion = "0.13.3" // don't forget update `junitInterfaceVersion` when we upgrade junit val junitVersion = "4.13.2" @@ -67,6 +66,11 @@ object Dependencies { val mybatisVersion = "3.5.15" val hikaricpVersion = "4.0.3" val h2Version = "2.2.224" + val swaggerVersion = "2.2.1" + val swaggerUiVersion = "4.9.1" + val jerseyVersion = "2.39.1" + val jettyVersion = "9.4.52.v20230823" + val jakartaServeletApiVersion = "4.0.4" // For SSL support val bouncycastleVersion = "1.77" @@ -105,7 +109,6 @@ object Dependencies { val ioDropwizardMetricsJvm = "io.dropwizard.metrics" % "metrics-jvm" % metricsVersion val ioNetty = "io.netty" % "netty-all" % nettyVersion excludeAll( ExclusionRule("io.netty", "netty-handler-ssl-ocsp")) - val javaxServletApi = "javax.servlet" % "javax.servlet-api" % javaxServletVersion val leveldbJniAll = "org.fusesource.leveldbjni" % "leveldbjni-all" % leveldbJniVersion val log4j12Api = "org.apache.logging.log4j" % "log4j-1.2-api" % log4j2Version val log4jSlf4jImpl = "org.apache.logging.log4j" % "log4j-slf4j-impl" % log4j2Version @@ -133,6 +136,23 @@ object Dependencies { val zstdJni = "com.github.luben" % "zstd-jni" % zstdJniVersion val mybatis = "org.mybatis" % "mybatis" % mybatisVersion val hikaricp = "com.zaxxer" % "HikariCP" % hikaricpVersion + val jettyServer = "org.eclipse.jetty" % "jetty-server" % jettyVersion excludeAll( + ExclusionRule("javax.servlet", "javax.servlet-api")) + val jettyServlet = "org.eclipse.jetty" % "jetty-servlet" % jettyVersion excludeAll( + ExclusionRule("javax.servlet", "javax.servlet-api")) + val jettyProxy = "org.eclipse.jetty" % "jetty-proxy" % jettyVersion + val jakartaServletApi = "jakarta.servlet" % "jakarta.servlet-api" % jakartaServeletApiVersion + val jerseyServer = "org.glassfish.jersey.core" % "jersey-server" % jerseyVersion excludeAll( + ExclusionRule("jakarta.xml.bind", "jakarta.xml.bind-api")) + val jerseyContainerServletCore = "org.glassfish.jersey.containers" % "jersey-container-servlet-core" % jerseyVersion + val jerseyHk2 = "org.glassfish.jersey.inject" % "jersey-hk2" % jerseyVersion + val jerseyMediaJsonJackson = "org.glassfish.jersey.media" % "jersey-media-json-jackson" % jerseyVersion + val jerseyMediaMultipart = "org.glassfish.jersey.media" % "jersey-media-multipart" % jerseyVersion + val swaggerJaxrs2 = "io.swagger.core.v3" % "swagger-jaxrs2" %swaggerVersion excludeAll( + ExclusionRule("com.sun.activation", "jakarta.activation"), + ExclusionRule("org.javassist", "javassist"), + ExclusionRule("jakarta.activation", "jakarta.activation-api")) + val swaggerUi = "org.webjars" % "swagger-ui" % swaggerUiVersion // Test dependencies // https://www.scala-sbt.org/1.x/docs/Testing.html @@ -143,6 +163,10 @@ object Dependencies { val scalatestMockito = "org.mockito" %% "mockito-scala-scalatest" % scalatestMockitoVersion val scalatest = "org.scalatest" %% "scalatest" % scalatestVersion val h2 = "com.h2database" % "h2" % h2Version + val jerseyTestFrameworkCore = "org.glassfish.jersey.test-framework" % "jersey-test-framework-core" % jerseyVersion + val jerseyTestFrameworkProviderJetty = "org.glassfish.jersey.test-framework.providers" % "jersey-test-framework-provider-jetty" % jerseyVersion excludeAll( + ExclusionRule("org.eclipse.jetty", "jetty-util"), + ExclusionRule("org.eclipse.jetty", "jetty-continuation")) // SSL support val bouncycastleBcprovJdk18on = "org.bouncycastle" % "bcprov-jdk18on" % bouncycastleVersion % "test" @@ -458,21 +482,34 @@ object CelebornService { Dependencies.findbugsJsr305, Dependencies.commonsIo, Dependencies.ioNetty, - Dependencies.javaxServletApi, Dependencies.commonsCrypto, Dependencies.slf4jApi, Dependencies.mybatis, Dependencies.hikaricp, + Dependencies.swaggerJaxrs2, + Dependencies.swaggerUi, + Dependencies.jakartaServletApi, + Dependencies.jerseyServer, + Dependencies.jerseyContainerServletCore, + Dependencies.jerseyHk2, + Dependencies.jerseyMediaJsonJackson, + Dependencies.jerseyMediaMultipart, + Dependencies.jettyServer, + Dependencies.jettyServlet, + Dependencies.jettyProxy, Dependencies.log4jSlf4jImpl % "test", Dependencies.log4j12Api % "test", - Dependencies.h2 % "test" + Dependencies.h2 % "test", + Dependencies.jerseyTestFrameworkCore % "test", + Dependencies.jerseyTestFrameworkProviderJetty % "test" ) ++ commonUnitTestDependencies ) } object CelebornMaster { lazy val master = Project("celeborn-master", file("master")) - .dependsOn(CelebornCommon.common, CelebornService.service) + .dependsOn(CelebornCommon.common) + .dependsOn(CelebornService.service % "test->test;compile->compile") .settings ( commonSettings, protoSettings, @@ -497,6 +534,7 @@ object CelebornWorker { lazy val worker = Project("celeborn-worker", file("worker")) .dependsOn(CelebornService.service) .dependsOn(CelebornCommon.common % "test->test;compile->compile") + .dependsOn(CelebornService.service % "test->test;compile->compile") .dependsOn(CelebornClient.client % "test->compile") .dependsOn(CelebornMaster.master % "test->compile") .settings ( @@ -516,7 +554,9 @@ object CelebornWorker { Dependencies.leveldbJniAll, Dependencies.roaringBitmap, Dependencies.rocksdbJni, - Dependencies.scalatestMockito % "test" + Dependencies.scalatestMockito % "test", + Dependencies.jerseyTestFrameworkCore % "test", + Dependencies.jerseyTestFrameworkProviderJetty % "test" ) ++ commonUnitTestDependencies ) } @@ -724,7 +764,9 @@ trait SparkClientProjects { libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "test", "org.apache.spark" %% "spark-sql" % sparkVersion % "test", - "org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier "tests" excludeAll( + ExclusionRule("org.glassfish.jersey.inject", "*"), + ExclusionRule("org.glassfish.jersey.core", "*")), "org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests" ) ++ commonUnitTestDependencies ) diff --git a/service/pom.xml b/service/pom.xml index faf80a142..b6d0da739 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -29,6 +29,36 @@ jar Celeborn Service + + + + + org.eclipse.jetty + jetty-server + ${jetty.version} + + + + javax.servlet + javax.servlet-api + + + + + org.eclipse.jetty + jetty-servlet + ${jetty.version} + + + org.eclipse.jetty + jetty-proxy + ${jetty.version} + + + + org.apache.celeborn @@ -43,10 +73,6 @@ io.netty netty-all - - javax.servlet - javax.servlet-api - commons-io commons-io @@ -75,6 +101,62 @@ test + + + io.swagger.core.v3 + swagger-jaxrs2 + + + + org.webjars + swagger-ui + + + + jakarta.servlet + jakarta.servlet-api + + + + org.glassfish.jersey.core + jersey-server + + + + org.glassfish.jersey.containers + jersey-container-servlet-core + + + + org.glassfish.jersey.inject + jersey-hk2 + + + + org.glassfish.jersey.media + jersey-media-json-jackson + + + + org.glassfish.jersey.media + jersey-media-multipart + + + + org.eclipse.jetty + jetty-server + + + + org.eclipse.jetty + jetty-servlet + + + + org.eclipse.jetty + jetty-proxy + + org.mockito @@ -91,5 +173,15 @@ log4j-1.2-api test + + org.glassfish.jersey.test-framework + jersey-test-framework-core + test + + + org.glassfish.jersey.test-framework.providers + jersey-test-framework-provider-jetty + test + diff --git a/service/src/main/resources/org/apache/celeborn/swagger/index.html b/service/src/main/resources/org/apache/celeborn/swagger/index.html new file mode 100644 index 000000000..2e0c4c687 --- /dev/null +++ b/service/src/main/resources/org/apache/celeborn/swagger/index.html @@ -0,0 +1,73 @@ + + + + + + + Apache Celeborn(Incubating) REST API Documentation + + + + + + +
+ + + + + + diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala b/service/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala similarity index 96% rename from common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala rename to service/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala index 82311bea9..ab919bb14 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala +++ b/service/src/main/scala/org/apache/celeborn/common/metrics/MetricsSystem.scala @@ -25,11 +25,12 @@ import scala.collection.mutable.ArrayBuffer import scala.util.matching.Regex import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry} +import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.CelebornConf.{METRICS_JSON_PATH, METRICS_PROMETHEUS_PATH} import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.common.metrics.sink.{JsonServlet, PrometheusServlet, ServletHttpRequestHandler, Sink} +import org.apache.celeborn.common.metrics.sink.{JsonServlet, PrometheusServlet, Sink} import org.apache.celeborn.common.metrics.source.Source import org.apache.celeborn.common.util.Utils @@ -51,7 +52,7 @@ class MetricsSystem( metricsConfig.initialize() - def getServletHandlers: Array[ServletHttpRequestHandler] = { + def getServletContextHandlers: Array[ServletContextHandler] = { require(running, "Can only call getServletHandlers on a running MetricsSystem") prometheusServlet.map(_.getHandlers(conf)).getOrElse(Array()) ++ jsonServlet.map(_.getHandlers(conf)).getOrElse(Array()) @@ -139,7 +140,7 @@ class MetricsSystem( prometheusServlet = Some(servlet.newInstance( kv._2, registry, - sources.asScala, + sources.asScala.toSeq, prometheusServletPath).asInstanceOf[PrometheusServlet]) } else if (kv._1 == "jsonServlet") { val servlet = Utils.classForName(classPath) @@ -152,7 +153,7 @@ class MetricsSystem( jsonServlet = Some(servlet.newInstance( kv._2, registry, - sources.asScala, + sources.asScala.toSeq, jsonServletPath, conf.metricsJsonPrettyEnabled.asInstanceOf[Object]).asInstanceOf[JsonServlet]) } else { diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala similarity index 77% rename from common/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala rename to service/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala index 325826164..9b9ba7e12 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala +++ b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/AbstractServlet.scala @@ -16,17 +16,19 @@ */ package org.apache.celeborn.common.metrics.sink +import org.eclipse.jetty.servlet.ServletContextHandler + import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.metrics.source.Source abstract class AbstractServlet(sources: Seq[Source]) extends Sink with Logging { - def getHandlers(conf: CelebornConf): Array[ServletHttpRequestHandler] = { - Array[ServletHttpRequestHandler]( - createHttpRequestHandler()) + def getHandlers(conf: CelebornConf): Array[ServletContextHandler] = { + Array[ServletContextHandler]( + createServletHandler()) } - def createHttpRequestHandler(): ServletHttpRequestHandler + def createServletHandler(): ServletContextHandler def getMetricsSnapshot: String = { sources.map(_.getMetrics).mkString @@ -38,11 +40,3 @@ abstract class AbstractServlet(sources: Seq[Source]) extends Sink with Logging { override def report(): Unit = {} } - -abstract class ServletHttpRequestHandler(path: String) extends Logging { - - def handleRequest(uri: String): String - - def getServletPath(): String = path - -} diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala similarity index 95% rename from common/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala rename to service/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala index 7a2b8b52c..4ed2c5a3a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala +++ b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/JsonServlet.scala @@ -24,10 +24,12 @@ import scala.collection.mutable.ArrayBuffer import com.codahale.metrics.MetricRegistry import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} -import io.netty.channel.ChannelHandler.Sharable +import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.celeborn.common.metrics.{CelebornHistogram, CelebornTimer, ResettableSlidingWindowReservoir} -import org.apache.celeborn.common.metrics.source.{AbstractSource, NamedCounter, NamedGauge, NamedHistogram, NamedTimer, Source} +import org.apache.celeborn.common.metrics.source._ +import org.apache.celeborn.server.common.http.HttpUtils +import org.apache.celeborn.server.common.http.HttpUtils.ServletParams object JsonConverter { val mapper = new ObjectMapper() with ClassTagExtensions @@ -72,8 +74,10 @@ class JsonServlet( } } - override def createHttpRequestHandler(): ServletHttpRequestHandler = { - new JsonHttpRequestHandler(servletPath, this) + override def createServletHandler(): ServletContextHandler = { + HttpUtils.createServletHandler( + servletPath, + new ServletParams(_ => getMetricsSnapshot, "text/json")) } override def stop(): Unit = {} @@ -344,12 +348,3 @@ class JsonServlet( } } } - -@Sharable -class JsonHttpRequestHandler(path: String, jsonServlet: JsonServlet) - extends ServletHttpRequestHandler(path) { - - override def handleRequest(uri: String): String = { - jsonServlet.getMetricsSnapshot - } -} diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala similarity index 73% rename from common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala rename to service/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala index 4b4548421..27797c8fc 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala +++ b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala @@ -20,9 +20,11 @@ package org.apache.celeborn.common.metrics.sink import java.util.Properties import com.codahale.metrics.MetricRegistry -import io.netty.channel.ChannelHandler.Sharable +import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.celeborn.common.metrics.source.Source +import org.apache.celeborn.server.common.http.HttpUtils +import org.apache.celeborn.server.common.http.HttpUtils.ServletParams class PrometheusServlet( val property: Properties, @@ -30,17 +32,9 @@ class PrometheusServlet( val sources: Seq[Source], val servletPath: String) extends AbstractServlet(sources) { - override def createHttpRequestHandler(): ServletHttpRequestHandler = { - new PrometheusHttpRequestHandler(servletPath, this) - } -} - -@Sharable -class PrometheusHttpRequestHandler( - path: String, - prometheusServlet: PrometheusServlet) extends ServletHttpRequestHandler(path) { - - override def handleRequest(uri: String): String = { - prometheusServlet.getMetricsSnapshot + override def createServletHandler(): ServletContextHandler = { + HttpUtils.createServletHandler( + servletPath, + new ServletParams(_ => getMetricsSnapshot, "text/plain")) } } diff --git a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala index f13752b81..18e016187 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala +++ b/service/src/main/scala/org/apache/celeborn/server/common/HttpService.scala @@ -23,7 +23,8 @@ import scala.collection.JavaConverters._ import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.server.common.http.{HttpRequestHandler, HttpServer, HttpServerInitializer} +import org.apache.celeborn.server.common.http.HttpServer +import org.apache.celeborn.server.common.http.api.ApiRootResource import org.apache.celeborn.server.common.service.config.ConfigLevel abstract class HttpService extends Service with Logging { @@ -173,18 +174,20 @@ abstract class HttpService extends Service with Logging { def getWorkerEventInfo(): String = throw new UnsupportedOperationException() def startHttpServer(): Unit = { - val handlers = - if (metricsSystem.running) { - new HttpRequestHandler(this, metricsSystem.getServletHandlers) - } else { - new HttpRequestHandler(this, null) - } - httpServer = new HttpServer( + httpServer = HttpServer( serviceName, httpHost(), httpPort(), - new HttpServerInitializer(handlers)) + httpMaxWorkerThreads(), + httpStopTimeout()) httpServer.start() + startInternal() + // block until the HTTP server is started, otherwise, we may get + // the wrong HTTP server port -1 + while (httpServer.getState != "STARTED") { + logInfo(s"Waiting for $serviceName's HTTP server getting started") + Thread.sleep(1000) + } } private def httpHost(): String = { @@ -205,6 +208,41 @@ abstract class HttpService extends Service with Logging { } } + private def httpMaxWorkerThreads(): Int = { + serviceName match { + case Service.MASTER => + conf.masterHttpMaxWorkerThreads + case Service.WORKER => + conf.workerHttpMaxWorkerThreads + } + } + + private def httpStopTimeout(): Long = { + serviceName match { + case Service.MASTER => + conf.masterHttpStopTimeout + case Service.WORKER => + conf.workerHttpStopTimeout + } + } + + def connectionUrl: String = { + httpServer.getServerUri + } + + protected def startInternal(): Unit = { + httpServer.addHandler(ApiRootResource.getServletHandler(this)) + httpServer.addStaticHandler("META-INF/resources/webjars/swagger-ui/4.9.1/", "/swagger-static/") + httpServer.addStaticHandler("org/apache/celeborn/swagger", "/swagger") + httpServer.addRedirectHandler("/help", "/swagger") + httpServer.addRedirectHandler("/docs", "/swagger") + if (metricsSystem.running) { + metricsSystem.getServletContextHandlers.foreach { handler => + httpServer.addHandler(handler) + } + } + } + override def initialize(): Unit = { super.initialize() startHttpServer() diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala deleted file mode 100644 index 778355873..000000000 --- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpEndpoint.scala +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.celeborn.server.common.http - -import org.apache.celeborn.server.common.{HttpService, Service} -import org.apache.celeborn.server.common.service.config.ConfigLevel - -/** - * HTTP endpoints of Rest API providers. - */ -trait HttpEndpoint { - def path: String - - def description(service: String): String - - def handle(service: HttpService, parameters: Map[String, String]): String -} - -case object Conf extends HttpEndpoint { - override def path: String = "/conf" - - override def description(service: String): String = s"List the conf setting of the $service." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.getConf -} - -case object ListDynamicConfigs extends HttpEndpoint { - override def path: String = "/listDynamicConfigs" - - override def description(service: String): String = s"List the dynamic configs of the $service. " + - s"The parameter level specifies the config level of dynamic configs. " + - s"The parameter tenant specifies the tenant id of ${ConfigLevel.TENANT.name()} or ${ConfigLevel.TENANT_USER.name()} level. " + - s"The parameter name specifies the user name of ${ConfigLevel.TENANT_USER.name()} level. " + - s"Meanwhile, either none or all of the parameter tenant and name are specified for ${ConfigLevel.TENANT_USER.name()} level." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.getDynamicConfigs( - parameters.getOrElse("LEVEL", "").trim, - parameters.getOrElse("TENANT", "").trim, - parameters.getOrElse("NAME", "").trim) -} - -case object WorkerInfo extends HttpEndpoint { - override def path: String = "/workerInfo" - - override def description(service: String): String = { - if (service == Service.MASTER) - "List worker information of the service. It will list all registered workers 's information." - else "List the worker information of the worker." - } - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.getWorkerInfo -} - -case object ThreadDump extends HttpEndpoint { - override def path: String = "/threadDump" - - override def description(service: String): String = - s"List the current thread dump of the $service." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.getThreadDump -} - -case object Shuffles extends HttpEndpoint { - override def path: String = "/shuffles" - - override def description(service: String): String = { - if (service == Service.MASTER) - "List all running shuffle keys of the service. It will return all running shuffle's key of the cluster." - else - "List all the running shuffle keys of the worker. It only return keys of shuffles running in that worker." - } - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.getShuffleList -} - -case object Applications extends HttpEndpoint { - override def path: String = "/applications" - - override def description(service: String): String = - if (service == Service.MASTER) - "List all running application's ids of the cluster." - else - "List all running application's ids of the worker. It only return application ids running in that worker." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.getApplicationList -} - -case object ListTopDiskUsedApps extends HttpEndpoint { - override def path: String = "/listTopDiskUsedApps" - - override def description(service: String): String = { - if (service == Service.MASTER) - "List the top disk usage application ids. It will return the top disk usage application ids for the cluster." - else - "List the top disk usage application ids. It only return application ids running in that worker." - } - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.listTopDiskUseApps -} - -case object Help extends HttpEndpoint { - override def path: String = "/help" - - override def description(service: String): String = - s"List the available API providers of the $service." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - HttpUtils.help(service.serviceName) -} - -case object Invalid extends HttpEndpoint { - - val invalid = "invalid" - - override def path: String = None.toString - - override def description(service: String): String = s"Invalid uri of the $service." - - override def handle(service: HttpService, parameters: Map[String, String]): String = invalid -} - -case object MasterGroupInfo extends HttpEndpoint { - override def path: String = "/masterGroupInfo" - - override def description(service: String): String = - "List master group information of the service. It will list all master's LEADER, FOLLOWER information." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.getMasterGroupInfo -} - -case object LostWorkers extends HttpEndpoint { - override def path: String = "/lostWorkers" - - override def description(service: String): String = "List all lost workers of the master." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.getLostWorkers -} - -case object ExcludedWorkers extends HttpEndpoint { - override def path: String = "/excludedWorkers" - - override def description(service: String): String = "List all excluded workers of the master." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.getExcludedWorkers -} - -case object ShutdownWorkers extends HttpEndpoint { - override def path: String = "/shutdownWorkers" - - override def description(service: String): String = "List all shutdown workers of the master." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.getShutdownWorkers -} - -case object Hostnames extends HttpEndpoint { - override def path: String = "/hostnames" - - override def description(service: String): String = - "List all running application's LifecycleManager's hostnames of the cluster." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.getHostnameList -} - -case object Exclude extends HttpEndpoint { - override def path: String = "/exclude" - - override def description(service: String): String = - "Excluded workers of the master add or remove the worker manually given worker id. The parameter add or remove specifies the excluded workers to add or remove, which value is separated by commas." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.exclude(parameters.getOrElse("ADD", "").trim, parameters.getOrElse("REMOVE", "").trim) -} - -case object ListPartitionLocationInfo extends HttpEndpoint { - override def path: String = "/listPartitionLocationInfo" - - override def description(service: String): String = - "List all the living PartitionLocation information in that worker." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.listPartitionLocationInfo -} - -case object UnavailablePeers extends HttpEndpoint { - override def path: String = "/unavailablePeers" - - override def description(service: String): String = - "List the unavailable peers of the worker, this always means the worker connect to the peer failed." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.getUnavailablePeers -} - -case object IsShutdown extends HttpEndpoint { - override def path: String = "/isShutdown" - - override def description(service: String): String = - "Show if the worker is during the process of shutdown." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.isShutdown -} - -case object IsRegistered extends HttpEndpoint { - override def path: String = "/isRegistered" - - override def description(service: String): String = - "Show if the worker is registered to the master success." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.isRegistered -} - -case object Exit extends HttpEndpoint { - override def path: String = "/exit" - - override def description(service: String): String = - "Trigger this worker to exit. Legal types are 'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.exit(parameters.getOrElse("TYPE", "")) -} - -case object SendWorkerEvent extends HttpEndpoint { - override def path: String = "/sendWorkerEvent" - - override def description(service: String): String = - "For Master(Leader) can send worker event to manager workers. Legal types are 'None', 'Immediately', 'Decommission', 'DecommissionThenIdle', 'Graceful', 'Recommission'" - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.handleWorkerEvent(parameters.getOrElse("TYPE", ""), parameters.getOrElse("WORKERS", "")) -} - -case object WorkerEventInfo extends HttpEndpoint { - override def path: String = "/workerEventInfo" - - override def description(service: String): String = - "List all worker event infos of the master." - - override def handle(service: HttpService, parameters: Map[String, String]): String = - service.getWorkerEventInfo() -} diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala deleted file mode 100644 index 531f19454..000000000 --- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpRequestHandler.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.celeborn.server.common.http - -import io.netty.buffer.Unpooled -import io.netty.channel.{ChannelFutureListener, ChannelHandlerContext, SimpleChannelInboundHandler} -import io.netty.channel.ChannelHandler.Sharable -import io.netty.handler.codec.http._ -import io.netty.util.CharsetUtil - -import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.common.metrics.sink.{JsonHttpRequestHandler, ServletHttpRequestHandler} -import org.apache.celeborn.server.common.HttpService - -/** - * A handler for the REST API that defines how to handle the HTTP request given a message. - * - * @param service The service of HTTP server. - * @param uri The uri of HTTP request. - */ -@Sharable -class HttpRequestHandler( - service: HttpService, - servletHttpRequestHandlers: Array[ServletHttpRequestHandler]) - extends SimpleChannelInboundHandler[FullHttpRequest] with Logging { - - override def channelReadComplete(ctx: ChannelHandlerContext): Unit = { - ctx.flush() - } - - override def channelRead0(ctx: ChannelHandlerContext, req: FullHttpRequest): Unit = { - val uri = req.uri() - val (path, parameters) = HttpUtils.parseUri(uri) - val msg = HttpUtils.handleRequest(service, path, parameters) - val textType = "text/plain; charset=UTF-8" - val jsonType = "application/json" - val (response, contentType) = msg match { - case Invalid.invalid => - if (servletHttpRequestHandlers != null) { - servletHttpRequestHandlers.find(servlet => - uri == servlet.getServletPath()).map { - case jsonHandler: JsonHttpRequestHandler => - (jsonHandler.handleRequest(uri), jsonType) - case handler: ServletHttpRequestHandler => - (handler.handleRequest(uri), textType) - }.getOrElse((s"Unknown path $uri!", textType)) - } else { - ( - s"${Invalid.description(service.serviceName)} ${HttpUtils.help(service.serviceName)}", - textType) - } - case _ => (msg, textType) - } - - val res = new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, - HttpResponseStatus.OK, - Unpooled.copiedBuffer(response, CharsetUtil.UTF_8)) - res.headers().set(HttpHeaderNames.CONTENT_TYPE, contentType) - ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE) - } -} diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala index a45ff109a..336132576 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala +++ b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpServer.scala @@ -17,75 +17,108 @@ package org.apache.celeborn.server.common.http -import java.net.InetSocketAddress -import java.util.concurrent.TimeUnit - -import io.netty.bootstrap.ServerBootstrap -import io.netty.channel.{ChannelFuture, ChannelInitializer} -import io.netty.channel.socket.nio.NioServerSocketChannel -import io.netty.handler.logging.{LoggingHandler, LogLevel} +import org.apache.commons.lang3.SystemUtils +import org.eclipse.jetty.server.{Handler, HttpConfiguration, HttpConnectionFactory, Server, ServerConnector} +import org.eclipse.jetty.server.handler.{ContextHandlerCollection, ErrorHandler} +import org.eclipse.jetty.util.component.LifeCycle +import org.eclipse.jetty.util.thread.{QueuedThreadPool, ScheduledExecutorScheduler} import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.common.network.util.{IOMode, NettyUtils} -import org.apache.celeborn.common.util.{CelebornExitKind, Utils} +import org.apache.celeborn.common.util.CelebornExitKind -class HttpServer( +private[celeborn] case class HttpServer( role: String, - host: String, - port: Int, - channelInitializer: ChannelInitializer[_]) extends Logging { + server: Server, + connector: ServerConnector, + rootHandler: ContextHandlerCollection) extends Logging { - private var bootstrap: ServerBootstrap = _ - private var bindFuture: ChannelFuture = _ @volatile private var isStarted = false @throws[Exception] def start(): Unit = synchronized { - val boss = NettyUtils.createEventLoop(IOMode.NIO, 1, role + "-http-boss") - val worker = NettyUtils.createEventLoop(IOMode.NIO, 2, role + "-http-worker") - bootstrap = new ServerBootstrap - bootstrap - .group(boss, worker) - .handler(new LoggingHandler(LogLevel.DEBUG)) - .channel(classOf[NioServerSocketChannel]) - .childHandler(channelInitializer) - - val address = new InetSocketAddress(host, port) - bindFuture = bootstrap.bind(address).sync - bindFuture.syncUninterruptibly() - logInfo(s"$role: HttpServer started on ${address.getHostString}:$port.") - isStarted = true + try { + server.start() + connector.start() + server.addConnector(connector) + logInfo(s"$role: HttpServer started on ${connector.getHost}:${connector.getPort}.") + isStarted = true + } catch { + case e: Exception => + stop(CelebornExitKind.EXIT_IMMEDIATELY) + throw e + } } def stop(exitCode: Int): Unit = synchronized { if (isStarted) { + if (exitCode == CelebornExitKind.EXIT_IMMEDIATELY) { + server.setStopTimeout(0) + } logInfo(s"$role: Stopping HttpServer") - if (bindFuture != null) { - // close is a local operation and should finish within milliseconds; timeout just to be safe - bindFuture.channel.close.awaitUninterruptibly(10, TimeUnit.SECONDS) - bindFuture = null + server.stop() + connector.stop() + server.getThreadPool match { + case lifeCycle: LifeCycle => lifeCycle.stop() + case _ => } - if (bootstrap != null && bootstrap.config.group != null) { - Utils.tryLogNonFatalError { - if (exitCode == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) { - bootstrap.config.group.shutdownGracefully(3, 5, TimeUnit.SECONDS) - } else { - bootstrap.config.group.shutdownGracefully(0, 0, TimeUnit.SECONDS) - } - } - } - if (bootstrap != null && bootstrap.config.childGroup != null) { - Utils.tryLogNonFatalError { - if (exitCode == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) { - bootstrap.config.childGroup.shutdownGracefully(3, 5, TimeUnit.SECONDS) - } else { - bootstrap.config.childGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS) - } - } - } - bootstrap = null logInfo(s"$role: HttpServer stopped.") isStarted = false } } + def getServerUri: String = connector.getHost + ":" + connector.getLocalPort + + def addHandler(handler: Handler): Unit = synchronized { + rootHandler.addHandler(handler) + if (!handler.isStarted) handler.start() + } + + def addStaticHandler( + resourceBase: String, + contextPath: String): Unit = { + addHandler(HttpUtils.createStaticHandler(resourceBase, contextPath)) + } + + def addRedirectHandler( + src: String, + dest: String): Unit = { + addHandler(HttpUtils.createRedirectHandler(src, dest)) + } + + def getState: String = server.getState +} + +object HttpServer { + + def apply(role: String, host: String, port: Int, poolSize: Int, stopTimeout: Long): HttpServer = { + val pool = new QueuedThreadPool(poolSize) + pool.setName(s"$role-JettyThreadPool") + pool.setDaemon(true) + val server = new Server(pool) + server.setStopTimeout(stopTimeout) + + val errorHandler = new ErrorHandler() + errorHandler.setShowStacks(true) + errorHandler.setServer(server) + server.addBean(errorHandler) + + val collection = new ContextHandlerCollection + server.setHandler(collection) + + val serverExecutor = new ScheduledExecutorScheduler(s"$role-JettyScheduler", true) + val httpConf = new HttpConfiguration() + val connector = new ServerConnector( + server, + null, + serverExecutor, + null, + -1, + -1, + new HttpConnectionFactory(httpConf)) + connector.setHost(host) + connector.setPort(port) + connector.setReuseAddress(!SystemUtils.IS_OS_WINDOWS) + connector.setAcceptQueueSize(math.min(connector.getAcceptors, 8)) + + new HttpServer(role, server, connector, collection) + } } diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala index 4ccb127e5..b9b6a3ec2 100644 --- a/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala +++ b/service/src/main/scala/org/apache/celeborn/server/common/http/HttpUtils.scala @@ -18,78 +18,111 @@ package org.apache.celeborn.server.common.http import java.net.URL -import java.util.Locale +import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} -import org.apache.celeborn.server.common.{HttpService, Service} +import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder} -object HttpUtils { +import org.apache.celeborn.common.exception.CelebornException +import org.apache.celeborn.common.internal.Logging - private val baseEndpoints: List[HttpEndpoint] = - List( - Conf, - ListDynamicConfigs, - WorkerInfo, - ThreadDump, - Shuffles, - Applications, - ListTopDiskUsedApps, - Help) - private val masterEndpoints: List[HttpEndpoint] = List( - MasterGroupInfo, - LostWorkers, - ExcludedWorkers, - ShutdownWorkers, - Hostnames, - SendWorkerEvent, - WorkerEventInfo, - Exclude) ++ baseEndpoints - private val workerEndpoints: List[HttpEndpoint] = - List( - ListPartitionLocationInfo, - UnavailablePeers, - IsShutdown, - IsRegistered, - Exit) ++ baseEndpoints +private[celeborn] object HttpUtils extends Logging { + // Base type for a function that returns something based on an HTTP request. Allows for + // implicit conversion from many types of functions to jetty Handlers. + type Responder[T] = HttpServletRequest => T - def parseUri(uri: String): (String, Map[String, String]) = { - val url = new URL(s"https://127.0.0.1:9000$uri") - val parameter = - if (url.getQuery == null) { - Map.empty[String, String] - } else { - url.getQuery - .split("&") - .map(_.split("=")) - .map(arr => arr(0).toUpperCase(Locale.ROOT) -> arr(1)).toMap - } - (url.getPath, parameter) - } + class ServletParams[T <: AnyRef]( + val responder: Responder[T], + val contentType: String, + val extractFn: T => String = (in: Any) => in.toString) {} - def handleRequest( - service: HttpService, + /** Create a context handler that responds to a request with the given path prefix */ + def createServletHandler[T <: AnyRef]( path: String, - parameters: Map[String, String]): String = { - endpoints(service.serviceName).find(endpoint => endpoint.path == path).orElse( - Some(Invalid)).get.handle( - service, - parameters) + servletParams: ServletParams[T]): ServletContextHandler = { + createServletHandler(path, createServlet(servletParams)) } - def help(service: String): String = { - val sb = new StringBuilder - sb.append("Available API providers include:\n") - val httpEndpoints: List[HttpEndpoint] = endpoints(service) - val maxLength = httpEndpoints.map(_.path.length).max - httpEndpoints.sortBy(_.path).foreach(endpoint => - sb.append( - s"${endpoint.path.padTo(maxLength, " ").mkString} ${endpoint.description(service)}\n")) - sb.toString + private def createServlet[T <: AnyRef](servletParams: ServletParams[T]): HttpServlet = { + new HttpServlet { + override def doGet(request: HttpServletRequest, response: HttpServletResponse): Unit = { + try { + response.setContentType("%s;charset=utf-8".format(servletParams.contentType)) + response.setStatus(HttpServletResponse.SC_OK) + val result = servletParams.responder(request) + response.getWriter.print(servletParams.extractFn(result)) + } catch { + case e: IllegalArgumentException => + response.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage) + case e: Exception => + logWarning(s"GET ${request.getRequestURI} failed: $e", e) + throw e + } + } + + override protected def doTrace(req: HttpServletRequest, res: HttpServletResponse): Unit = { + res.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED) + } + } } - private def endpoints(service: String): List[HttpEndpoint] = { - if (service == Service.MASTER) - masterEndpoints - else - workerEndpoints + /** + * Create a handler for serving files from a static directory + * + * @param resourceBase the resource directory contains static resource files + * @param contextPath the content path to set for the handler + * @return a static [[ServletContextHandler]] + */ + def createStaticHandler( + resourceBase: String, + contextPath: String): ServletContextHandler = { + val contextHandler = new ServletContextHandler() + val holder = new ServletHolder(classOf[DefaultServlet]) + Option(Thread.currentThread().getContextClassLoader.getResource(resourceBase)) match { + case Some(res) => + holder.setInitParameter("resourceBase", res.toString) + case None => + throw new CelebornException("Could not find resource path for Web UI: " + resourceBase) + } + contextHandler.setContextPath(contextPath) + contextHandler.addServlet(holder, "/") + contextHandler + } + + def createServletHandler(contextPath: String, servlet: HttpServlet): ServletContextHandler = { + val handler = new ServletContextHandler() + val holder = new ServletHolder(servlet) + handler.setContextPath(contextPath) + handler.addServlet(holder, "/") + handler + } + + def createRedirectHandler(src: String, dest: String): ServletContextHandler = { + val redirectedServlet = new HttpServlet { + private def doReq(req: HttpServletRequest, resp: HttpServletResponse): Unit = { + val newURL = new URL(new URL(req.getRequestURL.toString), dest).toString + resp.sendRedirect(newURL) + } + override def doGet(req: HttpServletRequest, resp: HttpServletResponse): Unit = { + doReq(req, resp) + } + + override def doPut(req: HttpServletRequest, resp: HttpServletResponse): Unit = { + doReq(req, resp) + } + + override def doPost(req: HttpServletRequest, resp: HttpServletResponse): Unit = { + doReq(req, resp) + } + + override def doDelete(req: HttpServletRequest, resp: HttpServletResponse): Unit = { + doReq(req, resp) + } + + override protected def doTrace(req: HttpServletRequest, resp: HttpServletResponse): Unit = { + resp.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED) + } + } + + createServletHandler(src, redirectedServlet) } } diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiBaseResource.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiBaseResource.scala new file mode 100644 index 000000000..45a5d2796 --- /dev/null +++ b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiBaseResource.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.http.api + +import javax.ws.rs.{GET, Path, Produces, QueryParam} +import javax.ws.rs.core.MediaType + +import io.swagger.v3.oas.annotations.media.Content +import io.swagger.v3.oas.annotations.responses.ApiResponse + +@Path("/") +private[api] class ApiBaseResource extends ApiRequestContext { + def service: String = httpService.serviceName + + @GET + @Path("ping") + @Produces(Array(MediaType.TEXT_PLAIN)) + def ping(): String = "pong" + + @Path("/conf") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = "List the conf setting.") + @GET + def conf: String = httpService.getConf + + @Path("/listDynamicConfigs") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = "List the dynamic configs. " + + "The parameter level specifies the config level of dynamic configs. " + + "The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. " + + "The parameter name specifies the user name of TENANT_USER level. " + + "Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level.") + @GET + def listDynamicConfigs( + @QueryParam("LEVEL") level: String, + @QueryParam("TENANT") tenant: String, + @QueryParam("NAME") name: String): String = { + httpService.getDynamicConfigs( + normalizeParam(level), + normalizeParam(tenant), + normalizeParam(name)) + } + + @Path("/workerInfo") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = + "For MASTER: List worker information of the service. It will list all registered workers 's information.\n" + + "For WORKER: List the worker information of the worker.") + @GET + def workerInfo(): String = { + httpService.getWorkerInfo + } + + @Path("/threadDump") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = "List the current thread dump.") + @GET + def threadDump(): String = { + httpService.getThreadDump + } + + @Path("shuffle") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = + "For MASTER: List all running shuffle keys of the service. It will return all running shuffle's key of the cluster.\n" + + "For WORKER: List all the running shuffle keys of the worker. It only return keys of shuffles running in that worker.") + @GET + def shuffles(): String = { + httpService.getShuffleList + } + + @Path("applications") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = + "For MASTER: List all running application's ids of the cluster.\n" + + "For WORKER: List all running application's ids of the worker. It only return application ids running in that worker.") + @GET + def applications(): String = { + httpService.getApplicationList + } + + @Path("listTopDiskUsedApps") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = + "For MASTER: List the top disk usage application ids. It will return the top disk usage application ids for the cluster.\n" + + "For WORKER: List the top disk usage application ids. It only return application ids running in that worker.") + @GET + def listTopDiskUsedApps(): String = { + httpService.listTopDiskUseApps + } +} diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala new file mode 100644 index 000000000..2c293bf29 --- /dev/null +++ b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRequestContext.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.http.api + +import javax.servlet.ServletContext +import javax.servlet.http.HttpServletRequest +import javax.ws.rs.core.Context + +import org.eclipse.jetty.server.handler.ContextHandler + +import org.apache.celeborn.server.common.HttpService + +private[celeborn] trait ApiRequestContext { + @Context + protected var servletContext: ServletContext = _ + + @Context + protected var httpRequest: HttpServletRequest = _ + + final protected def httpService: HttpService = HttpServiceContext.get(servletContext) + + protected def normalizeParam(param: String): String = Option(param).map(_.trim).getOrElse("") +} + +private[celeborn] object HttpServiceContext { + private val attribute = getClass.getCanonicalName + + def set(contextHandler: ContextHandler, rs: HttpService): Unit = { + contextHandler.setAttribute(attribute, rs) + } + + def get(context: ServletContext): HttpService = { + context.getAttribute(attribute).asInstanceOf[HttpService] + } +} diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRootResource.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRootResource.scala new file mode 100644 index 000000000..6d69c1b58 --- /dev/null +++ b/service/src/main/scala/org/apache/celeborn/server/common/http/api/ApiRootResource.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.http.api + +import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} +import org.glassfish.jersey.server.ResourceConfig +import org.glassfish.jersey.servlet.ServletContainer + +import org.apache.celeborn.server.common.HttpService + +private[celeborn] object ApiRootResource { + def getServletHandler(rs: HttpService): ServletContextHandler = { + val openapiConf: ResourceConfig = new OpenAPIConfig + val holder = new ServletHolder(new ServletContainer(openapiConf)) + val handler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS) + handler.setContextPath("/") + HttpServiceContext.set(handler, rs) + handler.addServlet(holder, "/*") + handler + } +} diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornOpenApiResource.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornOpenApiResource.scala new file mode 100644 index 000000000..f0e9cc40a --- /dev/null +++ b/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornOpenApiResource.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.http.api + +import javax.servlet.ServletConfig +import javax.ws.rs.{GET, Path, PathParam, Produces} +import javax.ws.rs.core.{Application, Context, HttpHeaders, MediaType, Response, UriInfo} + +import scala.collection.JavaConverters._ + +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter +import io.swagger.v3.jaxrs2.integration.JaxrsOpenApiContextBuilder +import io.swagger.v3.jaxrs2.integration.resources.BaseOpenApiResource +import io.swagger.v3.oas.annotations.Operation +import io.swagger.v3.oas.integration.api.OpenApiContext +import io.swagger.v3.oas.models.OpenAPI +import io.swagger.v3.oas.models.info.{Info, License} +import io.swagger.v3.oas.models.servers.Server +import org.apache.commons.lang3.StringUtils + +@Path("/openapi.{type:json|yaml}") +class CelebornOpenApiResource extends BaseOpenApiResource with ApiRequestContext { + @Context + protected var config: ServletConfig = _ + + @Context + protected var app: Application = _ + + @GET + @Produces(Array(MediaType.APPLICATION_JSON, "application/yaml")) + @Operation(hidden = true) + def getOpenApi( + @Context headers: HttpHeaders, + @Context uriInfo: UriInfo, + @PathParam("type") tpe: String): Response = { + + val ctxId = getContextId(config) + val ctx: OpenApiContext = new CelebornJaxrsOpenApiContextBuilder() + .servletConfig(config) + .application(app) + .resourcePackages(OpenAPIConfig.packages.toSet.asJava) + .configLocation(configLocation) + .openApiConfiguration(openApiConfiguration) + .ctxId(ctxId) + .buildContext(true) + + val openApi = setCelebornOpenAPIDefinition(ctx.read()) + + if (StringUtils.isNotBlank(tpe) && tpe.trim().equalsIgnoreCase("yaml")) { + Response.status(Response.Status.OK) + .entity( + ctx.getOutputYamlMapper() + .writer(new DefaultPrettyPrinter()) + .writeValueAsString(openApi)) + .`type`("application/yaml") + .build() + } else { + Response.status(Response.Status.OK) + .entity( + ctx.getOutputJsonMapper + .writer(new DefaultPrettyPrinter()) + .writeValueAsString(openApi)) + .`type`(MediaType.APPLICATION_JSON_TYPE) + .build() + } + } + + private def setCelebornOpenAPIDefinition(openApi: OpenAPI): OpenAPI = { + // TODO: to improve when https is enabled. + val apiUrl = s"http://${httpService.connectionUrl}/" + openApi.info( + new Info().title( + s"Apache Celeborn (Incubating) REST API Documentation") + .description(s"Role: ${httpService.serviceName}") + .license( + new License().name("Apache License 2.0") + .url("https://www.apache.org/licenses/LICENSE-2.0.txt"))) + .servers(List(new Server().url(apiUrl)).asJava) + } +} + +class CelebornJaxrsOpenApiContextBuilder + extends JaxrsOpenApiContextBuilder[CelebornJaxrsOpenApiContextBuilder] diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornScalaObjectMapper.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornScalaObjectMapper.scala new file mode 100644 index 000000000..37375e2c1 --- /dev/null +++ b/service/src/main/scala/org/apache/celeborn/server/common/http/api/CelebornScalaObjectMapper.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.http.api + +import javax.ws.rs.ext.ContextResolver + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +class CelebornScalaObjectMapper extends ContextResolver[ObjectMapper] { + private val mapper = new ObjectMapper().registerModule(DefaultScalaModule) + + override def getContext(aClass: Class[_]): ObjectMapper = mapper +} diff --git a/service/src/main/scala/org/apache/celeborn/server/common/http/api/OpenAPIConfig.scala b/service/src/main/scala/org/apache/celeborn/server/common/http/api/OpenAPIConfig.scala new file mode 100644 index 000000000..07053de4c --- /dev/null +++ b/service/src/main/scala/org/apache/celeborn/server/common/http/api/OpenAPIConfig.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.http.api + +import org.glassfish.jersey.server.ResourceConfig + +class OpenAPIConfig extends ResourceConfig { + packages(OpenAPIConfig.packages: _*) + register(classOf[CelebornOpenApiResource]) + register(classOf[CelebornScalaObjectMapper]) +} + +object OpenAPIConfig { + val packages = Seq( + "org.apache.celeborn.server.common.http.api", + "org.apache.celeborn.service.deploy.master.http.api", + "org.apache.celeborn.service.deploy.worker.http.api") +} diff --git a/service/src/test/resources/metrics-api.properties b/service/src/test/resources/metrics-api.properties new file mode 100644 index 000000000..b1b2e6725 --- /dev/null +++ b/service/src/test/resources/metrics-api.properties @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet +*.sink.jsonServlet.class=org.apache.celeborn.common.metrics.sink.JsonServlet diff --git a/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala new file mode 100644 index 000000000..632d06c79 --- /dev/null +++ b/service/src/test/scala/org/apache/celeborn/server/common/http/ApiBaseResourceSuite.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.http + +import javax.ws.rs.core.MediaType + +import org.apache.celeborn.common.CelebornConf + +abstract class ApiBaseResourceSuite extends HttpTestHelper { + celebornConf.set(CelebornConf.METRICS_ENABLED.key, "true") + .set( + CelebornConf.METRICS_CONF.key, + Thread.currentThread().getContextClassLoader.getResource("metrics-api.properties").getFile) + + test("ping") { + val response = webTarget.path("ping").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + assert(response.readEntity(classOf[String]) == "pong") + } + + test("conf") { + val response = webTarget.path("conf").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("listDynamicConfigs") { + val response = webTarget.path("listDynamicConfigs") + .queryParam("LEVEL", "TENANT") + .request(MediaType.TEXT_PLAIN) + .get() + assert(200 == response.getStatus) + } + + test("workerInfo") { + val response = webTarget.path("workerInfo").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("threadDump") { + val response = webTarget.path("threadDump").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("shuffle") { + val response = webTarget.path("shuffle").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("applications") { + val response = webTarget.path("applications").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("listTopDiskUsedApps") { + val response = webTarget.path("listTopDiskUsedApps").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("openapi.json") { + val response = webTarget.path("openapi.json").request(MediaType.APPLICATION_JSON).get() + assert(200 == response.getStatus) + assert(response.readEntity(classOf[String]).contains("/conf")) + } + + test("swagger") { + Seq("swagger", "docs", "help").foreach { path => + val response = webTarget.path(path).request(MediaType.TEXT_HTML).get() + assert(200 == response.getStatus) + assert(response.readEntity(classOf[String]).contains("swagger-ui")) + } + } + + test("metrics/prometheus") { + val response = webTarget.path("metrics/prometheus").request(MediaType.APPLICATION_JSON).get() + assert(200 == response.getStatus) + assert(response.readEntity(classOf[String]).contains("metrics_jvm_memory_heap_max_Value")) + } + + test("metrics/json") { + val response = webTarget.path("metrics/json").request(MediaType.APPLICATION_JSON).get() + assert(200 == response.getStatus) + assert(response.readEntity(classOf[String]).contains("\"name\" : \"jvm.memory.heap.max\"")) + } +} diff --git a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala new file mode 100644 index 000000000..8674fc39c --- /dev/null +++ b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpTestHelper.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.server.common.http + +import java.net.URI +import javax.ws.rs.client.WebTarget +import javax.ws.rs.core.{Application, UriBuilder} + +import org.glassfish.jersey.client.ClientConfig +import org.glassfish.jersey.media.multipart.MultiPartFeature +import org.glassfish.jersey.server.ResourceConfig +import org.glassfish.jersey.test.JerseyTest +import org.glassfish.jersey.test.jetty.JettyTestContainerFactory +import org.glassfish.jersey.test.spi.TestContainerFactory +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.server.common.HttpService +import org.apache.celeborn.server.common.http.HttpTestHelper.RestApiBaseSuite +import org.apache.celeborn.server.common.http.api.CelebornScalaObjectMapper + +object HttpTestHelper { + class RestApiBaseSuite extends JerseyTest { + + override def configure: Application = new ResourceConfig(getClass) + .register(classOf[MultiPartFeature]) + + override def configureClient(config: ClientConfig): Unit = { + config.register(classOf[CelebornScalaObjectMapper]) + .register(classOf[MultiPartFeature]) + } + + override def getTestContainerFactory: TestContainerFactory = new JettyTestContainerFactory + } +} + +trait HttpTestHelper extends AnyFunSuite + with BeforeAndAfterAll + with BeforeAndAfterEach + with Logging { + + protected val celebornConf = new CelebornConf() + protected def httpService: HttpService + + protected val restApiBaseSuite: JerseyTest = new RestApiBaseSuite + + override def beforeAll(): Unit = { + super.beforeAll() + restApiBaseSuite.setUp() + } + + override def afterAll(): Unit = { + restApiBaseSuite.tearDown() + super.afterAll() + } + + protected lazy val baseUri: URI = + UriBuilder.fromUri(s"http://${httpService.connectionUrl}/").build() + + protected lazy val webTarget: WebTarget = restApiBaseSuite.client.target(baseUri) +} diff --git a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala b/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala deleted file mode 100644 index cbb5d18ef..000000000 --- a/service/src/test/scala/org/apache/celeborn/server/common/http/HttpUtilsSuite.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.celeborn.server.common.http - -import org.scalatest.funsuite.AnyFunSuite - -import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.server.common.Service - -class HttpUtilsSuite extends AnyFunSuite with Logging { - - def checkParseUri( - uri: String, - expectPath: String, - expectParameters: Map[String, String]): Unit = { - val (path, parameters) = HttpUtils.parseUri(uri) - assert(path == expectPath) - assert(parameters == expectParameters) - } - - test("CELEBORN-448: Support exclude worker manually") { - checkParseUri("/exclude", "/exclude", Map.empty) - checkParseUri( - "/exclude?add=localhost:1001:1002:1003:1004", - "/exclude", - Map("ADD" -> "localhost:1001:1002:1003:1004")) - checkParseUri( - "/exclude?remove=localhost:1001:1002:1003:1004", - "/exclude", - Map("REMOVE" -> "localhost:1001:1002:1003:1004")) - checkParseUri( - "/exclude?add=localhost:1001:1002:1003:1004&remove=localhost:2001:2002:2003:2004", - "/exclude", - Map("ADD" -> "localhost:1001:1002:1003:1004", "REMOVE" -> "localhost:2001:2002:2003:2004")) - } - - test("CELEBORN-847: Support parse HTTP Restful API parameters") { - checkParseUri("/exit", "/exit", Map.empty) - checkParseUri("/exit?type=decommission", "/exit", Map("TYPE" -> "decommission")) - checkParseUri( - "/exit?type=decommission&foo=A", - "/exit", - Map("TYPE" -> "decommission", "FOO" -> "A")) - } - - test("CELEBORN-829: Improve response message of invalid HTTP request") { - assert(HttpUtils.help(Service.MASTER) == - s"""Available API providers include: - |/applications List all running application's ids of the cluster. - |/conf List the conf setting of the master. - |/exclude Excluded workers of the master add or remove the worker manually given worker id. The parameter add or remove specifies the excluded workers to add or remove, which value is separated by commas. - |/excludedWorkers List all excluded workers of the master. - |/help List the available API providers of the master. - |/hostnames List all running application's LifecycleManager's hostnames of the cluster. - |/listDynamicConfigs List the dynamic configs of the master. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level. - |/listTopDiskUsedApps List the top disk usage application ids. It will return the top disk usage application ids for the cluster. - |/lostWorkers List all lost workers of the master. - |/masterGroupInfo List master group information of the service. It will list all master's LEADER, FOLLOWER information. - |/sendWorkerEvent For Master(Leader) can send worker event to manager workers. Legal types are 'None', 'Immediately', 'Decommission', 'DecommissionThenIdle', 'Graceful', 'Recommission' - |/shuffles List all running shuffle keys of the service. It will return all running shuffle's key of the cluster. - |/shutdownWorkers List all shutdown workers of the master. - |/threadDump List the current thread dump of the master. - |/workerEventInfo List all worker event infos of the master. - |/workerInfo List worker information of the service. It will list all registered workers 's information. - |""".stripMargin) - assert(HttpUtils.help(Service.WORKER) == - s"""Available API providers include: - |/applications List all running application's ids of the worker. It only return application ids running in that worker. - |/conf List the conf setting of the worker. - |/exit Trigger this worker to exit. Legal types are 'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'. - |/help List the available API providers of the worker. - |/isRegistered Show if the worker is registered to the master success. - |/isShutdown Show if the worker is during the process of shutdown. - |/listDynamicConfigs List the dynamic configs of the worker. The parameter level specifies the config level of dynamic configs. The parameter tenant specifies the tenant id of TENANT or TENANT_USER level. The parameter name specifies the user name of TENANT_USER level. Meanwhile, either none or all of the parameter tenant and name are specified for TENANT_USER level. - |/listPartitionLocationInfo List all the living PartitionLocation information in that worker. - |/listTopDiskUsedApps List the top disk usage application ids. It only return application ids running in that worker. - |/shuffles List all the running shuffle keys of the worker. It only return keys of shuffles running in that worker. - |/threadDump List the current thread dump of the worker. - |/unavailablePeers List the unavailable peers of the worker, this always means the worker connect to the peer failed. - |/workerInfo List the worker information of the worker. - |""".stripMargin) - } - - test("CELEBORN-1245: Support Master manage workers") { - checkParseUri( - "/sendWorkerEvent?type=decommission&workers=localhost:1001:1002:1003:1004", - "/sendWorkerEvent", - Map("TYPE" -> "decommission", "WORKERS" -> "localhost:1001:1002:1003:1004")) - } - - test("CELEBORN-1056: Introduce Rest API of listing dynamic configuration") { - checkParseUri("/listDynamicConfigs", "/listDynamicConfigs", Map.empty) - checkParseUri( - "/listDynamicConfigs?level=system", - "/listDynamicConfigs", - Map("LEVEL" -> "system")) - checkParseUri( - "/listDynamicConfigs?level=tenant&tenant=tenantId1", - "/listDynamicConfigs", - Map("LEVEL" -> "tenant", "TENANT" -> "tenantId1")) - checkParseUri( - "/listDynamicConfigs?level=tenant_user&tenant=tenantId1&name=user1", - "/listDynamicConfigs", - Map("LEVEL" -> "tenant_user", "TENANT" -> "tenantId1", "NAME" -> "user1")) - } -} diff --git a/tests/spark-it/pom.xml b/tests/spark-it/pom.xml index 75e50221b..842d7a8ed 100644 --- a/tests/spark-it/pom.xml +++ b/tests/spark-it/pom.xml @@ -75,6 +75,14 @@ log4j log4j + + org.glassfish.jersey.inject + * + + + org.glassfish.jersey.core + * +
diff --git a/worker/pom.xml b/worker/pom.xml index c4a7c913d..3821fe9da 100644 --- a/worker/pom.xml +++ b/worker/pom.xml @@ -98,6 +98,12 @@ ${project.version} test + + org.apache.celeborn + celeborn-service_${scala.binary.version} + ${project.version} + test-jar + org.apache.celeborn celeborn-master_${scala.binary.version} @@ -109,6 +115,16 @@ mockito-scala-scalatest_${scala.binary.version} test + + org.glassfish.jersey.test-framework + jersey-test-framework-core + test + + + org.glassfish.jersey.test-framework.providers + jersey-test-framework-provider-jetty + test + diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResource.scala new file mode 100644 index 000000000..8573b7152 --- /dev/null +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/http/api/ApiWorkerResource.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker.http.api + +import javax.ws.rs.{GET, Path, POST, QueryParam} +import javax.ws.rs.core.MediaType + +import io.swagger.v3.oas.annotations.media.Content +import io.swagger.v3.oas.annotations.responses.ApiResponse + +import org.apache.celeborn.server.common.http.api.ApiRequestContext + +@Path("/") +class ApiWorkerResource extends ApiRequestContext { + @Path("/listPartitionLocationInfo") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = "List all the living PartitionLocation information in that worker.") + @GET + def listPartitionLocationInfo: String = httpService.listPartitionLocationInfo + + @Path("/unavailablePeers") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = + "List the unavailable peers of the worker, this always means the worker connect to the peer failed.") + @GET + def unavailablePeers: String = httpService.getUnavailablePeers + + @Path("/isShutdown") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = "Show if the worker is during the process of shutdown.") + @GET + def isShutdown: String = httpService.isShutdown + + @Path("/isRegistered") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = "Show if the worker is registered to the master success.") + @GET + def isRegistered: String = httpService.isRegistered + + @Path("/exit") + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.TEXT_PLAIN)), + description = + "Trigger this worker to exit. Legal types are 'DECOMMISSION', 'GRACEFUL' and 'IMMEDIATELY'.") + @POST + def exit(@QueryParam("TYPE") exitType: String): String = { + httpService.exit(normalizeParam(exitType)) + } +} diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala new file mode 100644 index 000000000..cc09764d0 --- /dev/null +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/ApiWorkerResourceSuite.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker.storage + +import javax.ws.rs.core.MediaType + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.util.{CelebornExitKind, Utils} +import org.apache.celeborn.server.common.HttpService +import org.apache.celeborn.server.common.http.ApiBaseResourceSuite +import org.apache.celeborn.service.deploy.worker.{Worker, WorkerArguments} + +class ApiWorkerResourceSuite extends ApiBaseResourceSuite { + private var worker: Worker = _ + override protected def httpService: HttpService = worker + + override def beforeAll(): Unit = { + val workerArgs = new WorkerArguments(Array(), celebornConf) + worker = new Worker(celebornConf, workerArgs) + worker.metricsSystem.start() + worker.startHttpServer() + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + worker.metricsSystem.stop() + worker.rpcEnv.shutdown() + worker.stop(CelebornExitKind.EXIT_IMMEDIATELY) + } + + test("listPartitionLocationInfo") { + val response = webTarget.path("listPartitionLocationInfo").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("unavailablePeers") { + val response = webTarget.path("unavailablePeers").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("isShutdown") { + val response = webTarget.path("isShutdown").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } + + test("isRegistered") { + val response = webTarget.path("isRegistered").request(MediaType.TEXT_PLAIN).get() + assert(200 == response.getStatus) + } +}