[CELEBORN-1572] Celeborn CLI initial REST API support

### What changes were proposed in this pull request?
Introducing the Celeborn CLI (based on this [CPIP](https://cwiki.apache.org/confluence/display/CELEBORN/CIP-7+Celeborn+CLI)). For the first iteration, adding support for querying the existing REST api endpoints.
After this will add a layer for external cluster manager support. Further improvements are needed such as pretty print, which can be added in subsequent PRs.

### Why are the changes needed?
see [CPIP](https://cwiki.apache.org/confluence/display/CELEBORN/CIP-7+Celeborn+CLI)

### Does this PR introduce _any_ user-facing change?
yes, new CLI tool.

### How was this patch tested?
added UTs and also tested internally.

Closes #2699 from akpatnam25/cli-CELEBORN-1572.

Lead-authored-by: Aravind Patnam <apatnam@linkedin.com>
Co-authored-by: Aravind Patnam <akpatnam25@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
This commit is contained in:
Aravind Patnam 2024-09-05 11:15:16 -05:00 committed by Mridul Muralidharan
parent 60dbb3e9f8
commit cc26131f88
20 changed files with 1340 additions and 2 deletions

View File

@ -46,6 +46,9 @@ do
if [ "$i" == "org.apache.celeborn.service.deploy.worker.Worker" ] ; then
LAUNCH_CLASS=org.apache.celeborn.service.deploy.worker.Worker
fi
if [ "$i" == "org.apache.celeborn.cli.CelebornCli" ] ; then
LAUNCH_CLASS=org.apache.celeborn.cli.CelebornCli
fi
done
if [ "${LAUNCH_CLASS}" == "org.apache.celeborn.service.deploy.master.Master" ] ; then
@ -64,6 +67,14 @@ if [ "${LAUNCH_CLASS}" == "org.apache.celeborn.service.deploy.worker.Worker" ] ;
fi
fi
if [ "${LAUNCH_CLASS}" == "org.apache.celeborn.cli.CelebornCli" ] ; then
if [ -d "${CELEBORN_HOME}/cli-jars" ]; then
CELEBORN_JARS_DIR="${CELEBORN_HOME}/cli-jars"
else
CELEBORN_JARS_DIR="${CELEBORN_HOME}/cli/target"
fi
fi
if [ ! -d "$CELEBORN_JARS_DIR" ]; then
echo "Failed to find CELEBORN jars directory ($CELEBORN_JARS_DIR)." 1>&2
echo "You need to build CELEBORN with the target \"package\" before running this program." 1>&2

View File

@ -138,7 +138,7 @@ function build_service {
# Store the command as an array because $MVN variable might have spaces in it.
# Normal quoting tricks don't work.
# See: http://mywiki.wooledge.org/BashFAQ/050
BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl master,worker -am $@)
BUILD_COMMAND=("$MVN" clean package $MVN_DIST_OPT -pl master,worker,cli -am $@)
# Actually build the jar
echo -e "\nBuilding with..."
@ -149,6 +149,7 @@ function build_service {
mkdir -p "$DIST_DIR/jars"
mkdir -p "$DIST_DIR/master-jars"
mkdir -p "$DIST_DIR/worker-jars"
mkdir -p "$DIST_DIR/cli-jars"
## Copy master jars
cp "$PROJECT_DIR"/master/target/celeborn-master_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/master-jars/"
@ -162,6 +163,12 @@ function build_service {
for jar in $(ls "$PROJECT_DIR/worker/target/scala-$SCALA_VERSION/jars"); do
(cd $DIST_DIR/worker-jars; ln -snf "../jars/$jar" .)
done
## Copy cli jars
cp "$PROJECT_DIR"/cli/target/celeborn-cli_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/cli-jars/"
cp "$PROJECT_DIR"/cli/target/scala-$SCALA_VERSION/jars/*.jar "$DIST_DIR/jars/"
for jar in $(ls "$PROJECT_DIR/cli/target/scala-$SCALA_VERSION/jars"); do
(cd $DIST_DIR/cli-jars; ln -snf "../jars/$jar" .)
done
}
function build_spark_client {
@ -273,6 +280,7 @@ function sbt_build_service {
mkdir -p "$DIST_DIR/jars"
mkdir -p "$DIST_DIR/master-jars"
mkdir -p "$DIST_DIR/worker-jars"
mkdir -p "$DIST_DIR/cli-jars"
## Copy master jars
cp "$PROJECT_DIR"/master/target/scala-$SCALA_VERSION/celeborn-master_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/master-jars/"
@ -286,6 +294,12 @@ function sbt_build_service {
for jar in $(ls "$PROJECT_DIR/worker/target/scala-$SCALA_VERSION/jars"); do
(cd $DIST_DIR/worker-jars; ln -snf "../jars/$jar" .)
done
## Copy cli jars
cp "$PROJECT_DIR"/cli/target/scala-$SCALA_VERSION/celeborn-cli_$SCALA_VERSION-$VERSION.jar "$DIST_DIR/cli-jars/"
cp "$PROJECT_DIR"/cli/target/scala-$SCALA_VERSION/jars/*.jar "$DIST_DIR/jars/"
for jar in $(ls "$PROJECT_DIR/cli/target/scala-$SCALA_VERSION/jars"); do
(cd $DIST_DIR/cli-jars; ln -snf "../jars/$jar" .)
done
}
function sbt_build_client {

View File

@ -133,6 +133,9 @@ upload_nexus_staging() {
echo "Deploying celeborn-spi"
${PROJECT_DIR}/build/sbt "clean;celeborn-spi/publishSigned"
echo "Deploying celeborn-cli"
${PROJECT_DIR}/build/sbt "clean;celeborn-cli/publishSigned"
}
finalize_svn() {

83
cli/pom.xml Normal file
View File

@ -0,0 +1,83 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-parent_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>celeborn-cli_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
<name>Celeborn CLI</name>
<dependencies>
<dependency>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-openapi-client_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-worker_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-worker_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-master_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.cli
import picocli.CommandLine
import picocli.CommandLine.Command
import org.apache.celeborn.cli.common.{CliLogging, CliVersionProvider}
import org.apache.celeborn.cli.master.MasterSubcommandImpl
import org.apache.celeborn.cli.worker.WorkerSubcommandImpl
@Command(
name = "celeborn-cli",
versionProvider = classOf[CliVersionProvider],
mixinStandardHelpOptions = true,
description = Array("@|bold Scala|@ Celeborn CLI"),
subcommands = Array(
classOf[MasterSubcommandImpl],
classOf[WorkerSubcommandImpl]))
class CelebornCli extends Runnable with CliLogging {
override def run(): Unit = {
logError(
"Master or Worker subcommand needs to be provided. Please run -h to see the usage info.")
}
}
object CelebornCli {
def main(args: Array[String]): Unit = {
val cmd = new CommandLine(new CelebornCli())
cmd.setOptionsCaseInsensitive(false)
cmd.setSubcommandsCaseInsensitive(false)
new CommandLine(new CelebornCli()).execute(args: _*)
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.cli.common
import Console.{GREEN, RED, RESET}
trait CliLogging {
def log(msg: String): Unit = {
Console.println(msg)
}
def log(obj: Any): Unit = {
Console.println(obj)
}
def logInfo(msg: String): Unit = {
Console.println(s"${RESET}${GREEN}${msg}")
}
def logError(msg: String): Unit = {
Console.println(s"${RESET}${RED}${msg}")
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.cli.common
import java.nio.file.{Files, Paths}
import scala.io.Source
import picocli.CommandLine.IVersionProvider
import org.apache.celeborn.common.util.Utils
class CliVersionProvider extends IVersionProvider with CliLogging {
private val versionPattern = """Celeborn\s+\S+""".r
private val prefix = "Celeborn CLI"
override def getVersion: Array[String] = {
val versionFile = Paths.get(sys.env.getOrElse("CELEBORN_HOME", "") + "/RELEASE")
if (Files.exists(versionFile)) {
Utils.tryWithResources(Source.fromFile(versionFile.toFile)) { source =>
source.getLines().find(line => versionPattern.findFirstIn(line).isDefined) match {
case Some(matchingLine) =>
Array(s"$prefix - ${versionPattern.findFirstIn(matchingLine).get}")
case _ =>
logInfo("Could not resolve version of Celeborn since RELEASE file did not contain version info.")
Array(prefix)
}
}
} else {
logInfo(
"Could not resolve version of Celeborn since no RELEASE file was found in $CELEBORN_HOME.")
Array(prefix)
}
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.cli.common
import picocli.CommandLine.{Command, Option, Spec}
import picocli.CommandLine.Model.CommandSpec
@Command
class CommonOptions {
@Spec var spec: CommandSpec = _ // injected by picocli
@Option(
names = Array("--hostport"),
paramLabel = "host:port",
description = Array("The host and http port"))
private[cli] var hostPort: String = _
@Option(
names = Array("--host-list"),
paramLabel = "h1,h2,h3...",
description = Array("List of hosts to pass to the command"))
private[cli] var hostList: String = _
@Option(
names = Array("--worker-ids"),
paramLabel = "w1,w2,w3...",
description =
Array("List of workerIds to pass to the command. Each worker should be in the format" +
" host:rpcPort:pushPort:fetchPort:replicatePort."))
private[cli] var workerIds: String = _
@Option(
names = Array("--cluster"),
paramLabel = "cluster_alias",
description = Array("The alias of the cluster to use to query masters"))
private[cli] var cluster: String = _
// Required for getting dynamic config info
@Option(
names = Array("--config-level"),
paramLabel = "level",
description = Array("The config level of the dynamic configs"))
private[cli] var configLevel: String = _
// Required for getting dynamic config info
@Option(
names = Array("--config-tenant"),
paramLabel = "tenant_id",
description = Array("The tenant id of TENANT or TENANT_USER level."))
private[cli] var configTenant: String = _
// Required for getting dynamic config info
@Option(
names = Array("--config-name"),
paramLabel = "username",
description = Array("The username of the TENANT_USER level."))
private[cli] var configName: String = _
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.cli.config
import java.io.{File, FileInputStream, FileOutputStream}
import java.time.LocalDateTime
import java.util.Properties
import org.apache.celeborn.cli.common.CliLogging
import org.apache.celeborn.common.util.Utils
case class CliConfig(cliConfigData: Map[String, String])
object CliConfigManager {
val cliConfigFilePath: String = s"${sys.env("HOME")}/.celeborn-cli.conf"
}
class CliConfigManager extends CliLogging {
private val properties = new Properties()
def loadConfig(): Option[CliConfig] = {
val file = new File(CliConfigManager.cliConfigFilePath)
if (!file.exists()) {
None
} else {
Utils.tryWithResources(new FileInputStream(file)) { inputStream =>
properties.load(inputStream)
Some(CliConfig(properties.stringPropertyNames().toArray.map(_.toString).map { key =>
key -> properties.getProperty(key)
}.toMap))
}
}
}
private def saveConfig(cliConfig: CliConfig): Unit = {
try {
val file = new File(CliConfigManager.cliConfigFilePath)
if (!file.exists()) {
file.getParentFile.mkdirs()
file.createNewFile()
}
properties.clear()
val outputStream = new FileOutputStream(file)
Utils.tryWithResources(outputStream) { os =>
cliConfig.cliConfigData.foreach { case (key, value) =>
properties.setProperty(key, value)
}
properties.store(os, s"Last updated conf at ${LocalDateTime.now()}")
}
} catch {
case e: Exception =>
logError(s"Error saving config: ${e.getMessage}")
}
}
def add(key: String, value: String): Unit = {
val config = loadConfig().getOrElse(CliConfig(Map()))
val updatedConfig = config.copy(cliConfigData = config.cliConfigData + (key -> value))
saveConfig(updatedConfig)
}
def remove(key: String): Unit = {
val config = loadConfig().getOrElse(CliConfig(Map()))
val updatedConfig = config.copy(cliConfigData = config.cliConfigData - key)
saveConfig(updatedConfig)
}
def get(key: String): Option[String] = {
loadConfig().flatMap(config => config.cliConfigData.get(key))
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.cli.master
import picocli.CommandLine.Option
final class MasterOptions {
@Option(names = Array("--show-masters-info"), description = Array("Show master group info"))
private[master] var showMastersInfo: Boolean = _
@Option(names = Array("--show-cluster-apps"), description = Array("Show cluster applications"))
private[master] var showClusterApps: Boolean = _
@Option(names = Array("--show-cluster-shuffles"), description = Array("Show cluster shuffles"))
private[master] var showClusterShuffles: Boolean = _
@Option(
names = Array("--show-top-disk-used-apps"),
description = Array("Show top disk used apps"))
private[master] var showTopDiskUsedApps: Boolean = _
@Option(names = Array("--exclude-worker"), description = Array("Exclude workers by ID"))
private[master] var excludeWorkers: Boolean = _
@Option(
names = Array("--remove-excluded-worker"),
description = Array("Remove excluded workers by ID"))
private[master] var removeExcludedWorkers: Boolean = _
@Option(
names = Array("--send-worker-event"),
paramLabel = "IMMEDIATELY | DECOMMISSION | DECOMMISSION_THEN_IDLE | GRACEFUL",
description = Array("Send an event to a worker"))
private[master] var sendWorkerEvent: String = _
@Option(
names = Array("--show-worker-event-info"),
description = Array("Show worker event information"))
private[master] var showWorkerEventInfo: Boolean = _
@Option(names = Array("--show-lost-workers"), description = Array("Show lost workers"))
private[master] var showLostWorkers: Boolean = _
@Option(names = Array("--show-excluded-workers"), description = Array("Show excluded workers"))
private[master] var showExcludedWorkers: Boolean = _
@Option(names = Array("--show-shutdown-workers"), description = Array("Show shutdown workers"))
private[master] var showShutdownWorkers: Boolean = _
@Option(
names = Array("--show-lifecycle-managers"),
description = Array("Show lifecycle managers"))
private[master] var showLifecycleManagers: Boolean = _
@Option(names = Array("--show-workers"), description = Array("Show registered workers"))
private[master] var showWorkers: Boolean = _
@Option(names = Array("--show-conf"), description = Array("Show master conf"))
private[master] var showConf: Boolean = _
@Option(names = Array("--show-dynamic-conf"), description = Array("Show dynamic master conf"))
private[master] var showDynamicConf: Boolean = _
@Option(names = Array("--show-thread-dump"), description = Array("Show master thread dump"))
private[master] var showThreadDump: Boolean = _
@Option(
names = Array("--add-cluster-alias"),
paramLabel = "alias",
description = Array("Add alias to use in the cli for the given set of masters"))
private[master] var addClusterAlias: String = _
@Option(
names = Array("--remove-cluster-alias"),
paramLabel = "alias",
description = Array("Remove alias to use in the cli for the given set of masters"))
private[master] var removeClusterAlias: String = _
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.cli.master
import java.util
import picocli.CommandLine.{ArgGroup, Mixin, ParameterException, ParentCommand, Spec}
import picocli.CommandLine.Model.CommandSpec
import org.apache.celeborn.cli.CelebornCli
import org.apache.celeborn.cli.common.{CliLogging, CommonOptions}
import org.apache.celeborn.cli.config.CliConfigManager
import org.apache.celeborn.rest.v1.master.{ApplicationApi, ConfApi, DefaultApi, MasterApi, ShuffleApi, WorkerApi}
import org.apache.celeborn.rest.v1.master.invoker.ApiClient
import org.apache.celeborn.rest.v1.model._
trait MasterSubcommand extends CliLogging {
@ParentCommand
private var celebornCli: CelebornCli = _
@ArgGroup(exclusive = true, multiplicity = "1")
private[master] var masterOptions: MasterOptions = _
@Mixin
private[master] var commonOptions: CommonOptions = _
@Spec
private[master] var spec: CommandSpec = _
private[master] val cliConfigManager = new CliConfigManager
private def apiClient: ApiClient = {
val connectionUrl =
if (commonOptions.hostPort != null && commonOptions.hostPort.nonEmpty) {
commonOptions.hostPort
} else {
val endpoints = cliConfigManager.get(commonOptions.cluster)
endpoints.getOrElse("").split(",")(0)
}
if (connectionUrl != null && connectionUrl.nonEmpty) {
log(s"Using connectionUrl: $connectionUrl")
new ApiClient().setBasePath(s"http://$connectionUrl")
} else {
throw new ParameterException(
spec.commandLine(),
"No valid connection url found, please provide either --hostport or " + "valid cluster alias.")
}
}
private[master] def applicationApi: ApplicationApi = new ApplicationApi(apiClient)
private[master] def confApi: ConfApi = new ConfApi(apiClient)
private[master] def defaultApi: DefaultApi = new DefaultApi(apiClient)
private[master] def masterApi: MasterApi = new MasterApi(apiClient)
private[master] def shuffleApi: ShuffleApi = new ShuffleApi(apiClient)
private[master] def workerApi: WorkerApi = new WorkerApi(apiClient)
private[master] def runShowMastersInfo: MasterInfoResponse
private[master] def runShowClusterApps: ApplicationsHeartbeatResponse
private[master] def runShowClusterShuffles: ShufflesResponse
private[master] def runShowTopDiskUsedApps: AppDiskUsageSnapshotsResponse
private[master] def runExcludeWorkers: HandleResponse
private[master] def runRemoveExcludedWorkers: HandleResponse
private[master] def runSendWorkerEvent: HandleResponse
private[master] def runShowWorkerEventInfo: WorkerEventsResponse
private[master] def runShowLostWorkers: Seq[WorkerTimestampData]
private[master] def runShowExcludedWorkers: Seq[WorkerData]
private[master] def runShowShutdownWorkers: Seq[WorkerData]
private[master] def runShowLifecycleManagers: HostnamesResponse
private[master] def runShowWorkers: WorkersResponse
private[master] def getWorkerIds: util.List[WorkerId]
private[master] def runShowConf: ConfResponse
private[master] def runShowDynamicConf: DynamicConfigResponse
private[master] def runShowThreadDump: ThreadStackResponse
}

View File

@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.cli.master
import java.util
import scala.collection.JavaConverters._
import picocli.CommandLine.{Command, ParameterException}
import org.apache.celeborn.cli.config.CliConfigManager
import org.apache.celeborn.rest.v1.model._
import org.apache.celeborn.rest.v1.model.SendWorkerEventRequest.EventTypeEnum
@Command(name = "master", mixinStandardHelpOptions = true)
class MasterSubcommandImpl extends Runnable with MasterSubcommand {
override def run(): Unit = {
if (masterOptions.showMastersInfo) log(runShowMastersInfo)
if (masterOptions.showClusterApps) log(runShowClusterApps)
if (masterOptions.showClusterShuffles) log(runShowClusterShuffles)
if (masterOptions.showTopDiskUsedApps) log(runShowTopDiskUsedApps)
if (masterOptions.excludeWorkers) log(runExcludeWorkers)
if (masterOptions.removeExcludedWorkers) log(runRemoveExcludedWorkers)
if (masterOptions.sendWorkerEvent != null && masterOptions.sendWorkerEvent.nonEmpty)
log(runSendWorkerEvent)
if (masterOptions.showWorkerEventInfo) log(runShowWorkerEventInfo)
if (masterOptions.showLostWorkers) log(runShowLostWorkers)
if (masterOptions.showExcludedWorkers) log(runShowExcludedWorkers)
if (masterOptions.showShutdownWorkers) log(runShowShutdownWorkers)
if (masterOptions.showLifecycleManagers) log(runShowLifecycleManagers)
if (masterOptions.showWorkers) log(runShowWorkers)
if (masterOptions.showConf) log(runShowConf)
if (masterOptions.showDynamicConf) log(runShowDynamicConf)
if (masterOptions.showThreadDump) log(runShowThreadDump)
if (masterOptions.addClusterAlias != null && masterOptions.addClusterAlias.nonEmpty)
runAddClusterAlias
if (masterOptions.removeClusterAlias != null && masterOptions.removeClusterAlias.nonEmpty)
runRemoveClusterAlias
}
private[master] def runShowMastersInfo: MasterInfoResponse = masterApi.getMasterGroupInfo
private[master] def runShowClusterApps: ApplicationsHeartbeatResponse =
applicationApi.getApplications
private[master] def runShowClusterShuffles: ShufflesResponse = shuffleApi.getShuffles
private[master] def runShowTopDiskUsedApps: AppDiskUsageSnapshotsResponse =
applicationApi.getApplicationsDiskUsageSnapshots
private[master] def runExcludeWorkers: HandleResponse = {
val workerIds = getWorkerIds
val excludeWorkerRequest = new ExcludeWorkerRequest().add(workerIds)
logInfo(s"Sending exclude worker requests to workers: $workerIds")
workerApi.excludeWorker(excludeWorkerRequest)
}
private[master] def runRemoveExcludedWorkers: HandleResponse = {
val workerIds = getWorkerIds
val removeExcludeWorkerRequest = new ExcludeWorkerRequest().remove(workerIds)
logInfo(s"Sending remove exclude worker requests to workers: $workerIds")
workerApi.excludeWorker(removeExcludeWorkerRequest)
}
private[master] def runSendWorkerEvent: HandleResponse = {
val eventType = {
try {
EventTypeEnum.valueOf(masterOptions.sendWorkerEvent.toUpperCase)
} catch {
case _: IllegalArgumentException => throw new ParameterException(
spec.commandLine(),
"Worker event type must be " +
"IMMEDIATELY, DECOMMISSION, DECOMMISSION_THEN_IDLE, or GRACEFUL")
}
}
val workerIds = getWorkerIds
val sendWorkerEventRequest =
new SendWorkerEventRequest().workers(workerIds).eventType(eventType)
logInfo(s"Sending workerEvent $eventType to workers: $workerIds")
workerApi.sendWorkerEvent(sendWorkerEventRequest)
}
private[master] def runShowWorkerEventInfo: WorkerEventsResponse = workerApi.getWorkerEvents
private[master] def runShowLostWorkers: Seq[WorkerTimestampData] = {
val lostWorkers = runShowWorkers.getLostWorkers.asScala.toSeq
if (lostWorkers.isEmpty) {
log("No lost workers found.")
Seq.empty[WorkerTimestampData]
} else {
lostWorkers.sortBy(_.getWorker.getHost)
}
}
private[master] def runShowExcludedWorkers: Seq[WorkerData] = {
val excludedWorkers = runShowWorkers.getExcludedWorkers.asScala.toSeq
if (excludedWorkers.isEmpty) {
log("No excluded workers found.")
Seq.empty[WorkerData]
} else {
excludedWorkers.sortBy(_.getHost)
}
}
private[master] def runShowShutdownWorkers: Seq[WorkerData] = {
val shutdownWorkers = runShowWorkers.getShutdownWorkers.asScala.toSeq
if (shutdownWorkers.isEmpty) {
log("No shutdown workers found.")
Seq.empty[WorkerData]
} else {
shutdownWorkers.sortBy(_.getHost)
}
}
private[master] def runShowLifecycleManagers: HostnamesResponse =
applicationApi.getApplicationHostNames
private[master] def runShowWorkers: WorkersResponse = workerApi.getWorkers
private[master] def getWorkerIds: util.List[WorkerId] = {
val workerIds = commonOptions.workerIds
if (workerIds == null || workerIds.isEmpty) {
throw new ParameterException(
spec.commandLine(),
"Host list must be provided for this command.")
}
workerIds
.trim
.split(",")
.map(workerId => {
val splitWorkerId = workerId.split(":")
val host = splitWorkerId(0)
val rpcPort = splitWorkerId(1).toInt
val pushPort = splitWorkerId(2).toInt
val fetchPort = splitWorkerId(3).toInt
val replicatePort = splitWorkerId(4).toInt
new WorkerId().host(host).rpcPort(rpcPort).pushPort(pushPort).fetchPort(
fetchPort).replicatePort(replicatePort)
})
.toList
.asJava
}
private[master] def runShowConf: ConfResponse = confApi.getConf
private[master] def runShowDynamicConf: DynamicConfigResponse =
confApi.getDynamicConf(
commonOptions.configLevel,
commonOptions.configTenant,
commonOptions.configName)
private[master] def runShowThreadDump: ThreadStackResponse = defaultApi.getThreadDump
private[master] def runAddClusterAlias: Unit = {
val aliasToAdd = masterOptions.addClusterAlias
val hosts = commonOptions.hostList
if (hosts == null || hosts.isEmpty) {
throw new ParameterException(
spec.commandLine(),
"Host list must be supplied via --host-list to add to alias.")
}
cliConfigManager.add(aliasToAdd, hosts)
logInfo(s"Cluster alias $aliasToAdd added to ${CliConfigManager.cliConfigFilePath}. You can now use the --cluster" +
s" command with this alias.")
}
private[master] def runRemoveClusterAlias: Unit = {
val aliasToRemove = masterOptions.removeClusterAlias
cliConfigManager.remove(aliasToRemove)
logInfo(s"Cluster alias $aliasToRemove removed.")
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.cli.worker
import picocli.CommandLine.Option
final class WorkerOptions {
@Option(names = Array("--show-worker-info"), description = Array("Show worker info"))
private[worker] var showWorkerInfo: Boolean = _
@Option(
names = Array("--show-apps-on-worker"),
description = Array("Show applications running on the worker"))
private[worker] var showAppsOnWorker: Boolean = _
@Option(
names = Array("--show-shuffles-on-worker"),
description = Array("Show shuffles running on the worker"))
private[worker] var showShufflesOnWorker: Boolean = _
@Option(
names = Array("--show-top-disk-used-apps"),
description = Array("Show top disk used applications"))
private[worker] var showTopDiskUsedApps: Boolean = _
@Option(
names = Array("--show-partition-location-info"),
description = Array("Show partition location information"))
private[worker] var showPartitionLocationInfo: Boolean = _
@Option(names = Array("--show-unavailable-peers"), description = Array("Show unavailable peers"))
private[worker] var showUnavailablePeers: Boolean = _
@Option(names = Array("--is-shutdown"), description = Array("Check if the system is shutdown"))
private[worker] var isShutdown: Boolean = _
@Option(
names = Array("--is-registered"),
description = Array("Check if the system is registered"))
private[worker] var isRegistered: Boolean = _
@Option(
names = Array("--exit"),
paramLabel = "exit_type",
description = Array("Exit the application with a specified type"))
private[worker] var exitType: String = _
@Option(names = Array("--show-conf"), description = Array("Show worker conf"))
private[worker] var showConf: Boolean = _
@Option(names = Array("--show-dynamic-conf"), description = Array("Show dynamic worker conf"))
private[worker] var showDynamicConf: Boolean = _
@Option(names = Array("--show-thread-dump"), description = Array("Show worker thread dump"))
private[worker] var showThreadDump: Boolean = _
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.cli.worker
import picocli.CommandLine.{ArgGroup, Mixin, ParameterException, ParentCommand, Spec}
import picocli.CommandLine.Model.CommandSpec
import org.apache.celeborn.cli.CelebornCli
import org.apache.celeborn.cli.common.{CliLogging, CommonOptions}
import org.apache.celeborn.rest.v1.model._
import org.apache.celeborn.rest.v1.worker.{ApplicationApi, ConfApi, DefaultApi, ShuffleApi, WorkerApi}
import org.apache.celeborn.rest.v1.worker.invoker.ApiClient
trait WorkerSubcommand extends CliLogging {
@ParentCommand
private var celebornCli: CelebornCli = _
@ArgGroup(exclusive = true, multiplicity = "1")
private[worker] var workerOptions: WorkerOptions = _
@Mixin
private[worker] var commonOptions: CommonOptions = _
@Spec
private[worker] var spec: CommandSpec = _
private[worker] def apiClient = {
if (commonOptions.hostPort != null && commonOptions.hostPort.nonEmpty) {
log(s"Using connectionUrl: ${commonOptions.hostPort}")
new ApiClient().setBasePath(s"http://${commonOptions.hostPort}")
} else {
throw new ParameterException(
spec.commandLine(),
"No valid connection url found, please provide --hostport.")
}
}
private[worker] def applicationApi = new ApplicationApi(apiClient)
private[worker] def confApi = new ConfApi(apiClient)
private[worker] def defaultApi = new DefaultApi(apiClient)
private[worker] def shuffleApi = new ShuffleApi(apiClient)
private[worker] def workerApi = new WorkerApi(apiClient)
private[worker] def runShowWorkerInfo: WorkerInfoResponse
private[worker] def runShowAppsOnWorker: ApplicationsResponse
private[worker] def runShowShufflesOnWorker: ShufflesResponse
private[worker] def runShowTopDiskUsedApps: AppDiskUsagesResponse
private[worker] def runShowPartitionLocationInfo: ShufflePartitionsResponse
private[worker] def runShowUnavailablePeers: UnAvailablePeersResponse
private[worker] def runIsShutdown: Boolean
private[worker] def runIsRegistered: Boolean
private[worker] def runExit: HandleResponse
private[worker] def runShowConf: ConfResponse
private[worker] def runShowDynamicConf: DynamicConfigResponse
private[worker] def runShowThreadDump: ThreadStackResponse
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.cli.worker
import picocli.CommandLine.Command
import org.apache.celeborn.rest.v1.model._
import org.apache.celeborn.rest.v1.model.WorkerExitRequest.TypeEnum
@Command(name = "worker", mixinStandardHelpOptions = true)
class WorkerSubcommandImpl extends Runnable with WorkerSubcommand {
override def run(): Unit = {
if (workerOptions.showWorkerInfo) log(runShowWorkerInfo)
if (workerOptions.showAppsOnWorker) log(runShowAppsOnWorker)
if (workerOptions.showShufflesOnWorker) log(runShowShufflesOnWorker)
if (workerOptions.showTopDiskUsedApps) log(runShowTopDiskUsedApps)
if (workerOptions.showPartitionLocationInfo) log(runShowPartitionLocationInfo)
if (workerOptions.showUnavailablePeers) log(runShowUnavailablePeers)
if (workerOptions.isShutdown) log(runIsShutdown)
if (workerOptions.isRegistered) log(runIsRegistered)
if (workerOptions.exitType != null && workerOptions.exitType.nonEmpty) log(runExit)
if (workerOptions.showConf) log(runShowConf)
if (workerOptions.showDynamicConf) log(runShowDynamicConf)
if (workerOptions.showThreadDump) log(runShowThreadDump)
}
private[worker] def runShowWorkerInfo: WorkerInfoResponse = workerApi.getWorkerInfo
private[worker] def runShowAppsOnWorker: ApplicationsResponse = applicationApi.getApplicationList
private[worker] def runShowShufflesOnWorker: ShufflesResponse = shuffleApi.getShuffles
private[worker] def runShowTopDiskUsedApps: AppDiskUsagesResponse =
applicationApi.getApplicationsDiskUsage
private[worker] def runShowPartitionLocationInfo: ShufflePartitionsResponse =
shuffleApi.getShufflePartitions
private[worker] def runShowUnavailablePeers: UnAvailablePeersResponse =
workerApi.unavailablePeers()
private[worker] def runIsShutdown: Boolean = runShowWorkerInfo.getIsShutdown
private[worker] def runIsRegistered: Boolean = runShowWorkerInfo.getIsRegistered
private[worker] def runExit: HandleResponse = {
val workerExitType: TypeEnum = TypeEnum.valueOf(workerOptions.exitType)
val workerExitRequest: WorkerExitRequest = new WorkerExitRequest().`type`(workerExitType)
logInfo(s"Sending worker exit type: ${workerExitType.getValue}")
workerApi.workerExit(workerExitRequest)
}
private[worker] def runShowConf: ConfResponse = confApi.getConf
private[worker] def runShowDynamicConf: DynamicConfigResponse =
confApi.getDynamicConf(
commonOptions.configLevel,
commonOptions.configTenant,
commonOptions.configName)
private[worker] def runShowThreadDump: ThreadStackResponse = defaultApi.getThreadDump
}

View File

@ -0,0 +1,247 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.cli
import java.io.{ByteArrayOutputStream, File, PrintStream}
import java.nio.file.{Files, Paths}
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.cli.config.CliConfigManager
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.service.deploy.MiniClusterFeature
import org.apache.celeborn.service.deploy.master.Master
import org.apache.celeborn.service.deploy.worker.Worker
class TestCelebornCliCommands extends CelebornFunSuite with MiniClusterFeature {
private val celebornConf = new CelebornConf()
protected var master: Master = _
protected var worker: Worker = _
override def beforeAll(): Unit = {
logInfo("test initialized, setup celeborn mini cluster")
val (m, w) =
setupMiniClusterWithRandomPorts(workerConf = celebornConf.getAll.toMap, workerNum = 1)
master = m
worker = w.head
super.beforeAll()
val aliasCommand = Array(
"master",
"--add-cluster-alias",
"unit-test",
"--host-list",
master.connectionUrl)
captureOutputAndValidateResponse(
aliasCommand,
s"Cluster alias unit-test added to ${CliConfigManager.cliConfigFilePath}")
}
override def afterAll(): Unit = {
super.afterAll()
logInfo("all test complete, stop celeborn mini cluster")
shutdownMiniCluster()
val removeAliasCommand = Array(
"master",
"--remove-cluster-alias",
"unit-test")
captureOutputAndValidateResponse(removeAliasCommand, s"Cluster alias unit-test removed.")
val cliConfigManager = new CliConfigManager
val aliasExists = cliConfigManager.loadConfig().exists(_.cliConfigData.contains("unit-test"))
assert(!aliasExists)
if (new File(CliConfigManager.cliConfigFilePath).exists()) {
Files.delete(Paths.get(CliConfigManager.cliConfigFilePath))
}
}
test("worker --show-worker-info") {
val args = prepareWorkerArgs() :+ "--show-worker-info"
captureOutputAndValidateResponse(args, "WorkerInfoResponse")
}
test("worker --show-apps-on-worker") {
val args = prepareWorkerArgs() :+ "--show-apps-on-worker"
captureOutputAndValidateResponse(args, "ApplicationsResponse")
}
test("worker --show-shuffles-on-worker") {
val args = prepareWorkerArgs() :+ "--show-shuffles-on-worker"
captureOutputAndValidateResponse(args, "ShufflesResponse")
}
test("worker --show-top-disk-used-apps") {
val args = prepareWorkerArgs() :+ "--show-top-disk-used-apps"
captureOutputAndValidateResponse(args, "AppDiskUsagesResponse")
}
test("worker --show-partition-location-info") {
val args = prepareWorkerArgs() :+ "--show-partition-location-info"
captureOutputAndValidateResponse(args, "ShufflePartitionsResponse")
}
test("worker --show-unavailable-peers") {
val args = prepareWorkerArgs() :+ "--show-unavailable-peers"
captureOutputAndValidateResponse(args, "UnAvailablePeersResponse")
}
test("worker --is-shutdown") {
val args = prepareWorkerArgs() :+ "--is-shutdown"
captureOutputAndValidateResponse(args, "false")
}
test("worker --is-registered") {
val args = prepareWorkerArgs() :+ "--is-registered"
captureOutputAndValidateResponse(args, "true")
}
test("worker --show-conf") {
val args = prepareWorkerArgs() :+ "--show-conf"
captureOutputAndValidateResponse(args, "ConfResponse")
}
test("worker --show-dynamic-conf") {
cancel("This test is temporarily disabled since dynamic conf is not enabled in unit tests.")
val args = prepareWorkerArgs() :+ "--show-dynamic-conf"
captureOutputAndValidateResponse(args, "")
}
test("worker --show-thread-dump") {
val args = prepareWorkerArgs() :+ "--show-thread-dump"
captureOutputAndValidateResponse(args, "ThreadStackResponse")
}
test("master --show-masters-info") {
cancel("This test is temporarily disabled since HA is not enabled in the unit tests.")
val args = prepareMasterArgs() :+ "--show-masters-info"
captureOutputAndValidateResponse(args, "")
}
test("master --show-cluster-apps") {
val args = prepareMasterArgs() :+ "--show-cluster-apps"
captureOutputAndValidateResponse(args, "ApplicationsHeartbeatResponse")
}
test("master --show-cluster-shuffles") {
val args = prepareMasterArgs() :+ "--show-cluster-shuffles"
captureOutputAndValidateResponse(args, "ShufflesResponse")
}
test("master --show-top-disk-used-apps") {
val args = prepareMasterArgs() :+ "--show-top-disk-used-apps"
captureOutputAndValidateResponse(args, "AppDiskUsageSnapshotsResponse")
}
test("master --show-worker-event-info") {
val args = prepareMasterArgs() :+ "--show-worker-event-info"
captureOutputAndValidateResponse(args, "WorkerEventsResponse")
}
test("master --show-lost-workers") {
val args = prepareMasterArgs() :+ "--show-lost-workers"
captureOutputAndValidateResponse(args, "No lost workers found.")
}
test("master --show-excluded-workers") {
val args = prepareMasterArgs() :+ "--show-excluded-workers"
captureOutputAndValidateResponse(args, "No excluded workers found.")
}
test("master --show-shutdown-workers") {
val args = prepareMasterArgs() :+ "--show-shutdown-workers"
captureOutputAndValidateResponse(args, "No shutdown workers found.")
}
test("master --show-lifecycle-managers") {
val args = prepareMasterArgs() :+ "--show-lifecycle-managers"
captureOutputAndValidateResponse(args, "HostnamesResponse")
}
test("master --show-workers") {
val args = prepareMasterArgs() :+ "--show-workers"
captureOutputAndValidateResponse(args, "WorkersResponse")
}
test("master --show-conf") {
val args = prepareMasterArgs() :+ "--show-conf"
captureOutputAndValidateResponse(args, "ConfResponse")
}
test("master --show-dynamic-conf") {
cancel("This test is temporarily disabled since dynamic conf is not enabled in unit tests.")
val args = prepareMasterArgs() :+ "--show-dynamic-conf"
captureOutputAndValidateResponse(args, "")
}
test("master --show-thread-dump") {
val args = prepareMasterArgs() :+ "--show-thread-dump"
captureOutputAndValidateResponse(args, "ThreadStackResponse")
}
test("master --exclude-worker and --remove-excluded-worker") {
val excludeArgs = prepareMasterArgs() ++ Array(
"--exclude-worker",
"--worker-ids",
getWorkerId())
captureOutputAndValidateResponse(excludeArgs, "success: true")
val removeExcludedArgs = prepareMasterArgs() ++ Array(
"--remove-excluded-worker",
"--worker-ids",
getWorkerId())
captureOutputAndValidateResponse(removeExcludedArgs, "success: true")
}
test("master --send-worker-event") {
val args = prepareMasterArgs() ++ Array(
"--send-worker-event",
"DECOMMISSION",
"--worker-ids",
getWorkerId())
captureOutputAndValidateResponse(args, "success: true")
}
private def prepareMasterArgs(): Array[String] = {
Array(
"master",
"--cluster",
"unit-test")
}
private def prepareWorkerArgs(): Array[String] = {
Array(
"worker",
"--hostport",
worker.connectionUrl)
}
private def captureOutputAndValidateResponse(
args: Array[String],
stdoutValidationString: String): Unit = {
val stdoutStream = new ByteArrayOutputStream()
val stdoutPrintStream = new PrintStream(stdoutStream)
Console.withOut(stdoutPrintStream) {
CelebornCli.main(args)
}
val stdout = stdoutStream.toString
assert(stdout.nonEmpty && stdout.contains(stdoutValidationString))
}
private def getWorkerId(): String = {
s"${worker.workerArgs.host}:${worker.rpcEnv.address.port}:${worker.getPushFetchServerPort._1}" +
s":${worker.getPushFetchServerPort._2}:${worker.replicateServer.getPort}"
}
}

View File

@ -990,6 +990,17 @@ object Utils extends Logging {
}
}
def tryWithResources[R <: Closeable, U](f: => R)(func: R => U): U = {
val res = f
try {
func(f)
} finally {
if (null != res) {
res.close()
}
}
}
def toTransportMessage(message: Any): Any = {
message match {
case legacy: Message =>

View File

@ -38,6 +38,7 @@
<module>service</module>
<module>master</module>
<module>worker</module>
<module>cli</module>
</modules>
<distributionManagement>
@ -104,6 +105,7 @@
<jackson.version>2.15.3</jackson.version>
<snappy.version>1.1.10.5</snappy.version>
<ap.loader.version>3.0-8</ap.loader.version>
<picocli.version>4.7.6</picocli.version>
<!-- Db dependencies -->
<mybatis.version>3.5.15</mybatis.version>
@ -708,6 +710,11 @@
<version>${bouncycastle.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
<version>${picocli.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -80,6 +80,7 @@ object Dependencies {
val httpClient5Version = "5.3.1"
val httpCore5Version = "5.2.4"
val javaxAnnotationApiVersion = "1.3.2"
val picocliVersion = "4.7.6"
// For SSL support
val bouncycastleVersion = "1.77"
@ -203,6 +204,8 @@ object Dependencies {
// SSL support
val bouncycastleBcprovJdk18on = "org.bouncycastle" % "bcprov-jdk18on" % bouncycastleVersion % "test"
val bouncycastleBcpkixJdk18on = "org.bouncycastle" % "bcpkix-jdk18on" % bouncycastleVersion % "test"
val picocli = "info.picocli" % "picocli" % picocliVersion
}
object CelebornCommonSettings {
@ -355,7 +358,8 @@ object CelebornBuild extends sbt.internal.BuildDef {
CelebornClient.client,
CelebornService.service,
CelebornWorker.worker,
CelebornMaster.master) ++ maybeSparkClientModules ++ maybeFlinkClientModules ++ maybeMRClientModules ++ maybeWebModules
CelebornMaster.master,
CelebornCli.cli) ++ maybeSparkClientModules ++ maybeFlinkClientModules ++ maybeMRClientModules ++ maybeWebModules
}
// ThisBuild / parallelExecution := false
@ -455,6 +459,20 @@ object Utils {
}
}
object CelebornCli {
lazy val cli = Project("celeborn-cli", file("cli"))
.dependsOn(CelebornCommon.common % "test->test;compile->compile")
.dependsOn(CelebornMaster.master % "test->test;compile->compile")
.dependsOn(CelebornWorker.worker % "test->test;compile->compile")
.dependsOn(CelebornOpenApi.openApiClient % "test->test;compile->compile")
.settings (
commonSettings,
libraryDependencies ++= Seq(
Dependencies.picocli
) ++ commonUnitTestDependencies
)
}
object CelebornSpi {
lazy val spi = Project("celeborn-spi", file("spi"))
.settings(

24
sbin/celeborn-cli Executable file
View File

@ -0,0 +1,24 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
if [ -z "${CELEBORN_HOME}" ]; then
export CELEBORN_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
. "${CELEBORN_HOME}/sbin/load-celeborn-env.sh"
exec "${CELEBORN_HOME}"/bin/celeborn-class org.apache.celeborn.cli.CelebornCli "$@"