From eb3dd30c350ede36d7ee11a5a2289f07f4e99387 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 3 Jul 2015 11:26:06 -0700 Subject: [PATCH 1/4] Refactor to work in notebooks --- .../spark/sql/perf/bigdata/Queries.scala | 4 +- .../com/databricks/spark/sql/perf/query.scala | 175 +++++++++++------- .../spark/sql/perf/runBenchmarks.scala | 15 +- .../perf/tpcds/queries/ImpalaKitQueries.scala | 4 +- .../perf/tpcds/queries/SimpleQueries.scala | 4 +- .../sql/perf/tpcds/queries/package.scala | 5 - 6 files changed, 119 insertions(+), 88 deletions(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala index 14b5720..12103b5 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala @@ -16,9 +16,9 @@ package com.databricks.spark.sql.perf.bigdata -import com.databricks.spark.sql.perf.Query +import com.databricks.spark.sql.perf.QuerySet -object Queries { +trait Queries extends QuerySet { val queries1to3 = Seq( Query( name = "q1A", diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index c7191a3..8f2fd1e 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -16,81 +16,120 @@ package com.databricks.spark.sql.perf -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -case class Query(name: String, sqlText: String, description: String, collectResults: Boolean) +object x { -case class QueryForTest( - query: Query, - includeBreakdown: Boolean, - @transient sqlContext: SQLContext) { - @transient val sparkContext = sqlContext.sparkContext - val name = query.name - def benchmarkMs[A](f: => A): Double = { - val startTime = System.nanoTime() - val ret = f - val endTime = System.nanoTime() - (endTime - startTime).toDouble / 1000000 - } +trait QuerySet { + val sqlContext: SQLContext + def sparkContext = sqlContext.sparkContext - def benchmark(description: String = "") = { - try { - sparkContext.setJobDescription(s"Query: ${query.name}, $description") - val dataFrame = sqlContext.sql(query.sqlText) - val queryExecution = dataFrame.queryExecution - // We are not counting the time of ScalaReflection.convertRowToScala. - val parsingTime = benchmarkMs { queryExecution.logical } - val analysisTime = benchmarkMs { queryExecution.analyzed } - val optimizationTime = benchmarkMs { queryExecution.optimizedPlan } - val planningTime = benchmarkMs { queryExecution.executedPlan } - val breakdownResults = if (includeBreakdown) { - val depth = queryExecution.executedPlan.treeString.split("\n").size - val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i))) - physicalOperators.map { - case (index, node) => - val executionTime = benchmarkMs { node.execute().map(_.copy()).foreach(row => Unit) } - BreakdownResult(node.nodeName, node.simpleString, index, executionTime) - } - } else { - Seq.empty[BreakdownResult] - } + object Query { + def apply( + name: String, + sqlText: String, + description: String, + collectResults: Boolean = true): Query = { + new Query(name, sqlContext.sql(sqlText), description, collectResults, Some(sqlText)) + } - // The executionTime for the entire query includes the time of type conversion from catalyst to scala. - val executionTime = if (query.collectResults) { - benchmarkMs { dataFrame.rdd.collect() } - } else { - benchmarkMs { dataFrame.rdd.foreach {row => Unit } } - } - - val joinTypes = dataFrame.queryExecution.executedPlan.collect { - case k if k.nodeName contains "Join" => k.nodeName - } - - val tablesInvolved = dataFrame.queryExecution.logical collect { - case UnresolvedRelation(tableIdentifier, _) => { - // We are ignoring the database name. - tableIdentifier.last - } - } - - BenchmarkResult( - name = query.name, - joinTypes = joinTypes, - tables = tablesInvolved, - parsingTime = parsingTime, - analysisTime = analysisTime, - optimizationTime = optimizationTime, - planningTime = planningTime, - executionTime = executionTime, - breakdownResults) - } catch { - case e: Exception => - throw new RuntimeException( - s"Failed to benchmark query ${query.name}", e) + def apply( + name: String, + dataFrameBuilder: => DataFrame, + description: String): Query = { + new Query(name, dataFrameBuilder, description, true, None) } } -} + + class Query( + val name: String, + dataFrameBuilder: => DataFrame, + val description: String, + val collectResults: Boolean, + val sqlText: Option[String]) { + + val tablesInvolved = dataFrameBuilder.queryExecution.logical collect { + case UnresolvedRelation(tableIdentifier, _) => { + // We are ignoring the database name. + tableIdentifier.last + } + } + + def benchmarkMs[A](f: => A): Double = { + val startTime = System.nanoTime() + val ret = f + val endTime = System.nanoTime() + (endTime - startTime).toDouble / 1000000 + } + + def benchmark(includeBreakdown: Boolean, description: String = "") = { + try { + val dataFrame = dataFrameBuilder + sparkContext.setJobDescription(s"Query: $name, $description") + val queryExecution = dataFrame.queryExecution + // We are not counting the time of ScalaReflection.convertRowToScala. + val parsingTime = benchmarkMs { + queryExecution.logical + } + val analysisTime = benchmarkMs { + queryExecution.analyzed + } + val optimizationTime = benchmarkMs { + queryExecution.optimizedPlan + } + val planningTime = benchmarkMs { + queryExecution.executedPlan + } + + val breakdownResults = if (includeBreakdown) { + val depth = queryExecution.executedPlan.treeString.split("\n").size + val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i))) + physicalOperators.map { + case (index, node) => + val executionTime = benchmarkMs { + node.execute().map(_.copy()).foreach(row => Unit) + } + BreakdownResult(node.nodeName, node.simpleString, index, executionTime) + } + } else { + Seq.empty[BreakdownResult] + } + + // The executionTime for the entire query includes the time of type conversion from catalyst to scala. + val executionTime = if (collectResults) { + benchmarkMs { + dataFrame.rdd.collect() + } + } else { + benchmarkMs { + dataFrame.rdd.foreach { row => Unit } + } + } + + val joinTypes = dataFrame.queryExecution.executedPlan.collect { + case k if k.nodeName contains "Join" => k.nodeName + } + + BenchmarkResult( + name = name, + joinTypes = joinTypes, + tables = tablesInvolved, + parsingTime = parsingTime, + analysisTime = analysisTime, + optimizationTime = optimizationTime, + planningTime = planningTime, + executionTime = executionTime, + breakdownResults) + } catch { + case e: Exception => + throw new RuntimeException( + s"Failed to benchmark query $name", e) + } + } + } + +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala index fb4d69a..3920c6f 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala @@ -127,16 +127,14 @@ case class ExperimentRun( * is a short string describing the scale of the dataset. */ abstract class Dataset( - @transient sqlContext: SQLContext, + @transient val sqlContext: SQLContext, sparkVersion: String, dataLocation: String, tables: Seq[Table], - scaleFactor: String) extends Serializable { + scaleFactor: String) extends Serializable with QuerySet { val datasetName: String - @transient val sparkContext = sqlContext.sparkContext - def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] val tablesForTest: Seq[TableForTest] = createTablesForTest(tables) @@ -181,7 +179,7 @@ abstract class Dataset( /** * Starts an experiment run with a given set of queries. - * @param queries Queries to be executed. + * @param queriesToRun Queries to be executed. * @param resultsLocation The location of performance results. * @param includeBreakdown If it is true, breakdown results of a query will be recorded. * Setting it to true may significantly increase the time used to @@ -193,15 +191,13 @@ abstract class Dataset( * track the progress of this experiment run. */ def runExperiment( - queries: Seq[Query], + queriesToRun: Seq[Query], resultsLocation: String, includeBreakdown: Boolean = false, iterations: Int = 3, variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("")) { _ => {} }), tags: Map[String, String] = Map.empty) = { - val queriesToRun = queries.map(query => QueryForTest(query, includeBreakdown, sqlContext)) - class ExperimentStatus { val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]() val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]() @@ -237,7 +233,7 @@ abstract class Dataset( currentMessages += s"Running query ${q.name} $setup" currentQuery = q.name - val singleResult = try q.benchmark(setup) :: Nil catch { + val singleResult = try q.benchmark(includeBreakdown, setup) :: Nil catch { case e: Exception => currentMessages += s"Failed to run query ${q.name}: $e" Nil @@ -287,6 +283,7 @@ abstract class Dataset( "Running" } + override def toString = s""" |=== $status Experiment === diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala index c530232..df621bf 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala @@ -16,9 +16,9 @@ package com.databricks.spark.sql.perf.tpcds.queries -import com.databricks.spark.sql.perf.Query +import com.databricks.spark.sql.perf.QuerySet -object ImpalaKitQueries { +trait ImpalaKitQueries extends QuerySet { // Queries are from // https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries val queries = Seq( diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala index fd4f4b6..98aec64 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala @@ -16,9 +16,9 @@ package com.databricks.spark.sql.perf.tpcds.queries -import com.databricks.spark.sql.perf.Query +import com.databricks.spark.sql.perf.QuerySet -object SimpleQueries { +trait SimpleQueries extends QuerySet{ val q7Derived = Seq( ("q7-simpleScan", """ diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/package.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/package.scala index 4b154ac..65f72fe 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/package.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/package.scala @@ -15,8 +15,3 @@ */ package com.databricks.spark.sql.perf.tpcds - -package object queries { - val impalaKitQueries = ImpalaKitQueries.impalaKitQueries - val q7DerivedQueries = SimpleQueries.q7Derived -} From eba8cea93c39a3a4190222c2c60495814fe8e0c6 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 13 Jul 2015 16:20:24 -0700 Subject: [PATCH 2/4] Basic join performance tests --- build.sbt | 15 +- .../databricks/spark/sql/perf/Benchmark.scala | 343 ++++++++++++++++++ .../spark/sql/perf/JoinPerformance.scala | 57 +++ .../spark/sql/perf/bigdata/BigData.scala | 14 +- .../spark/sql/perf/bigdata/Queries.scala | 6 +- .../com/databricks/spark/sql/perf/query.scala | 106 ------ .../databricks/spark/sql/perf/results.scala | 88 +++++ .../spark/sql/perf/runBenchmarks.scala | 302 --------------- .../com/databricks/spark/sql/perf/table.scala | 44 ++- .../spark/sql/perf/tpcds/TPCDS.scala | 13 +- .../spark/sql/perf/tpcds/Tables.scala | 10 +- .../perf/tpcds/queries/ImpalaKitQueries.scala | 6 +- .../perf/tpcds/queries/SimpleQueries.scala | 6 +- 13 files changed, 547 insertions(+), 463 deletions(-) create mode 100644 src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala create mode 100644 src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala create mode 100644 src/main/scala/com/databricks/spark/sql/perf/results.scala delete mode 100644 src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala diff --git a/build.sbt b/build.sbt index 3d224cd..cbc582a 100644 --- a/build.sbt +++ b/build.sbt @@ -3,22 +3,13 @@ scalaVersion := "2.10.4" -sparkVersion := "1.3.0" - sparkPackageName := "databricks/spark-sql-perf" -// Don't forget to set the version -version := "0.0.1-SNAPSHOT" +version := "0.0.4-SNAPSHOT" // All Spark Packages need a license licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")) +sparkVersion := "1.4.0" -// Add Spark components this package depends on, e.g, "mllib", .... -sparkComponents ++= Seq("sql", "hive") - -// uncomment and change the value below to change the directory where your zip artifact will be created -// spDistDirectory := target.value - -// add any sparkPackageDependencies using sparkPackageDependencies. -// e.g. sparkPackageDependencies += "databricks/spark-avro:0.1" +sparkComponents ++= Seq("sql", "hive") \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala new file mode 100644 index 0000000..d087b99 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -0,0 +1,343 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf + +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation + +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.spark.sql.{DataFrame, SQLContext} + +import org.apache.spark.sql.catalyst.plans.logical.Subquery + +/** + * A collection of queries that test a particular aspect of Spark SQL. + * + * @param sqlContext An existing SQLContext. + */ +abstract class Benchmark(@transient protected val sqlContext: SQLContext) + extends Serializable { + + import sqlContext.implicits._ + + val resultsLocation = "/spark/sql/performance" + val resultsTableName = "sqlPerformance" + + def createResultsTable() = { + sqlContext.sql(s"DROP TABLE $resultsTableName") + sqlContext.createExternalTable( + "sqlPerformance", "json", Map("path" -> (resultsLocation + "/*/"))) + } + + protected def sparkContext = sqlContext.sparkContext + + implicit def toOption[A](a: A) = Option(a) + + def currentConfiguration = BenchmarkConfiguration( + sqlConf = sqlContext.getAllConfs, + sparkConf = sparkContext.getConf.getAll.toMap, + defaultParallelism = sparkContext.defaultParallelism) + + /** + * A Variation represents a setting (e.g. the number of shuffle partitions or if tables + * are cached in memory) that we want to change in a experiment run. + * A Variation has three parts, `name`, `options`, and `setup`. + * The `name` is the identifier of a Variation. `options` is a Seq of options that + * will be used for a query. Basically, a query will be executed with every option + * defined in the list of `options`. `setup` defines the needed action for every + * option. For example, the following Variation is used to change the number of shuffle + * partitions of a query. The name of the Variation is "shufflePartitions". There are + * two options, 200 and 2000. The setup is used to set the value of property + * "spark.sql.shuffle.partitions". + * + * {{{ + * Variation("shufflePartitions", Seq("200", "2000")) { + * case num => sqlContext.setConf("spark.sql.shuffle.partitions", num) + * } + * }}} + */ + case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit) + + /** + * Starts an experiment run with a given set of queries. + * @param queriesToRun Queries to be executed. + * @param includeBreakdown If it is true, breakdown results of a query will be recorded. + * Setting it to true may significantly increase the time used to + * execute a query. + * @param iterations The number of iterations. + * @param variations [[Variation]]s used in this run. + * @param tags Tags of this run. + * @return It returns a ExperimentStatus object that can be used to + * track the progress of this experiment run. + */ + def runExperiment( + queriesToRun: Seq[Query], + includeBreakdown: Boolean = false, + iterations: Int = 3, + variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("true")) { _ => {} }), + tags: Map[String, String] = Map.empty) = { + + class ExperimentStatus { + val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]() + val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]() + val currentMessages = new collection.mutable.ArrayBuffer[String]() + + // Stats for HTML status message. + @volatile var currentQuery = "" + @volatile var currentPlan = "" + @volatile var currentConfig = "" + @volatile var failures = 0 + @volatile var startTime = 0L + + def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match { + case Nil => List(Nil) + case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt + } + + val timestamp = System.currentTimeMillis() + val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList) + val resultsFuture = Future { + val results = (1 to iterations).flatMap { i => + combinations.map { setup => + val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map { + case (v, idx) => + v.setup(v.options(idx)) + v.name -> v.options(idx).toString + } + currentConfig = currentOptions.map { case (k,v) => s"$k: $v" }.mkString(", ") + + val result = ExperimentRun( + timestamp = timestamp, + iteration = i, + tags = currentOptions.toMap ++ tags, + configuration = currentConfiguration, + queriesToRun.flatMap { q => + val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}" + currentMessages += s"Running query ${q.name} $setup" + + currentQuery = q.name + currentPlan = q.newDataFrame().queryExecution.executedPlan.toString + startTime = System.currentTimeMillis() + + val singleResult = q.benchmark(includeBreakdown, setup) + singleResult.failure.foreach { f => + failures += 1 + currentMessages += s"Query '${q.name}' failed: ${f.message}" + } + singleResult.executionTime.foreach(time => currentMessages += s"Exec time: $time") + currentResults += singleResult + singleResult :: Nil + }) + currentRuns += result + + result + } + } + + try { + val resultsTable = sqlContext.createDataFrame(results) + currentMessages += s"Results written to table: 'sqlPerformance' at $resultsLocation/$timestamp" + results.toDF().write + .format("json") + .save(s"$resultsLocation/$timestamp") + + results.toDF() + } catch { + case e: Throwable => currentMessages += s"Failed to write data: $e" + } + } + + /** Waits for the finish of the experiment. */ + def waitForFinish(timeoutInSeconds: Int) = { + Await.result(resultsFuture, timeoutInSeconds.seconds) + } + + /** Returns results from an actively running experiment. */ + def getCurrentResults() = { + val tbl = sqlContext.createDataFrame(currentResults) + tbl.registerTempTable("currentResults") + tbl + } + + /** Returns full iterations from an actively running experiment. */ + def getCurrentRuns() = { + val tbl = sqlContext.createDataFrame(currentRuns) + tbl.registerTempTable("currentRuns") + tbl + } + + def tail(n: Int = 5) = { + currentMessages.takeRight(n).mkString("\n") + } + + def status = + if (resultsFuture.isCompleted) { + if (resultsFuture.value.get.isFailure) "Failed" else "Successful" + } else { + "Running" + } + + override def toString = + s"""Permalink: table("sqlPerformance").where('timestamp === ${timestamp}L)""" + + + def html = + s""" + |

$status Experiment

+ |Permalink: table("$resultsTableName").where('timestamp === ${timestamp}L)
+ |Iterations complete: ${currentRuns.size / combinations.size} / $iterations
+ |Failures: $failures
+ |Queries run: ${currentResults.size} / ${iterations * combinations.size * queriesToRun.size}
+ |Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s
+ | + |

Current Query: $currentQuery

+ |Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s + |$currentConfig
+ |

QueryPlan

+ |
+           |${currentPlan.replaceAll("\n", "
")} + |
+ | + |

Logs

+ |
+           |${tail()}
+           |
+ """.stripMargin + } + new ExperimentStatus + } + + /** Factory object for benchmark queries. */ + object Query { + def apply( + name: String, + sqlText: String, + description: String, + collectResults: Boolean = true): Query = { + new Query(name, sqlContext.sql(sqlText), description, collectResults, Some(sqlText)) + } + + def apply( + name: String, + dataFrameBuilder: => DataFrame, + description: String): Query = { + new Query(name, dataFrameBuilder, description, true, None) + } + } + + /** Holds one benchmark query and its metadata. */ + class Query( + val name: String, + buildDataFrame: => DataFrame, + val description: String, + val collectResults: Boolean, + val sqlText: Option[String]) { + + override def toString = + s""" + |== Query: $name == + |${buildDataFrame.queryExecution.analyzed} + """.stripMargin + + val tablesInvolved = buildDataFrame.queryExecution.logical collect { + case UnresolvedRelation(tableIdentifier, _) => { + // We are ignoring the database name. + tableIdentifier.last + } + } + + def newDataFrame() = buildDataFrame + + def benchmarkMs[A](f: => A): Double = { + val startTime = System.nanoTime() + val ret = f + val endTime = System.nanoTime() + (endTime - startTime).toDouble / 1000000 + } + + def benchmark(includeBreakdown: Boolean, description: String = "") = { + try { + val dataFrame = buildDataFrame + sparkContext.setJobDescription(s"Query: $name, $description") + val queryExecution = dataFrame.queryExecution + // We are not counting the time of ScalaReflection.convertRowToScala. + val parsingTime = benchmarkMs { + queryExecution.logical + } + val analysisTime = benchmarkMs { + queryExecution.analyzed + } + val optimizationTime = benchmarkMs { + queryExecution.optimizedPlan + } + val planningTime = benchmarkMs { + queryExecution.executedPlan + } + + val breakdownResults = if (includeBreakdown) { + val depth = queryExecution.executedPlan.treeString.split("\n").size + val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i))) + physicalOperators.map { + case (index, node) => + val executionTime = benchmarkMs { + node.execute().map(_.copy()).foreach(row => Unit) + } + BreakdownResult(node.nodeName, node.simpleString, index, executionTime) + } + } else { + Seq.empty[BreakdownResult] + } + + // The executionTime for the entire query includes the time of type conversion from catalyst to scala. + val executionTime = if (collectResults) { + benchmarkMs { + dataFrame.rdd.collect() + } + } else { + benchmarkMs { + dataFrame.rdd.foreach { row => Unit } + } + } + + val joinTypes = dataFrame.queryExecution.executedPlan.collect { + case k if k.nodeName contains "Join" => k.nodeName + } + + BenchmarkResult( + name = name, + joinTypes = joinTypes, + tables = tablesInvolved, + parsingTime = parsingTime, + analysisTime = analysisTime, + optimizationTime = optimizationTime, + planningTime = planningTime, + executionTime = executionTime, + queryExecution = dataFrame.queryExecution.toString, + breakDown = breakdownResults) + } catch { + case e: Exception => + BenchmarkResult( + name = name, + failure = Failure(e.getClass.getName, e.getMessage)) + } + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala new file mode 100644 index 0000000..0da49e3 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala @@ -0,0 +1,57 @@ +package com.databricks.spark.sql.perf + +import org.apache.spark.sql.SQLContext + +class JoinPerformance(sqlContext: SQLContext) extends Benchmark(sqlContext) { + def buildTables() = { + // 1.5 mb, 1 file + sqlContext.range(0, 1000000) + .repartition(1) + .write.mode("ignore") + .saveAsTable("1milints") + + // 143.542mb, 10 files + sqlContext.range(0, 100000000) + .repartition(10) + .write.mode("ignore") + .saveAsTable("100milints") + + // 1.4348gb, 10 files + sqlContext.range(0, 1000000000) + .repartition(10) + .write.mode("ignore") + .saveAsTable("1bilints") + } + + val singleKeyJoins = Seq("1milints", "100milints", "1bilints").flatMap { table1 => + Seq("1milints", "100milints", "1bilints").flatMap { table2 => + Seq("JOIN", "RIGHT JOIN", "LEFT JOIN", "FULL OUTER JOIN").map { join => + Query( + s"singleKey-$join-$table1-$table2", + s"SELECT COUNT(*) FROM $table1 a $join $table2 b ON a.id = b.id", + "equi-inner join a small table with a big table using a single key.", + collectResults = true) + } + } + }.filterNot(_.name contains "FULL OUTER JOIN-1milints-1bilints") + + val complexInput = + Seq("1milints", "100milints", "1bilints").map { table => + Query( + "aggregation-complex-input", + s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table", + "Sum of 9 columns added together", + collectResults = true) + } + + val aggregates = + Seq("1milints", "100milints", "1bilints").flatMap { table => + Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg => + Query( + s"single-aggregate-$agg", + s"SELECT $agg(id) FROM $table", + "aggregation of a single column", + collectResults = true) + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala index 16d0b1a..c723382 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala @@ -21,23 +21,15 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.parquet.TPCDSTableForTest import org.apache.spark.sql.{Column, SQLContext} -class BigData ( +class BigDataBenchmark ( @transient sqlContext: SQLContext, - sparkVersion: String, dataLocation: String, - tables: Seq[Table], + val tables: Seq[Table], scaleFactor: String) - extends Dataset( - sqlContext, - sparkVersion, - dataLocation, - tables, - scaleFactor) with Serializable { + extends Benchmark(sqlContext) with Serializable with TableCreator { import sqlContext._ import sqlContext.implicits._ - override val datasetName = "bigDataBenchmark" - override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = { tables.map(table => BigDataTableForTest(table, dataLocation, scaleFactor, sqlContext)) diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala index 12103b5..5541a02 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala @@ -16,9 +16,11 @@ package com.databricks.spark.sql.perf.bigdata -import com.databricks.spark.sql.perf.QuerySet +import com.databricks.spark.sql.perf.Benchmark + +trait Queries { + self: Benchmark => -trait Queries extends QuerySet { val queries1to3 = Seq( Query( name = "q1A", diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index 8f2fd1e..690acd6 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -19,117 +19,11 @@ package com.databricks.spark.sql.perf import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -object x { - - - trait QuerySet { val sqlContext: SQLContext def sparkContext = sqlContext.sparkContext - object Query { - def apply( - name: String, - sqlText: String, - description: String, - collectResults: Boolean = true): Query = { - new Query(name, sqlContext.sql(sqlText), description, collectResults, Some(sqlText)) - } - def apply( - name: String, - dataFrameBuilder: => DataFrame, - description: String): Query = { - new Query(name, dataFrameBuilder, description, true, None) - } - } - - class Query( - val name: String, - dataFrameBuilder: => DataFrame, - val description: String, - val collectResults: Boolean, - val sqlText: Option[String]) { - - val tablesInvolved = dataFrameBuilder.queryExecution.logical collect { - case UnresolvedRelation(tableIdentifier, _) => { - // We are ignoring the database name. - tableIdentifier.last - } - } - - def benchmarkMs[A](f: => A): Double = { - val startTime = System.nanoTime() - val ret = f - val endTime = System.nanoTime() - (endTime - startTime).toDouble / 1000000 - } - - def benchmark(includeBreakdown: Boolean, description: String = "") = { - try { - val dataFrame = dataFrameBuilder - sparkContext.setJobDescription(s"Query: $name, $description") - val queryExecution = dataFrame.queryExecution - // We are not counting the time of ScalaReflection.convertRowToScala. - val parsingTime = benchmarkMs { - queryExecution.logical - } - val analysisTime = benchmarkMs { - queryExecution.analyzed - } - val optimizationTime = benchmarkMs { - queryExecution.optimizedPlan - } - val planningTime = benchmarkMs { - queryExecution.executedPlan - } - - val breakdownResults = if (includeBreakdown) { - val depth = queryExecution.executedPlan.treeString.split("\n").size - val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i))) - physicalOperators.map { - case (index, node) => - val executionTime = benchmarkMs { - node.execute().map(_.copy()).foreach(row => Unit) - } - BreakdownResult(node.nodeName, node.simpleString, index, executionTime) - } - } else { - Seq.empty[BreakdownResult] - } - - // The executionTime for the entire query includes the time of type conversion from catalyst to scala. - val executionTime = if (collectResults) { - benchmarkMs { - dataFrame.rdd.collect() - } - } else { - benchmarkMs { - dataFrame.rdd.foreach { row => Unit } - } - } - - val joinTypes = dataFrame.queryExecution.executedPlan.collect { - case k if k.nodeName contains "Join" => k.nodeName - } - - BenchmarkResult( - name = name, - joinTypes = joinTypes, - tables = tablesInvolved, - parsingTime = parsingTime, - analysisTime = analysisTime, - optimizationTime = optimizationTime, - planningTime = planningTime, - executionTime = executionTime, - breakdownResults) - } catch { - case e: Exception => - throw new RuntimeException( - s"Failed to benchmark query $name", e) - } - } - } } \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala new file mode 100644 index 0000000..7d31ed2 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -0,0 +1,88 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf + +/** + * The performance results of all given queries for a single iteration. + * @param timestamp The timestamp indicates when the entire experiment is started. + * @param iteration The index number of the current iteration. + * @param tags Tags of this iteration (variations are stored at here). + * @param configuration Configuration properties of this iteration. + * @param results The performance results of queries for this iteration. + */ +case class ExperimentRun( + timestamp: Long, + iteration: Int, + tags: Map[String, String], + configuration: BenchmarkConfiguration, + results: Seq[BenchmarkResult]) + +/** + * The configuration used for an iteration of an experiment. + * @param sparkVersion The version of Spark. + * @param sqlConf All configuration properties related to Spark SQL. + * @param sparkConf All configuration properties of Spark. + * @param defaultParallelism The default parallelism of the cluster. + * Usually, it is the number of cores of the cluster. + */ +case class BenchmarkConfiguration( + sparkVersion: String = org.apache.spark.SPARK_VERSION, + sqlConf: Map[String, String], + sparkConf: Map[String,String], + defaultParallelism: Int) + +/** + * The result of a query. + * @param name The name of the query. + * @param joinTypes The type of join operations in the query. + * @param tables The tables involved in the query. + * @param parsingTime The time used to parse the query. + * @param analysisTime The time used to analyze the query. + * @param optimizationTime The time used to optimize the query. + * @param planningTime The time used to plan the query. + * @param executionTime The time used to execute the query. + * @param breakDown The breakdown results of the query plan tree. + */ +case class BenchmarkResult( + name: String, + joinTypes: Seq[String] = Nil, + tables: Seq[String] = Nil, + parsingTime: Option[Double] = None, + analysisTime: Option[Double] = None, + optimizationTime: Option[Double] = None, + planningTime: Option[Double] = None, + executionTime: Option[Double] = None, + breakDown: Seq[BreakdownResult] = Nil, + queryExecution: Option[String] = None, + failure: Option[Failure] = None) + +/** + * The execution time of a subtree of the query plan tree of a specific query. + * @param nodeName The name of the top physical operator of the subtree. + * @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree. + * @param index The index of the top physical operator of the subtree + * in the original query plan tree. The index starts from 0 + * (0 represents the top physical operator of the original query plan tree). + * @param executionTime The execution time of the subtree. + */ +case class BreakdownResult( + nodeName: String, + nodeNameWithArgs: String, + index: Int, + executionTime: Double) + +case class Failure(className: String, message: String) \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala deleted file mode 100644 index 3920c6f..0000000 --- a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Copyright 2015 Databricks Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.perf - -import scala.concurrent._ -import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.spark.sql.SQLContext - -/** - * The configuration used for an iteration of an experiment. - * @param sparkVersion The version of Spark. - * @param scaleFactor The scale factor of the dataset. - * @param sqlConf All configuration properties related to Spark SQL. - * @param sparkConf All configuration properties of Spark. - * @param defaultParallelism The default parallelism of the cluster. - * Usually, it is the number of cores of the cluster. - */ -case class BenchmarkConfiguration( - sparkVersion: String, - scaleFactor: String, - sqlConf: Map[String, String], - sparkConf: Map[String,String], - defaultParallelism: Int) - -/** - * The execution time of a subtree of the query plan tree of a specific query. - * @param nodeName The name of the top physical operator of the subtree. - * @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree. - * @param index The index of the top physical operator of the subtree - * in the original query plan tree. The index starts from 0 - * (0 represents the top physical operator of the original query plan tree). - * @param executionTime The execution time of the subtree. - */ -case class BreakdownResult( - nodeName: String, - nodeNameWithArgs: String, - index: Int, - executionTime: Double) - -/** - * The result of a query. - * @param name The name of the query. - * @param joinTypes The type of join operations in the query. - * @param tables The tables involved in the query. - * @param parsingTime The time used to parse the query. - * @param analysisTime The time used to analyze the query. - * @param optimizationTime The time used to optimize the query. - * @param planningTime The time used to plan the query. - * @param executionTime The time used to execute the query. - * @param breakDown The breakdown results of the query plan tree. - */ -case class BenchmarkResult( - name: String, - joinTypes: Seq[String], - tables: Seq[String], - parsingTime: Double, - analysisTime: Double, - optimizationTime: Double, - planningTime: Double, - executionTime: Double, - breakDown: Seq[BreakdownResult]) - -/** - * A Variation represents a setting (e.g. the number of shuffle partitions and if tables - * are cached in memory) that we want to change in a experiment run. - * A Variation has three parts, `name`, `options`, and `setup`. - * The `name` is the identifier of a Variation. `options` is a Seq of options that - * will be used for a query. Basically, a query will be executed with every option - * defined in the list of `options`. `setup` defines the needed action for every - * option. For example, the following Variation is used to change the number of shuffle - * partitions of a query. The name of the Variation is "shufflePartitions". There are - * two options, 200 and 2000. The setup is used to set the value of property - * "spark.sql.shuffle.partitions". - * - * {{{ - * Variation("shufflePartitions", Seq("200", "2000")) { - * case num => sqlContext.setConf("spark.sql.shuffle.partitions", num) - * } - * }}} - */ -case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit) - -/** - * The performance results of all given queries for a single iteration. - * @param timestamp The timestamp indicates when the entire experiment is started. - * @param datasetName The name of dataset. - * @param iteration The index number of the current iteration. - * @param tags Tags of this iteration (variations are stored at here). - * @param configuration Configuration properties of this iteration. - * @param results The performance results of queries for this iteration. - */ -case class ExperimentRun( - timestamp: Long, - datasetName: String, - iteration: Int, - tags: Map[String, String], - configuration: BenchmarkConfiguration, - results: Seq[BenchmarkResult]) - -/** - * The dataset of a benchmark. - * @param sqlContext An existing SQLContext. - * @param sparkVersion The version of Spark. - * @param dataLocation The location of the dataset used by this experiment. - * @param tables Tables that will be used in this experiment. - * @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H - * and TPC-DS, the scale factor is a number roughly representing the - * size of raw data files. For some other benchmarks, the scale factor - * is a short string describing the scale of the dataset. - */ -abstract class Dataset( - @transient val sqlContext: SQLContext, - sparkVersion: String, - dataLocation: String, - tables: Seq[Table], - scaleFactor: String) extends Serializable with QuerySet { - - val datasetName: String - - def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] - - val tablesForTest: Seq[TableForTest] = createTablesForTest(tables) - - def checkData(): Unit = { - tablesForTest.foreach { table => - val fs = FileSystem.get(new java.net.URI(table.outputDir), new Configuration()) - val exists = fs.exists(new Path(table.outputDir)) - val wasSuccessful = fs.exists(new Path(s"${table.outputDir}/_SUCCESS")) - - if (!wasSuccessful) { - if (exists) { - println(s"Table '${table.name}' not generated successfully, regenerating.") - } else { - println(s"Table '${table.name}' does not exist, generating.") - } - fs.delete(new Path(table.outputDir), true) - table.generate() - } else { - println(s"Table ${table.name} already exists.") - } - } - } - - def allStats = tablesForTest.map(_.stats).reduceLeft(_.unionAll(_)) - - /** - * Does necessary setup work such as data generation and transformation. It needs to be - * called before running any query. - */ - def setup(): Unit = { - checkData() - tablesForTest.foreach(_.createTempTable()) - } - - def currentConfiguration = BenchmarkConfiguration( - sparkVersion = sparkVersion, - scaleFactor = scaleFactor, - sqlConf = sqlContext.getAllConfs, - sparkConf = sparkContext.getConf.getAll.toMap, - defaultParallelism = sparkContext.defaultParallelism) - - /** - * Starts an experiment run with a given set of queries. - * @param queriesToRun Queries to be executed. - * @param resultsLocation The location of performance results. - * @param includeBreakdown If it is true, breakdown results of a query will be recorded. - * Setting it to true may significantly increase the time used to - * execute a query. - * @param iterations The number of iterations. - * @param variations [[Variation]]s used in this run. - * @param tags Tags of this run. - * @return It returns a ExperimentStatus object that can be used to - * track the progress of this experiment run. - */ - def runExperiment( - queriesToRun: Seq[Query], - resultsLocation: String, - includeBreakdown: Boolean = false, - iterations: Int = 3, - variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("")) { _ => {} }), - tags: Map[String, String] = Map.empty) = { - - class ExperimentStatus { - val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]() - val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]() - val currentMessages = new collection.mutable.ArrayBuffer[String]() - - @volatile - var currentQuery = "" - - def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match { - case Nil => List(Nil) - case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt - } - - val timestamp = System.currentTimeMillis() - val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList) - val resultsFuture = future { - val results = (1 to iterations).flatMap { i => - combinations.map { setup => - val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map { - case (v, idx) => - v.setup(v.options(idx)) - v.name -> v.options(idx).toString - } - - val result = ExperimentRun( - timestamp = timestamp, - datasetName = datasetName, - iteration = i, - tags = currentOptions.toMap ++ tags, - configuration = currentConfiguration, - queriesToRun.flatMap { q => - val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}" - currentMessages += s"Running query ${q.name} $setup" - - currentQuery = q.name - val singleResult = try q.benchmark(includeBreakdown, setup) :: Nil catch { - case e: Exception => - currentMessages += s"Failed to run query ${q.name}: $e" - Nil - } - currentResults ++= singleResult - singleResult - }) - currentRuns += result - - result - } - } - - val resultsTable = sqlContext.createDataFrame(results) - currentMessages += s"Results stored to: $resultsLocation/$timestamp" - resultsTable.toJSON.coalesce(1).saveAsTextFile(s"$resultsLocation/$timestamp") - resultsTable - } - - /** Waits for the finish of the experiment. */ - def waitForFinish(timeoutInSeconds: Int) = { - Await.result(resultsFuture, timeoutInSeconds.seconds) - } - - /** Returns results from an actively running experiment. */ - def getCurrentResults() = { - val tbl = sqlContext.createDataFrame(currentResults) - tbl.registerTempTable("currentResults") - tbl - } - - /** Returns full iterations from an actively running experiment. */ - def getCurrentRuns() = { - val tbl = sqlContext.createDataFrame(currentRuns) - tbl.registerTempTable("currentRuns") - tbl - } - - def tail(n: Int = 5) = { - currentMessages.takeRight(n).mkString("\n") - } - - def status = - if (resultsFuture.isCompleted) { - if (resultsFuture.value.get.isFailure) "Failed" else "Successful" - } else { - "Running" - } - - - override def toString = - s""" - |=== $status Experiment === - |Permalink: table("allResults").where('timestamp === ${timestamp}L) - |Queries: ${queriesToRun.map(_.name).map(n => if(n == currentQuery) s"|$n|" else n).mkString(" ")} - |Iterations complete: ${currentRuns.size / combinations.size} / $iterations - |Queries run: ${currentResults.size} / ${iterations * combinations.size * queriesToRun.size} - |Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s - | - |== Logs == - |${tail()} - """.stripMargin - } - new ExperimentStatus - } -} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/table.scala b/src/main/scala/com/databricks/spark/sql/perf/table.scala index cb331e2..2eca42e 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/table.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/table.scala @@ -32,6 +32,35 @@ import org.apache.spark.sql.types._ import parquet.hadoop.ParquetOutputFormat import parquet.hadoop.util.ContextUtil +trait TableCreator { + + def tables: Seq[Table] + + def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] + + val tablesForTest: Seq[TableForTest] = createTablesForTest(tables) + + def checkData(): Unit = { + tablesForTest.foreach { table => + val fs = FileSystem.get(new java.net.URI(table.outputDir), new Configuration()) + val exists = fs.exists(new Path(table.outputDir)) + val wasSuccessful = fs.exists(new Path(s"${table.outputDir}/_SUCCESS")) + + if (!wasSuccessful) { + if (exists) { + println(s"Table '${table.name}' not generated successfully, regenerating.") + } else { + println(s"Table '${table.name}' does not exist, generating.") + } + fs.delete(new Path(table.outputDir), true) + table.generate() + } else { + println(s"Table ${table.name} already exists.") + } + } + } +} + abstract class TableType case object UnpartitionedTable extends TableType case class PartitionedTable(partitionColumn: String) extends TableType @@ -47,7 +76,7 @@ abstract class TableForTest( val name = table.name - val outputDir = s"$baseDir/parquet/${name}" + val outputDir = s"$baseDir/parquet/$name" def fromCatalog = sqlContext.table(name) @@ -57,16 +86,5 @@ abstract class TableForTest( count("*") as "numRows", lit(fromCatalog.queryExecution.optimizedPlan.statistics.sizeInBytes.toLong) as "sizeInBytes") - def createTempTable(): Unit = { - sqlContext.sql( - s""" - |CREATE TEMPORARY TABLE ${name} - |USING org.apache.spark.sql.parquet - |OPTIONS ( - | path '${outputDir}' - |) - """.stripMargin) - } - - def generate(): Unit + def generate() } diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala index 363a2d4..a685739 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala @@ -39,20 +39,13 @@ class TPCDS ( sparkVersion: String, dataLocation: String, dsdgenDir: String, - tables: Seq[Table], + val tables: Seq[Table], scaleFactor: String, userSpecifiedBaseDir: Option[String] = None) - extends Dataset( - sqlContext, - sparkVersion, - dataLocation, - tables, - scaleFactor) with Serializable { + extends Benchmark(sqlContext) with TableCreator with Serializable { import sqlContext._ import sqlContext.implicits._ - override val datasetName = "tpcds" - lazy val baseDir = userSpecifiedBaseDir.getOrElse(s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true") @@ -61,6 +54,7 @@ class TPCDS ( TPCDSTableForTest(table, baseDir, scaleFactor.toInt, dsdgenDir, sqlContext)) } + /* override def setup(): Unit = { super.setup() setupBroadcast() @@ -79,5 +73,6 @@ class TPCDS ( println(setQuery) sql(setQuery) } + */ } diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index e216b31..a6b180e 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -107,12 +107,14 @@ case class TPCDSTableForTest( val output = convertedData.queryExecution.analyzed.output val job = new Job(sqlContext.sparkContext.hadoopConfiguration) + + //HAX val writeSupport = - if (schema.fields.map(_.dataType).forall(_.isPrimitive)) { - classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport] - } else { + // if (schema.fields.map(_.dataType).forall(_.isPrimitive)) { + // classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport] + // } else { classOf[org.apache.spark.sql.parquet.RowWriteSupport] - } + // } ParquetOutputFormat.setWriteSupportClass(job, writeSupport) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala index df621bf..31e969a 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala @@ -16,9 +16,11 @@ package com.databricks.spark.sql.perf.tpcds.queries -import com.databricks.spark.sql.perf.QuerySet +import com.databricks.spark.sql.perf.Benchmark + +trait ImpalaKitQueries { + self: Benchmark => -trait ImpalaKitQueries extends QuerySet { // Queries are from // https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries val queries = Seq( diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala index 98aec64..24b2007 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala @@ -16,9 +16,11 @@ package com.databricks.spark.sql.perf.tpcds.queries -import com.databricks.spark.sql.perf.QuerySet +import com.databricks.spark.sql.perf.Benchmark + +trait SimpleQueries { + self: Benchmark => -trait SimpleQueries extends QuerySet{ val q7Derived = Seq( ("q7-simpleScan", """ From f00ad77985049bcf7072264841c8b09817295fe7 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 22 Jul 2015 00:29:58 -0700 Subject: [PATCH 3/4] with data generation --- .../sql/perf/AggregationPerformance.scala | 65 +++++++++ .../databricks/spark/sql/perf/Benchmark.scala | 124 ++++++++++++++++-- .../spark/sql/perf/JoinPerformance.scala | 58 ++++---- .../spark/sql/perf/bigdata/BigData.scala | 4 +- .../spark/sql/perf/bigdata/Tables.scala | 8 +- .../com/databricks/spark/sql/perf/table.scala | 8 +- .../spark/sql/perf/tpcds/TPCDS.scala | 4 +- .../spark/sql/perf/tpcds/Tables.scala | 22 ++-- 8 files changed, 226 insertions(+), 67 deletions(-) create mode 100644 src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala diff --git a/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala new file mode 100644 index 0000000..def479b --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala @@ -0,0 +1,65 @@ +package com.databricks.spark.sql.perf + +import org.apache.spark.sql.{Row, SQLContext} + +trait AggregationPerformance extends Benchmark { + + import sqlContext.implicits._ + + val sizes = (1 to 6).map(math.pow(10, _).toInt).toSeq + + val variousCardinality = sizes.map { size => + Table(s"ints$size", + sparkContext.parallelize(1 to size).flatMap { group => + (1 to 10000).map(i => (group, i)) + }.toDF("a", "b")) + } + + val lowCardinality = sizes.map { size => + val fullSize = size * 10000L + Table( + s"twoGroups$fullSize", + sqlContext.range(0, fullSize).select($"id" % 2 as 'a, $"id" as 'b)) + } + + val newAggreation = Variation("aggregationType", Seq("new", "old")) { + case "old" => sqlContext.setConf("spark.sql.useAggregate2", "false") + case "new" => sqlContext.setConf("spark.sql.useAggregate2", "true") + } + + val varyNumGroupsAvg: Seq[Query] = variousCardinality.map(_.name).map { table => + Query( + s"avg-$table", + s"SELECT AVG(b) FROM $table GROUP BY a", + "an average with a varying number of groups", + collectResults = false) + } + + val twoGroupsAvg: Seq[Query] = lowCardinality.map(_.name).map { table => + Query( + s"avg-$table", + s"SELECT AVG(b) FROM $table GROUP BY a", + "an average on an int column with only two groups", + collectResults = false) + } + + val complexInput = + Seq("1milints", "100milints", "1bilints").map { table => + Query( + s"aggregation-complex-input-$table", + s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table", + "Sum of 9 columns added together", + collectResults = true) + } + + val aggregates = + Seq("1milints", "100milints", "1bilints").flatMap { table => + Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg => + Query( + s"single-aggregate-$agg-$table", + s"SELECT $agg(id) FROM $table", + "aggregation of a single column", + collectResults = true) + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index d087b99..f1073b6 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} import org.apache.spark.sql.catalyst.plans.logical.Subquery @@ -49,7 +49,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) protected def sparkContext = sqlContext.sparkContext - implicit def toOption[A](a: A) = Option(a) + protected implicit def toOption[A](a: A) = Option(a) def currentConfiguration = BenchmarkConfiguration( sqlConf = sqlContext.getAllConfs, @@ -76,14 +76,25 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) */ case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit) + val codegen = Variation("codegen", Seq("on", "off")) { + case "off" => sqlContext.setConf("spark.sql.codegen", "false") + case "on" => sqlContext.setConf("spark.sql.codegen", "true") + } + + val unsafe = Variation("unsafe", Seq("on", "off")) { + case "off" => sqlContext.setConf("spark.sql.unsafe.enabled", "false") + case "on" => sqlContext.setConf("spark.sql.unsafe.enabled", "true") + } + /** * Starts an experiment run with a given set of queries. - * @param queriesToRun Queries to be executed. + * @param queriesToRun a list of queries to be executed. * @param includeBreakdown If it is true, breakdown results of a query will be recorded. * Setting it to true may significantly increase the time used to * execute a query. - * @param iterations The number of iterations. - * @param variations [[Variation]]s used in this run. + * @param iterations The number of iterations to run of each query. + * @param variations [[Variation]]s used in this run. The cross product of all variations will be + * run for each query * iteration. * @param tags Tags of this run. * @return It returns a ExperimentStatus object that can be used to * track the progress of this experiment run. @@ -115,6 +126,29 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) val timestamp = System.currentTimeMillis() val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList) val resultsFuture = Future { + queriesToRun.flatMap { query => + query.newDataFrame().queryExecution.logical.collect { + case UnresolvedRelation(Seq(name), _) => name + } + }.distinct.foreach { name => + try { + sqlContext.table(name) + currentMessages += s"Table $name exists." + } catch { + case ae: AnalysisException => + val table = allTables + .find(_.name == name) + .getOrElse(sys.error(s"Couldn't read table $name and its not defined as a Benchmark.Table.")) + + currentMessages += s"Creating table: $name" + table.data + .write + .mode("overwrite") + .saveAsTable(name) + } + } + + val results = (1 to iterations).flatMap { i => combinations.map { setup => val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map { @@ -142,7 +176,8 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) failures += 1 currentMessages += s"Query '${q.name}' failed: ${f.message}" } - singleResult.executionTime.foreach(time => currentMessages += s"Exec time: $time") + singleResult.executionTime.foreach(time => + currentMessages += s"Exec time: ${time / 1000}s") currentResults += singleResult singleResult :: Nil }) @@ -184,7 +219,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) tbl } - def tail(n: Int = 5) = { + def tail(n: Int = 20) = { currentMessages.takeRight(n).mkString("\n") } @@ -209,7 +244,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) |Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s
| |

Current Query: $currentQuery

- |Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s + |Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s
|$currentConfig
|

QueryPlan

|
@@ -225,6 +260,79 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
     new ExperimentStatus
   }
 
+  case class Table(
+      name: String,
+      data: DataFrame)
+
+  import reflect.runtime._, universe._
+  import reflect.runtime._
+  import universe._
+
+  private val runtimeMirror = universe.runtimeMirror(getClass.getClassLoader)
+  val myType = runtimeMirror.classSymbol(getClass).toType
+
+  def singleTables =
+    myType.declarations
+      .filter(m => m.isMethod)
+      .map(_.asMethod)
+      .filter(_.asMethod.returnType =:= typeOf[Table])
+      .map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Table])
+
+  def groupedTables =
+    myType.declarations
+      .filter(m => m.isMethod)
+      .map(_.asMethod)
+      .filter(_.asMethod.returnType =:= typeOf[Seq[Table]])
+      .flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Table]])
+
+  lazy val allTables: Seq[Table] = (singleTables ++ groupedTables).toSeq
+
+  def singleQueries =
+    myType.declarations
+      .filter(m => m.isMethod)
+      .map(_.asMethod)
+      .filter(_.asMethod.returnType =:= typeOf[Query])
+      .map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Query])
+
+  def groupedQueries =
+    myType.declarations
+      .filter(m => m.isMethod)
+      .map(_.asMethod)
+      .filter(_.asMethod.returnType =:= typeOf[Seq[Query]])
+      .flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Query]])
+
+  lazy val allQueries = (singleQueries ++ groupedQueries).toSeq
+
+  def html: String = {
+    val singleQueries =
+      myType.declarations
+        .filter(m => m.isMethod)
+        .map(_.asMethod)
+        .filter(_.asMethod.returnType =:= typeOf[Query])
+        .map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Query])
+        .mkString(",")
+    val queries =
+      myType.declarations
+      .filter(m => m.isMethod)
+      .map(_.asMethod)
+      .filter(_.asMethod.returnType =:= typeOf[Seq[Query]])
+      .map { method =>
+        val queries = runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Query]]
+        val queryList = queries.map(_.name).mkString(", ")
+        s"""
+          |

${method.name}

+ |
    $queryList
+ """.stripMargin + }.mkString("\n") + + s""" + |

Spark SQL Performance Benchmarking

+ |

Available Queries

+ |$singleQueries + |$queries + """.stripMargin + } + /** Factory object for benchmark queries. */ object Query { def apply( diff --git a/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala index 0da49e3..f2c541b 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala @@ -2,25 +2,31 @@ package com.databricks.spark.sql.perf import org.apache.spark.sql.SQLContext -class JoinPerformance(sqlContext: SQLContext) extends Benchmark(sqlContext) { - def buildTables() = { - // 1.5 mb, 1 file - sqlContext.range(0, 1000000) - .repartition(1) - .write.mode("ignore") - .saveAsTable("1milints") +trait JoinPerformance extends Benchmark { + // 1.5 mb, 1 file + val x = Table( + "1milints", + sqlContext.range(0, 1000000) + .repartition(1)) + + val joinTables = Seq( // 143.542mb, 10 files - sqlContext.range(0, 100000000) - .repartition(10) - .write.mode("ignore") - .saveAsTable("100milints") + Table( + "1bilints", + sqlContext.range(0, 100000000) + .repartition(10)), // 1.4348gb, 10 files - sqlContext.range(0, 1000000000) - .repartition(10) - .write.mode("ignore") - .saveAsTable("1bilints") + Table( + "1bilints", + sqlContext.range(0, 1000000000) + .repartition(10)) + ) + + val sortMergeJoin = Variation("sortMergeJoin", Seq("on", "off")) { + case "off" => sqlContext.setConf("spark.sql.planner.sortMergeJoin", "false") + case "on" => sqlContext.setConf("spark.sql.planner.sortMergeJoin", "true") } val singleKeyJoins = Seq("1milints", "100milints", "1bilints").flatMap { table1 => @@ -33,25 +39,5 @@ class JoinPerformance(sqlContext: SQLContext) extends Benchmark(sqlContext) { collectResults = true) } } - }.filterNot(_.name contains "FULL OUTER JOIN-1milints-1bilints") - - val complexInput = - Seq("1milints", "100milints", "1bilints").map { table => - Query( - "aggregation-complex-input", - s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table", - "Sum of 9 columns added together", - collectResults = true) - } - - val aggregates = - Seq("1milints", "100milints", "1bilints").flatMap { table => - Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg => - Query( - s"single-aggregate-$agg", - s"SELECT $agg(id) FROM $table", - "aggregation of a single column", - collectResults = true) - } - } + } } \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala index c723382..c30cf7b 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala @@ -24,13 +24,13 @@ import org.apache.spark.sql.{Column, SQLContext} class BigDataBenchmark ( @transient sqlContext: SQLContext, dataLocation: String, - val tables: Seq[Table], + val tables: Seq[Table2], scaleFactor: String) extends Benchmark(sqlContext) with Serializable with TableCreator { import sqlContext._ import sqlContext.implicits._ - override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = { + override def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest] = { tables.map(table => BigDataTableForTest(table, dataLocation, scaleFactor, sqlContext)) } diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala index 8301f59..9b6f657 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala @@ -37,7 +37,7 @@ import parquet.hadoop.util.ContextUtil import scala.sys.process._ case class BigDataTableForTest( - table: Table, + table: Table2, baseDir: String, scaleFactor: String, @transient sqlContext: SQLContext) @@ -54,13 +54,13 @@ case class Tables(sqlContext: SQLContext) { import sqlContext.implicits._ val tables = Seq( - Table("rankings", + Table2("rankings", UnpartitionedTable, 'pageURL .string, 'pageRank .int, 'avgDuration .int), - Table("uservisits", + Table2("uservisits", UnpartitionedTable, 'sourceIP .string, 'destURL .string, @@ -72,7 +72,7 @@ case class Tables(sqlContext: SQLContext) { 'searchWord .string, 'duration .int), - Table("documents", + Table2("documents", UnpartitionedTable, 'line .string) ) diff --git a/src/main/scala/com/databricks/spark/sql/perf/table.scala b/src/main/scala/com/databricks/spark/sql/perf/table.scala index 2eca42e..4fd4381 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/table.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/table.scala @@ -34,9 +34,9 @@ import parquet.hadoop.util.ContextUtil trait TableCreator { - def tables: Seq[Table] + def tables: Seq[Table2] - def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] + def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest] val tablesForTest: Seq[TableForTest] = createTablesForTest(tables) @@ -65,10 +65,10 @@ abstract class TableType case object UnpartitionedTable extends TableType case class PartitionedTable(partitionColumn: String) extends TableType -case class Table(name: String, tableType: TableType, fields: StructField*) +case class Table2(name: String, tableType: TableType, fields: StructField*) abstract class TableForTest( - table: Table, + table: Table2, baseDir: String, @transient sqlContext: SQLContext) extends Serializable { diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala index a685739..8c61a33 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala @@ -39,7 +39,7 @@ class TPCDS ( sparkVersion: String, dataLocation: String, dsdgenDir: String, - val tables: Seq[Table], + val tables: Seq[Table2], scaleFactor: String, userSpecifiedBaseDir: Option[String] = None) extends Benchmark(sqlContext) with TableCreator with Serializable { @@ -49,7 +49,7 @@ class TPCDS ( lazy val baseDir = userSpecifiedBaseDir.getOrElse(s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true") - override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = { + override def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest] = { tables.map(table => TPCDSTableForTest(table, baseDir, scaleFactor.toInt, dsdgenDir, sqlContext)) } diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index a6b180e..f559347 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -39,7 +39,7 @@ import parquet.hadoop.util.ContextUtil case class TPCDSTableForTest( - table: Table, + table: Table2, baseDir: String, scaleFactor: Int, dsdgenDir: String, @@ -213,7 +213,7 @@ case class Tables(sqlContext: SQLContext) { 'inv_item_sk .int, 'inv_warehouse_sk .int, 'inv_quantity_on_hand .int),*/ - Table("store_sales", + Table2("store_sales", PartitionedTable("ss_sold_date_sk"), 'ss_sold_date_sk .int, 'ss_sold_time_sk .int, @@ -238,7 +238,7 @@ case class Tables(sqlContext: SQLContext) { 'ss_net_paid .decimal(7,2), 'ss_net_paid_inc_tax .decimal(7,2), 'ss_net_profit .decimal(7,2)), - Table("customer", + Table2("customer", UnpartitionedTable, 'c_customer_sk .int, 'c_customer_id .string, @@ -258,7 +258,7 @@ case class Tables(sqlContext: SQLContext) { 'c_login .string, 'c_email_address .string, 'c_last_review_date .string), - Table("customer_address", + Table2("customer_address", UnpartitionedTable, 'ca_address_sk .int, 'ca_address_id .string, @@ -273,7 +273,7 @@ case class Tables(sqlContext: SQLContext) { 'ca_country .string, 'ca_gmt_offset .decimal(5,2), 'ca_location_type .string), - Table("customer_demographics", + Table2("customer_demographics", UnpartitionedTable, 'cd_demo_sk .int, 'cd_gender .string, @@ -284,7 +284,7 @@ case class Tables(sqlContext: SQLContext) { 'cd_dep_count .int, 'cd_dep_employed_count .int, 'cd_dep_college_count .int), - Table("date_dim", + Table2("date_dim", UnpartitionedTable, 'd_date_sk .int, 'd_date_id .string, @@ -314,14 +314,14 @@ case class Tables(sqlContext: SQLContext) { 'd_current_month .string, 'd_current_quarter .string, 'd_current_year .string), - Table("household_demographics", + Table2("household_demographics", UnpartitionedTable, 'hd_demo_sk .int, 'hd_income_band_sk .int, 'hd_buy_potential .string, 'hd_dep_count .int, 'hd_vehicle_count .int), - Table("item", + Table2("item", UnpartitionedTable, 'i_item_sk .int, 'i_item_id .string, @@ -345,7 +345,7 @@ case class Tables(sqlContext: SQLContext) { 'i_container .string, 'i_manager_id .int, 'i_product_name .string), - Table("promotion", + Table2("promotion", UnpartitionedTable, 'p_promo_sk .int, 'p_promo_id .string, @@ -366,7 +366,7 @@ case class Tables(sqlContext: SQLContext) { 'p_channel_details .string, 'p_purpose .string, 'p_discount_active .string), - Table("store", + Table2("store", UnpartitionedTable, 's_store_sk .int, 's_store_id .string, @@ -397,7 +397,7 @@ case class Tables(sqlContext: SQLContext) { 's_country .string, 's_gmt_offset .decimal(5,2), 's_tax_precentage .decimal(5,2)), - Table("time_dim", + Table2("time_dim", UnpartitionedTable, 't_time_sk .int, 't_time_id .string, From a239da90a22e7402de125786669bb696f2fb73b7 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 11 Aug 2015 15:51:34 -0700 Subject: [PATCH 4/4] more cleanup, update readme --- README.md | 33 ++++--------------- .../databricks/spark/sql/perf/Benchmark.scala | 2 +- .../com/databricks/spark/sql/perf/query.scala | 0 .../spark/sql/perf/runBenchmarks.scala | 0 .../com/databricks/spark/sql/perf/table.scala | 0 .../{queries => }/ImpalaKitQueries.scala | 2 +- .../tpcds/{queries => }/SimpleQueries.scala | 2 +- .../spark/sql/perf/tpcds/TPCDS.scala | 22 ++----------- .../sql/perf/tpcds/queries/package.scala | 17 ---------- 9 files changed, 11 insertions(+), 67 deletions(-) delete mode 100644 src/main/scala/com/databricks/spark/sql/perf/query.scala delete mode 100644 src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala delete mode 100644 src/main/scala/com/databricks/spark/sql/perf/table.scala rename src/main/scala/com/databricks/spark/sql/perf/tpcds/{queries => }/ImpalaKitQueries.scala (99%) rename src/main/scala/com/databricks/spark/sql/perf/tpcds/{queries => }/SimpleQueries.scala (99%) delete mode 100644 src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/package.scala diff --git a/README.md b/README.md index c188a55..94308f2 100644 --- a/README.md +++ b/README.md @@ -1,55 +1,34 @@ # Spark SQL Performance Tests -This is a performance testing framework for [Spark SQL](https://spark.apache.org/sql/) in [Apache Spark](https://spark.apache.org/) 1.3+. +This is a performance testing framework for [Spark SQL](https://spark.apache.org/sql/) in [Apache Spark](https://spark.apache.org/) 1.4+. **Note: This README is still under development. Please also check our source code for more information.** ## How to use it The rest of document will use TPC-DS benchmark as an example. We will add contents to explain how to use other benchmarks add the support of a new benchmark dataset in future. -### Setup a dataset -Before running any query, a dataset needs to be setup by creating a `Dataset` object. Every benchmark support in Spark SQL Perf needs to implement its own `Dataset` class. A `Dataset` object takes a few parameters that will be used to setup the needed tables and its `setup` function is used to setup needed tables. For TPC-DS benchmark, the class is `TPCDS` in the package of `com.databricks.spark.sql.perf.tpcds`. For example, to setup a TPC-DS dataset, you can +### Setup a benchmark +Before running any query, a dataset needs to be setup by creating a `Benchmark` object. ``` import org.apache.spark.sql.parquet.Tables // Tables in TPC-DS benchmark used by experiments. val tables = Tables(sqlContext) // Setup TPC-DS experiment -val tpcds = - new TPCDS ( - sqlContext = sqlContext, - sparkVersion = "1.3.1", - dataLocation = , - dsdgenDir = , - tables = tables.tables, - scaleFactor = ) +val tpcds = new TPCDS (sqlContext = sqlContext) ``` -After a `TPCDS` object is created, tables of it can be setup by calling - -``` -tpcds.setup() -``` - -The `setup` function will first check if needed tables are stored at the location specified by `dataLocation`. If not, it will creates tables at there by using the data generator tool `dsdgen` provided by TPC-DS benchmark (This tool needs to be pre-installed at the location specified by `dsdgenDir` in every worker). - ### Run benchmarking queries After setup, users can use `runExperiment` function to run benchmarking queries and record query execution time. Taking TPC-DS as an example, you can start an experiment by using ``` -tpcds.runExperiment( - queries = , - resultsLocation = , - includeBreakdown = , - iterations = , - variations = , - tags = ) +val experiment = tpcds.runExperiment(queriesToRun = tpcds.interactiveQueries) ``` For every experiment run (i.e.\ every call of `runExperiment`), Spark SQL Perf will use the timestamp of the start time to identify this experiment. Performance results will be stored in the sub-dir named by the timestamp in the given `resultsLocation` (for example `results/1429213883272`). The performance results are stored in the JSON format. ### Retrieve results -The follow code can be used to retrieve results ... +While the experiment is running you can use `experiment.html` to list the status. Once the experiment is complete, the results will be saved to the table sqlPerformance in json. ``` // Get experiments results. diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index 1bc0955..5b1119c 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -377,7 +377,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) |${buildDataFrame.queryExecution.analyzed} """.stripMargin - val tablesInvolved = buildDataFrame.queryExecution.logical collect { + lazy val tablesInvolved = buildDataFrame.queryExecution.logical collect { case UnresolvedRelation(tableIdentifier, _) => { // We are ignoring the database name. tableIdentifier.last diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala deleted file mode 100644 index e69de29..0000000 diff --git a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala deleted file mode 100644 index e69de29..0000000 diff --git a/src/main/scala/com/databricks/spark/sql/perf/table.scala b/src/main/scala/com/databricks/spark/sql/perf/table.scala deleted file mode 100644 index e69de29..0000000 diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala similarity index 99% rename from src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala rename to src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala index 063ea55..4f4c3e8 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.databricks.spark.sql.perf.tpcds.queries +package com.databricks.spark.sql.perf.tpcds import com.databricks.spark.sql.perf.Benchmark diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/SimpleQueries.scala similarity index 99% rename from src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala rename to src/main/scala/com/databricks/spark/sql/perf/tpcds/SimpleQueries.scala index cb26820..383c68f 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/SimpleQueries.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.databricks.spark.sql.perf.tpcds.queries +package com.databricks.spark.sql.perf.tpcds import com.databricks.spark.sql.perf.Benchmark diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala index 2c618ac..dab3fbd 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala @@ -22,27 +22,9 @@ import org.apache.spark.sql.SQLContext /** * TPC-DS benchmark's dataset. * @param sqlContext An existing SQLContext. - * @param sparkVersion The version of Spark. - * @param dataLocation The location of the dataset used by this experiment. - * @param dsdgenDir The location of dsdgen in every worker machine. - * @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H - * and TPC-DS, the scale factor is a number roughly representing the - * size of raw data files. For some other benchmarks, the scale factor - * is a short string describing the scale of the dataset. */ -class TPCDS ( - @transient sqlContext: SQLContext, - sparkVersion: String, - dataLocation: String, - dsdgenDir: String, - scaleFactor: String, - userSpecifiedBaseDir: Option[String] = None) - extends Benchmark(sqlContext) with Serializable { - import sqlContext._ - import sqlContext.implicits._ - - lazy val baseDir = - userSpecifiedBaseDir.getOrElse(s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true") +class TPCDS (@transient sqlContext: SQLContext) + extends Benchmark(sqlContext) with ImpalaKitQueries with SimpleQueries with Serializable { /* def setupBroadcast(skipTables: Seq[String] = Seq("store_sales", "customer")) = { diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/package.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/package.scala deleted file mode 100644 index 65f72fe..0000000 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/package.scala +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright 2015 Databricks Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.perf.tpcds