[KYUUBI #1811] TPC-DS benchmark expose cli args breakdown and results-dir

### _Why are the changes needed?_

Expose `breakdown` and `results-dir` as cli arg in TPC-DS benchmark tool

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1811 from pan3793/tpcds.

Closes #1811

18637ce7 [Cheng Pan] nit
cc10a7ea [Cheng Pan] style
51e07398 [Cheng Pan] expose results-dir
4bc57995 [Cheng Pan] TPC-DS benchmark expose cli arg breakdown

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
Cheng Pan 2022-02-11 13:30:37 +08:00 committed by ulysses-you
parent fd89022952
commit 68b924513c
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
3 changed files with 24 additions and 17 deletions

View File

@ -47,12 +47,14 @@ $SPARK_HOME/bin/spark-submit \
Support options:
| key | default | description |
|------------|----------------------|--------------------------------------------------------|
| db | none(required) | the TPC-DS database |
| benchmark | tpcds-v2.4-benchmark | the name of application |
| iterations | 3 | the number of iterations to run |
| filter | a | filter on the name of the queries to run, e.g. q1-v2.4 |
| key | default | description |
|-------------|------------------------|---------------------------------------------------------------|
| db | none(required) | the TPC-DS database |
| benchmark | tpcds-v2.4-benchmark | the name of application |
| iterations | 3 | the number of iterations to run |
| breakdown | false | whether to record breakdown results of an execution |
| filter | a | filter on the name of the queries to run, e.g. q1-v2.4 |
| results-dir | /spark/sql/performance | dir to store benchmark results, e.g. hdfs://hdfs-nn:9870/pref |
Example: the following command to benchmark TPC-DS sf10 with exists database `tpcds_sf10`.

View File

@ -36,10 +36,7 @@ abstract class Benchmark(
import Benchmark._
val resultsLocation: String =
sparkSession.conf.get(
"spark.sql.perf.results",
"/spark/sql/performance")
val resultsLocation: String = sparkSession.conf.get("spark.sql.perf.results")
protected def sparkContext = sparkSession.sparkContext
@ -82,7 +79,7 @@ abstract class Benchmark(
variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("true")) { _ => {} }),
tags: Map[String, String] = Map.empty,
timeout: Long = 0L,
resultLocation: String = resultsLocation,
resultsDir: String = resultsLocation,
forkThread: Boolean = true): ExperimentStatus = {
new ExperimentStatus(
@ -92,7 +89,7 @@ abstract class Benchmark(
variations,
tags,
timeout,
resultLocation,
resultsDir,
sparkSession,
currentConfiguration,
forkThread = forkThread)
@ -143,7 +140,7 @@ object Benchmark {
variations: Seq[Variation[_]],
tags: Map[String, String],
timeout: Long,
resultsLocation: String,
resultsDir: String,
sparkSession: SparkSession,
currentConfiguration: BenchmarkConfiguration,
forkThread: Boolean = true) {
@ -172,7 +169,7 @@ object Benchmark {
}
val timestamp: Long = System.currentTimeMillis()
val resultPath = s"$resultsLocation/timestamp=$timestamp"
val resultPath = s"$resultsDir/timestamp=$timestamp"
val combinations: Seq[List[Int]] =
cartesianProduct(variations.map(l => l.options.indices.toList).toList)
val resultsFuture: Future[Unit] = Future {

View File

@ -17,7 +17,6 @@
package org.apache.kyuubi.tpcds.benchmark
import java.io.File
import java.net.InetAddress
import org.apache.spark.SparkConf
@ -28,7 +27,9 @@ case class RunConfig(
db: String = null,
benchmarkName: String = "tpcds-v2.4-benchmark",
filter: Option[String] = None,
iterations: Int = 3)
iterations: Int = 3,
breakdown: Boolean = false,
resultsDir: String = "/spark/sql/performance")
// scalastyle:off
/**
@ -55,9 +56,15 @@ object RunBenchmark {
opt[String]('f', "filter")
.action((x, c) => c.copy(filter = Some(x)))
.text("a filter on the name of the queries to run")
opt[Boolean]('B', "breakdown")
.action((x, c) => c.copy(breakdown = x))
.text("whether to record breakdown results of an execution")
opt[Int]('i', "iterations")
.action((x, c) => c.copy(iterations = x))
.text("the number of iterations to run")
opt[String]('r', "results-dir")
.action((x, c) => c.copy(filter = Some(x)))
.text("dir to store benchmark results, e.g. hdfs://hdfs-nn:9870/pref")
help("help")
.text("prints this usage text")
}
@ -75,7 +82,7 @@ object RunBenchmark {
val sparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
import sparkSession.implicits._
sparkSession.conf.set("spark.sql.perf.results", new File("performance").toURI.toString)
sparkSession.conf.set("spark.sql.perf.results", config.resultsDir)
val benchmark = new TPCDS(sparkSession)
@ -94,6 +101,7 @@ object RunBenchmark {
val experiment = benchmark.runExperiment(
executionsToRun = allQueries,
includeBreakdown = config.breakdown,
iterations = config.iterations,
tags = Map("host" -> InetAddress.getLocalHost.getHostName))