From cc26131f88e845e306ff35435e10e506a2c058f3 Mon Sep 17 00:00:00 2001 From: Aravind Patnam Date: Thu, 5 Sep 2024 11:15:16 -0500 Subject: [PATCH] [CELEBORN-1572] Celeborn CLI initial REST API support ### What changes were proposed in this pull request? Introducing the Celeborn CLI (based on this [CPIP](https://cwiki.apache.org/confluence/display/CELEBORN/CIP-7+Celeborn+CLI)). For the first iteration, adding support for querying the existing REST api endpoints. After this will add a layer for external cluster manager support. Further improvements are needed such as pretty print, which can be added in subsequent PRs. ### Why are the changes needed? see [CPIP](https://cwiki.apache.org/confluence/display/CELEBORN/CIP-7+Celeborn+CLI) ### Does this PR introduce _any_ user-facing change? yes, new CLI tool. ### How was this patch tested? added UTs and also tested internally. Closes #2699 from akpatnam25/cli-CELEBORN-1572. Lead-authored-by: Aravind Patnam Co-authored-by: Aravind Patnam Signed-off-by: Mridul Muralidharan gmail.com> --- bin/celeborn-class | 11 + build/make-distribution.sh | 16 +- build/release/release.sh | 3 + cli/pom.xml | 83 ++++++ .../org/apache/celeborn/cli/CelebornCli.scala | 47 ++++ .../celeborn/cli/common/CliLogging.scala | 40 +++ .../cli/common/CliVersionProvider.scala | 52 ++++ .../celeborn/cli/common/CommonOptions.scala | 74 ++++++ .../cli/config/CliConfigManager.scala | 87 ++++++ .../celeborn/cli/master/MasterOptions.scala | 94 +++++++ .../cli/master/MasterSubcommand.scala | 105 ++++++++ .../cli/master/MasterSubcommandImpl.scala | 187 +++++++++++++ .../celeborn/cli/worker/WorkerOptions.scala | 73 ++++++ .../cli/worker/WorkerSubcommand.scala | 83 ++++++ .../cli/worker/WorkerSubcommandImpl.scala | 78 ++++++ .../cli/TestCelebornCliCommands.scala | 247 ++++++++++++++++++ .../apache/celeborn/common/util/Utils.scala | 11 + pom.xml | 7 + project/CelebornBuild.scala | 20 +- sbin/celeborn-cli | 24 ++ 20 files changed, 1340 insertions(+), 2 deletions(-) create mode 100644 cli/pom.xml create mode 100644 cli/src/main/scala/org/apache/celeborn/cli/CelebornCli.scala create mode 100644 cli/src/main/scala/org/apache/celeborn/cli/common/CliLogging.scala create mode 100644 cli/src/main/scala/org/apache/celeborn/cli/common/CliVersionProvider.scala create mode 100644 cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala create mode 100644 cli/src/main/scala/org/apache/celeborn/cli/config/CliConfigManager.scala create mode 100644 cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala create mode 100644 cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala create mode 100644 cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala create mode 100644 cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala create mode 100644 cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala create mode 100644 cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala create mode 100644 cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala create mode 100755 sbin/celeborn-cli diff --git a/bin/celeborn-class b/bin/celeborn-class index 5bb3791ff..32db4fe9d 100755 --- a/bin/celeborn-class +++ b/bin/celeborn-class @@ -46,6 +46,9 @@ do if [ "$i" == "org.apache.celeborn.service.deploy.worker.Worker" ] ; then LAUNCH_CLASS=org.apache.celeborn.service.deploy.worker.Worker fi + if [ "$i" == "org.apache.celeborn.cli.CelebornCli" ] ; then + LAUNCH_CLASS=org.apache.celeborn.cli.CelebornCli + fi done if [ "${LAUNCH_CLASS}" == "org.apache.celeborn.service.deploy.master.Master" ] ; then @@ -64,6 +67,14 @@ if [ "${LAUNCH_CLASS}" == "org.apache.celeborn.service.deploy.worker.Worker" ] ; fi fi +if [ "${LAUNCH_CLASS}" == "org.apache.celeborn.cli.CelebornCli" ] ; then + if [ -d "${CELEBORN_HOME}/cli-jars" ]; then + CELEBORN_JARS_DIR="${CELEBORN_HOME}/cli-jars" + else + CELEBORN_JARS_DIR="${CELEBORN_HOME}/cli/target" + fi +fi + if [ ! -d "$CELEBORN_JARS_DIR" ]; then echo "Failed to find CELEBORN jars directory ($CELEBORN_JARS_DIR)." 1>&2 echo "You need to build CELEBORN with the target \"package\" before running this program." 1>&2 diff --git a/build/make-distribution.sh b/build/make-distribution.sh index 2f0c9aa29..3133c2f3c 100755 --- a/build/make-distribution.sh +++ b/build/make-distribution.sh @@ -138,7 +138,7 @@ function build_service { # Store the command as an array because $MVN variable might have spaces in it. # Normal quoting tricks don't work. # See: http://mywiki.wooledge.org/BashFAQ/050 - BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl master,worker -am $@) + BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl master,worker,cli -am $@) # Actually build the jar echo -e "\nBuilding with..." @@ -149,6 +149,7 @@ function build_service { mkdir -p "$DIST_DIR/jars" mkdir -p "$DIST_DIR/master-jars" mkdir -p "$DIST_DIR/worker-jars" + mkdir -p "$DIST_DIR/cli-jars" ## Copy master jars cp "$PROJECT_DIR"/master/target/celeborn-master_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/master-jars/" @@ -162,6 +163,12 @@ function build_service { for jar in $(ls "$PROJECT_DIR/worker/target/scala-$SCALA_VERSION/jars"); do (cd $DIST_DIR/worker-jars; ln -snf "../jars/$jar" .) done + ## Copy cli jars + cp "$PROJECT_DIR"/cli/target/celeborn-cli_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/cli-jars/" + cp "$PROJECT_DIR"/cli/target/scala-$SCALA_VERSION/jars/*.jar "$DIST_DIR/jars/" + for jar in $(ls "$PROJECT_DIR/cli/target/scala-$SCALA_VERSION/jars"); do + (cd $DIST_DIR/cli-jars; ln -snf "../jars/$jar" .) + done } function build_spark_client { @@ -273,6 +280,7 @@ function sbt_build_service { mkdir -p "$DIST_DIR/jars" mkdir -p "$DIST_DIR/master-jars" mkdir -p "$DIST_DIR/worker-jars" + mkdir -p "$DIST_DIR/cli-jars" ## Copy master jars cp "$PROJECT_DIR"/master/target/scala-$SCALA_VERSION/celeborn-master_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/master-jars/" @@ -286,6 +294,12 @@ function sbt_build_service { for jar in $(ls "$PROJECT_DIR/worker/target/scala-$SCALA_VERSION/jars"); do (cd $DIST_DIR/worker-jars; ln -snf "../jars/$jar" .) done + ## Copy cli jars + cp "$PROJECT_DIR"/cli/target/scala-$SCALA_VERSION/celeborn-cli_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/cli-jars/" + cp "$PROJECT_DIR"/cli/target/scala-$SCALA_VERSION/jars/*.jar "$DIST_DIR/jars/" + for jar in $(ls "$PROJECT_DIR/cli/target/scala-$SCALA_VERSION/jars"); do + (cd $DIST_DIR/cli-jars; ln -snf "../jars/$jar" .) + done } function sbt_build_client { diff --git a/build/release/release.sh b/build/release/release.sh index ae381fedd..f12342b57 100755 --- a/build/release/release.sh +++ b/build/release/release.sh @@ -133,6 +133,9 @@ upload_nexus_staging() { echo "Deploying celeborn-spi" ${PROJECT_DIR}/build/sbt "clean;celeborn-spi/publishSigned" + + echo "Deploying celeborn-cli" + ${PROJECT_DIR}/build/sbt "clean;celeborn-cli/publishSigned" } finalize_svn() { diff --git a/cli/pom.xml b/cli/pom.xml new file mode 100644 index 000000000..b8773149a --- /dev/null +++ b/cli/pom.xml @@ -0,0 +1,83 @@ + + + + 4.0.0 + + + org.apache.celeborn + celeborn-parent_${scala.binary.version} + ${project.version} + ../pom.xml + + + celeborn-cli_${scala.binary.version} + jar + Celeborn CLI + + + + info.picocli + picocli + + + org.apache.celeborn + celeborn-openapi-client_${scala.binary.version} + ${project.version} + + + org.apache.celeborn + celeborn-worker_${scala.binary.version} + ${project.version} + test + + + org.apache.celeborn + celeborn-worker_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.celeborn + celeborn-common_${scala.binary.version} + ${project.version} + + + org.apache.celeborn + celeborn-common_${scala.binary.version} + ${project.version} + test-jar + test + + + org.apache.celeborn + celeborn-master_${scala.binary.version} + ${project.version} + test + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + + diff --git a/cli/src/main/scala/org/apache/celeborn/cli/CelebornCli.scala b/cli/src/main/scala/org/apache/celeborn/cli/CelebornCli.scala new file mode 100644 index 000000000..fb2081d7f --- /dev/null +++ b/cli/src/main/scala/org/apache/celeborn/cli/CelebornCli.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.cli +import picocli.CommandLine +import picocli.CommandLine.Command + +import org.apache.celeborn.cli.common.{CliLogging, CliVersionProvider} +import org.apache.celeborn.cli.master.MasterSubcommandImpl +import org.apache.celeborn.cli.worker.WorkerSubcommandImpl +@Command( + name = "celeborn-cli", + versionProvider = classOf[CliVersionProvider], + mixinStandardHelpOptions = true, + description = Array("@|bold Scala|@ Celeborn CLI"), + subcommands = Array( + classOf[MasterSubcommandImpl], + classOf[WorkerSubcommandImpl])) +class CelebornCli extends Runnable with CliLogging { + override def run(): Unit = { + logError( + "Master or Worker subcommand needs to be provided. Please run -h to see the usage info.") + } +} + +object CelebornCli { + def main(args: Array[String]): Unit = { + val cmd = new CommandLine(new CelebornCli()) + cmd.setOptionsCaseInsensitive(false) + cmd.setSubcommandsCaseInsensitive(false) + new CommandLine(new CelebornCli()).execute(args: _*) + } +} diff --git a/cli/src/main/scala/org/apache/celeborn/cli/common/CliLogging.scala b/cli/src/main/scala/org/apache/celeborn/cli/common/CliLogging.scala new file mode 100644 index 000000000..d389a1452 --- /dev/null +++ b/cli/src/main/scala/org/apache/celeborn/cli/common/CliLogging.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.cli.common + +import Console.{GREEN, RED, RESET} + +trait CliLogging { + + def log(msg: String): Unit = { + Console.println(msg) + } + + def log(obj: Any): Unit = { + Console.println(obj) + } + + def logInfo(msg: String): Unit = { + Console.println(s"${RESET}${GREEN}${msg}") + } + + def logError(msg: String): Unit = { + Console.println(s"${RESET}${RED}${msg}") + } + +} diff --git a/cli/src/main/scala/org/apache/celeborn/cli/common/CliVersionProvider.scala b/cli/src/main/scala/org/apache/celeborn/cli/common/CliVersionProvider.scala new file mode 100644 index 000000000..2dd48267b --- /dev/null +++ b/cli/src/main/scala/org/apache/celeborn/cli/common/CliVersionProvider.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.cli.common + +import java.nio.file.{Files, Paths} + +import scala.io.Source + +import picocli.CommandLine.IVersionProvider + +import org.apache.celeborn.common.util.Utils + +class CliVersionProvider extends IVersionProvider with CliLogging { + + private val versionPattern = """Celeborn\s+\S+""".r + private val prefix = "Celeborn CLI" + + override def getVersion: Array[String] = { + val versionFile = Paths.get(sys.env.getOrElse("CELEBORN_HOME", "") + "/RELEASE") + + if (Files.exists(versionFile)) { + Utils.tryWithResources(Source.fromFile(versionFile.toFile)) { source => + source.getLines().find(line => versionPattern.findFirstIn(line).isDefined) match { + case Some(matchingLine) => + Array(s"$prefix - ${versionPattern.findFirstIn(matchingLine).get}") + case _ => + logInfo("Could not resolve version of Celeborn since RELEASE file did not contain version info.") + Array(prefix) + } + } + } else { + logInfo( + "Could not resolve version of Celeborn since no RELEASE file was found in $CELEBORN_HOME.") + Array(prefix) + } + } +} diff --git a/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala new file mode 100644 index 000000000..507d5374f --- /dev/null +++ b/cli/src/main/scala/org/apache/celeborn/cli/common/CommonOptions.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.cli.common + +import picocli.CommandLine.{Command, Option, Spec} +import picocli.CommandLine.Model.CommandSpec + +@Command +class CommonOptions { + + @Spec var spec: CommandSpec = _ // injected by picocli + + @Option( + names = Array("--hostport"), + paramLabel = "host:port", + description = Array("The host and http port")) + private[cli] var hostPort: String = _ + + @Option( + names = Array("--host-list"), + paramLabel = "h1,h2,h3...", + description = Array("List of hosts to pass to the command")) + private[cli] var hostList: String = _ + + @Option( + names = Array("--worker-ids"), + paramLabel = "w1,w2,w3...", + description = + Array("List of workerIds to pass to the command. Each worker should be in the format" + + " host:rpcPort:pushPort:fetchPort:replicatePort.")) + private[cli] var workerIds: String = _ + + @Option( + names = Array("--cluster"), + paramLabel = "cluster_alias", + description = Array("The alias of the cluster to use to query masters")) + private[cli] var cluster: String = _ + + // Required for getting dynamic config info + @Option( + names = Array("--config-level"), + paramLabel = "level", + description = Array("The config level of the dynamic configs")) + private[cli] var configLevel: String = _ + + // Required for getting dynamic config info + @Option( + names = Array("--config-tenant"), + paramLabel = "tenant_id", + description = Array("The tenant id of TENANT or TENANT_USER level.")) + private[cli] var configTenant: String = _ + + // Required for getting dynamic config info + @Option( + names = Array("--config-name"), + paramLabel = "username", + description = Array("The username of the TENANT_USER level.")) + private[cli] var configName: String = _ +} diff --git a/cli/src/main/scala/org/apache/celeborn/cli/config/CliConfigManager.scala b/cli/src/main/scala/org/apache/celeborn/cli/config/CliConfigManager.scala new file mode 100644 index 000000000..873a5cbf1 --- /dev/null +++ b/cli/src/main/scala/org/apache/celeborn/cli/config/CliConfigManager.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.cli.config + +import java.io.{File, FileInputStream, FileOutputStream} +import java.time.LocalDateTime +import java.util.Properties + +import org.apache.celeborn.cli.common.CliLogging +import org.apache.celeborn.common.util.Utils + +case class CliConfig(cliConfigData: Map[String, String]) + +object CliConfigManager { + val cliConfigFilePath: String = s"${sys.env("HOME")}/.celeborn-cli.conf" +} + +class CliConfigManager extends CliLogging { + + private val properties = new Properties() + + def loadConfig(): Option[CliConfig] = { + val file = new File(CliConfigManager.cliConfigFilePath) + if (!file.exists()) { + None + } else { + Utils.tryWithResources(new FileInputStream(file)) { inputStream => + properties.load(inputStream) + Some(CliConfig(properties.stringPropertyNames().toArray.map(_.toString).map { key => + key -> properties.getProperty(key) + }.toMap)) + } + } + } + + private def saveConfig(cliConfig: CliConfig): Unit = { + try { + val file = new File(CliConfigManager.cliConfigFilePath) + if (!file.exists()) { + file.getParentFile.mkdirs() + file.createNewFile() + } + properties.clear() + val outputStream = new FileOutputStream(file) + Utils.tryWithResources(outputStream) { os => + cliConfig.cliConfigData.foreach { case (key, value) => + properties.setProperty(key, value) + } + properties.store(os, s"Last updated conf at ${LocalDateTime.now()}") + } + } catch { + case e: Exception => + logError(s"Error saving config: ${e.getMessage}") + } + } + + def add(key: String, value: String): Unit = { + val config = loadConfig().getOrElse(CliConfig(Map())) + val updatedConfig = config.copy(cliConfigData = config.cliConfigData + (key -> value)) + saveConfig(updatedConfig) + } + + def remove(key: String): Unit = { + val config = loadConfig().getOrElse(CliConfig(Map())) + val updatedConfig = config.copy(cliConfigData = config.cliConfigData - key) + saveConfig(updatedConfig) + } + + def get(key: String): Option[String] = { + loadConfig().flatMap(config => config.cliConfigData.get(key)) + } +} diff --git a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala new file mode 100644 index 000000000..b22f0bb3f --- /dev/null +++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterOptions.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.cli.master + +import picocli.CommandLine.Option + +final class MasterOptions { + + @Option(names = Array("--show-masters-info"), description = Array("Show master group info")) + private[master] var showMastersInfo: Boolean = _ + + @Option(names = Array("--show-cluster-apps"), description = Array("Show cluster applications")) + private[master] var showClusterApps: Boolean = _ + + @Option(names = Array("--show-cluster-shuffles"), description = Array("Show cluster shuffles")) + private[master] var showClusterShuffles: Boolean = _ + + @Option( + names = Array("--show-top-disk-used-apps"), + description = Array("Show top disk used apps")) + private[master] var showTopDiskUsedApps: Boolean = _ + + @Option(names = Array("--exclude-worker"), description = Array("Exclude workers by ID")) + private[master] var excludeWorkers: Boolean = _ + + @Option( + names = Array("--remove-excluded-worker"), + description = Array("Remove excluded workers by ID")) + private[master] var removeExcludedWorkers: Boolean = _ + + @Option( + names = Array("--send-worker-event"), + paramLabel = "IMMEDIATELY | DECOMMISSION | DECOMMISSION_THEN_IDLE | GRACEFUL", + description = Array("Send an event to a worker")) + private[master] var sendWorkerEvent: String = _ + + @Option( + names = Array("--show-worker-event-info"), + description = Array("Show worker event information")) + private[master] var showWorkerEventInfo: Boolean = _ + + @Option(names = Array("--show-lost-workers"), description = Array("Show lost workers")) + private[master] var showLostWorkers: Boolean = _ + + @Option(names = Array("--show-excluded-workers"), description = Array("Show excluded workers")) + private[master] var showExcludedWorkers: Boolean = _ + + @Option(names = Array("--show-shutdown-workers"), description = Array("Show shutdown workers")) + private[master] var showShutdownWorkers: Boolean = _ + + @Option( + names = Array("--show-lifecycle-managers"), + description = Array("Show lifecycle managers")) + private[master] var showLifecycleManagers: Boolean = _ + + @Option(names = Array("--show-workers"), description = Array("Show registered workers")) + private[master] var showWorkers: Boolean = _ + + @Option(names = Array("--show-conf"), description = Array("Show master conf")) + private[master] var showConf: Boolean = _ + + @Option(names = Array("--show-dynamic-conf"), description = Array("Show dynamic master conf")) + private[master] var showDynamicConf: Boolean = _ + + @Option(names = Array("--show-thread-dump"), description = Array("Show master thread dump")) + private[master] var showThreadDump: Boolean = _ + + @Option( + names = Array("--add-cluster-alias"), + paramLabel = "alias", + description = Array("Add alias to use in the cli for the given set of masters")) + private[master] var addClusterAlias: String = _ + + @Option( + names = Array("--remove-cluster-alias"), + paramLabel = "alias", + description = Array("Remove alias to use in the cli for the given set of masters")) + private[master] var removeClusterAlias: String = _ +} diff --git a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala new file mode 100644 index 000000000..756ebb89d --- /dev/null +++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommand.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.cli.master + +import java.util + +import picocli.CommandLine.{ArgGroup, Mixin, ParameterException, ParentCommand, Spec} +import picocli.CommandLine.Model.CommandSpec + +import org.apache.celeborn.cli.CelebornCli +import org.apache.celeborn.cli.common.{CliLogging, CommonOptions} +import org.apache.celeborn.cli.config.CliConfigManager +import org.apache.celeborn.rest.v1.master.{ApplicationApi, ConfApi, DefaultApi, MasterApi, ShuffleApi, WorkerApi} +import org.apache.celeborn.rest.v1.master.invoker.ApiClient +import org.apache.celeborn.rest.v1.model._ + +trait MasterSubcommand extends CliLogging { + + @ParentCommand + private var celebornCli: CelebornCli = _ + + @ArgGroup(exclusive = true, multiplicity = "1") + private[master] var masterOptions: MasterOptions = _ + + @Mixin + private[master] var commonOptions: CommonOptions = _ + + @Spec + private[master] var spec: CommandSpec = _ + + private[master] val cliConfigManager = new CliConfigManager + private def apiClient: ApiClient = { + val connectionUrl = + if (commonOptions.hostPort != null && commonOptions.hostPort.nonEmpty) { + commonOptions.hostPort + } else { + val endpoints = cliConfigManager.get(commonOptions.cluster) + endpoints.getOrElse("").split(",")(0) + } + if (connectionUrl != null && connectionUrl.nonEmpty) { + log(s"Using connectionUrl: $connectionUrl") + new ApiClient().setBasePath(s"http://$connectionUrl") + } else { + throw new ParameterException( + spec.commandLine(), + "No valid connection url found, please provide either --hostport or " + "valid cluster alias.") + } + } + private[master] def applicationApi: ApplicationApi = new ApplicationApi(apiClient) + private[master] def confApi: ConfApi = new ConfApi(apiClient) + private[master] def defaultApi: DefaultApi = new DefaultApi(apiClient) + private[master] def masterApi: MasterApi = new MasterApi(apiClient) + private[master] def shuffleApi: ShuffleApi = new ShuffleApi(apiClient) + private[master] def workerApi: WorkerApi = new WorkerApi(apiClient) + + private[master] def runShowMastersInfo: MasterInfoResponse + + private[master] def runShowClusterApps: ApplicationsHeartbeatResponse + + private[master] def runShowClusterShuffles: ShufflesResponse + + private[master] def runShowTopDiskUsedApps: AppDiskUsageSnapshotsResponse + + private[master] def runExcludeWorkers: HandleResponse + + private[master] def runRemoveExcludedWorkers: HandleResponse + + private[master] def runSendWorkerEvent: HandleResponse + + private[master] def runShowWorkerEventInfo: WorkerEventsResponse + + private[master] def runShowLostWorkers: Seq[WorkerTimestampData] + + private[master] def runShowExcludedWorkers: Seq[WorkerData] + + private[master] def runShowShutdownWorkers: Seq[WorkerData] + + private[master] def runShowLifecycleManagers: HostnamesResponse + + private[master] def runShowWorkers: WorkersResponse + + private[master] def getWorkerIds: util.List[WorkerId] + + private[master] def runShowConf: ConfResponse + + private[master] def runShowDynamicConf: DynamicConfigResponse + + private[master] def runShowThreadDump: ThreadStackResponse + +} diff --git a/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala new file mode 100644 index 000000000..50a643ca8 --- /dev/null +++ b/cli/src/main/scala/org/apache/celeborn/cli/master/MasterSubcommandImpl.scala @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.cli.master + +import java.util + +import scala.collection.JavaConverters._ + +import picocli.CommandLine.{Command, ParameterException} + +import org.apache.celeborn.cli.config.CliConfigManager +import org.apache.celeborn.rest.v1.model._ +import org.apache.celeborn.rest.v1.model.SendWorkerEventRequest.EventTypeEnum + +@Command(name = "master", mixinStandardHelpOptions = true) +class MasterSubcommandImpl extends Runnable with MasterSubcommand { + override def run(): Unit = { + if (masterOptions.showMastersInfo) log(runShowMastersInfo) + if (masterOptions.showClusterApps) log(runShowClusterApps) + if (masterOptions.showClusterShuffles) log(runShowClusterShuffles) + if (masterOptions.showTopDiskUsedApps) log(runShowTopDiskUsedApps) + if (masterOptions.excludeWorkers) log(runExcludeWorkers) + if (masterOptions.removeExcludedWorkers) log(runRemoveExcludedWorkers) + if (masterOptions.sendWorkerEvent != null && masterOptions.sendWorkerEvent.nonEmpty) + log(runSendWorkerEvent) + if (masterOptions.showWorkerEventInfo) log(runShowWorkerEventInfo) + if (masterOptions.showLostWorkers) log(runShowLostWorkers) + if (masterOptions.showExcludedWorkers) log(runShowExcludedWorkers) + if (masterOptions.showShutdownWorkers) log(runShowShutdownWorkers) + if (masterOptions.showLifecycleManagers) log(runShowLifecycleManagers) + if (masterOptions.showWorkers) log(runShowWorkers) + if (masterOptions.showConf) log(runShowConf) + if (masterOptions.showDynamicConf) log(runShowDynamicConf) + if (masterOptions.showThreadDump) log(runShowThreadDump) + if (masterOptions.addClusterAlias != null && masterOptions.addClusterAlias.nonEmpty) + runAddClusterAlias + if (masterOptions.removeClusterAlias != null && masterOptions.removeClusterAlias.nonEmpty) + runRemoveClusterAlias + } + + private[master] def runShowMastersInfo: MasterInfoResponse = masterApi.getMasterGroupInfo + + private[master] def runShowClusterApps: ApplicationsHeartbeatResponse = + applicationApi.getApplications + + private[master] def runShowClusterShuffles: ShufflesResponse = shuffleApi.getShuffles + + private[master] def runShowTopDiskUsedApps: AppDiskUsageSnapshotsResponse = + applicationApi.getApplicationsDiskUsageSnapshots + + private[master] def runExcludeWorkers: HandleResponse = { + val workerIds = getWorkerIds + val excludeWorkerRequest = new ExcludeWorkerRequest().add(workerIds) + logInfo(s"Sending exclude worker requests to workers: $workerIds") + workerApi.excludeWorker(excludeWorkerRequest) + } + + private[master] def runRemoveExcludedWorkers: HandleResponse = { + val workerIds = getWorkerIds + val removeExcludeWorkerRequest = new ExcludeWorkerRequest().remove(workerIds) + logInfo(s"Sending remove exclude worker requests to workers: $workerIds") + workerApi.excludeWorker(removeExcludeWorkerRequest) + } + + private[master] def runSendWorkerEvent: HandleResponse = { + val eventType = { + try { + EventTypeEnum.valueOf(masterOptions.sendWorkerEvent.toUpperCase) + } catch { + case _: IllegalArgumentException => throw new ParameterException( + spec.commandLine(), + "Worker event type must be " + + "IMMEDIATELY, DECOMMISSION, DECOMMISSION_THEN_IDLE, or GRACEFUL") + } + } + val workerIds = getWorkerIds + val sendWorkerEventRequest = + new SendWorkerEventRequest().workers(workerIds).eventType(eventType) + logInfo(s"Sending workerEvent $eventType to workers: $workerIds") + workerApi.sendWorkerEvent(sendWorkerEventRequest) + } + + private[master] def runShowWorkerEventInfo: WorkerEventsResponse = workerApi.getWorkerEvents + + private[master] def runShowLostWorkers: Seq[WorkerTimestampData] = { + val lostWorkers = runShowWorkers.getLostWorkers.asScala.toSeq + if (lostWorkers.isEmpty) { + log("No lost workers found.") + Seq.empty[WorkerTimestampData] + } else { + lostWorkers.sortBy(_.getWorker.getHost) + } + } + + private[master] def runShowExcludedWorkers: Seq[WorkerData] = { + val excludedWorkers = runShowWorkers.getExcludedWorkers.asScala.toSeq + if (excludedWorkers.isEmpty) { + log("No excluded workers found.") + Seq.empty[WorkerData] + } else { + excludedWorkers.sortBy(_.getHost) + } + } + + private[master] def runShowShutdownWorkers: Seq[WorkerData] = { + val shutdownWorkers = runShowWorkers.getShutdownWorkers.asScala.toSeq + if (shutdownWorkers.isEmpty) { + log("No shutdown workers found.") + Seq.empty[WorkerData] + } else { + shutdownWorkers.sortBy(_.getHost) + } + } + + private[master] def runShowLifecycleManagers: HostnamesResponse = + applicationApi.getApplicationHostNames + + private[master] def runShowWorkers: WorkersResponse = workerApi.getWorkers + + private[master] def getWorkerIds: util.List[WorkerId] = { + val workerIds = commonOptions.workerIds + if (workerIds == null || workerIds.isEmpty) { + throw new ParameterException( + spec.commandLine(), + "Host list must be provided for this command.") + } + workerIds + .trim + .split(",") + .map(workerId => { + val splitWorkerId = workerId.split(":") + val host = splitWorkerId(0) + val rpcPort = splitWorkerId(1).toInt + val pushPort = splitWorkerId(2).toInt + val fetchPort = splitWorkerId(3).toInt + val replicatePort = splitWorkerId(4).toInt + new WorkerId().host(host).rpcPort(rpcPort).pushPort(pushPort).fetchPort( + fetchPort).replicatePort(replicatePort) + }) + .toList + .asJava + } + + private[master] def runShowConf: ConfResponse = confApi.getConf + + private[master] def runShowDynamicConf: DynamicConfigResponse = + confApi.getDynamicConf( + commonOptions.configLevel, + commonOptions.configTenant, + commonOptions.configName) + + private[master] def runShowThreadDump: ThreadStackResponse = defaultApi.getThreadDump + + private[master] def runAddClusterAlias: Unit = { + val aliasToAdd = masterOptions.addClusterAlias + val hosts = commonOptions.hostList + if (hosts == null || hosts.isEmpty) { + throw new ParameterException( + spec.commandLine(), + "Host list must be supplied via --host-list to add to alias.") + } + cliConfigManager.add(aliasToAdd, hosts) + logInfo(s"Cluster alias $aliasToAdd added to ${CliConfigManager.cliConfigFilePath}. You can now use the --cluster" + + s" command with this alias.") + } + + private[master] def runRemoveClusterAlias: Unit = { + val aliasToRemove = masterOptions.removeClusterAlias + cliConfigManager.remove(aliasToRemove) + logInfo(s"Cluster alias $aliasToRemove removed.") + } +} diff --git a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala new file mode 100644 index 000000000..a0a1c25a7 --- /dev/null +++ b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerOptions.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.cli.worker + +import picocli.CommandLine.Option + +final class WorkerOptions { + + @Option(names = Array("--show-worker-info"), description = Array("Show worker info")) + private[worker] var showWorkerInfo: Boolean = _ + + @Option( + names = Array("--show-apps-on-worker"), + description = Array("Show applications running on the worker")) + private[worker] var showAppsOnWorker: Boolean = _ + + @Option( + names = Array("--show-shuffles-on-worker"), + description = Array("Show shuffles running on the worker")) + private[worker] var showShufflesOnWorker: Boolean = _ + + @Option( + names = Array("--show-top-disk-used-apps"), + description = Array("Show top disk used applications")) + private[worker] var showTopDiskUsedApps: Boolean = _ + + @Option( + names = Array("--show-partition-location-info"), + description = Array("Show partition location information")) + private[worker] var showPartitionLocationInfo: Boolean = _ + + @Option(names = Array("--show-unavailable-peers"), description = Array("Show unavailable peers")) + private[worker] var showUnavailablePeers: Boolean = _ + + @Option(names = Array("--is-shutdown"), description = Array("Check if the system is shutdown")) + private[worker] var isShutdown: Boolean = _ + + @Option( + names = Array("--is-registered"), + description = Array("Check if the system is registered")) + private[worker] var isRegistered: Boolean = _ + + @Option( + names = Array("--exit"), + paramLabel = "exit_type", + description = Array("Exit the application with a specified type")) + private[worker] var exitType: String = _ + + @Option(names = Array("--show-conf"), description = Array("Show worker conf")) + private[worker] var showConf: Boolean = _ + + @Option(names = Array("--show-dynamic-conf"), description = Array("Show dynamic worker conf")) + private[worker] var showDynamicConf: Boolean = _ + + @Option(names = Array("--show-thread-dump"), description = Array("Show worker thread dump")) + private[worker] var showThreadDump: Boolean = _ + +} diff --git a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala new file mode 100644 index 000000000..7690c364e --- /dev/null +++ b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommand.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.cli.worker + +import picocli.CommandLine.{ArgGroup, Mixin, ParameterException, ParentCommand, Spec} +import picocli.CommandLine.Model.CommandSpec + +import org.apache.celeborn.cli.CelebornCli +import org.apache.celeborn.cli.common.{CliLogging, CommonOptions} +import org.apache.celeborn.rest.v1.model._ +import org.apache.celeborn.rest.v1.worker.{ApplicationApi, ConfApi, DefaultApi, ShuffleApi, WorkerApi} +import org.apache.celeborn.rest.v1.worker.invoker.ApiClient + +trait WorkerSubcommand extends CliLogging { + + @ParentCommand + private var celebornCli: CelebornCli = _ + + @ArgGroup(exclusive = true, multiplicity = "1") + private[worker] var workerOptions: WorkerOptions = _ + + @Mixin + private[worker] var commonOptions: CommonOptions = _ + + @Spec + private[worker] var spec: CommandSpec = _ + + private[worker] def apiClient = { + if (commonOptions.hostPort != null && commonOptions.hostPort.nonEmpty) { + log(s"Using connectionUrl: ${commonOptions.hostPort}") + new ApiClient().setBasePath(s"http://${commonOptions.hostPort}") + } else { + throw new ParameterException( + spec.commandLine(), + "No valid connection url found, please provide --hostport.") + } + } + private[worker] def applicationApi = new ApplicationApi(apiClient) + private[worker] def confApi = new ConfApi(apiClient) + private[worker] def defaultApi = new DefaultApi(apiClient) + private[worker] def shuffleApi = new ShuffleApi(apiClient) + private[worker] def workerApi = new WorkerApi(apiClient) + + private[worker] def runShowWorkerInfo: WorkerInfoResponse + + private[worker] def runShowAppsOnWorker: ApplicationsResponse + + private[worker] def runShowShufflesOnWorker: ShufflesResponse + + private[worker] def runShowTopDiskUsedApps: AppDiskUsagesResponse + + private[worker] def runShowPartitionLocationInfo: ShufflePartitionsResponse + + private[worker] def runShowUnavailablePeers: UnAvailablePeersResponse + + private[worker] def runIsShutdown: Boolean + + private[worker] def runIsRegistered: Boolean + + private[worker] def runExit: HandleResponse + + private[worker] def runShowConf: ConfResponse + + private[worker] def runShowDynamicConf: DynamicConfigResponse + + private[worker] def runShowThreadDump: ThreadStackResponse + +} diff --git a/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala new file mode 100644 index 000000000..eee9e56dd --- /dev/null +++ b/cli/src/main/scala/org/apache/celeborn/cli/worker/WorkerSubcommandImpl.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.cli.worker + +import picocli.CommandLine.Command + +import org.apache.celeborn.rest.v1.model._ +import org.apache.celeborn.rest.v1.model.WorkerExitRequest.TypeEnum + +@Command(name = "worker", mixinStandardHelpOptions = true) +class WorkerSubcommandImpl extends Runnable with WorkerSubcommand { + + override def run(): Unit = { + if (workerOptions.showWorkerInfo) log(runShowWorkerInfo) + if (workerOptions.showAppsOnWorker) log(runShowAppsOnWorker) + if (workerOptions.showShufflesOnWorker) log(runShowShufflesOnWorker) + if (workerOptions.showTopDiskUsedApps) log(runShowTopDiskUsedApps) + if (workerOptions.showPartitionLocationInfo) log(runShowPartitionLocationInfo) + if (workerOptions.showUnavailablePeers) log(runShowUnavailablePeers) + if (workerOptions.isShutdown) log(runIsShutdown) + if (workerOptions.isRegistered) log(runIsRegistered) + if (workerOptions.exitType != null && workerOptions.exitType.nonEmpty) log(runExit) + if (workerOptions.showConf) log(runShowConf) + if (workerOptions.showDynamicConf) log(runShowDynamicConf) + if (workerOptions.showThreadDump) log(runShowThreadDump) + } + + private[worker] def runShowWorkerInfo: WorkerInfoResponse = workerApi.getWorkerInfo + + private[worker] def runShowAppsOnWorker: ApplicationsResponse = applicationApi.getApplicationList + + private[worker] def runShowShufflesOnWorker: ShufflesResponse = shuffleApi.getShuffles + + private[worker] def runShowTopDiskUsedApps: AppDiskUsagesResponse = + applicationApi.getApplicationsDiskUsage + + private[worker] def runShowPartitionLocationInfo: ShufflePartitionsResponse = + shuffleApi.getShufflePartitions + + private[worker] def runShowUnavailablePeers: UnAvailablePeersResponse = + workerApi.unavailablePeers() + + private[worker] def runIsShutdown: Boolean = runShowWorkerInfo.getIsShutdown + + private[worker] def runIsRegistered: Boolean = runShowWorkerInfo.getIsRegistered + + private[worker] def runExit: HandleResponse = { + val workerExitType: TypeEnum = TypeEnum.valueOf(workerOptions.exitType) + val workerExitRequest: WorkerExitRequest = new WorkerExitRequest().`type`(workerExitType) + logInfo(s"Sending worker exit type: ${workerExitType.getValue}") + workerApi.workerExit(workerExitRequest) + } + + private[worker] def runShowConf: ConfResponse = confApi.getConf + + private[worker] def runShowDynamicConf: DynamicConfigResponse = + confApi.getDynamicConf( + commonOptions.configLevel, + commonOptions.configTenant, + commonOptions.configName) + + private[worker] def runShowThreadDump: ThreadStackResponse = defaultApi.getThreadDump +} diff --git a/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala new file mode 100644 index 000000000..6e90650ac --- /dev/null +++ b/cli/src/test/scala/org/apache/celeborn/cli/TestCelebornCliCommands.scala @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.cli + +import java.io.{ByteArrayOutputStream, File, PrintStream} +import java.nio.file.{Files, Paths} + +import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.cli.config.CliConfigManager +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.service.deploy.MiniClusterFeature +import org.apache.celeborn.service.deploy.master.Master +import org.apache.celeborn.service.deploy.worker.Worker + +class TestCelebornCliCommands extends CelebornFunSuite with MiniClusterFeature { + + private val celebornConf = new CelebornConf() + protected var master: Master = _ + protected var worker: Worker = _ + + override def beforeAll(): Unit = { + logInfo("test initialized, setup celeborn mini cluster") + val (m, w) = + setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap, workerNum = 1) + master = m + worker = w.head + super.beforeAll() + val aliasCommand = Array( + "master", + "--add-cluster-alias", + "unit-test", + "--host-list", + master.connectionUrl) + captureOutputAndValidateResponse( + aliasCommand, + s"Cluster alias unit-test added to ${CliConfigManager.cliConfigFilePath}") + } + + override def afterAll(): Unit = { + super.afterAll() + logInfo("all test complete, stop celeborn mini cluster") + shutdownMiniCluster() + val removeAliasCommand = Array( + "master", + "--remove-cluster-alias", + "unit-test") + captureOutputAndValidateResponse(removeAliasCommand, s"Cluster alias unit-test removed.") + val cliConfigManager = new CliConfigManager + val aliasExists = cliConfigManager.loadConfig().exists(_.cliConfigData.contains("unit-test")) + assert(!aliasExists) + if (new File(CliConfigManager.cliConfigFilePath).exists()) { + Files.delete(Paths.get(CliConfigManager.cliConfigFilePath)) + } + + } + + test("worker --show-worker-info") { + val args = prepareWorkerArgs() :+ "--show-worker-info" + captureOutputAndValidateResponse(args, "WorkerInfoResponse") + } + + test("worker --show-apps-on-worker") { + val args = prepareWorkerArgs() :+ "--show-apps-on-worker" + captureOutputAndValidateResponse(args, "ApplicationsResponse") + } + + test("worker --show-shuffles-on-worker") { + val args = prepareWorkerArgs() :+ "--show-shuffles-on-worker" + captureOutputAndValidateResponse(args, "ShufflesResponse") + } + + test("worker --show-top-disk-used-apps") { + val args = prepareWorkerArgs() :+ "--show-top-disk-used-apps" + captureOutputAndValidateResponse(args, "AppDiskUsagesResponse") + } + + test("worker --show-partition-location-info") { + val args = prepareWorkerArgs() :+ "--show-partition-location-info" + captureOutputAndValidateResponse(args, "ShufflePartitionsResponse") + } + + test("worker --show-unavailable-peers") { + val args = prepareWorkerArgs() :+ "--show-unavailable-peers" + captureOutputAndValidateResponse(args, "UnAvailablePeersResponse") + } + + test("worker --is-shutdown") { + val args = prepareWorkerArgs() :+ "--is-shutdown" + captureOutputAndValidateResponse(args, "false") + } + + test("worker --is-registered") { + val args = prepareWorkerArgs() :+ "--is-registered" + captureOutputAndValidateResponse(args, "true") + } + + test("worker --show-conf") { + val args = prepareWorkerArgs() :+ "--show-conf" + captureOutputAndValidateResponse(args, "ConfResponse") + } + + test("worker --show-dynamic-conf") { + cancel("This test is temporarily disabled since dynamic conf is not enabled in unit tests.") + val args = prepareWorkerArgs() :+ "--show-dynamic-conf" + captureOutputAndValidateResponse(args, "") + } + + test("worker --show-thread-dump") { + val args = prepareWorkerArgs() :+ "--show-thread-dump" + captureOutputAndValidateResponse(args, "ThreadStackResponse") + } + + test("master --show-masters-info") { + cancel("This test is temporarily disabled since HA is not enabled in the unit tests.") + val args = prepareMasterArgs() :+ "--show-masters-info" + captureOutputAndValidateResponse(args, "") + } + + test("master --show-cluster-apps") { + val args = prepareMasterArgs() :+ "--show-cluster-apps" + captureOutputAndValidateResponse(args, "ApplicationsHeartbeatResponse") + } + + test("master --show-cluster-shuffles") { + val args = prepareMasterArgs() :+ "--show-cluster-shuffles" + captureOutputAndValidateResponse(args, "ShufflesResponse") + } + + test("master --show-top-disk-used-apps") { + val args = prepareMasterArgs() :+ "--show-top-disk-used-apps" + captureOutputAndValidateResponse(args, "AppDiskUsageSnapshotsResponse") + } + + test("master --show-worker-event-info") { + val args = prepareMasterArgs() :+ "--show-worker-event-info" + captureOutputAndValidateResponse(args, "WorkerEventsResponse") + } + + test("master --show-lost-workers") { + val args = prepareMasterArgs() :+ "--show-lost-workers" + captureOutputAndValidateResponse(args, "No lost workers found.") + } + + test("master --show-excluded-workers") { + val args = prepareMasterArgs() :+ "--show-excluded-workers" + captureOutputAndValidateResponse(args, "No excluded workers found.") + } + + test("master --show-shutdown-workers") { + val args = prepareMasterArgs() :+ "--show-shutdown-workers" + captureOutputAndValidateResponse(args, "No shutdown workers found.") + } + + test("master --show-lifecycle-managers") { + val args = prepareMasterArgs() :+ "--show-lifecycle-managers" + captureOutputAndValidateResponse(args, "HostnamesResponse") + } + + test("master --show-workers") { + val args = prepareMasterArgs() :+ "--show-workers" + captureOutputAndValidateResponse(args, "WorkersResponse") + } + + test("master --show-conf") { + val args = prepareMasterArgs() :+ "--show-conf" + captureOutputAndValidateResponse(args, "ConfResponse") + } + + test("master --show-dynamic-conf") { + cancel("This test is temporarily disabled since dynamic conf is not enabled in unit tests.") + val args = prepareMasterArgs() :+ "--show-dynamic-conf" + captureOutputAndValidateResponse(args, "") + } + + test("master --show-thread-dump") { + val args = prepareMasterArgs() :+ "--show-thread-dump" + captureOutputAndValidateResponse(args, "ThreadStackResponse") + } + + test("master --exclude-worker and --remove-excluded-worker") { + val excludeArgs = prepareMasterArgs() ++ Array( + "--exclude-worker", + "--worker-ids", + getWorkerId()) + captureOutputAndValidateResponse(excludeArgs, "success: true") + val removeExcludedArgs = prepareMasterArgs() ++ Array( + "--remove-excluded-worker", + "--worker-ids", + getWorkerId()) + captureOutputAndValidateResponse(removeExcludedArgs, "success: true") + } + + test("master --send-worker-event") { + val args = prepareMasterArgs() ++ Array( + "--send-worker-event", + "DECOMMISSION", + "--worker-ids", + getWorkerId()) + captureOutputAndValidateResponse(args, "success: true") + } + + private def prepareMasterArgs(): Array[String] = { + Array( + "master", + "--cluster", + "unit-test") + } + + private def prepareWorkerArgs(): Array[String] = { + Array( + "worker", + "--hostport", + worker.connectionUrl) + } + + private def captureOutputAndValidateResponse( + args: Array[String], + stdoutValidationString: String): Unit = { + val stdoutStream = new ByteArrayOutputStream() + val stdoutPrintStream = new PrintStream(stdoutStream) + Console.withOut(stdoutPrintStream) { + CelebornCli.main(args) + } + val stdout = stdoutStream.toString + assert(stdout.nonEmpty && stdout.contains(stdoutValidationString)) + } + + private def getWorkerId(): String = { + s"${worker.workerArgs.host}:${worker.rpcEnv.address.port}:${worker.getPushFetchServerPort._1}" + + s":${worker.getPushFetchServerPort._2}:${worker.replicateServer.getPort}" + } +} diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index 02650dc1b..c452cbc0c 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -990,6 +990,17 @@ object Utils extends Logging { } } + def tryWithResources[R <: Closeable, U](f: => R)(func: R => U): U = { + val res = f + try { + func(f) + } finally { + if (null != res) { + res.close() + } + } + } + def toTransportMessage(message: Any): Any = { message match { case legacy: Message => diff --git a/pom.xml b/pom.xml index 2e06cf2b2..9f91c1552 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,7 @@ service master worker + cli @@ -104,6 +105,7 @@ 2.15.3 1.1.10.5 3.0-8 + 4.7.6 3.5.15 @@ -708,6 +710,11 @@ ${bouncycastle.version} test + + info.picocli + picocli + ${picocli.version} + diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala index 31816fe2c..ddddcf366 100644 --- a/project/CelebornBuild.scala +++ b/project/CelebornBuild.scala @@ -80,6 +80,7 @@ object Dependencies { val httpClient5Version = "5.3.1" val httpCore5Version = "5.2.4" val javaxAnnotationApiVersion = "1.3.2" + val picocliVersion = "4.7.6" // For SSL support val bouncycastleVersion = "1.77" @@ -203,6 +204,8 @@ object Dependencies { // SSL support val bouncycastleBcprovJdk18on = "org.bouncycastle" % "bcprov-jdk18on" % bouncycastleVersion % "test" val bouncycastleBcpkixJdk18on = "org.bouncycastle" % "bcpkix-jdk18on" % bouncycastleVersion % "test" + + val picocli = "info.picocli" % "picocli" % picocliVersion } object CelebornCommonSettings { @@ -355,7 +358,8 @@ object CelebornBuild extends sbt.internal.BuildDef { CelebornClient.client, CelebornService.service, CelebornWorker.worker, - CelebornMaster.master) ++ maybeSparkClientModules ++ maybeFlinkClientModules ++ maybeMRClientModules ++ maybeWebModules + CelebornMaster.master, + CelebornCli.cli) ++ maybeSparkClientModules ++ maybeFlinkClientModules ++ maybeMRClientModules ++ maybeWebModules } // ThisBuild / parallelExecution := false @@ -455,6 +459,20 @@ object Utils { } } +object CelebornCli { + lazy val cli = Project("celeborn-cli", file("cli")) + .dependsOn(CelebornCommon.common % "test->test;compile->compile") + .dependsOn(CelebornMaster.master % "test->test;compile->compile") + .dependsOn(CelebornWorker.worker % "test->test;compile->compile") + .dependsOn(CelebornOpenApi.openApiClient % "test->test;compile->compile") + .settings ( + commonSettings, + libraryDependencies ++= Seq( + Dependencies.picocli + ) ++ commonUnitTestDependencies + ) +} + object CelebornSpi { lazy val spi = Project("celeborn-spi", file("spi")) .settings( diff --git a/sbin/celeborn-cli b/sbin/celeborn-cli new file mode 100755 index 000000000..ed89f22f8 --- /dev/null +++ b/sbin/celeborn-cli @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +if [ -z "${CELEBORN_HOME}" ]; then + export CELEBORN_HOME="$(cd "`dirname "$0"`"/..; pwd)" +fi + +. "${CELEBORN_HOME}/sbin/load-celeborn-env.sh" +exec "${CELEBORN_HOME}"/bin/celeborn-class org.apache.celeborn.cli.CelebornCli "$@"