[KYUUBI #1743] Fix parallelism of DataGenerator and other enhancements

### _Why are the changes needed?_

The parallelism of DataGenerator always is `spark.sparkContext.defaultParallelism`, it does not make sense for generating large scale data.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1743 from pan3793/tpcds.

Closes #1743

62f7c866 [Cheng Pan] nit
fdcf8329 [Cheng Pan] nit
a52ff489 [Cheng Pan] Fix parallelism of DataGenerator and other enhancements

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2022-01-13 11:41:22 +08:00
parent ab4184dbe3
commit 01f0ea8609
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
8 changed files with 97 additions and 98 deletions

View File

@ -16,57 +16,60 @@
-->
# Introduction
This module includes tpcds data generator and benchmark.
This module includes TPC-DS data generator and benchmark tool.
# How to use
package jar with following command:
`./build/mvn install -DskipTests -Ptpcds -pl dev/kyuubi-tpcds -am`
`./build/mvn clean package -Ptpcds -pl dev/kyuubi-tpcds -am`
## data generator
## Data Generator
Support options:
| key | default | description |
|-------------|---------|------------------------------|
| db | default | the databases to write data |
| scaleFactor | 1 | the scale factor of tpcds |
| key | default | description |
|--------------|-----------------|-----------------------------------|
| db | default | the database to write data |
| scaleFactor | 1 | the scale factor of TPC-DS |
| format | parquet | the format of table to store data |
| parallel | scaleFactor * 2 | the parallelism of Spark job |
Example: the following command to generate 10GB data with new database `tpcds_sf10`.
```shell
$SPARK_HOME/bin/spark-submit \
--class org.apache.kyuubi.tpcds.DataGenerator \
kyuubi-tpcds-*.jar --db tpcds_sf10 --scaleFactor 10
kyuubi-tpcds_*.jar \
--db tpcds_sf10 --scaleFactor 10 --format parquet --parallel 20
```
## do benchmark
## Benchmark Tool
Support options:
| key | default | description |
|------------|----------------------|--------------------------------------------------------|
| db | none(required) | the tpcds database |
| db | none(required) | the TPC-DS database |
| benchmark | tpcds-v2.4-benchmark | the name of application |
| iterations | 3 | the number of iterations to run |
| filter | a | filter on the name of the queries to run, e.g. q1-v2.4 |
Example: the following command to benchmark tpcds sf10 with exists database `tpcds_sf10`.
Example: the following command to benchmark TPC-DS sf10 with exists database `tpcds_sf10`.
```shell
$SPARK_HOME/bin/spark-submit \
--class org.apache.kyuubi.tpcds.benchmark.RunBenchmark \
kyuubi-tpcds-*.jar --db tpcds_sf10
kyuubi-tpcds_*.jar --db tpcds_sf10
```
We also support run one of the tpcds query:
We also support run one of the TPC-DS query:
```shell
$SPARK_HOME/bin/spark-submit \
--class org.apache.kyuubi.tpcds.benchmark.RunBenchmark \
kyuubi-tpcds-*.jar --db tpcds_sf10 --filter q1-v2.4
kyuubi-tpcds_*.jar --db tpcds_sf10 --filter q1-v2.4
```
The result of tpcds benchmark like:
The result of TPC-DS benchmark like:
| name | minTimeMs | maxTimeMs | avgTimeMs | stdDev | stdDevPercent |
|---------|-----------|-------------|------------|----------|----------------|

View File

@ -17,32 +17,29 @@
package org.apache.kyuubi.tpcds
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
case class RunConfig(
db: String = "default",
scaleFactor: Int = 1)
/**
* Usage:
* <p>
* Run following command to generate 10GB data with new database `tpcds_sf10`.
* {{{
* `$SPARK_HOME/bin/spark-submit \
* --conf spark.sql.tpcds.scale.factor=10 \
* --conf spark.sql.tpcds.database=tpcds_sf10 \
* $SPARK_HOME/bin/spark-submit \
* --class org.apache.kyuubi.tpcds.DataGenerator \
* kyuubi-tpcds-*.jar`
* kyuubi-tpcds_*.jar \
* --db tpcds_sf10 --scaleFactor 10 --format parquet --parallel 20
* }}}
*/
object DataGenerator {
private val logger =
LoggerFactory.getLogger(this.getClass.getSimpleName.stripSuffix("$"))
val SCALE_FACTOR_KEY = "spark.sql.tpcds.scale.factor"
val DATABASE_KEY = "spark.sql.tpcds.database"
case class Config(
db: String = "default",
scaleFactor: Int = 1,
format: String = "parquet",
parallel: Option[Int] = None)
private val logger = LoggerFactory.getLogger(this.getClass.getSimpleName.stripSuffix("$"))
def initTable(spark: SparkSession): Seq[TableGenerator] = {
import spark.implicits._
@ -597,52 +594,50 @@ object DataGenerator {
promotion)
}
def run(config: RunConfig): Unit = {
val conf = new SparkConf()
val db = conf.get(DATABASE_KEY, config.db)
val scaleFactor = conf.get(DataGenerator.SCALE_FACTOR_KEY, config.scaleFactor.toString).toInt
def run(config: Config): Unit = {
val spark = SparkSession.builder()
.appName(s"Kyuubi TPCDS Generation - ${scaleFactor}GB")
.config(conf)
.appName(s"Kyuubi TPC-DS Generation - ${config.scaleFactor}GB")
.enableHiveSupport()
.getOrCreate()
logger.info(s"Generating TPCDS tables under database $db")
spark.sql(s"CREATE DATABASE IF NOT EXISTS $db")
spark.sql(s"USE $db")
spark.sql(s"DESC DATABASE $db").show()
val parallel = spark.sparkContext.defaultParallelism
logger.info(s"Generating TPC-DS tables under database ${config.db}")
spark.sql(s"CREATE DATABASE IF NOT EXISTS ${config.db}")
spark.sql(s"USE ${config.db}")
spark.sql(s"DESC DATABASE ${config.db}").show()
val tpcdsTables = initTable(spark)
tpcdsTables.par.foreach { table =>
table.setScaleFactor(scaleFactor)
table.setParallelism(parallel)
table.setScaleFactor(config.scaleFactor)
table.setFormat(config.format)
config.parallel.foreach(table.setParallelism)
spark.sparkContext.setJobDescription(table.toString)
logger.info(s"$table")
logger.info(s"Generating $table")
table.create()
}
}
def main(args: Array[String]): Unit = {
val parser = new scopt.OptionParser[RunConfig]("tpcds-data-generator") {
head("tpcds-data-generator", "")
val parser = new scopt.OptionParser[Config]("tpcds-data-generator") {
head("Kyuubi TPC-DS Data Generator")
opt[String]('d', "db")
.action { (x, c) => c.copy(db = x) }
.text("the databases to write data")
.text("the database to write data")
opt[Int]('s', "scaleFactor")
.action { (x, c) => c.copy(scaleFactor = x) }
.text("the scale factor of tpcds")
help("help")
.text("the scale factor of TPC-DS")
opt[String]('f', "format")
.action { (x, c) => c.copy(format = x) }
.text("the format of table to store data")
opt[Int]('p', "parallel")
.action { (x, c) => c.copy(parallel = Some(x)) }
.text("the parallelism of Spark job")
help('h', "help")
.text("prints this usage text")
}
parser.parse(args, RunConfig()) match {
case Some(config) =>
run(config)
case None =>
System.exit(1)
parser.parse(args, Config()) match {
case Some(config) => run(config)
case None => sys.exit(1)
}
}
}

View File

@ -40,15 +40,18 @@ case class TableGenerator(
private val rawSchema: StructType = StructType(fields.map(f => StructField(f.name, StringType)))
private var scaleFactor: Int = 1
def setScaleFactor(scale: Int): Unit = this.scaleFactor = scale
private var parallelism: Int = scaleFactor * 2
private var _parallelism: Option[Int] = None
private def parallelism: Int = _parallelism.getOrElse(scaleFactor * 2)
def setParallelism(parallel: Int): Unit = this._parallelism = Some(parallel max 2)
private val ss: SparkSession = SparkSession.active
private val format: String = ss.conf.get("spark.sql.sources.default", "parquet")
private var _format: Option[String] = None
private def format: String = _format.getOrElse(ss.conf.get("spark.sql.sources.default"))
def setFormat(format: String): Unit = this._format = Some(format)
private def radix: Int = {
math.min(math.max(5, scaleFactor / 100), parallelism)
}
private def radix: Int = (scaleFactor / 100) max 5 min parallelism
private def toDF: DataFrame = {
val rawRDD = ss.sparkContext.parallelize(1 to parallelism, parallelism).flatMap { i =>
@ -101,8 +104,8 @@ case class TableGenerator(
val data = Paths.get(tempDir.toString, s"${name}_${i}_$parallelism.dat")
val iterator =
if (Files.exists(data)) {
// ... realized that when opening the dat files I should use the Cp1252 encoding.
// https://github.com/databricks/spark-sql-perf/pull/104
// The data generated by `dsdgen` encoding in "Cp1252".
// See detail at https://github.com/databricks/spark-sql-perf/pull/104
// noinspection SourceNotClosed
Source.fromFile(data.toFile, "cp1252", 8192).getLines
} else {
@ -123,14 +126,6 @@ case class TableGenerator(
ss.createDataFrame(rowRDD, rawSchema).select(columns: _*)
}
def setScaleFactor(scale: Int): Unit = {
this.scaleFactor = scale
}
def setParallelism(parallel: Int): Unit = {
this.parallelism = math.max(2, parallel)
}
def create(): Unit = {
val data =
if (partitionCols.isEmpty) {

View File

@ -36,7 +36,7 @@ abstract class Benchmark(
import Benchmark._
val resultsLocation =
val resultsLocation: String =
sparkSession.conf.get(
"spark.sql.perf.results",
"/spark/sql/performance")
@ -45,15 +45,16 @@ abstract class Benchmark(
implicit protected def toOption[A](a: A): Option[A] = Option(a)
val buildInfo = Try(getClass.getClassLoader.loadClass("org.apache.spark.BuildInfo")).map { cls =>
cls.getMethods
.filter(_.getReturnType == classOf[String])
.filterNot(_.getName == "toString")
.map(m => m.getName -> m.invoke(cls).asInstanceOf[String])
.toMap
}.getOrElse(Map.empty)
val buildInfo: Map[String, String] =
Try(getClass.getClassLoader.loadClass("org.apache.spark.BuildInfo")).map { cls =>
cls.getMethods
.filter(_.getReturnType == classOf[String])
.filterNot(_.getName == "toString")
.map(m => m.getName -> m.invoke(cls).asInstanceOf[String])
.toMap
}.getOrElse(Map.empty)
def currentConfiguration = BenchmarkConfiguration(
def currentConfiguration: BenchmarkConfiguration = BenchmarkConfiguration(
sqlConf = sparkSession.conf.getAll,
sparkConf = sparkContext.getConf.getAll.toMap,
defaultParallelism = sparkContext.defaultParallelism,
@ -82,7 +83,7 @@ abstract class Benchmark(
tags: Map[String, String] = Map.empty,
timeout: Long = 0L,
resultLocation: String = resultsLocation,
forkThread: Boolean = true) = {
forkThread: Boolean = true): ExperimentStatus = {
new ExperimentStatus(
executionsToRun,
@ -150,7 +151,7 @@ object Benchmark {
val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]()
val currentMessages = new collection.mutable.ArrayBuffer[String]()
def logMessage(msg: String) = {
def logMessage(msg: String): Unit = {
println(msg)
currentMessages += msg
}
@ -170,10 +171,11 @@ object Benchmark {
case h :: t => for (xh <- h; xt <- cartesianProduct(t)) yield xh :: xt
}
val timestamp = System.currentTimeMillis()
val timestamp: Long = System.currentTimeMillis()
val resultPath = s"$resultsLocation/timestamp=$timestamp"
val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList)
val resultsFuture = Future {
val combinations: Seq[List[Int]] =
cartesianProduct(variations.map(l => l.options.indices.toList).toList)
val resultsFuture: Future[Unit] = Future {
// Run the benchmarks!
val results: Seq[ExperimentRun] = (1 to iterations).flatMap { i =>
combinations.map { setup =>
@ -260,7 +262,7 @@ object Benchmark {
}
/** Waits for the finish of the experiment. */
def waitForFinish(timeoutInSeconds: Int) = {
def waitForFinish(timeoutInSeconds: Int): Unit = {
Await.result(resultsFuture, timeoutInSeconds.seconds)
}

View File

@ -53,7 +53,7 @@ class Query(
}
}
lazy val tablesInvolved = buildDataFrame.queryExecution.logical collect {
lazy val tablesInvolved: Seq[String] = buildDataFrame.queryExecution.logical collect {
case r: UnresolvedRelation => r.tableName
}

View File

@ -34,9 +34,11 @@ case class RunConfig(
/**
* Usage:
* <p>
* Run following command to benchmark tpcds sf10 with exists database `tpcds_sf10`.
* Run following command to benchmark TPC-DS sf10 with exists database `tpcds_sf10`.
* {{{
* $SPARK_HOME/bin/spark-submit --class org.apache.kyuubi.tpcds.benchmark.RunBenchmark kyuubi-tpcds-*.jar --db tpcds_sf10
* $SPARK_HOME/bin/spark-submit \
* --class org.apache.kyuubi.tpcds.benchmark.RunBenchmark \
* kyuubi-tpcds_*.jar --db tpcds_sf10
* }}}
*/
object RunBenchmark {
@ -61,10 +63,8 @@ object RunBenchmark {
}
parser.parse(args, RunConfig()) match {
case Some(config) =>
run(config)
case None =>
System.exit(1)
case Some(config) => run(config)
case None => sys.exit(1)
}
}
@ -95,7 +95,7 @@ object RunBenchmark {
val experiment = benchmark.runExperiment(
executionsToRun = allQueries,
iterations = config.iterations,
tags = Map("host" -> InetAddress.getLocalHost().getHostName()))
tags = Map("host" -> InetAddress.getLocalHost.getHostName))
println("== STARTING EXPERIMENT ==")
experiment.waitForFinish(1000 * 60 * 30)

View File

@ -24,5 +24,5 @@ import org.apache.spark.sql.SparkSession
*/
class TPCDS(@transient sparkSession: SparkSession)
extends Benchmark(sparkSession)
with Tpcds_2_4_Queries
with TPCDS_2_4_Queries
with Serializable {}

View File

@ -17,12 +17,14 @@
package org.apache.kyuubi.tpcds.benchmark
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
/**
* This implements the official TPCDS v2.4 queries with only cosmetic modifications.
*/
trait Tpcds_2_4_Queries extends Benchmark {
trait TPCDS_2_4_Queries extends Benchmark {
import ExecutionMode._
@ -132,15 +134,17 @@ trait Tpcds_2_4_Queries extends Benchmark {
"q99",
"ss_max")
val tpcds2_4Queries = queryNames.map { queryName =>
val tpcds2_4Queries: Seq[Query] = queryNames.map { queryName =>
val queryContent: String = IOUtils.toString(
getClass().getClassLoader().getResourceAsStream(s"tpcds_2_4/$queryName.sql"))
getClass.getClassLoader.getResourceAsStream(s"tpcds_2_4/$queryName.sql"),
StandardCharsets.UTF_8)
Query(
queryName + "-v2.4",
queryContent,
description = "TPCDS 2.4 Query",
description = "TPC-DS 2.4 Query",
executionMode = CollectResults)
}
val tpcds2_4QueriesMap = tpcds2_4Queries.map(q => q.name.split("-").get(0) -> q).toMap
val tpcds2_4QueriesMap: Map[String, Query] =
tpcds2_4Queries.map(q => q.name.split("-").get(0) -> q).toMap
}