From eba8cea93c39a3a4190222c2c60495814fe8e0c6 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 13 Jul 2015 16:20:24 -0700 Subject: [PATCH] Basic join performance tests --- build.sbt | 15 +- .../databricks/spark/sql/perf/Benchmark.scala | 343 ++++++++++++++++++ .../spark/sql/perf/JoinPerformance.scala | 57 +++ .../spark/sql/perf/bigdata/BigData.scala | 14 +- .../spark/sql/perf/bigdata/Queries.scala | 6 +- .../com/databricks/spark/sql/perf/query.scala | 106 ------ .../databricks/spark/sql/perf/results.scala | 88 +++++ .../spark/sql/perf/runBenchmarks.scala | 302 --------------- .../com/databricks/spark/sql/perf/table.scala | 44 ++- .../spark/sql/perf/tpcds/TPCDS.scala | 13 +- .../spark/sql/perf/tpcds/Tables.scala | 10 +- .../perf/tpcds/queries/ImpalaKitQueries.scala | 6 +- .../perf/tpcds/queries/SimpleQueries.scala | 6 +- 13 files changed, 547 insertions(+), 463 deletions(-) create mode 100644 src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala create mode 100644 src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala create mode 100644 src/main/scala/com/databricks/spark/sql/perf/results.scala delete mode 100644 src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala diff --git a/build.sbt b/build.sbt index 3d224cd..cbc582a 100644 --- a/build.sbt +++ b/build.sbt @@ -3,22 +3,13 @@ scalaVersion := "2.10.4" -sparkVersion := "1.3.0" - sparkPackageName := "databricks/spark-sql-perf" -// Don't forget to set the version -version := "0.0.1-SNAPSHOT" +version := "0.0.4-SNAPSHOT" // All Spark Packages need a license licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")) +sparkVersion := "1.4.0" -// Add Spark components this package depends on, e.g, "mllib", .... -sparkComponents ++= Seq("sql", "hive") - -// uncomment and change the value below to change the directory where your zip artifact will be created -// spDistDirectory := target.value - -// add any sparkPackageDependencies using sparkPackageDependencies. -// e.g. sparkPackageDependencies += "databricks/spark-avro:0.1" +sparkComponents ++= Seq("sql", "hive") \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala new file mode 100644 index 0000000..d087b99 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -0,0 +1,343 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf + +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation + +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.spark.sql.{DataFrame, SQLContext} + +import org.apache.spark.sql.catalyst.plans.logical.Subquery + +/** + * A collection of queries that test a particular aspect of Spark SQL. + * + * @param sqlContext An existing SQLContext. + */ +abstract class Benchmark(@transient protected val sqlContext: SQLContext) + extends Serializable { + + import sqlContext.implicits._ + + val resultsLocation = "/spark/sql/performance" + val resultsTableName = "sqlPerformance" + + def createResultsTable() = { + sqlContext.sql(s"DROP TABLE $resultsTableName") + sqlContext.createExternalTable( + "sqlPerformance", "json", Map("path" -> (resultsLocation + "/*/"))) + } + + protected def sparkContext = sqlContext.sparkContext + + implicit def toOption[A](a: A) = Option(a) + + def currentConfiguration = BenchmarkConfiguration( + sqlConf = sqlContext.getAllConfs, + sparkConf = sparkContext.getConf.getAll.toMap, + defaultParallelism = sparkContext.defaultParallelism) + + /** + * A Variation represents a setting (e.g. the number of shuffle partitions or if tables + * are cached in memory) that we want to change in a experiment run. + * A Variation has three parts, `name`, `options`, and `setup`. + * The `name` is the identifier of a Variation. `options` is a Seq of options that + * will be used for a query. Basically, a query will be executed with every option + * defined in the list of `options`. `setup` defines the needed action for every + * option. For example, the following Variation is used to change the number of shuffle + * partitions of a query. The name of the Variation is "shufflePartitions". There are + * two options, 200 and 2000. The setup is used to set the value of property + * "spark.sql.shuffle.partitions". + * + * {{{ + * Variation("shufflePartitions", Seq("200", "2000")) { + * case num => sqlContext.setConf("spark.sql.shuffle.partitions", num) + * } + * }}} + */ + case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit) + + /** + * Starts an experiment run with a given set of queries. + * @param queriesToRun Queries to be executed. + * @param includeBreakdown If it is true, breakdown results of a query will be recorded. + * Setting it to true may significantly increase the time used to + * execute a query. + * @param iterations The number of iterations. + * @param variations [[Variation]]s used in this run. + * @param tags Tags of this run. + * @return It returns a ExperimentStatus object that can be used to + * track the progress of this experiment run. + */ + def runExperiment( + queriesToRun: Seq[Query], + includeBreakdown: Boolean = false, + iterations: Int = 3, + variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("true")) { _ => {} }), + tags: Map[String, String] = Map.empty) = { + + class ExperimentStatus { + val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]() + val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]() + val currentMessages = new collection.mutable.ArrayBuffer[String]() + + // Stats for HTML status message. + @volatile var currentQuery = "" + @volatile var currentPlan = "" + @volatile var currentConfig = "" + @volatile var failures = 0 + @volatile var startTime = 0L + + def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match { + case Nil => List(Nil) + case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt + } + + val timestamp = System.currentTimeMillis() + val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList) + val resultsFuture = Future { + val results = (1 to iterations).flatMap { i => + combinations.map { setup => + val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map { + case (v, idx) => + v.setup(v.options(idx)) + v.name -> v.options(idx).toString + } + currentConfig = currentOptions.map { case (k,v) => s"$k: $v" }.mkString(", ") + + val result = ExperimentRun( + timestamp = timestamp, + iteration = i, + tags = currentOptions.toMap ++ tags, + configuration = currentConfiguration, + queriesToRun.flatMap { q => + val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}" + currentMessages += s"Running query ${q.name} $setup" + + currentQuery = q.name + currentPlan = q.newDataFrame().queryExecution.executedPlan.toString + startTime = System.currentTimeMillis() + + val singleResult = q.benchmark(includeBreakdown, setup) + singleResult.failure.foreach { f => + failures += 1 + currentMessages += s"Query '${q.name}' failed: ${f.message}" + } + singleResult.executionTime.foreach(time => currentMessages += s"Exec time: $time") + currentResults += singleResult + singleResult :: Nil + }) + currentRuns += result + + result + } + } + + try { + val resultsTable = sqlContext.createDataFrame(results) + currentMessages += s"Results written to table: 'sqlPerformance' at $resultsLocation/$timestamp" + results.toDF().write + .format("json") + .save(s"$resultsLocation/$timestamp") + + results.toDF() + } catch { + case e: Throwable => currentMessages += s"Failed to write data: $e" + } + } + + /** Waits for the finish of the experiment. */ + def waitForFinish(timeoutInSeconds: Int) = { + Await.result(resultsFuture, timeoutInSeconds.seconds) + } + + /** Returns results from an actively running experiment. */ + def getCurrentResults() = { + val tbl = sqlContext.createDataFrame(currentResults) + tbl.registerTempTable("currentResults") + tbl + } + + /** Returns full iterations from an actively running experiment. */ + def getCurrentRuns() = { + val tbl = sqlContext.createDataFrame(currentRuns) + tbl.registerTempTable("currentRuns") + tbl + } + + def tail(n: Int = 5) = { + currentMessages.takeRight(n).mkString("\n") + } + + def status = + if (resultsFuture.isCompleted) { + if (resultsFuture.value.get.isFailure) "Failed" else "Successful" + } else { + "Running" + } + + override def toString = + s"""Permalink: table("sqlPerformance").where('timestamp === ${timestamp}L)""" + + + def html = + s""" + |

$status Experiment

+ |Permalink: table("$resultsTableName").where('timestamp === ${timestamp}L)
+ |Iterations complete: ${currentRuns.size / combinations.size} / $iterations
+ |Failures: $failures
+ |Queries run: ${currentResults.size} / ${iterations * combinations.size * queriesToRun.size}
+ |Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s
+ | + |

Current Query: $currentQuery

+ |Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s + |$currentConfig
+ |

QueryPlan

+ |
+           |${currentPlan.replaceAll("\n", "
")} + |
+ | + |

Logs

+ |
+           |${tail()}
+           |
+ """.stripMargin + } + new ExperimentStatus + } + + /** Factory object for benchmark queries. */ + object Query { + def apply( + name: String, + sqlText: String, + description: String, + collectResults: Boolean = true): Query = { + new Query(name, sqlContext.sql(sqlText), description, collectResults, Some(sqlText)) + } + + def apply( + name: String, + dataFrameBuilder: => DataFrame, + description: String): Query = { + new Query(name, dataFrameBuilder, description, true, None) + } + } + + /** Holds one benchmark query and its metadata. */ + class Query( + val name: String, + buildDataFrame: => DataFrame, + val description: String, + val collectResults: Boolean, + val sqlText: Option[String]) { + + override def toString = + s""" + |== Query: $name == + |${buildDataFrame.queryExecution.analyzed} + """.stripMargin + + val tablesInvolved = buildDataFrame.queryExecution.logical collect { + case UnresolvedRelation(tableIdentifier, _) => { + // We are ignoring the database name. + tableIdentifier.last + } + } + + def newDataFrame() = buildDataFrame + + def benchmarkMs[A](f: => A): Double = { + val startTime = System.nanoTime() + val ret = f + val endTime = System.nanoTime() + (endTime - startTime).toDouble / 1000000 + } + + def benchmark(includeBreakdown: Boolean, description: String = "") = { + try { + val dataFrame = buildDataFrame + sparkContext.setJobDescription(s"Query: $name, $description") + val queryExecution = dataFrame.queryExecution + // We are not counting the time of ScalaReflection.convertRowToScala. + val parsingTime = benchmarkMs { + queryExecution.logical + } + val analysisTime = benchmarkMs { + queryExecution.analyzed + } + val optimizationTime = benchmarkMs { + queryExecution.optimizedPlan + } + val planningTime = benchmarkMs { + queryExecution.executedPlan + } + + val breakdownResults = if (includeBreakdown) { + val depth = queryExecution.executedPlan.treeString.split("\n").size + val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i))) + physicalOperators.map { + case (index, node) => + val executionTime = benchmarkMs { + node.execute().map(_.copy()).foreach(row => Unit) + } + BreakdownResult(node.nodeName, node.simpleString, index, executionTime) + } + } else { + Seq.empty[BreakdownResult] + } + + // The executionTime for the entire query includes the time of type conversion from catalyst to scala. + val executionTime = if (collectResults) { + benchmarkMs { + dataFrame.rdd.collect() + } + } else { + benchmarkMs { + dataFrame.rdd.foreach { row => Unit } + } + } + + val joinTypes = dataFrame.queryExecution.executedPlan.collect { + case k if k.nodeName contains "Join" => k.nodeName + } + + BenchmarkResult( + name = name, + joinTypes = joinTypes, + tables = tablesInvolved, + parsingTime = parsingTime, + analysisTime = analysisTime, + optimizationTime = optimizationTime, + planningTime = planningTime, + executionTime = executionTime, + queryExecution = dataFrame.queryExecution.toString, + breakDown = breakdownResults) + } catch { + case e: Exception => + BenchmarkResult( + name = name, + failure = Failure(e.getClass.getName, e.getMessage)) + } + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala new file mode 100644 index 0000000..0da49e3 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala @@ -0,0 +1,57 @@ +package com.databricks.spark.sql.perf + +import org.apache.spark.sql.SQLContext + +class JoinPerformance(sqlContext: SQLContext) extends Benchmark(sqlContext) { + def buildTables() = { + // 1.5 mb, 1 file + sqlContext.range(0, 1000000) + .repartition(1) + .write.mode("ignore") + .saveAsTable("1milints") + + // 143.542mb, 10 files + sqlContext.range(0, 100000000) + .repartition(10) + .write.mode("ignore") + .saveAsTable("100milints") + + // 1.4348gb, 10 files + sqlContext.range(0, 1000000000) + .repartition(10) + .write.mode("ignore") + .saveAsTable("1bilints") + } + + val singleKeyJoins = Seq("1milints", "100milints", "1bilints").flatMap { table1 => + Seq("1milints", "100milints", "1bilints").flatMap { table2 => + Seq("JOIN", "RIGHT JOIN", "LEFT JOIN", "FULL OUTER JOIN").map { join => + Query( + s"singleKey-$join-$table1-$table2", + s"SELECT COUNT(*) FROM $table1 a $join $table2 b ON a.id = b.id", + "equi-inner join a small table with a big table using a single key.", + collectResults = true) + } + } + }.filterNot(_.name contains "FULL OUTER JOIN-1milints-1bilints") + + val complexInput = + Seq("1milints", "100milints", "1bilints").map { table => + Query( + "aggregation-complex-input", + s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table", + "Sum of 9 columns added together", + collectResults = true) + } + + val aggregates = + Seq("1milints", "100milints", "1bilints").flatMap { table => + Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg => + Query( + s"single-aggregate-$agg", + s"SELECT $agg(id) FROM $table", + "aggregation of a single column", + collectResults = true) + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala index 16d0b1a..c723382 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala @@ -21,23 +21,15 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.parquet.TPCDSTableForTest import org.apache.spark.sql.{Column, SQLContext} -class BigData ( +class BigDataBenchmark ( @transient sqlContext: SQLContext, - sparkVersion: String, dataLocation: String, - tables: Seq[Table], + val tables: Seq[Table], scaleFactor: String) - extends Dataset( - sqlContext, - sparkVersion, - dataLocation, - tables, - scaleFactor) with Serializable { + extends Benchmark(sqlContext) with Serializable with TableCreator { import sqlContext._ import sqlContext.implicits._ - override val datasetName = "bigDataBenchmark" - override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = { tables.map(table => BigDataTableForTest(table, dataLocation, scaleFactor, sqlContext)) diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala index 12103b5..5541a02 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala @@ -16,9 +16,11 @@ package com.databricks.spark.sql.perf.bigdata -import com.databricks.spark.sql.perf.QuerySet +import com.databricks.spark.sql.perf.Benchmark + +trait Queries { + self: Benchmark => -trait Queries extends QuerySet { val queries1to3 = Seq( Query( name = "q1A", diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index 8f2fd1e..690acd6 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -19,117 +19,11 @@ package com.databricks.spark.sql.perf import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -object x { - - - trait QuerySet { val sqlContext: SQLContext def sparkContext = sqlContext.sparkContext - object Query { - def apply( - name: String, - sqlText: String, - description: String, - collectResults: Boolean = true): Query = { - new Query(name, sqlContext.sql(sqlText), description, collectResults, Some(sqlText)) - } - def apply( - name: String, - dataFrameBuilder: => DataFrame, - description: String): Query = { - new Query(name, dataFrameBuilder, description, true, None) - } - } - - class Query( - val name: String, - dataFrameBuilder: => DataFrame, - val description: String, - val collectResults: Boolean, - val sqlText: Option[String]) { - - val tablesInvolved = dataFrameBuilder.queryExecution.logical collect { - case UnresolvedRelation(tableIdentifier, _) => { - // We are ignoring the database name. - tableIdentifier.last - } - } - - def benchmarkMs[A](f: => A): Double = { - val startTime = System.nanoTime() - val ret = f - val endTime = System.nanoTime() - (endTime - startTime).toDouble / 1000000 - } - - def benchmark(includeBreakdown: Boolean, description: String = "") = { - try { - val dataFrame = dataFrameBuilder - sparkContext.setJobDescription(s"Query: $name, $description") - val queryExecution = dataFrame.queryExecution - // We are not counting the time of ScalaReflection.convertRowToScala. - val parsingTime = benchmarkMs { - queryExecution.logical - } - val analysisTime = benchmarkMs { - queryExecution.analyzed - } - val optimizationTime = benchmarkMs { - queryExecution.optimizedPlan - } - val planningTime = benchmarkMs { - queryExecution.executedPlan - } - - val breakdownResults = if (includeBreakdown) { - val depth = queryExecution.executedPlan.treeString.split("\n").size - val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i))) - physicalOperators.map { - case (index, node) => - val executionTime = benchmarkMs { - node.execute().map(_.copy()).foreach(row => Unit) - } - BreakdownResult(node.nodeName, node.simpleString, index, executionTime) - } - } else { - Seq.empty[BreakdownResult] - } - - // The executionTime for the entire query includes the time of type conversion from catalyst to scala. - val executionTime = if (collectResults) { - benchmarkMs { - dataFrame.rdd.collect() - } - } else { - benchmarkMs { - dataFrame.rdd.foreach { row => Unit } - } - } - - val joinTypes = dataFrame.queryExecution.executedPlan.collect { - case k if k.nodeName contains "Join" => k.nodeName - } - - BenchmarkResult( - name = name, - joinTypes = joinTypes, - tables = tablesInvolved, - parsingTime = parsingTime, - analysisTime = analysisTime, - optimizationTime = optimizationTime, - planningTime = planningTime, - executionTime = executionTime, - breakdownResults) - } catch { - case e: Exception => - throw new RuntimeException( - s"Failed to benchmark query $name", e) - } - } - } } \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala new file mode 100644 index 0000000..7d31ed2 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -0,0 +1,88 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf + +/** + * The performance results of all given queries for a single iteration. + * @param timestamp The timestamp indicates when the entire experiment is started. + * @param iteration The index number of the current iteration. + * @param tags Tags of this iteration (variations are stored at here). + * @param configuration Configuration properties of this iteration. + * @param results The performance results of queries for this iteration. + */ +case class ExperimentRun( + timestamp: Long, + iteration: Int, + tags: Map[String, String], + configuration: BenchmarkConfiguration, + results: Seq[BenchmarkResult]) + +/** + * The configuration used for an iteration of an experiment. + * @param sparkVersion The version of Spark. + * @param sqlConf All configuration properties related to Spark SQL. + * @param sparkConf All configuration properties of Spark. + * @param defaultParallelism The default parallelism of the cluster. + * Usually, it is the number of cores of the cluster. + */ +case class BenchmarkConfiguration( + sparkVersion: String = org.apache.spark.SPARK_VERSION, + sqlConf: Map[String, String], + sparkConf: Map[String,String], + defaultParallelism: Int) + +/** + * The result of a query. + * @param name The name of the query. + * @param joinTypes The type of join operations in the query. + * @param tables The tables involved in the query. + * @param parsingTime The time used to parse the query. + * @param analysisTime The time used to analyze the query. + * @param optimizationTime The time used to optimize the query. + * @param planningTime The time used to plan the query. + * @param executionTime The time used to execute the query. + * @param breakDown The breakdown results of the query plan tree. + */ +case class BenchmarkResult( + name: String, + joinTypes: Seq[String] = Nil, + tables: Seq[String] = Nil, + parsingTime: Option[Double] = None, + analysisTime: Option[Double] = None, + optimizationTime: Option[Double] = None, + planningTime: Option[Double] = None, + executionTime: Option[Double] = None, + breakDown: Seq[BreakdownResult] = Nil, + queryExecution: Option[String] = None, + failure: Option[Failure] = None) + +/** + * The execution time of a subtree of the query plan tree of a specific query. + * @param nodeName The name of the top physical operator of the subtree. + * @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree. + * @param index The index of the top physical operator of the subtree + * in the original query plan tree. The index starts from 0 + * (0 represents the top physical operator of the original query plan tree). + * @param executionTime The execution time of the subtree. + */ +case class BreakdownResult( + nodeName: String, + nodeNameWithArgs: String, + index: Int, + executionTime: Double) + +case class Failure(className: String, message: String) \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala deleted file mode 100644 index 3920c6f..0000000 --- a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Copyright 2015 Databricks Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.perf - -import scala.concurrent._ -import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.spark.sql.SQLContext - -/** - * The configuration used for an iteration of an experiment. - * @param sparkVersion The version of Spark. - * @param scaleFactor The scale factor of the dataset. - * @param sqlConf All configuration properties related to Spark SQL. - * @param sparkConf All configuration properties of Spark. - * @param defaultParallelism The default parallelism of the cluster. - * Usually, it is the number of cores of the cluster. - */ -case class BenchmarkConfiguration( - sparkVersion: String, - scaleFactor: String, - sqlConf: Map[String, String], - sparkConf: Map[String,String], - defaultParallelism: Int) - -/** - * The execution time of a subtree of the query plan tree of a specific query. - * @param nodeName The name of the top physical operator of the subtree. - * @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree. - * @param index The index of the top physical operator of the subtree - * in the original query plan tree. The index starts from 0 - * (0 represents the top physical operator of the original query plan tree). - * @param executionTime The execution time of the subtree. - */ -case class BreakdownResult( - nodeName: String, - nodeNameWithArgs: String, - index: Int, - executionTime: Double) - -/** - * The result of a query. - * @param name The name of the query. - * @param joinTypes The type of join operations in the query. - * @param tables The tables involved in the query. - * @param parsingTime The time used to parse the query. - * @param analysisTime The time used to analyze the query. - * @param optimizationTime The time used to optimize the query. - * @param planningTime The time used to plan the query. - * @param executionTime The time used to execute the query. - * @param breakDown The breakdown results of the query plan tree. - */ -case class BenchmarkResult( - name: String, - joinTypes: Seq[String], - tables: Seq[String], - parsingTime: Double, - analysisTime: Double, - optimizationTime: Double, - planningTime: Double, - executionTime: Double, - breakDown: Seq[BreakdownResult]) - -/** - * A Variation represents a setting (e.g. the number of shuffle partitions and if tables - * are cached in memory) that we want to change in a experiment run. - * A Variation has three parts, `name`, `options`, and `setup`. - * The `name` is the identifier of a Variation. `options` is a Seq of options that - * will be used for a query. Basically, a query will be executed with every option - * defined in the list of `options`. `setup` defines the needed action for every - * option. For example, the following Variation is used to change the number of shuffle - * partitions of a query. The name of the Variation is "shufflePartitions". There are - * two options, 200 and 2000. The setup is used to set the value of property - * "spark.sql.shuffle.partitions". - * - * {{{ - * Variation("shufflePartitions", Seq("200", "2000")) { - * case num => sqlContext.setConf("spark.sql.shuffle.partitions", num) - * } - * }}} - */ -case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit) - -/** - * The performance results of all given queries for a single iteration. - * @param timestamp The timestamp indicates when the entire experiment is started. - * @param datasetName The name of dataset. - * @param iteration The index number of the current iteration. - * @param tags Tags of this iteration (variations are stored at here). - * @param configuration Configuration properties of this iteration. - * @param results The performance results of queries for this iteration. - */ -case class ExperimentRun( - timestamp: Long, - datasetName: String, - iteration: Int, - tags: Map[String, String], - configuration: BenchmarkConfiguration, - results: Seq[BenchmarkResult]) - -/** - * The dataset of a benchmark. - * @param sqlContext An existing SQLContext. - * @param sparkVersion The version of Spark. - * @param dataLocation The location of the dataset used by this experiment. - * @param tables Tables that will be used in this experiment. - * @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H - * and TPC-DS, the scale factor is a number roughly representing the - * size of raw data files. For some other benchmarks, the scale factor - * is a short string describing the scale of the dataset. - */ -abstract class Dataset( - @transient val sqlContext: SQLContext, - sparkVersion: String, - dataLocation: String, - tables: Seq[Table], - scaleFactor: String) extends Serializable with QuerySet { - - val datasetName: String - - def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] - - val tablesForTest: Seq[TableForTest] = createTablesForTest(tables) - - def checkData(): Unit = { - tablesForTest.foreach { table => - val fs = FileSystem.get(new java.net.URI(table.outputDir), new Configuration()) - val exists = fs.exists(new Path(table.outputDir)) - val wasSuccessful = fs.exists(new Path(s"${table.outputDir}/_SUCCESS")) - - if (!wasSuccessful) { - if (exists) { - println(s"Table '${table.name}' not generated successfully, regenerating.") - } else { - println(s"Table '${table.name}' does not exist, generating.") - } - fs.delete(new Path(table.outputDir), true) - table.generate() - } else { - println(s"Table ${table.name} already exists.") - } - } - } - - def allStats = tablesForTest.map(_.stats).reduceLeft(_.unionAll(_)) - - /** - * Does necessary setup work such as data generation and transformation. It needs to be - * called before running any query. - */ - def setup(): Unit = { - checkData() - tablesForTest.foreach(_.createTempTable()) - } - - def currentConfiguration = BenchmarkConfiguration( - sparkVersion = sparkVersion, - scaleFactor = scaleFactor, - sqlConf = sqlContext.getAllConfs, - sparkConf = sparkContext.getConf.getAll.toMap, - defaultParallelism = sparkContext.defaultParallelism) - - /** - * Starts an experiment run with a given set of queries. - * @param queriesToRun Queries to be executed. - * @param resultsLocation The location of performance results. - * @param includeBreakdown If it is true, breakdown results of a query will be recorded. - * Setting it to true may significantly increase the time used to - * execute a query. - * @param iterations The number of iterations. - * @param variations [[Variation]]s used in this run. - * @param tags Tags of this run. - * @return It returns a ExperimentStatus object that can be used to - * track the progress of this experiment run. - */ - def runExperiment( - queriesToRun: Seq[Query], - resultsLocation: String, - includeBreakdown: Boolean = false, - iterations: Int = 3, - variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("")) { _ => {} }), - tags: Map[String, String] = Map.empty) = { - - class ExperimentStatus { - val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]() - val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]() - val currentMessages = new collection.mutable.ArrayBuffer[String]() - - @volatile - var currentQuery = "" - - def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match { - case Nil => List(Nil) - case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt - } - - val timestamp = System.currentTimeMillis() - val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList) - val resultsFuture = future { - val results = (1 to iterations).flatMap { i => - combinations.map { setup => - val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map { - case (v, idx) => - v.setup(v.options(idx)) - v.name -> v.options(idx).toString - } - - val result = ExperimentRun( - timestamp = timestamp, - datasetName = datasetName, - iteration = i, - tags = currentOptions.toMap ++ tags, - configuration = currentConfiguration, - queriesToRun.flatMap { q => - val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}" - currentMessages += s"Running query ${q.name} $setup" - - currentQuery = q.name - val singleResult = try q.benchmark(includeBreakdown, setup) :: Nil catch { - case e: Exception => - currentMessages += s"Failed to run query ${q.name}: $e" - Nil - } - currentResults ++= singleResult - singleResult - }) - currentRuns += result - - result - } - } - - val resultsTable = sqlContext.createDataFrame(results) - currentMessages += s"Results stored to: $resultsLocation/$timestamp" - resultsTable.toJSON.coalesce(1).saveAsTextFile(s"$resultsLocation/$timestamp") - resultsTable - } - - /** Waits for the finish of the experiment. */ - def waitForFinish(timeoutInSeconds: Int) = { - Await.result(resultsFuture, timeoutInSeconds.seconds) - } - - /** Returns results from an actively running experiment. */ - def getCurrentResults() = { - val tbl = sqlContext.createDataFrame(currentResults) - tbl.registerTempTable("currentResults") - tbl - } - - /** Returns full iterations from an actively running experiment. */ - def getCurrentRuns() = { - val tbl = sqlContext.createDataFrame(currentRuns) - tbl.registerTempTable("currentRuns") - tbl - } - - def tail(n: Int = 5) = { - currentMessages.takeRight(n).mkString("\n") - } - - def status = - if (resultsFuture.isCompleted) { - if (resultsFuture.value.get.isFailure) "Failed" else "Successful" - } else { - "Running" - } - - - override def toString = - s""" - |=== $status Experiment === - |Permalink: table("allResults").where('timestamp === ${timestamp}L) - |Queries: ${queriesToRun.map(_.name).map(n => if(n == currentQuery) s"|$n|" else n).mkString(" ")} - |Iterations complete: ${currentRuns.size / combinations.size} / $iterations - |Queries run: ${currentResults.size} / ${iterations * combinations.size * queriesToRun.size} - |Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s - | - |== Logs == - |${tail()} - """.stripMargin - } - new ExperimentStatus - } -} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/table.scala b/src/main/scala/com/databricks/spark/sql/perf/table.scala index cb331e2..2eca42e 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/table.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/table.scala @@ -32,6 +32,35 @@ import org.apache.spark.sql.types._ import parquet.hadoop.ParquetOutputFormat import parquet.hadoop.util.ContextUtil +trait TableCreator { + + def tables: Seq[Table] + + def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] + + val tablesForTest: Seq[TableForTest] = createTablesForTest(tables) + + def checkData(): Unit = { + tablesForTest.foreach { table => + val fs = FileSystem.get(new java.net.URI(table.outputDir), new Configuration()) + val exists = fs.exists(new Path(table.outputDir)) + val wasSuccessful = fs.exists(new Path(s"${table.outputDir}/_SUCCESS")) + + if (!wasSuccessful) { + if (exists) { + println(s"Table '${table.name}' not generated successfully, regenerating.") + } else { + println(s"Table '${table.name}' does not exist, generating.") + } + fs.delete(new Path(table.outputDir), true) + table.generate() + } else { + println(s"Table ${table.name} already exists.") + } + } + } +} + abstract class TableType case object UnpartitionedTable extends TableType case class PartitionedTable(partitionColumn: String) extends TableType @@ -47,7 +76,7 @@ abstract class TableForTest( val name = table.name - val outputDir = s"$baseDir/parquet/${name}" + val outputDir = s"$baseDir/parquet/$name" def fromCatalog = sqlContext.table(name) @@ -57,16 +86,5 @@ abstract class TableForTest( count("*") as "numRows", lit(fromCatalog.queryExecution.optimizedPlan.statistics.sizeInBytes.toLong) as "sizeInBytes") - def createTempTable(): Unit = { - sqlContext.sql( - s""" - |CREATE TEMPORARY TABLE ${name} - |USING org.apache.spark.sql.parquet - |OPTIONS ( - | path '${outputDir}' - |) - """.stripMargin) - } - - def generate(): Unit + def generate() } diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala index 363a2d4..a685739 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala @@ -39,20 +39,13 @@ class TPCDS ( sparkVersion: String, dataLocation: String, dsdgenDir: String, - tables: Seq[Table], + val tables: Seq[Table], scaleFactor: String, userSpecifiedBaseDir: Option[String] = None) - extends Dataset( - sqlContext, - sparkVersion, - dataLocation, - tables, - scaleFactor) with Serializable { + extends Benchmark(sqlContext) with TableCreator with Serializable { import sqlContext._ import sqlContext.implicits._ - override val datasetName = "tpcds" - lazy val baseDir = userSpecifiedBaseDir.getOrElse(s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true") @@ -61,6 +54,7 @@ class TPCDS ( TPCDSTableForTest(table, baseDir, scaleFactor.toInt, dsdgenDir, sqlContext)) } + /* override def setup(): Unit = { super.setup() setupBroadcast() @@ -79,5 +73,6 @@ class TPCDS ( println(setQuery) sql(setQuery) } + */ } diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index e216b31..a6b180e 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -107,12 +107,14 @@ case class TPCDSTableForTest( val output = convertedData.queryExecution.analyzed.output val job = new Job(sqlContext.sparkContext.hadoopConfiguration) + + //HAX val writeSupport = - if (schema.fields.map(_.dataType).forall(_.isPrimitive)) { - classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport] - } else { + // if (schema.fields.map(_.dataType).forall(_.isPrimitive)) { + // classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport] + // } else { classOf[org.apache.spark.sql.parquet.RowWriteSupport] - } + // } ParquetOutputFormat.setWriteSupportClass(job, writeSupport) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala index df621bf..31e969a 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala @@ -16,9 +16,11 @@ package com.databricks.spark.sql.perf.tpcds.queries -import com.databricks.spark.sql.perf.QuerySet +import com.databricks.spark.sql.perf.Benchmark + +trait ImpalaKitQueries { + self: Benchmark => -trait ImpalaKitQueries extends QuerySet { // Queries are from // https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries val queries = Seq( diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala index 98aec64..24b2007 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala @@ -16,9 +16,11 @@ package com.databricks.spark.sql.perf.tpcds.queries -import com.databricks.spark.sql.perf.QuerySet +import com.databricks.spark.sql.perf.Benchmark + +trait SimpleQueries { + self: Benchmark => -trait SimpleQueries extends QuerySet{ val q7Derived = Seq( ("q7-simpleScan", """