diff --git a/dev/kyuubi-tpcds/README.md b/dev/kyuubi-tpcds/README.md index ff4aa0778..bcfba8502 100644 --- a/dev/kyuubi-tpcds/README.md +++ b/dev/kyuubi-tpcds/README.md @@ -16,57 +16,60 @@ --> # Introduction -This module includes tpcds data generator and benchmark. +This module includes TPC-DS data generator and benchmark tool. # How to use package jar with following command: -`./build/mvn install -DskipTests -Ptpcds -pl dev/kyuubi-tpcds -am` +`./build/mvn clean package -Ptpcds -pl dev/kyuubi-tpcds -am` -## data generator +## Data Generator Support options: -| key | default | description | -|-------------|---------|------------------------------| -| db | default | the databases to write data | -| scaleFactor | 1 | the scale factor of tpcds | +| key | default | description | +|--------------|-----------------|-----------------------------------| +| db | default | the database to write data | +| scaleFactor | 1 | the scale factor of TPC-DS | +| format | parquet | the format of table to store data | +| parallel | scaleFactor * 2 | the parallelism of Spark job | Example: the following command to generate 10GB data with new database `tpcds_sf10`. ```shell $SPARK_HOME/bin/spark-submit \ --class org.apache.kyuubi.tpcds.DataGenerator \ - kyuubi-tpcds-*.jar --db tpcds_sf10 --scaleFactor 10 + kyuubi-tpcds_*.jar \ + --db tpcds_sf10 --scaleFactor 10 --format parquet --parallel 20 ``` -## do benchmark +## Benchmark Tool Support options: | key | default | description | |------------|----------------------|--------------------------------------------------------| -| db | none(required) | the tpcds database | +| db | none(required) | the TPC-DS database | | benchmark | tpcds-v2.4-benchmark | the name of application | | iterations | 3 | the number of iterations to run | | filter | a | filter on the name of the queries to run, e.g. q1-v2.4 | -Example: the following command to benchmark tpcds sf10 with exists database `tpcds_sf10`. +Example: the following command to benchmark TPC-DS sf10 with exists database `tpcds_sf10`. ```shell $SPARK_HOME/bin/spark-submit \ --class org.apache.kyuubi.tpcds.benchmark.RunBenchmark \ - kyuubi-tpcds-*.jar --db tpcds_sf10 + kyuubi-tpcds_*.jar --db tpcds_sf10 ``` -We also support run one of the tpcds query: +We also support run one of the TPC-DS query: ```shell $SPARK_HOME/bin/spark-submit \ --class org.apache.kyuubi.tpcds.benchmark.RunBenchmark \ - kyuubi-tpcds-*.jar --db tpcds_sf10 --filter q1-v2.4 + kyuubi-tpcds_*.jar --db tpcds_sf10 --filter q1-v2.4 ``` -The result of tpcds benchmark like: +The result of TPC-DS benchmark like: | name | minTimeMs | maxTimeMs | avgTimeMs | stdDev | stdDevPercent | |---------|-----------|-------------|------------|----------|----------------| diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/DataGenerator.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/DataGenerator.scala index fb275d86b..b46367a36 100644 --- a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/DataGenerator.scala +++ b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/DataGenerator.scala @@ -17,32 +17,29 @@ package org.apache.kyuubi.tpcds -import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory -case class RunConfig( - db: String = "default", - scaleFactor: Int = 1) - /** * Usage: *

* Run following command to generate 10GB data with new database `tpcds_sf10`. * {{{ - * `$SPARK_HOME/bin/spark-submit \ - * --conf spark.sql.tpcds.scale.factor=10 \ - * --conf spark.sql.tpcds.database=tpcds_sf10 \ + * $SPARK_HOME/bin/spark-submit \ * --class org.apache.kyuubi.tpcds.DataGenerator \ - * kyuubi-tpcds-*.jar` + * kyuubi-tpcds_*.jar \ + * --db tpcds_sf10 --scaleFactor 10 --format parquet --parallel 20 * }}} */ object DataGenerator { - private val logger = - LoggerFactory.getLogger(this.getClass.getSimpleName.stripSuffix("$")) - val SCALE_FACTOR_KEY = "spark.sql.tpcds.scale.factor" - val DATABASE_KEY = "spark.sql.tpcds.database" + case class Config( + db: String = "default", + scaleFactor: Int = 1, + format: String = "parquet", + parallel: Option[Int] = None) + + private val logger = LoggerFactory.getLogger(this.getClass.getSimpleName.stripSuffix("$")) def initTable(spark: SparkSession): Seq[TableGenerator] = { import spark.implicits._ @@ -597,52 +594,50 @@ object DataGenerator { promotion) } - def run(config: RunConfig): Unit = { - val conf = new SparkConf() - val db = conf.get(DATABASE_KEY, config.db) - val scaleFactor = conf.get(DataGenerator.SCALE_FACTOR_KEY, config.scaleFactor.toString).toInt - + def run(config: Config): Unit = { val spark = SparkSession.builder() - .appName(s"Kyuubi TPCDS Generation - ${scaleFactor}GB") - .config(conf) + .appName(s"Kyuubi TPC-DS Generation - ${config.scaleFactor}GB") .enableHiveSupport() .getOrCreate() - logger.info(s"Generating TPCDS tables under database $db") - spark.sql(s"CREATE DATABASE IF NOT EXISTS $db") - spark.sql(s"USE $db") - spark.sql(s"DESC DATABASE $db").show() - - val parallel = spark.sparkContext.defaultParallelism + logger.info(s"Generating TPC-DS tables under database ${config.db}") + spark.sql(s"CREATE DATABASE IF NOT EXISTS ${config.db}") + spark.sql(s"USE ${config.db}") + spark.sql(s"DESC DATABASE ${config.db}").show() val tpcdsTables = initTable(spark) tpcdsTables.par.foreach { table => - table.setScaleFactor(scaleFactor) - table.setParallelism(parallel) + table.setScaleFactor(config.scaleFactor) + table.setFormat(config.format) + config.parallel.foreach(table.setParallelism) spark.sparkContext.setJobDescription(table.toString) - logger.info(s"$table") + logger.info(s"Generating $table") table.create() } } def main(args: Array[String]): Unit = { - val parser = new scopt.OptionParser[RunConfig]("tpcds-data-generator") { - head("tpcds-data-generator", "") + val parser = new scopt.OptionParser[Config]("tpcds-data-generator") { + head("Kyuubi TPC-DS Data Generator") opt[String]('d', "db") .action { (x, c) => c.copy(db = x) } - .text("the databases to write data") + .text("the database to write data") opt[Int]('s', "scaleFactor") .action { (x, c) => c.copy(scaleFactor = x) } - .text("the scale factor of tpcds") - help("help") + .text("the scale factor of TPC-DS") + opt[String]('f', "format") + .action { (x, c) => c.copy(format = x) } + .text("the format of table to store data") + opt[Int]('p', "parallel") + .action { (x, c) => c.copy(parallel = Some(x)) } + .text("the parallelism of Spark job") + help('h', "help") .text("prints this usage text") } - parser.parse(args, RunConfig()) match { - case Some(config) => - run(config) - case None => - System.exit(1) + parser.parse(args, Config()) match { + case Some(config) => run(config) + case None => sys.exit(1) } } } diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/TableGenerator.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/TableGenerator.scala index ac0dc2307..497a08bb3 100644 --- a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/TableGenerator.scala +++ b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/TableGenerator.scala @@ -40,15 +40,18 @@ case class TableGenerator( private val rawSchema: StructType = StructType(fields.map(f => StructField(f.name, StringType))) private var scaleFactor: Int = 1 + def setScaleFactor(scale: Int): Unit = this.scaleFactor = scale - private var parallelism: Int = scaleFactor * 2 + private var _parallelism: Option[Int] = None + private def parallelism: Int = _parallelism.getOrElse(scaleFactor * 2) + def setParallelism(parallel: Int): Unit = this._parallelism = Some(parallel max 2) private val ss: SparkSession = SparkSession.active - private val format: String = ss.conf.get("spark.sql.sources.default", "parquet") + private var _format: Option[String] = None + private def format: String = _format.getOrElse(ss.conf.get("spark.sql.sources.default")) + def setFormat(format: String): Unit = this._format = Some(format) - private def radix: Int = { - math.min(math.max(5, scaleFactor / 100), parallelism) - } + private def radix: Int = (scaleFactor / 100) max 5 min parallelism private def toDF: DataFrame = { val rawRDD = ss.sparkContext.parallelize(1 to parallelism, parallelism).flatMap { i => @@ -101,8 +104,8 @@ case class TableGenerator( val data = Paths.get(tempDir.toString, s"${name}_${i}_$parallelism.dat") val iterator = if (Files.exists(data)) { - // ... realized that when opening the dat files I should use the “Cp1252” encoding. - // https://github.com/databricks/spark-sql-perf/pull/104 + // The data generated by `dsdgen` encoding in "Cp1252". + // See detail at https://github.com/databricks/spark-sql-perf/pull/104 // noinspection SourceNotClosed Source.fromFile(data.toFile, "cp1252", 8192).getLines } else { @@ -123,14 +126,6 @@ case class TableGenerator( ss.createDataFrame(rowRDD, rawSchema).select(columns: _*) } - def setScaleFactor(scale: Int): Unit = { - this.scaleFactor = scale - } - - def setParallelism(parallel: Int): Unit = { - this.parallelism = math.max(2, parallel) - } - def create(): Unit = { val data = if (partitionCols.isEmpty) { diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala index d215f843b..5645bd5a1 100644 --- a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala +++ b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala @@ -36,7 +36,7 @@ abstract class Benchmark( import Benchmark._ - val resultsLocation = + val resultsLocation: String = sparkSession.conf.get( "spark.sql.perf.results", "/spark/sql/performance") @@ -45,15 +45,16 @@ abstract class Benchmark( implicit protected def toOption[A](a: A): Option[A] = Option(a) - val buildInfo = Try(getClass.getClassLoader.loadClass("org.apache.spark.BuildInfo")).map { cls => - cls.getMethods - .filter(_.getReturnType == classOf[String]) - .filterNot(_.getName == "toString") - .map(m => m.getName -> m.invoke(cls).asInstanceOf[String]) - .toMap - }.getOrElse(Map.empty) + val buildInfo: Map[String, String] = + Try(getClass.getClassLoader.loadClass("org.apache.spark.BuildInfo")).map { cls => + cls.getMethods + .filter(_.getReturnType == classOf[String]) + .filterNot(_.getName == "toString") + .map(m => m.getName -> m.invoke(cls).asInstanceOf[String]) + .toMap + }.getOrElse(Map.empty) - def currentConfiguration = BenchmarkConfiguration( + def currentConfiguration: BenchmarkConfiguration = BenchmarkConfiguration( sqlConf = sparkSession.conf.getAll, sparkConf = sparkContext.getConf.getAll.toMap, defaultParallelism = sparkContext.defaultParallelism, @@ -82,7 +83,7 @@ abstract class Benchmark( tags: Map[String, String] = Map.empty, timeout: Long = 0L, resultLocation: String = resultsLocation, - forkThread: Boolean = true) = { + forkThread: Boolean = true): ExperimentStatus = { new ExperimentStatus( executionsToRun, @@ -150,7 +151,7 @@ object Benchmark { val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]() val currentMessages = new collection.mutable.ArrayBuffer[String]() - def logMessage(msg: String) = { + def logMessage(msg: String): Unit = { println(msg) currentMessages += msg } @@ -170,10 +171,11 @@ object Benchmark { case h :: t => for (xh <- h; xt <- cartesianProduct(t)) yield xh :: xt } - val timestamp = System.currentTimeMillis() + val timestamp: Long = System.currentTimeMillis() val resultPath = s"$resultsLocation/timestamp=$timestamp" - val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList) - val resultsFuture = Future { + val combinations: Seq[List[Int]] = + cartesianProduct(variations.map(l => l.options.indices.toList).toList) + val resultsFuture: Future[Unit] = Future { // Run the benchmarks! val results: Seq[ExperimentRun] = (1 to iterations).flatMap { i => combinations.map { setup => @@ -260,7 +262,7 @@ object Benchmark { } /** Waits for the finish of the experiment. */ - def waitForFinish(timeoutInSeconds: Int) = { + def waitForFinish(timeoutInSeconds: Int): Unit = { Await.result(resultsFuture, timeoutInSeconds.seconds) } diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Query.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Query.scala index e4ffa9884..5eba308de 100644 --- a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Query.scala +++ b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Query.scala @@ -53,7 +53,7 @@ class Query( } } - lazy val tablesInvolved = buildDataFrame.queryExecution.logical collect { + lazy val tablesInvolved: Seq[String] = buildDataFrame.queryExecution.logical collect { case r: UnresolvedRelation => r.tableName } diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala index 407c3d1de..5e4b1c581 100644 --- a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala +++ b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala @@ -34,9 +34,11 @@ case class RunConfig( /** * Usage: *

- * Run following command to benchmark tpcds sf10 with exists database `tpcds_sf10`. + * Run following command to benchmark TPC-DS sf10 with exists database `tpcds_sf10`. * {{{ - * $SPARK_HOME/bin/spark-submit --class org.apache.kyuubi.tpcds.benchmark.RunBenchmark kyuubi-tpcds-*.jar --db tpcds_sf10 + * $SPARK_HOME/bin/spark-submit \ + * --class org.apache.kyuubi.tpcds.benchmark.RunBenchmark \ + * kyuubi-tpcds_*.jar --db tpcds_sf10 * }}} */ object RunBenchmark { @@ -61,10 +63,8 @@ object RunBenchmark { } parser.parse(args, RunConfig()) match { - case Some(config) => - run(config) - case None => - System.exit(1) + case Some(config) => run(config) + case None => sys.exit(1) } } @@ -95,7 +95,7 @@ object RunBenchmark { val experiment = benchmark.runExperiment( executionsToRun = allQueries, iterations = config.iterations, - tags = Map("host" -> InetAddress.getLocalHost().getHostName())) + tags = Map("host" -> InetAddress.getLocalHost.getHostName)) println("== STARTING EXPERIMENT ==") experiment.waitForFinish(1000 * 60 * 30) diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS.scala index e4904ef1b..ba25ec0f8 100644 --- a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS.scala +++ b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS.scala @@ -24,5 +24,5 @@ import org.apache.spark.sql.SparkSession */ class TPCDS(@transient sparkSession: SparkSession) extends Benchmark(sparkSession) - with Tpcds_2_4_Queries + with TPCDS_2_4_Queries with Serializable {} diff --git a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala index 7b06ca8d1..24cac4c67 100644 --- a/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala +++ b/dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala @@ -17,12 +17,14 @@ package org.apache.kyuubi.tpcds.benchmark +import java.nio.charset.StandardCharsets + import org.apache.commons.io.IOUtils /** * This implements the official TPCDS v2.4 queries with only cosmetic modifications. */ -trait Tpcds_2_4_Queries extends Benchmark { +trait TPCDS_2_4_Queries extends Benchmark { import ExecutionMode._ @@ -132,15 +134,17 @@ trait Tpcds_2_4_Queries extends Benchmark { "q99", "ss_max") - val tpcds2_4Queries = queryNames.map { queryName => + val tpcds2_4Queries: Seq[Query] = queryNames.map { queryName => val queryContent: String = IOUtils.toString( - getClass().getClassLoader().getResourceAsStream(s"tpcds_2_4/$queryName.sql")) + getClass.getClassLoader.getResourceAsStream(s"tpcds_2_4/$queryName.sql"), + StandardCharsets.UTF_8) Query( queryName + "-v2.4", queryContent, - description = "TPCDS 2.4 Query", + description = "TPC-DS 2.4 Query", executionMode = CollectResults) } - val tpcds2_4QueriesMap = tpcds2_4Queries.map(q => q.name.split("-").get(0) -> q).toMap + val tpcds2_4QueriesMap: Map[String, Query] = + tpcds2_4Queries.map(q => q.name.split("-").get(0) -> q).toMap }