diff --git a/README.md b/README.md index c188a55..94308f2 100644 --- a/README.md +++ b/README.md @@ -1,55 +1,34 @@ # Spark SQL Performance Tests -This is a performance testing framework for [Spark SQL](https://spark.apache.org/sql/) in [Apache Spark](https://spark.apache.org/) 1.3+. +This is a performance testing framework for [Spark SQL](https://spark.apache.org/sql/) in [Apache Spark](https://spark.apache.org/) 1.4+. **Note: This README is still under development. Please also check our source code for more information.** ## How to use it The rest of document will use TPC-DS benchmark as an example. We will add contents to explain how to use other benchmarks add the support of a new benchmark dataset in future. -### Setup a dataset -Before running any query, a dataset needs to be setup by creating a `Dataset` object. Every benchmark support in Spark SQL Perf needs to implement its own `Dataset` class. A `Dataset` object takes a few parameters that will be used to setup the needed tables and its `setup` function is used to setup needed tables. For TPC-DS benchmark, the class is `TPCDS` in the package of `com.databricks.spark.sql.perf.tpcds`. For example, to setup a TPC-DS dataset, you can +### Setup a benchmark +Before running any query, a dataset needs to be setup by creating a `Benchmark` object. ``` import org.apache.spark.sql.parquet.Tables // Tables in TPC-DS benchmark used by experiments. val tables = Tables(sqlContext) // Setup TPC-DS experiment -val tpcds = - new TPCDS ( - sqlContext = sqlContext, - sparkVersion = "1.3.1", - dataLocation = , - dsdgenDir = , - tables = tables.tables, - scaleFactor = ) +val tpcds = new TPCDS (sqlContext = sqlContext) ``` -After a `TPCDS` object is created, tables of it can be setup by calling - -``` -tpcds.setup() -``` - -The `setup` function will first check if needed tables are stored at the location specified by `dataLocation`. If not, it will creates tables at there by using the data generator tool `dsdgen` provided by TPC-DS benchmark (This tool needs to be pre-installed at the location specified by `dsdgenDir` in every worker). - ### Run benchmarking queries After setup, users can use `runExperiment` function to run benchmarking queries and record query execution time. Taking TPC-DS as an example, you can start an experiment by using ``` -tpcds.runExperiment( - queries = , - resultsLocation = , - includeBreakdown = , - iterations = , - variations = , - tags = ) +val experiment = tpcds.runExperiment(queriesToRun = tpcds.interactiveQueries) ``` For every experiment run (i.e.\ every call of `runExperiment`), Spark SQL Perf will use the timestamp of the start time to identify this experiment. Performance results will be stored in the sub-dir named by the timestamp in the given `resultsLocation` (for example `results/1429213883272`). The performance results are stored in the JSON format. ### Retrieve results -The follow code can be used to retrieve results ... +While the experiment is running you can use `experiment.html` to list the status. Once the experiment is complete, the results will be saved to the table sqlPerformance in json. ``` // Get experiments results. diff --git a/build.sbt b/build.sbt index 3d224cd..cbc582a 100644 --- a/build.sbt +++ b/build.sbt @@ -3,22 +3,13 @@ scalaVersion := "2.10.4" -sparkVersion := "1.3.0" - sparkPackageName := "databricks/spark-sql-perf" -// Don't forget to set the version -version := "0.0.1-SNAPSHOT" +version := "0.0.4-SNAPSHOT" // All Spark Packages need a license licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")) +sparkVersion := "1.4.0" -// Add Spark components this package depends on, e.g, "mllib", .... -sparkComponents ++= Seq("sql", "hive") - -// uncomment and change the value below to change the directory where your zip artifact will be created -// spDistDirectory := target.value - -// add any sparkPackageDependencies using sparkPackageDependencies. -// e.g. sparkPackageDependencies += "databricks/spark-avro:0.1" +sparkComponents ++= Seq("sql", "hive") \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala new file mode 100644 index 0000000..f27c596 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala @@ -0,0 +1,67 @@ +package com.databricks.spark.sql.perf + +import org.apache.spark.sql.{Row, SQLContext} + +trait AggregationPerformance extends Benchmark { + + import sqlContext.implicits._ + import ExecutionMode._ + + + val sizes = (1 to 6).map(math.pow(10, _).toInt).toSeq + + val variousCardinality = sizes.map { size => + Table(s"ints$size", + sparkContext.parallelize(1 to size).flatMap { group => + (1 to 10000).map(i => (group, i)) + }.toDF("a", "b")) + } + + val lowCardinality = sizes.map { size => + val fullSize = size * 10000L + Table( + s"twoGroups$fullSize", + sqlContext.range(0, fullSize).select($"id" % 2 as 'a, $"id" as 'b)) + } + + val newAggreation = Variation("aggregationType", Seq("new", "old")) { + case "old" => sqlContext.setConf("spark.sql.useAggregate2", "false") + case "new" => sqlContext.setConf("spark.sql.useAggregate2", "true") + } + + val varyNumGroupsAvg: Seq[Query] = variousCardinality.map(_.name).map { table => + Query( + s"avg-$table", + s"SELECT AVG(b) FROM $table GROUP BY a", + "an average with a varying number of groups", + executionMode = ForeachResults) + } + + val twoGroupsAvg: Seq[Query] = lowCardinality.map(_.name).map { table => + Query( + s"avg-$table", + s"SELECT AVG(b) FROM $table GROUP BY a", + "an average on an int column with only two groups", + executionMode = ForeachResults) + } + + val complexInput = + Seq("1milints", "100milints", "1bilints").map { table => + Query( + s"aggregation-complex-input-$table", + s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table", + "Sum of 9 columns added together", + executionMode = CollectResults) + } + + val aggregates = + Seq("1milints", "100milints", "1bilints").flatMap { table => + Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg => + Query( + s"single-aggregate-$agg-$table", + s"SELECT $agg(id) FROM $table", + "aggregation of a single column", + executionMode = CollectResults) + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala new file mode 100644 index 0000000..5b1119c --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -0,0 +1,463 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf + +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation + +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} + +import org.apache.spark.sql.catalyst.plans.logical.Subquery + +/** + * A collection of queries that test a particular aspect of Spark SQL. + * + * @param sqlContext An existing SQLContext. + */ +abstract class Benchmark(@transient protected val sqlContext: SQLContext) + extends Serializable { + + import sqlContext.implicits._ + + val resultsLocation = "/spark/sql/performance" + val resultsTableName = "sqlPerformance" + + def createResultsTable() = { + sqlContext.sql(s"DROP TABLE $resultsTableName") + sqlContext.createExternalTable( + "sqlPerformance", "json", Map("path" -> (resultsLocation + "/*/"))) + } + + protected def sparkContext = sqlContext.sparkContext + + protected implicit def toOption[A](a: A) = Option(a) + + def currentConfiguration = BenchmarkConfiguration( + sqlConf = sqlContext.getAllConfs, + sparkConf = sparkContext.getConf.getAll.toMap, + defaultParallelism = sparkContext.defaultParallelism) + + /** + * A Variation represents a setting (e.g. the number of shuffle partitions or if tables + * are cached in memory) that we want to change in a experiment run. + * A Variation has three parts, `name`, `options`, and `setup`. + * The `name` is the identifier of a Variation. `options` is a Seq of options that + * will be used for a query. Basically, a query will be executed with every option + * defined in the list of `options`. `setup` defines the needed action for every + * option. For example, the following Variation is used to change the number of shuffle + * partitions of a query. The name of the Variation is "shufflePartitions". There are + * two options, 200 and 2000. The setup is used to set the value of property + * "spark.sql.shuffle.partitions". + * + * {{{ + * Variation("shufflePartitions", Seq("200", "2000")) { + * case num => sqlContext.setConf("spark.sql.shuffle.partitions", num) + * } + * }}} + */ + case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit) + + val codegen = Variation("codegen", Seq("on", "off")) { + case "off" => sqlContext.setConf("spark.sql.codegen", "false") + case "on" => sqlContext.setConf("spark.sql.codegen", "true") + } + + val unsafe = Variation("unsafe", Seq("on", "off")) { + case "off" => sqlContext.setConf("spark.sql.unsafe.enabled", "false") + case "on" => sqlContext.setConf("spark.sql.unsafe.enabled", "true") + } + + /** + * Starts an experiment run with a given set of queries. + * @param queriesToRun a list of queries to be executed. + * @param includeBreakdown If it is true, breakdown results of a query will be recorded. + * Setting it to true may significantly increase the time used to + * execute a query. + * @param iterations The number of iterations to run of each query. + * @param variations [[Variation]]s used in this run. The cross product of all variations will be + * run for each query * iteration. + * @param tags Tags of this run. + * @return It returns a ExperimentStatus object that can be used to + * track the progress of this experiment run. + */ + def runExperiment( + queriesToRun: Seq[Query], + includeBreakdown: Boolean = false, + iterations: Int = 3, + variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("true")) { _ => {} }), + tags: Map[String, String] = Map.empty) = { + + class ExperimentStatus { + val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]() + val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]() + val currentMessages = new collection.mutable.ArrayBuffer[String]() + + // Stats for HTML status message. + @volatile var currentQuery = "" + @volatile var currentPlan = "" + @volatile var currentConfig = "" + @volatile var failures = 0 + @volatile var startTime = 0L + + def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match { + case Nil => List(Nil) + case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt + } + + val timestamp = System.currentTimeMillis() + val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList) + val resultsFuture = Future { + queriesToRun.flatMap { query => + query.newDataFrame().queryExecution.logical.collect { + case UnresolvedRelation(Seq(name), _) => name + } + }.distinct.foreach { name => + try { + sqlContext.table(name) + currentMessages += s"Table $name exists." + } catch { + case ae: AnalysisException => + val table = allTables + .find(_.name == name) + .getOrElse(sys.error(s"Couldn't read table $name and its not defined as a Benchmark.Table.")) + + currentMessages += s"Creating table: $name" + table.data + .write + .mode("overwrite") + .saveAsTable(name) + } + } + + + val results = (1 to iterations).flatMap { i => + combinations.map { setup => + val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map { + case (v, idx) => + v.setup(v.options(idx)) + v.name -> v.options(idx).toString + } + currentConfig = currentOptions.map { case (k,v) => s"$k: $v" }.mkString(", ") + + val result = ExperimentRun( + timestamp = timestamp, + iteration = i, + tags = currentOptions.toMap ++ tags, + configuration = currentConfiguration, + queriesToRun.flatMap { q => + val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}" + currentMessages += s"Running query ${q.name} $setup" + + currentQuery = q.name + currentPlan = q.newDataFrame().queryExecution.executedPlan.toString + startTime = System.currentTimeMillis() + + val singleResult = q.benchmark(includeBreakdown, setup) + singleResult.failure.foreach { f => + failures += 1 + currentMessages += s"Query '${q.name}' failed: ${f.message}" + } + singleResult.executionTime.foreach(time => + currentMessages += s"Exec time: ${time / 1000}s") + currentResults += singleResult + singleResult :: Nil + }) + currentRuns += result + + result + } + } + + try { + val resultsTable = sqlContext.createDataFrame(results) + currentMessages += s"Results written to table: 'sqlPerformance' at $resultsLocation/$timestamp" + results.toDF().write + .format("json") + .save(s"$resultsLocation/$timestamp") + + results.toDF() + } catch { + case e: Throwable => currentMessages += s"Failed to write data: $e" + } + } + + /** Waits for the finish of the experiment. */ + def waitForFinish(timeoutInSeconds: Int) = { + Await.result(resultsFuture, timeoutInSeconds.seconds) + } + + /** Returns results from an actively running experiment. */ + def getCurrentResults() = { + val tbl = sqlContext.createDataFrame(currentResults) + tbl.registerTempTable("currentResults") + tbl + } + + /** Returns full iterations from an actively running experiment. */ + def getCurrentRuns() = { + val tbl = sqlContext.createDataFrame(currentRuns) + tbl.registerTempTable("currentRuns") + tbl + } + + def tail(n: Int = 20) = { + currentMessages.takeRight(n).mkString("\n") + } + + def status = + if (resultsFuture.isCompleted) { + if (resultsFuture.value.get.isFailure) "Failed" else "Successful" + } else { + "Running" + } + + override def toString = + s"""Permalink: table("sqlPerformance").where('timestamp === ${timestamp}L)""" + + + def html = + s""" + |

$status Experiment

+ |Permalink: table("$resultsTableName").where('timestamp === ${timestamp}L)
+ |Iterations complete: ${currentRuns.size / combinations.size} / $iterations
+ |Failures: $failures
+ |Queries run: ${currentResults.size} / ${iterations * combinations.size * queriesToRun.size}
+ |Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s
+ | + |

Current Query: $currentQuery

+ |Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s
+ |$currentConfig
+ |

QueryPlan

+ |
+           |${currentPlan.replaceAll("\n", "
")} + |
+ | + |

Logs

+ |
+           |${tail()}
+           |
+ """.stripMargin + } + new ExperimentStatus + } + + case class Table( + name: String, + data: DataFrame) + + import reflect.runtime._, universe._ + import reflect.runtime._ + import universe._ + + private val runtimeMirror = universe.runtimeMirror(getClass.getClassLoader) + val myType = runtimeMirror.classSymbol(getClass).toType + + def singleTables = + myType.declarations + .filter(m => m.isMethod) + .map(_.asMethod) + .filter(_.asMethod.returnType =:= typeOf[Table]) + .map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Table]) + + def groupedTables = + myType.declarations + .filter(m => m.isMethod) + .map(_.asMethod) + .filter(_.asMethod.returnType =:= typeOf[Seq[Table]]) + .flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Table]]) + + lazy val allTables: Seq[Table] = (singleTables ++ groupedTables).toSeq + + def singleQueries = + myType.declarations + .filter(m => m.isMethod) + .map(_.asMethod) + .filter(_.asMethod.returnType =:= typeOf[Query]) + .map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Query]) + + def groupedQueries = + myType.declarations + .filter(m => m.isMethod) + .map(_.asMethod) + .filter(_.asMethod.returnType =:= typeOf[Seq[Query]]) + .flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Query]]) + + lazy val allQueries = (singleQueries ++ groupedQueries).toSeq + + def html: String = { + val singleQueries = + myType.declarations + .filter(m => m.isMethod) + .map(_.asMethod) + .filter(_.asMethod.returnType =:= typeOf[Query]) + .map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Query]) + .mkString(",") + val queries = + myType.declarations + .filter(m => m.isMethod) + .map(_.asMethod) + .filter(_.asMethod.returnType =:= typeOf[Seq[Query]]) + .map { method => + val queries = runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Query]] + val queryList = queries.map(_.name).mkString(", ") + s""" + |

${method.name}

+ |
    $queryList
+ """.stripMargin + }.mkString("\n") + + s""" + |

Spark SQL Performance Benchmarking

+ |

Available Queries

+ |$singleQueries + |$queries + """.stripMargin + } + + trait ExecutionMode + object ExecutionMode { + // Benchmark run by collecting queries results (e.g. rdd.collect()) + case object CollectResults extends ExecutionMode + + // Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit)) + case object ForeachResults extends ExecutionMode + + // Benchmark run by saving the output of each query as a parquet file at the specified location + case class WriteParquet(location: String) extends ExecutionMode + } + + /** Factory object for benchmark queries. */ + object Query { + def apply( + name: String, + sqlText: String, + description: String, + executionMode: ExecutionMode = ExecutionMode.ForeachResults): Query = { + new Query(name, sqlContext.sql(sqlText), description, Some(sqlText), executionMode) + } + + def apply( + name: String, + dataFrameBuilder: => DataFrame, + description: String): Query = { + new Query(name, dataFrameBuilder, description, None, ExecutionMode.CollectResults) + } + } + + /** Holds one benchmark query and its metadata. */ + class Query( + val name: String, + buildDataFrame: => DataFrame, + val description: String, + val sqlText: Option[String], + val executionMode: ExecutionMode) { + + override def toString = + s""" + |== Query: $name == + |${buildDataFrame.queryExecution.analyzed} + """.stripMargin + + lazy val tablesInvolved = buildDataFrame.queryExecution.logical collect { + case UnresolvedRelation(tableIdentifier, _) => { + // We are ignoring the database name. + tableIdentifier.last + } + } + + def newDataFrame() = buildDataFrame + + def benchmarkMs[A](f: => A): Double = { + val startTime = System.nanoTime() + val ret = f + val endTime = System.nanoTime() + (endTime - startTime).toDouble / 1000000 + } + + def benchmark(includeBreakdown: Boolean, description: String = "") = { + try { + val dataFrame = buildDataFrame + sparkContext.setJobDescription(s"Query: $name, $description") + val queryExecution = dataFrame.queryExecution + // We are not counting the time of ScalaReflection.convertRowToScala. + val parsingTime = benchmarkMs { + queryExecution.logical + } + val analysisTime = benchmarkMs { + queryExecution.analyzed + } + val optimizationTime = benchmarkMs { + queryExecution.optimizedPlan + } + val planningTime = benchmarkMs { + queryExecution.executedPlan + } + + val breakdownResults = if (includeBreakdown) { + val depth = queryExecution.executedPlan.treeString.split("\n").size + val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i))) + physicalOperators.map { + case (index, node) => + val executionTime = benchmarkMs { + node.execute().map(_.copy()).foreach(row => Unit) + } + BreakdownResult(node.nodeName, node.simpleString, index, executionTime) + } + } else { + Seq.empty[BreakdownResult] + } + + // The executionTime for the entire query includes the time of type conversion from catalyst to scala. + // The executionTime for the entire query includes the time of type conversion + // from catalyst to scala. + val executionTime = benchmarkMs { + executionMode match { + case ExecutionMode.CollectResults => dataFrame.rdd.collect() + case ExecutionMode.ForeachResults => dataFrame.rdd.foreach { row => Unit } + case ExecutionMode.WriteParquet(location) => dataFrame.saveAsParquetFile(s"$location/$name.parquet") + } + } + + val joinTypes = dataFrame.queryExecution.executedPlan.collect { + case k if k.nodeName contains "Join" => k.nodeName + } + + BenchmarkResult( + name = name, + joinTypes = joinTypes, + tables = tablesInvolved, + parsingTime = parsingTime, + analysisTime = analysisTime, + optimizationTime = optimizationTime, + planningTime = planningTime, + executionTime = executionTime, + queryExecution = dataFrame.queryExecution.toString, + breakDown = breakdownResults) + } catch { + case e: Exception => + BenchmarkResult( + name = name, + failure = Failure(e.getClass.getName, e.getMessage)) + } + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala new file mode 100644 index 0000000..c7705a2 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala @@ -0,0 +1,45 @@ +package com.databricks.spark.sql.perf + +import org.apache.spark.sql.SQLContext + +trait JoinPerformance extends Benchmark { + // 1.5 mb, 1 file + + import ExecutionMode._ + + val x = Table( + "1milints", + sqlContext.range(0, 1000000) + .repartition(1)) + + val joinTables = Seq( + // 143.542mb, 10 files + Table( + "1bilints", + sqlContext.range(0, 100000000) + .repartition(10)), + + // 1.4348gb, 10 files + Table( + "1bilints", + sqlContext.range(0, 1000000000) + .repartition(10)) + ) + + val sortMergeJoin = Variation("sortMergeJoin", Seq("on", "off")) { + case "off" => sqlContext.setConf("spark.sql.planner.sortMergeJoin", "false") + case "on" => sqlContext.setConf("spark.sql.planner.sortMergeJoin", "true") + } + + val singleKeyJoins = Seq("1milints", "100milints", "1bilints").flatMap { table1 => + Seq("1milints", "100milints", "1bilints").flatMap { table2 => + Seq("JOIN", "RIGHT JOIN", "LEFT JOIN", "FULL OUTER JOIN").map { join => + Query( + s"singleKey-$join-$table1-$table2", + s"SELECT COUNT(*) FROM $table1 a $join $table2 b ON a.id = b.id", + "equi-inner join a small table with a big table using a single key.", + executionMode = CollectResults) + } + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala index 16d0b1a..e69de29 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala @@ -1,46 +0,0 @@ -/* - * Copyright 2015 Databricks Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.perf.bigdata - -import com.databricks.spark.sql.perf._ -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.parquet.TPCDSTableForTest -import org.apache.spark.sql.{Column, SQLContext} - -class BigData ( - @transient sqlContext: SQLContext, - sparkVersion: String, - dataLocation: String, - tables: Seq[Table], - scaleFactor: String) - extends Dataset( - sqlContext, - sparkVersion, - dataLocation, - tables, - scaleFactor) with Serializable { - import sqlContext._ - import sqlContext.implicits._ - - override val datasetName = "bigDataBenchmark" - - override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = { - tables.map(table => - BigDataTableForTest(table, dataLocation, scaleFactor, sqlContext)) - } -} - diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala index 5a639ba..b9fb016 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala @@ -16,10 +16,12 @@ package com.databricks.spark.sql.perf.bigdata -import com.databricks.spark.sql.perf.ExecutionMode.ForeachResults -import com.databricks.spark.sql.perf.Query +import com.databricks.spark.sql.perf.Benchmark + +trait Queries extends Benchmark { + + import ExecutionMode._ -object Queries { val queries1to3 = Seq( Query( name = "q1A", diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala index 8301f59..e69de29 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala @@ -1,79 +0,0 @@ -/* - * Copyright 2015 Databricks Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.perf.bigdata - -// This is a hack until parquet has better support for partitioning. - -import java.text.SimpleDateFormat -import java.util.Date - -import com.databricks.spark.sql.perf._ -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} -import org.apache.hadoop.mapreduce.{Job, OutputCommitter, RecordWriter, TaskAttemptContext} -import org.apache.spark.SerializableWritable -import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.sql.{Column, ColumnName, SQLContext} -import parquet.hadoop.ParquetOutputFormat -import parquet.hadoop.util.ContextUtil - -import scala.sys.process._ - -case class BigDataTableForTest( - table: Table, - baseDir: String, - scaleFactor: String, - @transient sqlContext: SQLContext) - extends TableForTest(table, baseDir, sqlContext) with Serializable { - - @transient val sparkContext = sqlContext.sparkContext - - override def generate(): Unit = - throw new UnsupportedOperationException( - "Generate data for BigDataBenchmark has not been implemented") -} - -case class Tables(sqlContext: SQLContext) { - import sqlContext.implicits._ - - val tables = Seq( - Table("rankings", - UnpartitionedTable, - 'pageURL .string, - 'pageRank .int, - 'avgDuration .int), - - Table("uservisits", - UnpartitionedTable, - 'sourceIP .string, - 'destURL .string, - 'visitDate .string, - 'adRevenue .double, - 'userAgent .string, - 'countryCode .string, - 'languageCode .string, - 'searchWord .string, - 'duration .int), - - Table("documents", - UnpartitionedTable, - 'line .string) - ) -} diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala deleted file mode 100644 index 5a469cc..0000000 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright 2015 Databricks Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.perf - -import com.databricks.spark.sql.perf.ExecutionMode.{WriteParquet, ForeachResults, CollectResults} -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation - -trait ExecutionMode -object ExecutionMode { - // Benchmark run by collecting queries results (e.g. rdd.collect()) - case object CollectResults extends ExecutionMode - - // Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit)) - case object ForeachResults extends ExecutionMode - - // Benchmark run by saving the output of each query as a parquet file at the specified location - case class WriteParquet(location: String) extends ExecutionMode -} - -case class Query(name: String, sqlText: String, description: String, executionMode: ExecutionMode) - -case class QueryForTest( - query: Query, - includeBreakdown: Boolean, - @transient sqlContext: SQLContext) { - @transient val sparkContext = sqlContext.sparkContext - - val name = query.name - - def benchmarkMs[A](f: => A): Double = { - val startTime = System.nanoTime() - val ret = f - val endTime = System.nanoTime() - (endTime - startTime).toDouble / 1000000 - } - - def benchmark(description: String = "") = { - try { - sparkContext.setJobDescription(s"Query: ${query.name}, $description") - val dataFrame = sqlContext.sql(query.sqlText) - val queryExecution = dataFrame.queryExecution - // We are not counting the time of ScalaReflection.convertRowToScala. - val parsingTime = benchmarkMs { queryExecution.logical } - val analysisTime = benchmarkMs { queryExecution.analyzed } - val optimizationTime = benchmarkMs { queryExecution.optimizedPlan } - val planningTime = benchmarkMs { queryExecution.executedPlan } - - val breakdownResults = if (includeBreakdown) { - val depth = queryExecution.executedPlan.treeString.split("\n").size - val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i))) - physicalOperators.map { - case (index, node) => - val executionTime = benchmarkMs { node.execute().map(_.copy()).foreach(row => Unit) } - BreakdownResult(node.nodeName, node.simpleString, index, executionTime) - } - } else { - Seq.empty[BreakdownResult] - } - - // The executionTime for the entire query includes the time of type conversion - // from catalyst to scala. - val executionTime = benchmarkMs { - query.executionMode match { - case CollectResults => dataFrame.rdd.collect() - case ForeachResults => dataFrame.rdd.foreach { row => Unit } - case WriteParquet(location) => dataFrame.saveAsParquetFile(s"$location/$name.parquet") - } - } - - val joinTypes = dataFrame.queryExecution.executedPlan.collect { - case k if k.nodeName contains "Join" => k.nodeName - } - - val tablesInvolved = dataFrame.queryExecution.logical collect { - case UnresolvedRelation(tableIdentifier, _) => { - // We are ignoring the database name. - tableIdentifier.last - } - } - - BenchmarkResult( - name = query.name, - joinTypes = joinTypes, - tables = tablesInvolved, - parsingTime = parsingTime, - analysisTime = analysisTime, - optimizationTime = optimizationTime, - planningTime = planningTime, - executionTime = executionTime, - breakdownResults) - } catch { - case e: Exception => - throw new RuntimeException( - s"Failed to benchmark query ${query.name}", e) - } - } -} diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala new file mode 100644 index 0000000..7d31ed2 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -0,0 +1,88 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf + +/** + * The performance results of all given queries for a single iteration. + * @param timestamp The timestamp indicates when the entire experiment is started. + * @param iteration The index number of the current iteration. + * @param tags Tags of this iteration (variations are stored at here). + * @param configuration Configuration properties of this iteration. + * @param results The performance results of queries for this iteration. + */ +case class ExperimentRun( + timestamp: Long, + iteration: Int, + tags: Map[String, String], + configuration: BenchmarkConfiguration, + results: Seq[BenchmarkResult]) + +/** + * The configuration used for an iteration of an experiment. + * @param sparkVersion The version of Spark. + * @param sqlConf All configuration properties related to Spark SQL. + * @param sparkConf All configuration properties of Spark. + * @param defaultParallelism The default parallelism of the cluster. + * Usually, it is the number of cores of the cluster. + */ +case class BenchmarkConfiguration( + sparkVersion: String = org.apache.spark.SPARK_VERSION, + sqlConf: Map[String, String], + sparkConf: Map[String,String], + defaultParallelism: Int) + +/** + * The result of a query. + * @param name The name of the query. + * @param joinTypes The type of join operations in the query. + * @param tables The tables involved in the query. + * @param parsingTime The time used to parse the query. + * @param analysisTime The time used to analyze the query. + * @param optimizationTime The time used to optimize the query. + * @param planningTime The time used to plan the query. + * @param executionTime The time used to execute the query. + * @param breakDown The breakdown results of the query plan tree. + */ +case class BenchmarkResult( + name: String, + joinTypes: Seq[String] = Nil, + tables: Seq[String] = Nil, + parsingTime: Option[Double] = None, + analysisTime: Option[Double] = None, + optimizationTime: Option[Double] = None, + planningTime: Option[Double] = None, + executionTime: Option[Double] = None, + breakDown: Seq[BreakdownResult] = Nil, + queryExecution: Option[String] = None, + failure: Option[Failure] = None) + +/** + * The execution time of a subtree of the query plan tree of a specific query. + * @param nodeName The name of the top physical operator of the subtree. + * @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree. + * @param index The index of the top physical operator of the subtree + * in the original query plan tree. The index starts from 0 + * (0 represents the top physical operator of the original query plan tree). + * @param executionTime The execution time of the subtree. + */ +case class BreakdownResult( + nodeName: String, + nodeNameWithArgs: String, + index: Int, + executionTime: Double) + +case class Failure(className: String, message: String) \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala deleted file mode 100644 index 7a3c7be..0000000 --- a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Copyright 2015 Databricks Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.perf - -import scala.concurrent._ -import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.spark.sql.SQLContext - -/** - * The configuration used for an iteration of an experiment. - * @param sparkVersion The version of Spark. - * @param scaleFactor The scale factor of the dataset. - * @param sqlConf All configuration properties related to Spark SQL. - * @param sparkConf All configuration properties of Spark. - * @param defaultParallelism The default parallelism of the cluster. - * Usually, it is the number of cores of the cluster. - */ -case class BenchmarkConfiguration( - sparkVersion: String, - scaleFactor: String, - sqlConf: Map[String, String], - sparkConf: Map[String,String], - defaultParallelism: Int) - -/** - * The execution time of a subtree of the query plan tree of a specific query. - * @param nodeName The name of the top physical operator of the subtree. - * @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree. - * @param index The index of the top physical operator of the subtree - * in the original query plan tree. The index starts from 0 - * (0 represents the top physical operator of the original query plan tree). - * @param executionTime The execution time of the subtree. - */ -case class BreakdownResult( - nodeName: String, - nodeNameWithArgs: String, - index: Int, - executionTime: Double) - -/** - * The result of a query. - * @param name The name of the query. - * @param joinTypes The type of join operations in the query. - * @param tables The tables involved in the query. - * @param parsingTime The time used to parse the query. - * @param analysisTime The time used to analyze the query. - * @param optimizationTime The time used to optimize the query. - * @param planningTime The time used to plan the query. - * @param executionTime The time used to execute the query. - * @param breakDown The breakdown results of the query plan tree. - */ -case class BenchmarkResult( - name: String, - joinTypes: Seq[String], - tables: Seq[String], - parsingTime: Double, - analysisTime: Double, - optimizationTime: Double, - planningTime: Double, - executionTime: Double, - breakDown: Seq[BreakdownResult]) - -/** - * A Variation represents a setting (e.g. the number of shuffle partitions and if tables - * are cached in memory) that we want to change in a experiment run. - * A Variation has three parts, `name`, `options`, and `setup`. - * The `name` is the identifier of a Variation. `options` is a Seq of options that - * will be used for a query. Basically, a query will be executed with every option - * defined in the list of `options`. `setup` defines the needed action for every - * option. For example, the following Variation is used to change the number of shuffle - * partitions of a query. The name of the Variation is "shufflePartitions". There are - * two options, 200 and 2000. The setup is used to set the value of property - * "spark.sql.shuffle.partitions". - * - * {{{ - * Variation("shufflePartitions", Seq("200", "2000")) { - * case num => sqlContext.setConf("spark.sql.shuffle.partitions", num) - * } - * }}} - */ -case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit) - -/** - * The performance results of all given queries for a single iteration. - * @param timestamp The timestamp indicates when the entire experiment is started. - * @param datasetName The name of dataset. - * @param iteration The index number of the current iteration. - * @param tags Tags of this iteration (variations are stored at here). - * @param configuration Configuration properties of this iteration. - * @param results The performance results of queries for this iteration. - */ -case class ExperimentRun( - timestamp: Long, - datasetName: String, - iteration: Int, - tags: Map[String, String], - configuration: BenchmarkConfiguration, - results: Seq[BenchmarkResult]) - -/** - * The dataset of a benchmark. - * @param sqlContext An existing SQLContext. - * @param sparkVersion The version of Spark. - * @param dataLocation The location of the dataset used by this experiment. - * @param tables Tables that will be used in this experiment. - * @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H - * and TPC-DS, the scale factor is a number roughly representing the - * size of raw data files. For some other benchmarks, the scale factor - * is a short string describing the scale of the dataset. - */ -abstract class Dataset( - @transient sqlContext: SQLContext, - sparkVersion: String, - dataLocation: String, - tables: Seq[Table], - scaleFactor: String) extends Serializable { - - val datasetName: String - - @transient val sparkContext = sqlContext.sparkContext - - def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] - - val tablesForTest: Seq[TableForTest] = createTablesForTest(tables) - - def checkData(): Unit = { - tablesForTest.foreach { table => - val fs = FileSystem.get(new java.net.URI(table.outputDir), sparkContext.hadoopConfiguration) - val exists = fs.exists(new Path(table.outputDir)) - val wasSuccessful = fs.exists(new Path(s"${table.outputDir}/_SUCCESS")) - - if (!wasSuccessful) { - if (exists) { - println(s"Table '${table.name}' not generated successfully, regenerating.") - } else { - println(s"Table '${table.name}' does not exist, generating.") - } - fs.delete(new Path(table.outputDir), true) - table.generate() - } else { - println(s"Table ${table.name} already exists.") - } - } - } - - def allStats = tablesForTest.map(_.stats).reduceLeft(_.unionAll(_)) - - /** - * Does necessary setup work such as data generation and transformation. It needs to be - * called before running any query. - */ - def setup(): Unit = { - checkData() - tablesForTest.foreach(_.createTempTable()) - } - - def currentConfiguration = BenchmarkConfiguration( - sparkVersion = sparkVersion, - scaleFactor = scaleFactor, - sqlConf = sqlContext.getAllConfs, - sparkConf = sparkContext.getConf.getAll.toMap, - defaultParallelism = sparkContext.defaultParallelism) - - /** - * Starts an experiment run with a given set of queries. - * @param queries Queries to be executed. - * @param resultsLocation The location of performance results. - * @param includeBreakdown If it is true, breakdown results of a query will be recorded. - * Setting it to true may significantly increase the time used to - * execute a query. - * @param iterations The number of iterations. - * @param variations [[Variation]]s used in this run. - * @param tags Tags of this run. - * @return It returns a ExperimentStatus object that can be used to - * track the progress of this experiment run. - */ - def runExperiment( - queries: Seq[Query], - resultsLocation: String, - includeBreakdown: Boolean = false, - iterations: Int = 3, - variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("")) { _ => {} }), - tags: Map[String, String] = Map.empty) = { - - val queriesToRun = queries.map(query => QueryForTest(query, includeBreakdown, sqlContext)) - - class ExperimentStatus { - val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]() - val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]() - val currentMessages = new collection.mutable.ArrayBuffer[String]() - - @volatile - var currentQuery = "" - - def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match { - case Nil => List(Nil) - case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt - } - - val timestamp = System.currentTimeMillis() - val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList) - val resultsFuture = future { - val results = (1 to iterations).flatMap { i => - combinations.map { setup => - val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map { - case (v, idx) => - v.setup(v.options(idx)) - v.name -> v.options(idx).toString - } - - val result = ExperimentRun( - timestamp = timestamp, - datasetName = datasetName, - iteration = i, - tags = currentOptions.toMap ++ tags, - configuration = currentConfiguration, - queriesToRun.flatMap { q => - val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}" - currentMessages += s"Running query ${q.name} $setup" - - currentQuery = q.name - val singleResult = try q.benchmark(setup) :: Nil catch { - case e: Exception => - currentMessages += s"Failed to run query ${q.name}: $e" - Nil - } - currentResults ++= singleResult - singleResult - }) - currentRuns += result - - result - } - } - - val resultsTable = sqlContext.createDataFrame(results) - currentMessages += s"Results stored to: $resultsLocation/$timestamp" - resultsTable.toJSON.coalesce(1).saveAsTextFile(s"$resultsLocation/$timestamp") - resultsTable - } - - /** Waits for the finish of the experiment. */ - def waitForFinish(timeoutInSeconds: Int) = { - Await.result(resultsFuture, timeoutInSeconds.seconds) - } - - /** Returns results from an actively running experiment. */ - def getCurrentResults() = { - val tbl = sqlContext.createDataFrame(currentResults) - tbl.registerTempTable("currentResults") - tbl - } - - /** Returns full iterations from an actively running experiment. */ - def getCurrentRuns() = { - val tbl = sqlContext.createDataFrame(currentRuns) - tbl.registerTempTable("currentRuns") - tbl - } - - def tail(n: Int = 5) = { - currentMessages.takeRight(n).mkString("\n") - } - - def status = - if (resultsFuture.isCompleted) { - if (resultsFuture.value.get.isFailure) "Failed" else "Successful" - } else { - "Running" - } - - override def toString = - s""" - |=== $status Experiment === - |Permalink: table("allResults").where('timestamp === ${timestamp}L) - |Queries: ${queriesToRun.map(_.name).map(n => if(n == currentQuery) s"|$n|" else n).mkString(" ")} - |Iterations complete: ${currentRuns.size / combinations.size} / $iterations - |Queries run: ${currentResults.size} / ${iterations * combinations.size * queriesToRun.size} - |Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s - | - |== Logs == - |${tail()} - """.stripMargin - } - new ExperimentStatus - } -} diff --git a/src/main/scala/com/databricks/spark/sql/perf/table.scala b/src/main/scala/com/databricks/spark/sql/perf/table.scala deleted file mode 100644 index 3c94557..0000000 --- a/src/main/scala/com/databricks/spark/sql/perf/table.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2015 Databricks Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.perf - -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types._ - -abstract class TableType -case object UnpartitionedTable extends TableType -case class PartitionedTable(partitionColumn: String) extends TableType - -case class Table(name: String, tableType: TableType, fields: StructField*) - -abstract class TableForTest( - table: Table, - baseDir: String, - @transient sqlContext: SQLContext) extends Serializable { - - val schema = StructType(table.fields) - - val name = table.name - - val outputDir = s"$baseDir/${name}" - - def fromCatalog = sqlContext.table(name) - - def stats = - fromCatalog.select( - lit(name) as "tableName", - count("*") as "numRows", - lit(fromCatalog.queryExecution.optimizedPlan.statistics.sizeInBytes.toLong) as "sizeInBytes") - - def createTempTable(): Unit = { - sqlContext.sql( - s""" - |CREATE TEMPORARY TABLE ${name} - |USING org.apache.spark.sql.parquet - |OPTIONS ( - | path '${outputDir}' - |) - """.stripMargin) - } - - def generate(): Unit -} diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala similarity index 99% rename from src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala rename to src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala index 248fbac..4f4c3e8 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala @@ -14,12 +14,14 @@ * limitations under the License. */ -package com.databricks.spark.sql.perf.tpcds.queries +package com.databricks.spark.sql.perf.tpcds -import com.databricks.spark.sql.perf.ExecutionMode.CollectResults -import com.databricks.spark.sql.perf.Query +import com.databricks.spark.sql.perf.Benchmark + +trait ImpalaKitQueries extends Benchmark { + + import ExecutionMode._ -object ImpalaKitQueries { // Queries are from // https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries val queries = Seq( diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/SimpleQueries.scala similarity index 97% rename from src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala rename to src/main/scala/com/databricks/spark/sql/perf/tpcds/SimpleQueries.scala index 514afc5..383c68f 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/SimpleQueries.scala @@ -14,12 +14,14 @@ * limitations under the License. */ -package com.databricks.spark.sql.perf.tpcds.queries +package com.databricks.spark.sql.perf.tpcds -import com.databricks.spark.sql.perf.ExecutionMode.ForeachResults -import com.databricks.spark.sql.perf.Query +import com.databricks.spark.sql.perf.Benchmark + +trait SimpleQueries extends Benchmark { + + import ExecutionMode._ -object SimpleQueries { val q7Derived = Seq( ("q7-simpleScan", """ diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala index 363a2d4..dab3fbd 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala @@ -17,55 +17,16 @@ package com.databricks.spark.sql.perf.tpcds import com.databricks.spark.sql.perf._ -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.parquet.TPCDSTableForTest -import org.apache.spark.sql.{Column, SQLContext} +import org.apache.spark.sql.SQLContext /** * TPC-DS benchmark's dataset. * @param sqlContext An existing SQLContext. - * @param sparkVersion The version of Spark. - * @param dataLocation The location of the dataset used by this experiment. - * @param dsdgenDir The location of dsdgen in every worker machine. - * @param resultsLocation The location of performance results. - * @param tables Tables that will be used in this experiment. - * @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H - * and TPC-DS, the scale factor is a number roughly representing the - * size of raw data files. For some other benchmarks, the scale factor - * is a short string describing the scale of the dataset. */ -class TPCDS ( - @transient sqlContext: SQLContext, - sparkVersion: String, - dataLocation: String, - dsdgenDir: String, - tables: Seq[Table], - scaleFactor: String, - userSpecifiedBaseDir: Option[String] = None) - extends Dataset( - sqlContext, - sparkVersion, - dataLocation, - tables, - scaleFactor) with Serializable { - import sqlContext._ - import sqlContext.implicits._ - - override val datasetName = "tpcds" - - lazy val baseDir = - userSpecifiedBaseDir.getOrElse(s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true") - - override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = { - tables.map(table => - TPCDSTableForTest(table, baseDir, scaleFactor.toInt, dsdgenDir, sqlContext)) - } - - override def setup(): Unit = { - super.setup() - setupBroadcast() - } +class TPCDS (@transient sqlContext: SQLContext) + extends Benchmark(sqlContext) with ImpalaKitQueries with SimpleQueries with Serializable { + /* def setupBroadcast(skipTables: Seq[String] = Seq("store_sales", "customer")) = { val skipExpr = skipTables.map(t => !('tableName === t)).reduceLeft[Column](_ && _) val threshold = @@ -79,5 +40,6 @@ class TPCDS ( println(setQuery) sql(setQuery) } + */ } diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index e216b31..0466bf9 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.apache.spark.sql.parquet // This is a hack until parquet has better support for partitioning. +package com.databricks.spark.sql.perf.tpcds import java.text.SimpleDateFormat import java.util.Date @@ -37,171 +37,63 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import parquet.hadoop.ParquetOutputFormat import parquet.hadoop.util.ContextUtil +class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) { + import sqlContext.implicits._ -case class TPCDSTableForTest( - table: Table, - baseDir: String, - scaleFactor: Int, - dsdgenDir: String, - @transient sqlContext: SQLContext, - maxRowsPerPartitions: Int = 20 * 1000 * 1000) - extends TableForTest(table, baseDir, sqlContext) with Serializable with SparkHadoopMapReduceUtil { - - @transient val sparkContext = sqlContext.sparkContext - + def sparkContext = sqlContext.sparkContext val dsdgen = s"$dsdgenDir/dsdgen" - override def generate(): Unit = { - val partitions = table.tableType match { - case PartitionedTable(_) => scaleFactor - case _ => 1 - } + case class Table(name: String, partitionColumns: Seq[String], fields: StructField*) { + val schema = StructType(fields) + val partitions = if (partitionColumns.isEmpty) 1 else 100 - val generatedData = { - sparkContext.parallelize(1 to partitions, partitions).flatMap { i => - val localToolsDir = if (new java.io.File(dsdgen).exists) { - dsdgenDir - } else if (new java.io.File(s"/$dsdgen").exists) { - s"/$dsdgenDir" - } else { - sys.error(s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install") - } - - val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else "" - val commands = Seq( - "bash", "-c", - s"cd $localToolsDir && ./dsdgen -table ${table.name} -filter Y -scale $scaleFactor $parallel") - println(commands) - commands.lines - } - } - - generatedData.setName(s"${table.name}, sf=$scaleFactor, strings") - - val rows = generatedData.mapPartitions { iter => - val currentRow = new GenericMutableRow(schema.fields.size) - iter.map { l => - (0 until schema.fields.length).foreach(currentRow.setNullAt) - l.split("\\|", -1).zipWithIndex.dropRight(1).foreach { case (f, i) => currentRow(i) = f} - currentRow: Row - } - } - - val stringData = - sqlContext.createDataFrame( - rows, - StructType(schema.fields.map(f => StructField(f.name, StringType)))) - - val convertedData = { - val columns = schema.fields.map { f => - val columnName = new ColumnName(f.name) - columnName.cast(f.dataType).as(f.name) - } - stringData.select(columns: _*) - } - - table.tableType match { - // This is an awful hack... spark sql parquet should support this natively. - case PartitionedTable(partitioningColumn) => - sqlContext.setConf("spark.sql.planner.externalSort", "true") - val output = convertedData.queryExecution.analyzed.output - val job = new Job(sqlContext.sparkContext.hadoopConfiguration) - - val writeSupport = - if (schema.fields.map(_.dataType).forall(_.isPrimitive)) { - classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport] + def df = { + val generatedData = { + sparkContext.parallelize(1 to partitions, partitions).flatMap { i => + val localToolsDir = if (new java.io.File(dsdgen).exists) { + dsdgenDir + } else if (new java.io.File(s"/$dsdgen").exists) { + s"/$dsdgenDir" } else { - classOf[org.apache.spark.sql.parquet.RowWriteSupport] + sys.error(s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install") } - ParquetOutputFormat.setWriteSupportClass(job, writeSupport) - - val conf = new SerializableWritable(ContextUtil.getConfiguration(job)) - org.apache.spark.sql.parquet.RowWriteSupport.setSchema(schema.toAttributes, conf.value) - - val partColumnAttr = - BindReferences.bindReference[Expression]( - output.find(_.name == partitioningColumn).get, - output) - - - // TODO: clusterBy would be faster than orderBy - val orderedConvertedData = - convertedData.filter(new Column(partitioningColumn) isNotNull).orderBy(Column(partitioningColumn) asc) - orderedConvertedData.queryExecution.toRdd.foreachPartition { iter => - var writer: RecordWriter[Void, Row] = null - val getPartition = new InterpretedMutableProjection(Seq(partColumnAttr)) - var currentPartition: Row = null - var hadoopContext: TaskAttemptContext = null - var committer: OutputCommitter = null - - var rowCount = 0 - var partition = 0 - - while (iter.hasNext) { - val currentRow = iter.next() - - rowCount += 1 - if (rowCount >= maxRowsPerPartitions) { - rowCount = 0 - partition += 1 - println(s"Starting partition $partition") - if (writer != null) { - writer.close(hadoopContext) - committer.commitTask(hadoopContext) - } - writer = null - } - - if ((getPartition(currentRow) != currentPartition || writer == null) && - !getPartition.currentValue.isNullAt(0)) { - rowCount = 0 - currentPartition = getPartition.currentValue.copy() - if (writer != null) { - writer.close(hadoopContext) - committer.commitTask(hadoopContext) - } - - val job = new Job(conf.value) - val keyType = classOf[Void] - job.setOutputKeyClass(keyType) - job.setOutputValueClass(classOf[Row]) - NewFileOutputFormat.setOutputPath( - job, - new Path(s"$outputDir/$partitioningColumn=${currentPartition(0)}")) - val wrappedConf = new SerializableWritable(job.getConfiguration) - val formatter = new SimpleDateFormat("yyyyMMddHHmm") - val jobtrackerID = formatter.format(new Date()) - val stageId = partition - - val attemptNumber = 1 - /* "reduce task" */ - val attemptId = newTaskAttemptID(jobtrackerID, partition, isMap = false, partition, attemptNumber) - hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) - val format = new ParquetOutputFormat[Row] - committer = format.getOutputCommitter(hadoopContext) - committer.setupTask(hadoopContext) - writer = format.getRecordWriter(hadoopContext) - - } - if (!getPartition.currentValue.isNullAt(0)) { - writer.write(null, currentRow) - } - } - if (writer != null) { - writer.close(hadoopContext) - committer.commitTask(hadoopContext) - } + val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else "" + val commands = Seq( + "bash", "-c", + s"cd $localToolsDir && ./dsdgen -table $name -filter Y -scale $scaleFactor $parallel") + println(commands) + commands.lines } - val fs = FileSystem.get(new java.net.URI(outputDir), new Configuration()) - fs.create(new Path(s"$outputDir/_SUCCESS")).close() - case _ => convertedData.saveAsParquetFile(outputDir) + } + + generatedData.setName(s"$name, sf=$scaleFactor, strings") + + val rows = generatedData.mapPartitions { iter => + val currentRow = new GenericMutableRow(schema.fields.size) + iter.map { l => + (0 until schema.fields.length).foreach(currentRow.setNullAt) + l.split("\\|", -1).zipWithIndex.dropRight(1).foreach { case (f, i) => currentRow(i) = f} + currentRow: Row + } + } + + val stringData = + sqlContext.createDataFrame( + rows, + StructType(schema.fields.map(f => StructField(f.name, StringType)))) + + val convertedData = { + val columns = schema.fields.map { f => + val columnName = new ColumnName(f.name) + columnName.cast(f.dataType).as(f.name) + } + stringData.select(columns: _*) + } + + convertedData } } -} - -case class Tables(sqlContext: SQLContext) { - import sqlContext.implicits._ val tables = Seq( /* This is another large table that we don't build yet. @@ -212,7 +104,7 @@ case class Tables(sqlContext: SQLContext) { 'inv_warehouse_sk .int, 'inv_quantity_on_hand .int),*/ Table("store_sales", - PartitionedTable("ss_sold_date_sk"), + partitionColumns = "ss_sold_date_sk" :: Nil, 'ss_sold_date_sk .int, 'ss_sold_time_sk .int, 'ss_item_sk .int, @@ -237,7 +129,7 @@ case class Tables(sqlContext: SQLContext) { 'ss_net_paid_inc_tax .decimal(7,2), 'ss_net_profit .decimal(7,2)), Table("customer", - UnpartitionedTable, + partitionColumns = Nil, 'c_customer_sk .int, 'c_customer_id .string, 'c_current_cdemo_sk .int, @@ -257,7 +149,7 @@ case class Tables(sqlContext: SQLContext) { 'c_email_address .string, 'c_last_review_date .string), Table("customer_address", - UnpartitionedTable, + partitionColumns = Nil, 'ca_address_sk .int, 'ca_address_id .string, 'ca_street_number .string, @@ -272,7 +164,7 @@ case class Tables(sqlContext: SQLContext) { 'ca_gmt_offset .decimal(5,2), 'ca_location_type .string), Table("customer_demographics", - UnpartitionedTable, + partitionColumns = Nil, 'cd_demo_sk .int, 'cd_gender .string, 'cd_marital_status .string, @@ -283,7 +175,7 @@ case class Tables(sqlContext: SQLContext) { 'cd_dep_employed_count .int, 'cd_dep_college_count .int), Table("date_dim", - UnpartitionedTable, + partitionColumns = Nil, 'd_date_sk .int, 'd_date_id .string, 'd_date .string, @@ -313,14 +205,14 @@ case class Tables(sqlContext: SQLContext) { 'd_current_quarter .string, 'd_current_year .string), Table("household_demographics", - UnpartitionedTable, + partitionColumns = Nil, 'hd_demo_sk .int, 'hd_income_band_sk .int, 'hd_buy_potential .string, 'hd_dep_count .int, 'hd_vehicle_count .int), Table("item", - UnpartitionedTable, + partitionColumns = Nil, 'i_item_sk .int, 'i_item_id .string, 'i_rec_start_date .string, @@ -344,7 +236,7 @@ case class Tables(sqlContext: SQLContext) { 'i_manager_id .int, 'i_product_name .string), Table("promotion", - UnpartitionedTable, + partitionColumns = Nil, 'p_promo_sk .int, 'p_promo_id .string, 'p_start_date_sk .int, @@ -365,7 +257,7 @@ case class Tables(sqlContext: SQLContext) { 'p_purpose .string, 'p_discount_active .string), Table("store", - UnpartitionedTable, + partitionColumns = Nil, 's_store_sk .int, 's_store_id .string, 's_rec_start_date .string, @@ -396,7 +288,7 @@ case class Tables(sqlContext: SQLContext) { 's_gmt_offset .decimal(5,2), 's_tax_precentage .decimal(5,2)), Table("time_dim", - UnpartitionedTable, + partitionColumns = Nil, 't_time_sk .int, 't_time_id .string, 't_time .int, diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/package.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/package.scala deleted file mode 100644 index 4b154ac..0000000 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/package.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2015 Databricks Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.perf.tpcds - -package object queries { - val impalaKitQueries = ImpalaKitQueries.impalaKitQueries - val q7DerivedQueries = SimpleQueries.q7Derived -}