diff --git a/build.sbt b/build.sbt index 3d224cd..cbc582a 100644 --- a/build.sbt +++ b/build.sbt @@ -3,22 +3,13 @@ scalaVersion := "2.10.4" -sparkVersion := "1.3.0" - sparkPackageName := "databricks/spark-sql-perf" -// Don't forget to set the version -version := "0.0.1-SNAPSHOT" +version := "0.0.4-SNAPSHOT" // All Spark Packages need a license licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")) +sparkVersion := "1.4.0" -// Add Spark components this package depends on, e.g, "mllib", .... -sparkComponents ++= Seq("sql", "hive") - -// uncomment and change the value below to change the directory where your zip artifact will be created -// spDistDirectory := target.value - -// add any sparkPackageDependencies using sparkPackageDependencies. -// e.g. sparkPackageDependencies += "databricks/spark-avro:0.1" +sparkComponents ++= Seq("sql", "hive") \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala new file mode 100644 index 0000000..d087b99 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -0,0 +1,343 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf + +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation + +import scala.concurrent._ +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.spark.sql.{DataFrame, SQLContext} + +import org.apache.spark.sql.catalyst.plans.logical.Subquery + +/** + * A collection of queries that test a particular aspect of Spark SQL. + * + * @param sqlContext An existing SQLContext. + */ +abstract class Benchmark(@transient protected val sqlContext: SQLContext) + extends Serializable { + + import sqlContext.implicits._ + + val resultsLocation = "/spark/sql/performance" + val resultsTableName = "sqlPerformance" + + def createResultsTable() = { + sqlContext.sql(s"DROP TABLE $resultsTableName") + sqlContext.createExternalTable( + "sqlPerformance", "json", Map("path" -> (resultsLocation + "/*/"))) + } + + protected def sparkContext = sqlContext.sparkContext + + implicit def toOption[A](a: A) = Option(a) + + def currentConfiguration = BenchmarkConfiguration( + sqlConf = sqlContext.getAllConfs, + sparkConf = sparkContext.getConf.getAll.toMap, + defaultParallelism = sparkContext.defaultParallelism) + + /** + * A Variation represents a setting (e.g. the number of shuffle partitions or if tables + * are cached in memory) that we want to change in a experiment run. + * A Variation has three parts, `name`, `options`, and `setup`. + * The `name` is the identifier of a Variation. `options` is a Seq of options that + * will be used for a query. Basically, a query will be executed with every option + * defined in the list of `options`. `setup` defines the needed action for every + * option. For example, the following Variation is used to change the number of shuffle + * partitions of a query. The name of the Variation is "shufflePartitions". There are + * two options, 200 and 2000. The setup is used to set the value of property + * "spark.sql.shuffle.partitions". + * + * {{{ + * Variation("shufflePartitions", Seq("200", "2000")) { + * case num => sqlContext.setConf("spark.sql.shuffle.partitions", num) + * } + * }}} + */ + case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit) + + /** + * Starts an experiment run with a given set of queries. + * @param queriesToRun Queries to be executed. + * @param includeBreakdown If it is true, breakdown results of a query will be recorded. + * Setting it to true may significantly increase the time used to + * execute a query. + * @param iterations The number of iterations. + * @param variations [[Variation]]s used in this run. + * @param tags Tags of this run. + * @return It returns a ExperimentStatus object that can be used to + * track the progress of this experiment run. + */ + def runExperiment( + queriesToRun: Seq[Query], + includeBreakdown: Boolean = false, + iterations: Int = 3, + variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("true")) { _ => {} }), + tags: Map[String, String] = Map.empty) = { + + class ExperimentStatus { + val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]() + val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]() + val currentMessages = new collection.mutable.ArrayBuffer[String]() + + // Stats for HTML status message. + @volatile var currentQuery = "" + @volatile var currentPlan = "" + @volatile var currentConfig = "" + @volatile var failures = 0 + @volatile var startTime = 0L + + def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match { + case Nil => List(Nil) + case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt + } + + val timestamp = System.currentTimeMillis() + val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList) + val resultsFuture = Future { + val results = (1 to iterations).flatMap { i => + combinations.map { setup => + val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map { + case (v, idx) => + v.setup(v.options(idx)) + v.name -> v.options(idx).toString + } + currentConfig = currentOptions.map { case (k,v) => s"$k: $v" }.mkString(", ") + + val result = ExperimentRun( + timestamp = timestamp, + iteration = i, + tags = currentOptions.toMap ++ tags, + configuration = currentConfiguration, + queriesToRun.flatMap { q => + val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}" + currentMessages += s"Running query ${q.name} $setup" + + currentQuery = q.name + currentPlan = q.newDataFrame().queryExecution.executedPlan.toString + startTime = System.currentTimeMillis() + + val singleResult = q.benchmark(includeBreakdown, setup) + singleResult.failure.foreach { f => + failures += 1 + currentMessages += s"Query '${q.name}' failed: ${f.message}" + } + singleResult.executionTime.foreach(time => currentMessages += s"Exec time: $time") + currentResults += singleResult + singleResult :: Nil + }) + currentRuns += result + + result + } + } + + try { + val resultsTable = sqlContext.createDataFrame(results) + currentMessages += s"Results written to table: 'sqlPerformance' at $resultsLocation/$timestamp" + results.toDF().write + .format("json") + .save(s"$resultsLocation/$timestamp") + + results.toDF() + } catch { + case e: Throwable => currentMessages += s"Failed to write data: $e" + } + } + + /** Waits for the finish of the experiment. */ + def waitForFinish(timeoutInSeconds: Int) = { + Await.result(resultsFuture, timeoutInSeconds.seconds) + } + + /** Returns results from an actively running experiment. */ + def getCurrentResults() = { + val tbl = sqlContext.createDataFrame(currentResults) + tbl.registerTempTable("currentResults") + tbl + } + + /** Returns full iterations from an actively running experiment. */ + def getCurrentRuns() = { + val tbl = sqlContext.createDataFrame(currentRuns) + tbl.registerTempTable("currentRuns") + tbl + } + + def tail(n: Int = 5) = { + currentMessages.takeRight(n).mkString("\n") + } + + def status = + if (resultsFuture.isCompleted) { + if (resultsFuture.value.get.isFailure) "Failed" else "Successful" + } else { + "Running" + } + + override def toString = + s"""Permalink: table("sqlPerformance").where('timestamp === ${timestamp}L)""" + + + def html = + s""" + |
+ |${currentPlan.replaceAll("\n", "
")}
+ |
+ |
+ |
+ |${tail()}
+ |
+ """.stripMargin
+ }
+ new ExperimentStatus
+ }
+
+ /** Factory object for benchmark queries. */
+ object Query {
+ def apply(
+ name: String,
+ sqlText: String,
+ description: String,
+ collectResults: Boolean = true): Query = {
+ new Query(name, sqlContext.sql(sqlText), description, collectResults, Some(sqlText))
+ }
+
+ def apply(
+ name: String,
+ dataFrameBuilder: => DataFrame,
+ description: String): Query = {
+ new Query(name, dataFrameBuilder, description, true, None)
+ }
+ }
+
+ /** Holds one benchmark query and its metadata. */
+ class Query(
+ val name: String,
+ buildDataFrame: => DataFrame,
+ val description: String,
+ val collectResults: Boolean,
+ val sqlText: Option[String]) {
+
+ override def toString =
+ s"""
+ |== Query: $name ==
+ |${buildDataFrame.queryExecution.analyzed}
+ """.stripMargin
+
+ val tablesInvolved = buildDataFrame.queryExecution.logical collect {
+ case UnresolvedRelation(tableIdentifier, _) => {
+ // We are ignoring the database name.
+ tableIdentifier.last
+ }
+ }
+
+ def newDataFrame() = buildDataFrame
+
+ def benchmarkMs[A](f: => A): Double = {
+ val startTime = System.nanoTime()
+ val ret = f
+ val endTime = System.nanoTime()
+ (endTime - startTime).toDouble / 1000000
+ }
+
+ def benchmark(includeBreakdown: Boolean, description: String = "") = {
+ try {
+ val dataFrame = buildDataFrame
+ sparkContext.setJobDescription(s"Query: $name, $description")
+ val queryExecution = dataFrame.queryExecution
+ // We are not counting the time of ScalaReflection.convertRowToScala.
+ val parsingTime = benchmarkMs {
+ queryExecution.logical
+ }
+ val analysisTime = benchmarkMs {
+ queryExecution.analyzed
+ }
+ val optimizationTime = benchmarkMs {
+ queryExecution.optimizedPlan
+ }
+ val planningTime = benchmarkMs {
+ queryExecution.executedPlan
+ }
+
+ val breakdownResults = if (includeBreakdown) {
+ val depth = queryExecution.executedPlan.treeString.split("\n").size
+ val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i)))
+ physicalOperators.map {
+ case (index, node) =>
+ val executionTime = benchmarkMs {
+ node.execute().map(_.copy()).foreach(row => Unit)
+ }
+ BreakdownResult(node.nodeName, node.simpleString, index, executionTime)
+ }
+ } else {
+ Seq.empty[BreakdownResult]
+ }
+
+ // The executionTime for the entire query includes the time of type conversion from catalyst to scala.
+ val executionTime = if (collectResults) {
+ benchmarkMs {
+ dataFrame.rdd.collect()
+ }
+ } else {
+ benchmarkMs {
+ dataFrame.rdd.foreach { row => Unit }
+ }
+ }
+
+ val joinTypes = dataFrame.queryExecution.executedPlan.collect {
+ case k if k.nodeName contains "Join" => k.nodeName
+ }
+
+ BenchmarkResult(
+ name = name,
+ joinTypes = joinTypes,
+ tables = tablesInvolved,
+ parsingTime = parsingTime,
+ analysisTime = analysisTime,
+ optimizationTime = optimizationTime,
+ planningTime = planningTime,
+ executionTime = executionTime,
+ queryExecution = dataFrame.queryExecution.toString,
+ breakDown = breakdownResults)
+ } catch {
+ case e: Exception =>
+ BenchmarkResult(
+ name = name,
+ failure = Failure(e.getClass.getName, e.getMessage))
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala
new file mode 100644
index 0000000..0da49e3
--- /dev/null
+++ b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala
@@ -0,0 +1,57 @@
+package com.databricks.spark.sql.perf
+
+import org.apache.spark.sql.SQLContext
+
+class JoinPerformance(sqlContext: SQLContext) extends Benchmark(sqlContext) {
+ def buildTables() = {
+ // 1.5 mb, 1 file
+ sqlContext.range(0, 1000000)
+ .repartition(1)
+ .write.mode("ignore")
+ .saveAsTable("1milints")
+
+ // 143.542mb, 10 files
+ sqlContext.range(0, 100000000)
+ .repartition(10)
+ .write.mode("ignore")
+ .saveAsTable("100milints")
+
+ // 1.4348gb, 10 files
+ sqlContext.range(0, 1000000000)
+ .repartition(10)
+ .write.mode("ignore")
+ .saveAsTable("1bilints")
+ }
+
+ val singleKeyJoins = Seq("1milints", "100milints", "1bilints").flatMap { table1 =>
+ Seq("1milints", "100milints", "1bilints").flatMap { table2 =>
+ Seq("JOIN", "RIGHT JOIN", "LEFT JOIN", "FULL OUTER JOIN").map { join =>
+ Query(
+ s"singleKey-$join-$table1-$table2",
+ s"SELECT COUNT(*) FROM $table1 a $join $table2 b ON a.id = b.id",
+ "equi-inner join a small table with a big table using a single key.",
+ collectResults = true)
+ }
+ }
+ }.filterNot(_.name contains "FULL OUTER JOIN-1milints-1bilints")
+
+ val complexInput =
+ Seq("1milints", "100milints", "1bilints").map { table =>
+ Query(
+ "aggregation-complex-input",
+ s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table",
+ "Sum of 9 columns added together",
+ collectResults = true)
+ }
+
+ val aggregates =
+ Seq("1milints", "100milints", "1bilints").flatMap { table =>
+ Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg =>
+ Query(
+ s"single-aggregate-$agg",
+ s"SELECT $agg(id) FROM $table",
+ "aggregation of a single column",
+ collectResults = true)
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala
index 16d0b1a..c723382 100644
--- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala
+++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala
@@ -21,23 +21,15 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.parquet.TPCDSTableForTest
import org.apache.spark.sql.{Column, SQLContext}
-class BigData (
+class BigDataBenchmark (
@transient sqlContext: SQLContext,
- sparkVersion: String,
dataLocation: String,
- tables: Seq[Table],
+ val tables: Seq[Table],
scaleFactor: String)
- extends Dataset(
- sqlContext,
- sparkVersion,
- dataLocation,
- tables,
- scaleFactor) with Serializable {
+ extends Benchmark(sqlContext) with Serializable with TableCreator {
import sqlContext._
import sqlContext.implicits._
- override val datasetName = "bigDataBenchmark"
-
override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = {
tables.map(table =>
BigDataTableForTest(table, dataLocation, scaleFactor, sqlContext))
diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala
index 12103b5..5541a02 100644
--- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala
+++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala
@@ -16,9 +16,11 @@
package com.databricks.spark.sql.perf.bigdata
-import com.databricks.spark.sql.perf.QuerySet
+import com.databricks.spark.sql.perf.Benchmark
+
+trait Queries {
+ self: Benchmark =>
-trait Queries extends QuerySet {
val queries1to3 = Seq(
Query(
name = "q1A",
diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala
index 8f2fd1e..690acd6 100644
--- a/src/main/scala/com/databricks/spark/sql/perf/query.scala
+++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala
@@ -19,117 +19,11 @@ package com.databricks.spark.sql.perf
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-object x {
-
-
-
trait QuerySet {
val sqlContext: SQLContext
def sparkContext = sqlContext.sparkContext
- object Query {
- def apply(
- name: String,
- sqlText: String,
- description: String,
- collectResults: Boolean = true): Query = {
- new Query(name, sqlContext.sql(sqlText), description, collectResults, Some(sqlText))
- }
- def apply(
- name: String,
- dataFrameBuilder: => DataFrame,
- description: String): Query = {
- new Query(name, dataFrameBuilder, description, true, None)
- }
- }
-
- class Query(
- val name: String,
- dataFrameBuilder: => DataFrame,
- val description: String,
- val collectResults: Boolean,
- val sqlText: Option[String]) {
-
- val tablesInvolved = dataFrameBuilder.queryExecution.logical collect {
- case UnresolvedRelation(tableIdentifier, _) => {
- // We are ignoring the database name.
- tableIdentifier.last
- }
- }
-
- def benchmarkMs[A](f: => A): Double = {
- val startTime = System.nanoTime()
- val ret = f
- val endTime = System.nanoTime()
- (endTime - startTime).toDouble / 1000000
- }
-
- def benchmark(includeBreakdown: Boolean, description: String = "") = {
- try {
- val dataFrame = dataFrameBuilder
- sparkContext.setJobDescription(s"Query: $name, $description")
- val queryExecution = dataFrame.queryExecution
- // We are not counting the time of ScalaReflection.convertRowToScala.
- val parsingTime = benchmarkMs {
- queryExecution.logical
- }
- val analysisTime = benchmarkMs {
- queryExecution.analyzed
- }
- val optimizationTime = benchmarkMs {
- queryExecution.optimizedPlan
- }
- val planningTime = benchmarkMs {
- queryExecution.executedPlan
- }
-
- val breakdownResults = if (includeBreakdown) {
- val depth = queryExecution.executedPlan.treeString.split("\n").size
- val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i)))
- physicalOperators.map {
- case (index, node) =>
- val executionTime = benchmarkMs {
- node.execute().map(_.copy()).foreach(row => Unit)
- }
- BreakdownResult(node.nodeName, node.simpleString, index, executionTime)
- }
- } else {
- Seq.empty[BreakdownResult]
- }
-
- // The executionTime for the entire query includes the time of type conversion from catalyst to scala.
- val executionTime = if (collectResults) {
- benchmarkMs {
- dataFrame.rdd.collect()
- }
- } else {
- benchmarkMs {
- dataFrame.rdd.foreach { row => Unit }
- }
- }
-
- val joinTypes = dataFrame.queryExecution.executedPlan.collect {
- case k if k.nodeName contains "Join" => k.nodeName
- }
-
- BenchmarkResult(
- name = name,
- joinTypes = joinTypes,
- tables = tablesInvolved,
- parsingTime = parsingTime,
- analysisTime = analysisTime,
- optimizationTime = optimizationTime,
- planningTime = planningTime,
- executionTime = executionTime,
- breakdownResults)
- } catch {
- case e: Exception =>
- throw new RuntimeException(
- s"Failed to benchmark query $name", e)
- }
- }
- }
}
\ No newline at end of file
diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala
new file mode 100644
index 0000000..7d31ed2
--- /dev/null
+++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2015 Databricks Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.databricks.spark.sql.perf
+
+/**
+ * The performance results of all given queries for a single iteration.
+ * @param timestamp The timestamp indicates when the entire experiment is started.
+ * @param iteration The index number of the current iteration.
+ * @param tags Tags of this iteration (variations are stored at here).
+ * @param configuration Configuration properties of this iteration.
+ * @param results The performance results of queries for this iteration.
+ */
+case class ExperimentRun(
+ timestamp: Long,
+ iteration: Int,
+ tags: Map[String, String],
+ configuration: BenchmarkConfiguration,
+ results: Seq[BenchmarkResult])
+
+/**
+ * The configuration used for an iteration of an experiment.
+ * @param sparkVersion The version of Spark.
+ * @param sqlConf All configuration properties related to Spark SQL.
+ * @param sparkConf All configuration properties of Spark.
+ * @param defaultParallelism The default parallelism of the cluster.
+ * Usually, it is the number of cores of the cluster.
+ */
+case class BenchmarkConfiguration(
+ sparkVersion: String = org.apache.spark.SPARK_VERSION,
+ sqlConf: Map[String, String],
+ sparkConf: Map[String,String],
+ defaultParallelism: Int)
+
+/**
+ * The result of a query.
+ * @param name The name of the query.
+ * @param joinTypes The type of join operations in the query.
+ * @param tables The tables involved in the query.
+ * @param parsingTime The time used to parse the query.
+ * @param analysisTime The time used to analyze the query.
+ * @param optimizationTime The time used to optimize the query.
+ * @param planningTime The time used to plan the query.
+ * @param executionTime The time used to execute the query.
+ * @param breakDown The breakdown results of the query plan tree.
+ */
+case class BenchmarkResult(
+ name: String,
+ joinTypes: Seq[String] = Nil,
+ tables: Seq[String] = Nil,
+ parsingTime: Option[Double] = None,
+ analysisTime: Option[Double] = None,
+ optimizationTime: Option[Double] = None,
+ planningTime: Option[Double] = None,
+ executionTime: Option[Double] = None,
+ breakDown: Seq[BreakdownResult] = Nil,
+ queryExecution: Option[String] = None,
+ failure: Option[Failure] = None)
+
+/**
+ * The execution time of a subtree of the query plan tree of a specific query.
+ * @param nodeName The name of the top physical operator of the subtree.
+ * @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree.
+ * @param index The index of the top physical operator of the subtree
+ * in the original query plan tree. The index starts from 0
+ * (0 represents the top physical operator of the original query plan tree).
+ * @param executionTime The execution time of the subtree.
+ */
+case class BreakdownResult(
+ nodeName: String,
+ nodeNameWithArgs: String,
+ index: Int,
+ executionTime: Double)
+
+case class Failure(className: String, message: String)
\ No newline at end of file
diff --git a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala
deleted file mode 100644
index 3920c6f..0000000
--- a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Copyright 2015 Databricks Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.databricks.spark.sql.perf
-
-import scala.concurrent._
-import scala.concurrent.duration._
-import scala.concurrent.ExecutionContext.Implicits.global
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, FileSystem}
-import org.apache.spark.sql.SQLContext
-
-/**
- * The configuration used for an iteration of an experiment.
- * @param sparkVersion The version of Spark.
- * @param scaleFactor The scale factor of the dataset.
- * @param sqlConf All configuration properties related to Spark SQL.
- * @param sparkConf All configuration properties of Spark.
- * @param defaultParallelism The default parallelism of the cluster.
- * Usually, it is the number of cores of the cluster.
- */
-case class BenchmarkConfiguration(
- sparkVersion: String,
- scaleFactor: String,
- sqlConf: Map[String, String],
- sparkConf: Map[String,String],
- defaultParallelism: Int)
-
-/**
- * The execution time of a subtree of the query plan tree of a specific query.
- * @param nodeName The name of the top physical operator of the subtree.
- * @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree.
- * @param index The index of the top physical operator of the subtree
- * in the original query plan tree. The index starts from 0
- * (0 represents the top physical operator of the original query plan tree).
- * @param executionTime The execution time of the subtree.
- */
-case class BreakdownResult(
- nodeName: String,
- nodeNameWithArgs: String,
- index: Int,
- executionTime: Double)
-
-/**
- * The result of a query.
- * @param name The name of the query.
- * @param joinTypes The type of join operations in the query.
- * @param tables The tables involved in the query.
- * @param parsingTime The time used to parse the query.
- * @param analysisTime The time used to analyze the query.
- * @param optimizationTime The time used to optimize the query.
- * @param planningTime The time used to plan the query.
- * @param executionTime The time used to execute the query.
- * @param breakDown The breakdown results of the query plan tree.
- */
-case class BenchmarkResult(
- name: String,
- joinTypes: Seq[String],
- tables: Seq[String],
- parsingTime: Double,
- analysisTime: Double,
- optimizationTime: Double,
- planningTime: Double,
- executionTime: Double,
- breakDown: Seq[BreakdownResult])
-
-/**
- * A Variation represents a setting (e.g. the number of shuffle partitions and if tables
- * are cached in memory) that we want to change in a experiment run.
- * A Variation has three parts, `name`, `options`, and `setup`.
- * The `name` is the identifier of a Variation. `options` is a Seq of options that
- * will be used for a query. Basically, a query will be executed with every option
- * defined in the list of `options`. `setup` defines the needed action for every
- * option. For example, the following Variation is used to change the number of shuffle
- * partitions of a query. The name of the Variation is "shufflePartitions". There are
- * two options, 200 and 2000. The setup is used to set the value of property
- * "spark.sql.shuffle.partitions".
- *
- * {{{
- * Variation("shufflePartitions", Seq("200", "2000")) {
- * case num => sqlContext.setConf("spark.sql.shuffle.partitions", num)
- * }
- * }}}
- */
-case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit)
-
-/**
- * The performance results of all given queries for a single iteration.
- * @param timestamp The timestamp indicates when the entire experiment is started.
- * @param datasetName The name of dataset.
- * @param iteration The index number of the current iteration.
- * @param tags Tags of this iteration (variations are stored at here).
- * @param configuration Configuration properties of this iteration.
- * @param results The performance results of queries for this iteration.
- */
-case class ExperimentRun(
- timestamp: Long,
- datasetName: String,
- iteration: Int,
- tags: Map[String, String],
- configuration: BenchmarkConfiguration,
- results: Seq[BenchmarkResult])
-
-/**
- * The dataset of a benchmark.
- * @param sqlContext An existing SQLContext.
- * @param sparkVersion The version of Spark.
- * @param dataLocation The location of the dataset used by this experiment.
- * @param tables Tables that will be used in this experiment.
- * @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H
- * and TPC-DS, the scale factor is a number roughly representing the
- * size of raw data files. For some other benchmarks, the scale factor
- * is a short string describing the scale of the dataset.
- */
-abstract class Dataset(
- @transient val sqlContext: SQLContext,
- sparkVersion: String,
- dataLocation: String,
- tables: Seq[Table],
- scaleFactor: String) extends Serializable with QuerySet {
-
- val datasetName: String
-
- def createTablesForTest(tables: Seq[Table]): Seq[TableForTest]
-
- val tablesForTest: Seq[TableForTest] = createTablesForTest(tables)
-
- def checkData(): Unit = {
- tablesForTest.foreach { table =>
- val fs = FileSystem.get(new java.net.URI(table.outputDir), new Configuration())
- val exists = fs.exists(new Path(table.outputDir))
- val wasSuccessful = fs.exists(new Path(s"${table.outputDir}/_SUCCESS"))
-
- if (!wasSuccessful) {
- if (exists) {
- println(s"Table '${table.name}' not generated successfully, regenerating.")
- } else {
- println(s"Table '${table.name}' does not exist, generating.")
- }
- fs.delete(new Path(table.outputDir), true)
- table.generate()
- } else {
- println(s"Table ${table.name} already exists.")
- }
- }
- }
-
- def allStats = tablesForTest.map(_.stats).reduceLeft(_.unionAll(_))
-
- /**
- * Does necessary setup work such as data generation and transformation. It needs to be
- * called before running any query.
- */
- def setup(): Unit = {
- checkData()
- tablesForTest.foreach(_.createTempTable())
- }
-
- def currentConfiguration = BenchmarkConfiguration(
- sparkVersion = sparkVersion,
- scaleFactor = scaleFactor,
- sqlConf = sqlContext.getAllConfs,
- sparkConf = sparkContext.getConf.getAll.toMap,
- defaultParallelism = sparkContext.defaultParallelism)
-
- /**
- * Starts an experiment run with a given set of queries.
- * @param queriesToRun Queries to be executed.
- * @param resultsLocation The location of performance results.
- * @param includeBreakdown If it is true, breakdown results of a query will be recorded.
- * Setting it to true may significantly increase the time used to
- * execute a query.
- * @param iterations The number of iterations.
- * @param variations [[Variation]]s used in this run.
- * @param tags Tags of this run.
- * @return It returns a ExperimentStatus object that can be used to
- * track the progress of this experiment run.
- */
- def runExperiment(
- queriesToRun: Seq[Query],
- resultsLocation: String,
- includeBreakdown: Boolean = false,
- iterations: Int = 3,
- variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("")) { _ => {} }),
- tags: Map[String, String] = Map.empty) = {
-
- class ExperimentStatus {
- val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]()
- val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]()
- val currentMessages = new collection.mutable.ArrayBuffer[String]()
-
- @volatile
- var currentQuery = ""
-
- def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match {
- case Nil => List(Nil)
- case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt
- }
-
- val timestamp = System.currentTimeMillis()
- val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList)
- val resultsFuture = future {
- val results = (1 to iterations).flatMap { i =>
- combinations.map { setup =>
- val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map {
- case (v, idx) =>
- v.setup(v.options(idx))
- v.name -> v.options(idx).toString
- }
-
- val result = ExperimentRun(
- timestamp = timestamp,
- datasetName = datasetName,
- iteration = i,
- tags = currentOptions.toMap ++ tags,
- configuration = currentConfiguration,
- queriesToRun.flatMap { q =>
- val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}"
- currentMessages += s"Running query ${q.name} $setup"
-
- currentQuery = q.name
- val singleResult = try q.benchmark(includeBreakdown, setup) :: Nil catch {
- case e: Exception =>
- currentMessages += s"Failed to run query ${q.name}: $e"
- Nil
- }
- currentResults ++= singleResult
- singleResult
- })
- currentRuns += result
-
- result
- }
- }
-
- val resultsTable = sqlContext.createDataFrame(results)
- currentMessages += s"Results stored to: $resultsLocation/$timestamp"
- resultsTable.toJSON.coalesce(1).saveAsTextFile(s"$resultsLocation/$timestamp")
- resultsTable
- }
-
- /** Waits for the finish of the experiment. */
- def waitForFinish(timeoutInSeconds: Int) = {
- Await.result(resultsFuture, timeoutInSeconds.seconds)
- }
-
- /** Returns results from an actively running experiment. */
- def getCurrentResults() = {
- val tbl = sqlContext.createDataFrame(currentResults)
- tbl.registerTempTable("currentResults")
- tbl
- }
-
- /** Returns full iterations from an actively running experiment. */
- def getCurrentRuns() = {
- val tbl = sqlContext.createDataFrame(currentRuns)
- tbl.registerTempTable("currentRuns")
- tbl
- }
-
- def tail(n: Int = 5) = {
- currentMessages.takeRight(n).mkString("\n")
- }
-
- def status =
- if (resultsFuture.isCompleted) {
- if (resultsFuture.value.get.isFailure) "Failed" else "Successful"
- } else {
- "Running"
- }
-
-
- override def toString =
- s"""
- |=== $status Experiment ===
- |Permalink: table("allResults").where('timestamp === ${timestamp}L)
- |Queries: ${queriesToRun.map(_.name).map(n => if(n == currentQuery) s"|$n|" else n).mkString(" ")}
- |Iterations complete: ${currentRuns.size / combinations.size} / $iterations
- |Queries run: ${currentResults.size} / ${iterations * combinations.size * queriesToRun.size}
- |Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s
- |
- |== Logs ==
- |${tail()}
- """.stripMargin
- }
- new ExperimentStatus
- }
-}
\ No newline at end of file
diff --git a/src/main/scala/com/databricks/spark/sql/perf/table.scala b/src/main/scala/com/databricks/spark/sql/perf/table.scala
index cb331e2..2eca42e 100644
--- a/src/main/scala/com/databricks/spark/sql/perf/table.scala
+++ b/src/main/scala/com/databricks/spark/sql/perf/table.scala
@@ -32,6 +32,35 @@ import org.apache.spark.sql.types._
import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.util.ContextUtil
+trait TableCreator {
+
+ def tables: Seq[Table]
+
+ def createTablesForTest(tables: Seq[Table]): Seq[TableForTest]
+
+ val tablesForTest: Seq[TableForTest] = createTablesForTest(tables)
+
+ def checkData(): Unit = {
+ tablesForTest.foreach { table =>
+ val fs = FileSystem.get(new java.net.URI(table.outputDir), new Configuration())
+ val exists = fs.exists(new Path(table.outputDir))
+ val wasSuccessful = fs.exists(new Path(s"${table.outputDir}/_SUCCESS"))
+
+ if (!wasSuccessful) {
+ if (exists) {
+ println(s"Table '${table.name}' not generated successfully, regenerating.")
+ } else {
+ println(s"Table '${table.name}' does not exist, generating.")
+ }
+ fs.delete(new Path(table.outputDir), true)
+ table.generate()
+ } else {
+ println(s"Table ${table.name} already exists.")
+ }
+ }
+ }
+}
+
abstract class TableType
case object UnpartitionedTable extends TableType
case class PartitionedTable(partitionColumn: String) extends TableType
@@ -47,7 +76,7 @@ abstract class TableForTest(
val name = table.name
- val outputDir = s"$baseDir/parquet/${name}"
+ val outputDir = s"$baseDir/parquet/$name"
def fromCatalog = sqlContext.table(name)
@@ -57,16 +86,5 @@ abstract class TableForTest(
count("*") as "numRows",
lit(fromCatalog.queryExecution.optimizedPlan.statistics.sizeInBytes.toLong) as "sizeInBytes")
- def createTempTable(): Unit = {
- sqlContext.sql(
- s"""
- |CREATE TEMPORARY TABLE ${name}
- |USING org.apache.spark.sql.parquet
- |OPTIONS (
- | path '${outputDir}'
- |)
- """.stripMargin)
- }
-
- def generate(): Unit
+ def generate()
}
diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala
index 363a2d4..a685739 100644
--- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala
+++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala
@@ -39,20 +39,13 @@ class TPCDS (
sparkVersion: String,
dataLocation: String,
dsdgenDir: String,
- tables: Seq[Table],
+ val tables: Seq[Table],
scaleFactor: String,
userSpecifiedBaseDir: Option[String] = None)
- extends Dataset(
- sqlContext,
- sparkVersion,
- dataLocation,
- tables,
- scaleFactor) with Serializable {
+ extends Benchmark(sqlContext) with TableCreator with Serializable {
import sqlContext._
import sqlContext.implicits._
- override val datasetName = "tpcds"
-
lazy val baseDir =
userSpecifiedBaseDir.getOrElse(s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true")
@@ -61,6 +54,7 @@ class TPCDS (
TPCDSTableForTest(table, baseDir, scaleFactor.toInt, dsdgenDir, sqlContext))
}
+ /*
override def setup(): Unit = {
super.setup()
setupBroadcast()
@@ -79,5 +73,6 @@ class TPCDS (
println(setQuery)
sql(setQuery)
}
+ */
}
diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala
index e216b31..a6b180e 100644
--- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala
+++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala
@@ -107,12 +107,14 @@ case class TPCDSTableForTest(
val output = convertedData.queryExecution.analyzed.output
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
+
+ //HAX
val writeSupport =
- if (schema.fields.map(_.dataType).forall(_.isPrimitive)) {
- classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport]
- } else {
+ // if (schema.fields.map(_.dataType).forall(_.isPrimitive)) {
+ // classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport]
+ // } else {
classOf[org.apache.spark.sql.parquet.RowWriteSupport]
- }
+ // }
ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala
index df621bf..31e969a 100644
--- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala
+++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala
@@ -16,9 +16,11 @@
package com.databricks.spark.sql.perf.tpcds.queries
-import com.databricks.spark.sql.perf.QuerySet
+import com.databricks.spark.sql.perf.Benchmark
+
+trait ImpalaKitQueries {
+ self: Benchmark =>
-trait ImpalaKitQueries extends QuerySet {
// Queries are from
// https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries
val queries = Seq(
diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala
index 98aec64..24b2007 100644
--- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala
+++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala
@@ -16,9 +16,11 @@
package com.databricks.spark.sql.perf.tpcds.queries
-import com.databricks.spark.sql.perf.QuerySet
+import com.databricks.spark.sql.perf.Benchmark
+
+trait SimpleQueries {
+ self: Benchmark =>
-trait SimpleQueries extends QuerySet{
val q7Derived = Seq(
("q7-simpleScan",
"""