diff --git a/dev/kyuubi-tpcds/README.md b/dev/kyuubi-tpcds/README.md index bcfba8502..adffb6726 100644 --- a/dev/kyuubi-tpcds/README.md +++ b/dev/kyuubi-tpcds/README.md @@ -47,12 +47,14 @@ $SPARK_HOME/bin/spark-submit \ Support options: -| key | default | description | -|------------|----------------------|--------------------------------------------------------| -| db | none(required) | the TPC-DS database | -| benchmark | tpcds-v2.4-benchmark | the name of application | -| iterations | 3 | the number of iterations to run | -| filter | a | filter on the name of the queries to run, e.g. q1-v2.4 | +| key | default | description | +|-------------|------------------------|---------------------------------------------------------------| +| db | none(required) | the TPC-DS database | +| benchmark | tpcds-v2.4-benchmark | the name of application | +| iterations | 3 | the number of iterations to run | +| breakdown | false | whether to record breakdown results of an execution | +| filter | a | filter on the name of the queries to run, e.g. q1-v2.4 | +| results-dir | /spark/sql/performance | dir to store benchmark results, e.g. hdfs://hdfs-nn:9870/pref | Example: the following command to benchmark TPC-DS sf10 with exists database `tpcds_sf10`. diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala index 5645bd5a1..8071bca1b 100644 --- a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala +++ b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala @@ -36,10 +36,7 @@ abstract class Benchmark( import Benchmark._ - val resultsLocation: String = - sparkSession.conf.get( - "spark.sql.perf.results", - "/spark/sql/performance") + val resultsLocation: String = sparkSession.conf.get("spark.sql.perf.results") protected def sparkContext = sparkSession.sparkContext @@ -82,7 +79,7 @@ abstract class Benchmark( variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("true")) { _ => {} }), tags: Map[String, String] = Map.empty, timeout: Long = 0L, - resultLocation: String = resultsLocation, + resultsDir: String = resultsLocation, forkThread: Boolean = true): ExperimentStatus = { new ExperimentStatus( @@ -92,7 +89,7 @@ abstract class Benchmark( variations, tags, timeout, - resultLocation, + resultsDir, sparkSession, currentConfiguration, forkThread = forkThread) @@ -143,7 +140,7 @@ object Benchmark { variations: Seq[Variation[_]], tags: Map[String, String], timeout: Long, - resultsLocation: String, + resultsDir: String, sparkSession: SparkSession, currentConfiguration: BenchmarkConfiguration, forkThread: Boolean = true) { @@ -172,7 +169,7 @@ object Benchmark { } val timestamp: Long = System.currentTimeMillis() - val resultPath = s"$resultsLocation/timestamp=$timestamp" + val resultPath = s"$resultsDir/timestamp=$timestamp" val combinations: Seq[List[Int]] = cartesianProduct(variations.map(l => l.options.indices.toList).toList) val resultsFuture: Future[Unit] = Future { diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala index 5e4b1c581..673c9c589 100644 --- a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala +++ b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala @@ -17,7 +17,6 @@ package org.apache.kyuubi.tpcds.benchmark -import java.io.File import java.net.InetAddress import org.apache.spark.SparkConf @@ -28,7 +27,9 @@ case class RunConfig( db: String = null, benchmarkName: String = "tpcds-v2.4-benchmark", filter: Option[String] = None, - iterations: Int = 3) + iterations: Int = 3, + breakdown: Boolean = false, + resultsDir: String = "/spark/sql/performance") // scalastyle:off /** @@ -55,9 +56,15 @@ object RunBenchmark { opt[String]('f', "filter") .action((x, c) => c.copy(filter = Some(x))) .text("a filter on the name of the queries to run") + opt[Boolean]('B', "breakdown") + .action((x, c) => c.copy(breakdown = x)) + .text("whether to record breakdown results of an execution") opt[Int]('i', "iterations") .action((x, c) => c.copy(iterations = x)) .text("the number of iterations to run") + opt[String]('r', "results-dir") + .action((x, c) => c.copy(filter = Some(x))) + .text("dir to store benchmark results, e.g. hdfs://hdfs-nn:9870/pref") help("help") .text("prints this usage text") } @@ -75,7 +82,7 @@ object RunBenchmark { val sparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate() import sparkSession.implicits._ - sparkSession.conf.set("spark.sql.perf.results", new File("performance").toURI.toString) + sparkSession.conf.set("spark.sql.perf.results", config.resultsDir) val benchmark = new TPCDS(sparkSession) @@ -94,6 +101,7 @@ object RunBenchmark { val experiment = benchmark.runExperiment( executionsToRun = allQueries, + includeBreakdown = config.breakdown, iterations = config.iterations, tags = Map("host" -> InetAddress.getLocalHost.getHostName))