Basic join performance tests

This commit is contained in:
Michael Armbrust 2015-07-13 16:20:24 -07:00
parent eb3dd30c35
commit eba8cea93c
13 changed files with 547 additions and 463 deletions

View File

@ -3,22 +3,13 @@
scalaVersion := "2.10.4"
sparkVersion := "1.3.0"
sparkPackageName := "databricks/spark-sql-perf"
// Don't forget to set the version
version := "0.0.1-SNAPSHOT"
version := "0.0.4-SNAPSHOT"
// All Spark Packages need a license
licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0"))
sparkVersion := "1.4.0"
// Add Spark components this package depends on, e.g, "mllib", ....
sparkComponents ++= Seq("sql", "hive")
// uncomment and change the value below to change the directory where your zip artifact will be created
// spDistDirectory := target.value
// add any sparkPackageDependencies using sparkPackageDependencies.
// e.g. sparkPackageDependencies += "databricks/spark-avro:0.1"
sparkComponents ++= Seq("sql", "hive")

View File

@ -0,0 +1,343 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.sql.perf
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical.Subquery
/**
* A collection of queries that test a particular aspect of Spark SQL.
*
* @param sqlContext An existing SQLContext.
*/
abstract class Benchmark(@transient protected val sqlContext: SQLContext)
extends Serializable {
import sqlContext.implicits._
val resultsLocation = "/spark/sql/performance"
val resultsTableName = "sqlPerformance"
def createResultsTable() = {
sqlContext.sql(s"DROP TABLE $resultsTableName")
sqlContext.createExternalTable(
"sqlPerformance", "json", Map("path" -> (resultsLocation + "/*/")))
}
protected def sparkContext = sqlContext.sparkContext
implicit def toOption[A](a: A) = Option(a)
def currentConfiguration = BenchmarkConfiguration(
sqlConf = sqlContext.getAllConfs,
sparkConf = sparkContext.getConf.getAll.toMap,
defaultParallelism = sparkContext.defaultParallelism)
/**
* A Variation represents a setting (e.g. the number of shuffle partitions or if tables
* are cached in memory) that we want to change in a experiment run.
* A Variation has three parts, `name`, `options`, and `setup`.
* The `name` is the identifier of a Variation. `options` is a Seq of options that
* will be used for a query. Basically, a query will be executed with every option
* defined in the list of `options`. `setup` defines the needed action for every
* option. For example, the following Variation is used to change the number of shuffle
* partitions of a query. The name of the Variation is "shufflePartitions". There are
* two options, 200 and 2000. The setup is used to set the value of property
* "spark.sql.shuffle.partitions".
*
* {{{
* Variation("shufflePartitions", Seq("200", "2000")) {
* case num => sqlContext.setConf("spark.sql.shuffle.partitions", num)
* }
* }}}
*/
case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit)
/**
* Starts an experiment run with a given set of queries.
* @param queriesToRun Queries to be executed.
* @param includeBreakdown If it is true, breakdown results of a query will be recorded.
* Setting it to true may significantly increase the time used to
* execute a query.
* @param iterations The number of iterations.
* @param variations [[Variation]]s used in this run.
* @param tags Tags of this run.
* @return It returns a ExperimentStatus object that can be used to
* track the progress of this experiment run.
*/
def runExperiment(
queriesToRun: Seq[Query],
includeBreakdown: Boolean = false,
iterations: Int = 3,
variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("true")) { _ => {} }),
tags: Map[String, String] = Map.empty) = {
class ExperimentStatus {
val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]()
val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]()
val currentMessages = new collection.mutable.ArrayBuffer[String]()
// Stats for HTML status message.
@volatile var currentQuery = ""
@volatile var currentPlan = ""
@volatile var currentConfig = ""
@volatile var failures = 0
@volatile var startTime = 0L
def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match {
case Nil => List(Nil)
case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt
}
val timestamp = System.currentTimeMillis()
val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList)
val resultsFuture = Future {
val results = (1 to iterations).flatMap { i =>
combinations.map { setup =>
val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map {
case (v, idx) =>
v.setup(v.options(idx))
v.name -> v.options(idx).toString
}
currentConfig = currentOptions.map { case (k,v) => s"$k: $v" }.mkString(", ")
val result = ExperimentRun(
timestamp = timestamp,
iteration = i,
tags = currentOptions.toMap ++ tags,
configuration = currentConfiguration,
queriesToRun.flatMap { q =>
val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}"
currentMessages += s"Running query ${q.name} $setup"
currentQuery = q.name
currentPlan = q.newDataFrame().queryExecution.executedPlan.toString
startTime = System.currentTimeMillis()
val singleResult = q.benchmark(includeBreakdown, setup)
singleResult.failure.foreach { f =>
failures += 1
currentMessages += s"Query '${q.name}' failed: ${f.message}"
}
singleResult.executionTime.foreach(time => currentMessages += s"Exec time: $time")
currentResults += singleResult
singleResult :: Nil
})
currentRuns += result
result
}
}
try {
val resultsTable = sqlContext.createDataFrame(results)
currentMessages += s"Results written to table: 'sqlPerformance' at $resultsLocation/$timestamp"
results.toDF().write
.format("json")
.save(s"$resultsLocation/$timestamp")
results.toDF()
} catch {
case e: Throwable => currentMessages += s"Failed to write data: $e"
}
}
/** Waits for the finish of the experiment. */
def waitForFinish(timeoutInSeconds: Int) = {
Await.result(resultsFuture, timeoutInSeconds.seconds)
}
/** Returns results from an actively running experiment. */
def getCurrentResults() = {
val tbl = sqlContext.createDataFrame(currentResults)
tbl.registerTempTable("currentResults")
tbl
}
/** Returns full iterations from an actively running experiment. */
def getCurrentRuns() = {
val tbl = sqlContext.createDataFrame(currentRuns)
tbl.registerTempTable("currentRuns")
tbl
}
def tail(n: Int = 5) = {
currentMessages.takeRight(n).mkString("\n")
}
def status =
if (resultsFuture.isCompleted) {
if (resultsFuture.value.get.isFailure) "Failed" else "Successful"
} else {
"Running"
}
override def toString =
s"""Permalink: table("sqlPerformance").where('timestamp === ${timestamp}L)"""
def html =
s"""
|<h2>$status Experiment</h2>
|<b>Permalink:</b> <tt>table("$resultsTableName").where('timestamp === ${timestamp}L)</tt><br/>
|<b>Iterations complete:</b> ${currentRuns.size / combinations.size} / $iterations<br/>
|<b>Failures:</b> $failures<br/>
|<b>Queries run:</b> ${currentResults.size} / ${iterations * combinations.size * queriesToRun.size}<br/>
|<b>Run time:</b> ${(System.currentTimeMillis() - timestamp) / 1000}s<br/>
|
|<h2>Current Query: $currentQuery</h2>
|Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s
|$currentConfig<br/>
|<h3>QueryPlan</h3>
|<pre>
|${currentPlan.replaceAll("\n", "<br/>")}
|</pre>
|
|<h2>Logs</h2>
|<pre>
|${tail()}
|</pre>
""".stripMargin
}
new ExperimentStatus
}
/** Factory object for benchmark queries. */
object Query {
def apply(
name: String,
sqlText: String,
description: String,
collectResults: Boolean = true): Query = {
new Query(name, sqlContext.sql(sqlText), description, collectResults, Some(sqlText))
}
def apply(
name: String,
dataFrameBuilder: => DataFrame,
description: String): Query = {
new Query(name, dataFrameBuilder, description, true, None)
}
}
/** Holds one benchmark query and its metadata. */
class Query(
val name: String,
buildDataFrame: => DataFrame,
val description: String,
val collectResults: Boolean,
val sqlText: Option[String]) {
override def toString =
s"""
|== Query: $name ==
|${buildDataFrame.queryExecution.analyzed}
""".stripMargin
val tablesInvolved = buildDataFrame.queryExecution.logical collect {
case UnresolvedRelation(tableIdentifier, _) => {
// We are ignoring the database name.
tableIdentifier.last
}
}
def newDataFrame() = buildDataFrame
def benchmarkMs[A](f: => A): Double = {
val startTime = System.nanoTime()
val ret = f
val endTime = System.nanoTime()
(endTime - startTime).toDouble / 1000000
}
def benchmark(includeBreakdown: Boolean, description: String = "") = {
try {
val dataFrame = buildDataFrame
sparkContext.setJobDescription(s"Query: $name, $description")
val queryExecution = dataFrame.queryExecution
// We are not counting the time of ScalaReflection.convertRowToScala.
val parsingTime = benchmarkMs {
queryExecution.logical
}
val analysisTime = benchmarkMs {
queryExecution.analyzed
}
val optimizationTime = benchmarkMs {
queryExecution.optimizedPlan
}
val planningTime = benchmarkMs {
queryExecution.executedPlan
}
val breakdownResults = if (includeBreakdown) {
val depth = queryExecution.executedPlan.treeString.split("\n").size
val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i)))
physicalOperators.map {
case (index, node) =>
val executionTime = benchmarkMs {
node.execute().map(_.copy()).foreach(row => Unit)
}
BreakdownResult(node.nodeName, node.simpleString, index, executionTime)
}
} else {
Seq.empty[BreakdownResult]
}
// The executionTime for the entire query includes the time of type conversion from catalyst to scala.
val executionTime = if (collectResults) {
benchmarkMs {
dataFrame.rdd.collect()
}
} else {
benchmarkMs {
dataFrame.rdd.foreach { row => Unit }
}
}
val joinTypes = dataFrame.queryExecution.executedPlan.collect {
case k if k.nodeName contains "Join" => k.nodeName
}
BenchmarkResult(
name = name,
joinTypes = joinTypes,
tables = tablesInvolved,
parsingTime = parsingTime,
analysisTime = analysisTime,
optimizationTime = optimizationTime,
planningTime = planningTime,
executionTime = executionTime,
queryExecution = dataFrame.queryExecution.toString,
breakDown = breakdownResults)
} catch {
case e: Exception =>
BenchmarkResult(
name = name,
failure = Failure(e.getClass.getName, e.getMessage))
}
}
}
}

View File

@ -0,0 +1,57 @@
package com.databricks.spark.sql.perf
import org.apache.spark.sql.SQLContext
class JoinPerformance(sqlContext: SQLContext) extends Benchmark(sqlContext) {
def buildTables() = {
// 1.5 mb, 1 file
sqlContext.range(0, 1000000)
.repartition(1)
.write.mode("ignore")
.saveAsTable("1milints")
// 143.542mb, 10 files
sqlContext.range(0, 100000000)
.repartition(10)
.write.mode("ignore")
.saveAsTable("100milints")
// 1.4348gb, 10 files
sqlContext.range(0, 1000000000)
.repartition(10)
.write.mode("ignore")
.saveAsTable("1bilints")
}
val singleKeyJoins = Seq("1milints", "100milints", "1bilints").flatMap { table1 =>
Seq("1milints", "100milints", "1bilints").flatMap { table2 =>
Seq("JOIN", "RIGHT JOIN", "LEFT JOIN", "FULL OUTER JOIN").map { join =>
Query(
s"singleKey-$join-$table1-$table2",
s"SELECT COUNT(*) FROM $table1 a $join $table2 b ON a.id = b.id",
"equi-inner join a small table with a big table using a single key.",
collectResults = true)
}
}
}.filterNot(_.name contains "FULL OUTER JOIN-1milints-1bilints")
val complexInput =
Seq("1milints", "100milints", "1bilints").map { table =>
Query(
"aggregation-complex-input",
s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table",
"Sum of 9 columns added together",
collectResults = true)
}
val aggregates =
Seq("1milints", "100milints", "1bilints").flatMap { table =>
Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg =>
Query(
s"single-aggregate-$agg",
s"SELECT $agg(id) FROM $table",
"aggregation of a single column",
collectResults = true)
}
}
}

View File

@ -21,23 +21,15 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.parquet.TPCDSTableForTest
import org.apache.spark.sql.{Column, SQLContext}
class BigData (
class BigDataBenchmark (
@transient sqlContext: SQLContext,
sparkVersion: String,
dataLocation: String,
tables: Seq[Table],
val tables: Seq[Table],
scaleFactor: String)
extends Dataset(
sqlContext,
sparkVersion,
dataLocation,
tables,
scaleFactor) with Serializable {
extends Benchmark(sqlContext) with Serializable with TableCreator {
import sqlContext._
import sqlContext.implicits._
override val datasetName = "bigDataBenchmark"
override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = {
tables.map(table =>
BigDataTableForTest(table, dataLocation, scaleFactor, sqlContext))

View File

@ -16,9 +16,11 @@
package com.databricks.spark.sql.perf.bigdata
import com.databricks.spark.sql.perf.QuerySet
import com.databricks.spark.sql.perf.Benchmark
trait Queries {
self: Benchmark =>
trait Queries extends QuerySet {
val queries1to3 = Seq(
Query(
name = "q1A",

View File

@ -19,117 +19,11 @@ package com.databricks.spark.sql.perf
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
object x {
trait QuerySet {
val sqlContext: SQLContext
def sparkContext = sqlContext.sparkContext
object Query {
def apply(
name: String,
sqlText: String,
description: String,
collectResults: Boolean = true): Query = {
new Query(name, sqlContext.sql(sqlText), description, collectResults, Some(sqlText))
}
def apply(
name: String,
dataFrameBuilder: => DataFrame,
description: String): Query = {
new Query(name, dataFrameBuilder, description, true, None)
}
}
class Query(
val name: String,
dataFrameBuilder: => DataFrame,
val description: String,
val collectResults: Boolean,
val sqlText: Option[String]) {
val tablesInvolved = dataFrameBuilder.queryExecution.logical collect {
case UnresolvedRelation(tableIdentifier, _) => {
// We are ignoring the database name.
tableIdentifier.last
}
}
def benchmarkMs[A](f: => A): Double = {
val startTime = System.nanoTime()
val ret = f
val endTime = System.nanoTime()
(endTime - startTime).toDouble / 1000000
}
def benchmark(includeBreakdown: Boolean, description: String = "") = {
try {
val dataFrame = dataFrameBuilder
sparkContext.setJobDescription(s"Query: $name, $description")
val queryExecution = dataFrame.queryExecution
// We are not counting the time of ScalaReflection.convertRowToScala.
val parsingTime = benchmarkMs {
queryExecution.logical
}
val analysisTime = benchmarkMs {
queryExecution.analyzed
}
val optimizationTime = benchmarkMs {
queryExecution.optimizedPlan
}
val planningTime = benchmarkMs {
queryExecution.executedPlan
}
val breakdownResults = if (includeBreakdown) {
val depth = queryExecution.executedPlan.treeString.split("\n").size
val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i)))
physicalOperators.map {
case (index, node) =>
val executionTime = benchmarkMs {
node.execute().map(_.copy()).foreach(row => Unit)
}
BreakdownResult(node.nodeName, node.simpleString, index, executionTime)
}
} else {
Seq.empty[BreakdownResult]
}
// The executionTime for the entire query includes the time of type conversion from catalyst to scala.
val executionTime = if (collectResults) {
benchmarkMs {
dataFrame.rdd.collect()
}
} else {
benchmarkMs {
dataFrame.rdd.foreach { row => Unit }
}
}
val joinTypes = dataFrame.queryExecution.executedPlan.collect {
case k if k.nodeName contains "Join" => k.nodeName
}
BenchmarkResult(
name = name,
joinTypes = joinTypes,
tables = tablesInvolved,
parsingTime = parsingTime,
analysisTime = analysisTime,
optimizationTime = optimizationTime,
planningTime = planningTime,
executionTime = executionTime,
breakdownResults)
} catch {
case e: Exception =>
throw new RuntimeException(
s"Failed to benchmark query $name", e)
}
}
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.sql.perf
/**
* The performance results of all given queries for a single iteration.
* @param timestamp The timestamp indicates when the entire experiment is started.
* @param iteration The index number of the current iteration.
* @param tags Tags of this iteration (variations are stored at here).
* @param configuration Configuration properties of this iteration.
* @param results The performance results of queries for this iteration.
*/
case class ExperimentRun(
timestamp: Long,
iteration: Int,
tags: Map[String, String],
configuration: BenchmarkConfiguration,
results: Seq[BenchmarkResult])
/**
* The configuration used for an iteration of an experiment.
* @param sparkVersion The version of Spark.
* @param sqlConf All configuration properties related to Spark SQL.
* @param sparkConf All configuration properties of Spark.
* @param defaultParallelism The default parallelism of the cluster.
* Usually, it is the number of cores of the cluster.
*/
case class BenchmarkConfiguration(
sparkVersion: String = org.apache.spark.SPARK_VERSION,
sqlConf: Map[String, String],
sparkConf: Map[String,String],
defaultParallelism: Int)
/**
* The result of a query.
* @param name The name of the query.
* @param joinTypes The type of join operations in the query.
* @param tables The tables involved in the query.
* @param parsingTime The time used to parse the query.
* @param analysisTime The time used to analyze the query.
* @param optimizationTime The time used to optimize the query.
* @param planningTime The time used to plan the query.
* @param executionTime The time used to execute the query.
* @param breakDown The breakdown results of the query plan tree.
*/
case class BenchmarkResult(
name: String,
joinTypes: Seq[String] = Nil,
tables: Seq[String] = Nil,
parsingTime: Option[Double] = None,
analysisTime: Option[Double] = None,
optimizationTime: Option[Double] = None,
planningTime: Option[Double] = None,
executionTime: Option[Double] = None,
breakDown: Seq[BreakdownResult] = Nil,
queryExecution: Option[String] = None,
failure: Option[Failure] = None)
/**
* The execution time of a subtree of the query plan tree of a specific query.
* @param nodeName The name of the top physical operator of the subtree.
* @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree.
* @param index The index of the top physical operator of the subtree
* in the original query plan tree. The index starts from 0
* (0 represents the top physical operator of the original query plan tree).
* @param executionTime The execution time of the subtree.
*/
case class BreakdownResult(
nodeName: String,
nodeNameWithArgs: String,
index: Int,
executionTime: Double)
case class Failure(className: String, message: String)

View File

@ -1,302 +0,0 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.sql.perf
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.SQLContext
/**
* The configuration used for an iteration of an experiment.
* @param sparkVersion The version of Spark.
* @param scaleFactor The scale factor of the dataset.
* @param sqlConf All configuration properties related to Spark SQL.
* @param sparkConf All configuration properties of Spark.
* @param defaultParallelism The default parallelism of the cluster.
* Usually, it is the number of cores of the cluster.
*/
case class BenchmarkConfiguration(
sparkVersion: String,
scaleFactor: String,
sqlConf: Map[String, String],
sparkConf: Map[String,String],
defaultParallelism: Int)
/**
* The execution time of a subtree of the query plan tree of a specific query.
* @param nodeName The name of the top physical operator of the subtree.
* @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree.
* @param index The index of the top physical operator of the subtree
* in the original query plan tree. The index starts from 0
* (0 represents the top physical operator of the original query plan tree).
* @param executionTime The execution time of the subtree.
*/
case class BreakdownResult(
nodeName: String,
nodeNameWithArgs: String,
index: Int,
executionTime: Double)
/**
* The result of a query.
* @param name The name of the query.
* @param joinTypes The type of join operations in the query.
* @param tables The tables involved in the query.
* @param parsingTime The time used to parse the query.
* @param analysisTime The time used to analyze the query.
* @param optimizationTime The time used to optimize the query.
* @param planningTime The time used to plan the query.
* @param executionTime The time used to execute the query.
* @param breakDown The breakdown results of the query plan tree.
*/
case class BenchmarkResult(
name: String,
joinTypes: Seq[String],
tables: Seq[String],
parsingTime: Double,
analysisTime: Double,
optimizationTime: Double,
planningTime: Double,
executionTime: Double,
breakDown: Seq[BreakdownResult])
/**
* A Variation represents a setting (e.g. the number of shuffle partitions and if tables
* are cached in memory) that we want to change in a experiment run.
* A Variation has three parts, `name`, `options`, and `setup`.
* The `name` is the identifier of a Variation. `options` is a Seq of options that
* will be used for a query. Basically, a query will be executed with every option
* defined in the list of `options`. `setup` defines the needed action for every
* option. For example, the following Variation is used to change the number of shuffle
* partitions of a query. The name of the Variation is "shufflePartitions". There are
* two options, 200 and 2000. The setup is used to set the value of property
* "spark.sql.shuffle.partitions".
*
* {{{
* Variation("shufflePartitions", Seq("200", "2000")) {
* case num => sqlContext.setConf("spark.sql.shuffle.partitions", num)
* }
* }}}
*/
case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit)
/**
* The performance results of all given queries for a single iteration.
* @param timestamp The timestamp indicates when the entire experiment is started.
* @param datasetName The name of dataset.
* @param iteration The index number of the current iteration.
* @param tags Tags of this iteration (variations are stored at here).
* @param configuration Configuration properties of this iteration.
* @param results The performance results of queries for this iteration.
*/
case class ExperimentRun(
timestamp: Long,
datasetName: String,
iteration: Int,
tags: Map[String, String],
configuration: BenchmarkConfiguration,
results: Seq[BenchmarkResult])
/**
* The dataset of a benchmark.
* @param sqlContext An existing SQLContext.
* @param sparkVersion The version of Spark.
* @param dataLocation The location of the dataset used by this experiment.
* @param tables Tables that will be used in this experiment.
* @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H
* and TPC-DS, the scale factor is a number roughly representing the
* size of raw data files. For some other benchmarks, the scale factor
* is a short string describing the scale of the dataset.
*/
abstract class Dataset(
@transient val sqlContext: SQLContext,
sparkVersion: String,
dataLocation: String,
tables: Seq[Table],
scaleFactor: String) extends Serializable with QuerySet {
val datasetName: String
def createTablesForTest(tables: Seq[Table]): Seq[TableForTest]
val tablesForTest: Seq[TableForTest] = createTablesForTest(tables)
def checkData(): Unit = {
tablesForTest.foreach { table =>
val fs = FileSystem.get(new java.net.URI(table.outputDir), new Configuration())
val exists = fs.exists(new Path(table.outputDir))
val wasSuccessful = fs.exists(new Path(s"${table.outputDir}/_SUCCESS"))
if (!wasSuccessful) {
if (exists) {
println(s"Table '${table.name}' not generated successfully, regenerating.")
} else {
println(s"Table '${table.name}' does not exist, generating.")
}
fs.delete(new Path(table.outputDir), true)
table.generate()
} else {
println(s"Table ${table.name} already exists.")
}
}
}
def allStats = tablesForTest.map(_.stats).reduceLeft(_.unionAll(_))
/**
* Does necessary setup work such as data generation and transformation. It needs to be
* called before running any query.
*/
def setup(): Unit = {
checkData()
tablesForTest.foreach(_.createTempTable())
}
def currentConfiguration = BenchmarkConfiguration(
sparkVersion = sparkVersion,
scaleFactor = scaleFactor,
sqlConf = sqlContext.getAllConfs,
sparkConf = sparkContext.getConf.getAll.toMap,
defaultParallelism = sparkContext.defaultParallelism)
/**
* Starts an experiment run with a given set of queries.
* @param queriesToRun Queries to be executed.
* @param resultsLocation The location of performance results.
* @param includeBreakdown If it is true, breakdown results of a query will be recorded.
* Setting it to true may significantly increase the time used to
* execute a query.
* @param iterations The number of iterations.
* @param variations [[Variation]]s used in this run.
* @param tags Tags of this run.
* @return It returns a ExperimentStatus object that can be used to
* track the progress of this experiment run.
*/
def runExperiment(
queriesToRun: Seq[Query],
resultsLocation: String,
includeBreakdown: Boolean = false,
iterations: Int = 3,
variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("")) { _ => {} }),
tags: Map[String, String] = Map.empty) = {
class ExperimentStatus {
val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]()
val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]()
val currentMessages = new collection.mutable.ArrayBuffer[String]()
@volatile
var currentQuery = ""
def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match {
case Nil => List(Nil)
case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt
}
val timestamp = System.currentTimeMillis()
val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList)
val resultsFuture = future {
val results = (1 to iterations).flatMap { i =>
combinations.map { setup =>
val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map {
case (v, idx) =>
v.setup(v.options(idx))
v.name -> v.options(idx).toString
}
val result = ExperimentRun(
timestamp = timestamp,
datasetName = datasetName,
iteration = i,
tags = currentOptions.toMap ++ tags,
configuration = currentConfiguration,
queriesToRun.flatMap { q =>
val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}"
currentMessages += s"Running query ${q.name} $setup"
currentQuery = q.name
val singleResult = try q.benchmark(includeBreakdown, setup) :: Nil catch {
case e: Exception =>
currentMessages += s"Failed to run query ${q.name}: $e"
Nil
}
currentResults ++= singleResult
singleResult
})
currentRuns += result
result
}
}
val resultsTable = sqlContext.createDataFrame(results)
currentMessages += s"Results stored to: $resultsLocation/$timestamp"
resultsTable.toJSON.coalesce(1).saveAsTextFile(s"$resultsLocation/$timestamp")
resultsTable
}
/** Waits for the finish of the experiment. */
def waitForFinish(timeoutInSeconds: Int) = {
Await.result(resultsFuture, timeoutInSeconds.seconds)
}
/** Returns results from an actively running experiment. */
def getCurrentResults() = {
val tbl = sqlContext.createDataFrame(currentResults)
tbl.registerTempTable("currentResults")
tbl
}
/** Returns full iterations from an actively running experiment. */
def getCurrentRuns() = {
val tbl = sqlContext.createDataFrame(currentRuns)
tbl.registerTempTable("currentRuns")
tbl
}
def tail(n: Int = 5) = {
currentMessages.takeRight(n).mkString("\n")
}
def status =
if (resultsFuture.isCompleted) {
if (resultsFuture.value.get.isFailure) "Failed" else "Successful"
} else {
"Running"
}
override def toString =
s"""
|=== $status Experiment ===
|Permalink: table("allResults").where('timestamp === ${timestamp}L)
|Queries: ${queriesToRun.map(_.name).map(n => if(n == currentQuery) s"|$n|" else n).mkString(" ")}
|Iterations complete: ${currentRuns.size / combinations.size} / $iterations
|Queries run: ${currentResults.size} / ${iterations * combinations.size * queriesToRun.size}
|Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s
|
|== Logs ==
|${tail()}
""".stripMargin
}
new ExperimentStatus
}
}

View File

@ -32,6 +32,35 @@ import org.apache.spark.sql.types._
import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.util.ContextUtil
trait TableCreator {
def tables: Seq[Table]
def createTablesForTest(tables: Seq[Table]): Seq[TableForTest]
val tablesForTest: Seq[TableForTest] = createTablesForTest(tables)
def checkData(): Unit = {
tablesForTest.foreach { table =>
val fs = FileSystem.get(new java.net.URI(table.outputDir), new Configuration())
val exists = fs.exists(new Path(table.outputDir))
val wasSuccessful = fs.exists(new Path(s"${table.outputDir}/_SUCCESS"))
if (!wasSuccessful) {
if (exists) {
println(s"Table '${table.name}' not generated successfully, regenerating.")
} else {
println(s"Table '${table.name}' does not exist, generating.")
}
fs.delete(new Path(table.outputDir), true)
table.generate()
} else {
println(s"Table ${table.name} already exists.")
}
}
}
}
abstract class TableType
case object UnpartitionedTable extends TableType
case class PartitionedTable(partitionColumn: String) extends TableType
@ -47,7 +76,7 @@ abstract class TableForTest(
val name = table.name
val outputDir = s"$baseDir/parquet/${name}"
val outputDir = s"$baseDir/parquet/$name"
def fromCatalog = sqlContext.table(name)
@ -57,16 +86,5 @@ abstract class TableForTest(
count("*") as "numRows",
lit(fromCatalog.queryExecution.optimizedPlan.statistics.sizeInBytes.toLong) as "sizeInBytes")
def createTempTable(): Unit = {
sqlContext.sql(
s"""
|CREATE TEMPORARY TABLE ${name}
|USING org.apache.spark.sql.parquet
|OPTIONS (
| path '${outputDir}'
|)
""".stripMargin)
}
def generate(): Unit
def generate()
}

View File

@ -39,20 +39,13 @@ class TPCDS (
sparkVersion: String,
dataLocation: String,
dsdgenDir: String,
tables: Seq[Table],
val tables: Seq[Table],
scaleFactor: String,
userSpecifiedBaseDir: Option[String] = None)
extends Dataset(
sqlContext,
sparkVersion,
dataLocation,
tables,
scaleFactor) with Serializable {
extends Benchmark(sqlContext) with TableCreator with Serializable {
import sqlContext._
import sqlContext.implicits._
override val datasetName = "tpcds"
lazy val baseDir =
userSpecifiedBaseDir.getOrElse(s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true")
@ -61,6 +54,7 @@ class TPCDS (
TPCDSTableForTest(table, baseDir, scaleFactor.toInt, dsdgenDir, sqlContext))
}
/*
override def setup(): Unit = {
super.setup()
setupBroadcast()
@ -79,5 +73,6 @@ class TPCDS (
println(setQuery)
sql(setQuery)
}
*/
}

View File

@ -107,12 +107,14 @@ case class TPCDSTableForTest(
val output = convertedData.queryExecution.analyzed.output
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
//HAX
val writeSupport =
if (schema.fields.map(_.dataType).forall(_.isPrimitive)) {
classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport]
} else {
// if (schema.fields.map(_.dataType).forall(_.isPrimitive)) {
// classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport]
// } else {
classOf[org.apache.spark.sql.parquet.RowWriteSupport]
}
// }
ParquetOutputFormat.setWriteSupportClass(job, writeSupport)

View File

@ -16,9 +16,11 @@
package com.databricks.spark.sql.perf.tpcds.queries
import com.databricks.spark.sql.perf.QuerySet
import com.databricks.spark.sql.perf.Benchmark
trait ImpalaKitQueries {
self: Benchmark =>
trait ImpalaKitQueries extends QuerySet {
// Queries are from
// https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries
val queries = Seq(

View File

@ -16,9 +16,11 @@
package com.databricks.spark.sql.perf.tpcds.queries
import com.databricks.spark.sql.perf.QuerySet
import com.databricks.spark.sql.perf.Benchmark
trait SimpleQueries {
self: Benchmark =>
trait SimpleQueries extends QuerySet{
val q7Derived = Seq(
("q7-simpleScan",
"""