Merge pull request #6 from marmbrus/refactor
Remove deprecated parquet writing code and add some micro benchmarks
This commit is contained in:
commit
ff19051b0e
33
README.md
33
README.md
@ -1,55 +1,34 @@
|
||||
# Spark SQL Performance Tests
|
||||
|
||||
This is a performance testing framework for [Spark SQL](https://spark.apache.org/sql/) in [Apache Spark](https://spark.apache.org/) 1.3+.
|
||||
This is a performance testing framework for [Spark SQL](https://spark.apache.org/sql/) in [Apache Spark](https://spark.apache.org/) 1.4+.
|
||||
|
||||
**Note: This README is still under development. Please also check our source code for more information.**
|
||||
|
||||
## How to use it
|
||||
The rest of document will use TPC-DS benchmark as an example. We will add contents to explain how to use other benchmarks add the support of a new benchmark dataset in future.
|
||||
|
||||
### Setup a dataset
|
||||
Before running any query, a dataset needs to be setup by creating a `Dataset` object. Every benchmark support in Spark SQL Perf needs to implement its own `Dataset` class. A `Dataset` object takes a few parameters that will be used to setup the needed tables and its `setup` function is used to setup needed tables. For TPC-DS benchmark, the class is `TPCDS` in the package of `com.databricks.spark.sql.perf.tpcds`. For example, to setup a TPC-DS dataset, you can
|
||||
### Setup a benchmark
|
||||
Before running any query, a dataset needs to be setup by creating a `Benchmark` object.
|
||||
|
||||
```
|
||||
import org.apache.spark.sql.parquet.Tables
|
||||
// Tables in TPC-DS benchmark used by experiments.
|
||||
val tables = Tables(sqlContext)
|
||||
// Setup TPC-DS experiment
|
||||
val tpcds =
|
||||
new TPCDS (
|
||||
sqlContext = sqlContext,
|
||||
sparkVersion = "1.3.1",
|
||||
dataLocation = <the location of data>,
|
||||
dsdgenDir = <the location of dsdgen in every worker>,
|
||||
tables = tables.tables,
|
||||
scaleFactor = <scale factor>)
|
||||
val tpcds = new TPCDS (sqlContext = sqlContext)
|
||||
```
|
||||
|
||||
After a `TPCDS` object is created, tables of it can be setup by calling
|
||||
|
||||
```
|
||||
tpcds.setup()
|
||||
```
|
||||
|
||||
The `setup` function will first check if needed tables are stored at the location specified by `dataLocation`. If not, it will creates tables at there by using the data generator tool `dsdgen` provided by TPC-DS benchmark (This tool needs to be pre-installed at the location specified by `dsdgenDir` in every worker).
|
||||
|
||||
### Run benchmarking queries
|
||||
After setup, users can use `runExperiment` function to run benchmarking queries and record query execution time. Taking TPC-DS as an example, you can start an experiment by using
|
||||
|
||||
```
|
||||
tpcds.runExperiment(
|
||||
queries = <a Seq of Queries>,
|
||||
resultsLocation = <the root location of performance results>,
|
||||
includeBreakdown = <if measure the performance of every physical operators>,
|
||||
iterations = <the number of iterations>,
|
||||
variations = <variations used in the experiment>,
|
||||
tags = <tags of this experiment>)
|
||||
val experiment = tpcds.runExperiment(queriesToRun = tpcds.interactiveQueries)
|
||||
```
|
||||
|
||||
For every experiment run (i.e.\ every call of `runExperiment`), Spark SQL Perf will use the timestamp of the start time to identify this experiment. Performance results will be stored in the sub-dir named by the timestamp in the given `resultsLocation` (for example `results/1429213883272`). The performance results are stored in the JSON format.
|
||||
|
||||
### Retrieve results
|
||||
The follow code can be used to retrieve results ...
|
||||
While the experiment is running you can use `experiment.html` to list the status. Once the experiment is complete, the results will be saved to the table sqlPerformance in json.
|
||||
|
||||
```
|
||||
// Get experiments results.
|
||||
|
||||
15
build.sbt
15
build.sbt
@ -3,22 +3,13 @@
|
||||
|
||||
scalaVersion := "2.10.4"
|
||||
|
||||
sparkVersion := "1.3.0"
|
||||
|
||||
sparkPackageName := "databricks/spark-sql-perf"
|
||||
|
||||
// Don't forget to set the version
|
||||
version := "0.0.1-SNAPSHOT"
|
||||
version := "0.0.4-SNAPSHOT"
|
||||
|
||||
// All Spark Packages need a license
|
||||
licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0"))
|
||||
|
||||
sparkVersion := "1.4.0"
|
||||
|
||||
// Add Spark components this package depends on, e.g, "mllib", ....
|
||||
sparkComponents ++= Seq("sql", "hive")
|
||||
|
||||
// uncomment and change the value below to change the directory where your zip artifact will be created
|
||||
// spDistDirectory := target.value
|
||||
|
||||
// add any sparkPackageDependencies using sparkPackageDependencies.
|
||||
// e.g. sparkPackageDependencies += "databricks/spark-avro:0.1"
|
||||
sparkComponents ++= Seq("sql", "hive")
|
||||
@ -0,0 +1,67 @@
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import org.apache.spark.sql.{Row, SQLContext}
|
||||
|
||||
trait AggregationPerformance extends Benchmark {
|
||||
|
||||
import sqlContext.implicits._
|
||||
import ExecutionMode._
|
||||
|
||||
|
||||
val sizes = (1 to 6).map(math.pow(10, _).toInt).toSeq
|
||||
|
||||
val variousCardinality = sizes.map { size =>
|
||||
Table(s"ints$size",
|
||||
sparkContext.parallelize(1 to size).flatMap { group =>
|
||||
(1 to 10000).map(i => (group, i))
|
||||
}.toDF("a", "b"))
|
||||
}
|
||||
|
||||
val lowCardinality = sizes.map { size =>
|
||||
val fullSize = size * 10000L
|
||||
Table(
|
||||
s"twoGroups$fullSize",
|
||||
sqlContext.range(0, fullSize).select($"id" % 2 as 'a, $"id" as 'b))
|
||||
}
|
||||
|
||||
val newAggreation = Variation("aggregationType", Seq("new", "old")) {
|
||||
case "old" => sqlContext.setConf("spark.sql.useAggregate2", "false")
|
||||
case "new" => sqlContext.setConf("spark.sql.useAggregate2", "true")
|
||||
}
|
||||
|
||||
val varyNumGroupsAvg: Seq[Query] = variousCardinality.map(_.name).map { table =>
|
||||
Query(
|
||||
s"avg-$table",
|
||||
s"SELECT AVG(b) FROM $table GROUP BY a",
|
||||
"an average with a varying number of groups",
|
||||
executionMode = ForeachResults)
|
||||
}
|
||||
|
||||
val twoGroupsAvg: Seq[Query] = lowCardinality.map(_.name).map { table =>
|
||||
Query(
|
||||
s"avg-$table",
|
||||
s"SELECT AVG(b) FROM $table GROUP BY a",
|
||||
"an average on an int column with only two groups",
|
||||
executionMode = ForeachResults)
|
||||
}
|
||||
|
||||
val complexInput =
|
||||
Seq("1milints", "100milints", "1bilints").map { table =>
|
||||
Query(
|
||||
s"aggregation-complex-input-$table",
|
||||
s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table",
|
||||
"Sum of 9 columns added together",
|
||||
executionMode = CollectResults)
|
||||
}
|
||||
|
||||
val aggregates =
|
||||
Seq("1milints", "100milints", "1bilints").flatMap { table =>
|
||||
Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg =>
|
||||
Query(
|
||||
s"single-aggregate-$agg-$table",
|
||||
s"SELECT $agg(id) FROM $table",
|
||||
"aggregation of a single column",
|
||||
executionMode = CollectResults)
|
||||
}
|
||||
}
|
||||
}
|
||||
463
src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala
Normal file
463
src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala
Normal file
@ -0,0 +1,463 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||
|
||||
import scala.concurrent._
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{Path, FileSystem}
|
||||
import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
|
||||
|
||||
import org.apache.spark.sql.catalyst.plans.logical.Subquery
|
||||
|
||||
/**
|
||||
* A collection of queries that test a particular aspect of Spark SQL.
|
||||
*
|
||||
* @param sqlContext An existing SQLContext.
|
||||
*/
|
||||
abstract class Benchmark(@transient protected val sqlContext: SQLContext)
|
||||
extends Serializable {
|
||||
|
||||
import sqlContext.implicits._
|
||||
|
||||
val resultsLocation = "/spark/sql/performance"
|
||||
val resultsTableName = "sqlPerformance"
|
||||
|
||||
def createResultsTable() = {
|
||||
sqlContext.sql(s"DROP TABLE $resultsTableName")
|
||||
sqlContext.createExternalTable(
|
||||
"sqlPerformance", "json", Map("path" -> (resultsLocation + "/*/")))
|
||||
}
|
||||
|
||||
protected def sparkContext = sqlContext.sparkContext
|
||||
|
||||
protected implicit def toOption[A](a: A) = Option(a)
|
||||
|
||||
def currentConfiguration = BenchmarkConfiguration(
|
||||
sqlConf = sqlContext.getAllConfs,
|
||||
sparkConf = sparkContext.getConf.getAll.toMap,
|
||||
defaultParallelism = sparkContext.defaultParallelism)
|
||||
|
||||
/**
|
||||
* A Variation represents a setting (e.g. the number of shuffle partitions or if tables
|
||||
* are cached in memory) that we want to change in a experiment run.
|
||||
* A Variation has three parts, `name`, `options`, and `setup`.
|
||||
* The `name` is the identifier of a Variation. `options` is a Seq of options that
|
||||
* will be used for a query. Basically, a query will be executed with every option
|
||||
* defined in the list of `options`. `setup` defines the needed action for every
|
||||
* option. For example, the following Variation is used to change the number of shuffle
|
||||
* partitions of a query. The name of the Variation is "shufflePartitions". There are
|
||||
* two options, 200 and 2000. The setup is used to set the value of property
|
||||
* "spark.sql.shuffle.partitions".
|
||||
*
|
||||
* {{{
|
||||
* Variation("shufflePartitions", Seq("200", "2000")) {
|
||||
* case num => sqlContext.setConf("spark.sql.shuffle.partitions", num)
|
||||
* }
|
||||
* }}}
|
||||
*/
|
||||
case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit)
|
||||
|
||||
val codegen = Variation("codegen", Seq("on", "off")) {
|
||||
case "off" => sqlContext.setConf("spark.sql.codegen", "false")
|
||||
case "on" => sqlContext.setConf("spark.sql.codegen", "true")
|
||||
}
|
||||
|
||||
val unsafe = Variation("unsafe", Seq("on", "off")) {
|
||||
case "off" => sqlContext.setConf("spark.sql.unsafe.enabled", "false")
|
||||
case "on" => sqlContext.setConf("spark.sql.unsafe.enabled", "true")
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts an experiment run with a given set of queries.
|
||||
* @param queriesToRun a list of queries to be executed.
|
||||
* @param includeBreakdown If it is true, breakdown results of a query will be recorded.
|
||||
* Setting it to true may significantly increase the time used to
|
||||
* execute a query.
|
||||
* @param iterations The number of iterations to run of each query.
|
||||
* @param variations [[Variation]]s used in this run. The cross product of all variations will be
|
||||
* run for each query * iteration.
|
||||
* @param tags Tags of this run.
|
||||
* @return It returns a ExperimentStatus object that can be used to
|
||||
* track the progress of this experiment run.
|
||||
*/
|
||||
def runExperiment(
|
||||
queriesToRun: Seq[Query],
|
||||
includeBreakdown: Boolean = false,
|
||||
iterations: Int = 3,
|
||||
variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("true")) { _ => {} }),
|
||||
tags: Map[String, String] = Map.empty) = {
|
||||
|
||||
class ExperimentStatus {
|
||||
val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]()
|
||||
val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]()
|
||||
val currentMessages = new collection.mutable.ArrayBuffer[String]()
|
||||
|
||||
// Stats for HTML status message.
|
||||
@volatile var currentQuery = ""
|
||||
@volatile var currentPlan = ""
|
||||
@volatile var currentConfig = ""
|
||||
@volatile var failures = 0
|
||||
@volatile var startTime = 0L
|
||||
|
||||
def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match {
|
||||
case Nil => List(Nil)
|
||||
case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt
|
||||
}
|
||||
|
||||
val timestamp = System.currentTimeMillis()
|
||||
val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList)
|
||||
val resultsFuture = Future {
|
||||
queriesToRun.flatMap { query =>
|
||||
query.newDataFrame().queryExecution.logical.collect {
|
||||
case UnresolvedRelation(Seq(name), _) => name
|
||||
}
|
||||
}.distinct.foreach { name =>
|
||||
try {
|
||||
sqlContext.table(name)
|
||||
currentMessages += s"Table $name exists."
|
||||
} catch {
|
||||
case ae: AnalysisException =>
|
||||
val table = allTables
|
||||
.find(_.name == name)
|
||||
.getOrElse(sys.error(s"Couldn't read table $name and its not defined as a Benchmark.Table."))
|
||||
|
||||
currentMessages += s"Creating table: $name"
|
||||
table.data
|
||||
.write
|
||||
.mode("overwrite")
|
||||
.saveAsTable(name)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
val results = (1 to iterations).flatMap { i =>
|
||||
combinations.map { setup =>
|
||||
val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map {
|
||||
case (v, idx) =>
|
||||
v.setup(v.options(idx))
|
||||
v.name -> v.options(idx).toString
|
||||
}
|
||||
currentConfig = currentOptions.map { case (k,v) => s"$k: $v" }.mkString(", ")
|
||||
|
||||
val result = ExperimentRun(
|
||||
timestamp = timestamp,
|
||||
iteration = i,
|
||||
tags = currentOptions.toMap ++ tags,
|
||||
configuration = currentConfiguration,
|
||||
queriesToRun.flatMap { q =>
|
||||
val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}"
|
||||
currentMessages += s"Running query ${q.name} $setup"
|
||||
|
||||
currentQuery = q.name
|
||||
currentPlan = q.newDataFrame().queryExecution.executedPlan.toString
|
||||
startTime = System.currentTimeMillis()
|
||||
|
||||
val singleResult = q.benchmark(includeBreakdown, setup)
|
||||
singleResult.failure.foreach { f =>
|
||||
failures += 1
|
||||
currentMessages += s"Query '${q.name}' failed: ${f.message}"
|
||||
}
|
||||
singleResult.executionTime.foreach(time =>
|
||||
currentMessages += s"Exec time: ${time / 1000}s")
|
||||
currentResults += singleResult
|
||||
singleResult :: Nil
|
||||
})
|
||||
currentRuns += result
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
val resultsTable = sqlContext.createDataFrame(results)
|
||||
currentMessages += s"Results written to table: 'sqlPerformance' at $resultsLocation/$timestamp"
|
||||
results.toDF().write
|
||||
.format("json")
|
||||
.save(s"$resultsLocation/$timestamp")
|
||||
|
||||
results.toDF()
|
||||
} catch {
|
||||
case e: Throwable => currentMessages += s"Failed to write data: $e"
|
||||
}
|
||||
}
|
||||
|
||||
/** Waits for the finish of the experiment. */
|
||||
def waitForFinish(timeoutInSeconds: Int) = {
|
||||
Await.result(resultsFuture, timeoutInSeconds.seconds)
|
||||
}
|
||||
|
||||
/** Returns results from an actively running experiment. */
|
||||
def getCurrentResults() = {
|
||||
val tbl = sqlContext.createDataFrame(currentResults)
|
||||
tbl.registerTempTable("currentResults")
|
||||
tbl
|
||||
}
|
||||
|
||||
/** Returns full iterations from an actively running experiment. */
|
||||
def getCurrentRuns() = {
|
||||
val tbl = sqlContext.createDataFrame(currentRuns)
|
||||
tbl.registerTempTable("currentRuns")
|
||||
tbl
|
||||
}
|
||||
|
||||
def tail(n: Int = 20) = {
|
||||
currentMessages.takeRight(n).mkString("\n")
|
||||
}
|
||||
|
||||
def status =
|
||||
if (resultsFuture.isCompleted) {
|
||||
if (resultsFuture.value.get.isFailure) "Failed" else "Successful"
|
||||
} else {
|
||||
"Running"
|
||||
}
|
||||
|
||||
override def toString =
|
||||
s"""Permalink: table("sqlPerformance").where('timestamp === ${timestamp}L)"""
|
||||
|
||||
|
||||
def html =
|
||||
s"""
|
||||
|<h2>$status Experiment</h2>
|
||||
|<b>Permalink:</b> <tt>table("$resultsTableName").where('timestamp === ${timestamp}L)</tt><br/>
|
||||
|<b>Iterations complete:</b> ${currentRuns.size / combinations.size} / $iterations<br/>
|
||||
|<b>Failures:</b> $failures<br/>
|
||||
|<b>Queries run:</b> ${currentResults.size} / ${iterations * combinations.size * queriesToRun.size}<br/>
|
||||
|<b>Run time:</b> ${(System.currentTimeMillis() - timestamp) / 1000}s<br/>
|
||||
|
|
||||
|<h2>Current Query: $currentQuery</h2>
|
||||
|Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s<br/>
|
||||
|$currentConfig<br/>
|
||||
|<h3>QueryPlan</h3>
|
||||
|<pre>
|
||||
|${currentPlan.replaceAll("\n", "<br/>")}
|
||||
|</pre>
|
||||
|
|
||||
|<h2>Logs</h2>
|
||||
|<pre>
|
||||
|${tail()}
|
||||
|</pre>
|
||||
""".stripMargin
|
||||
}
|
||||
new ExperimentStatus
|
||||
}
|
||||
|
||||
case class Table(
|
||||
name: String,
|
||||
data: DataFrame)
|
||||
|
||||
import reflect.runtime._, universe._
|
||||
import reflect.runtime._
|
||||
import universe._
|
||||
|
||||
private val runtimeMirror = universe.runtimeMirror(getClass.getClassLoader)
|
||||
val myType = runtimeMirror.classSymbol(getClass).toType
|
||||
|
||||
def singleTables =
|
||||
myType.declarations
|
||||
.filter(m => m.isMethod)
|
||||
.map(_.asMethod)
|
||||
.filter(_.asMethod.returnType =:= typeOf[Table])
|
||||
.map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Table])
|
||||
|
||||
def groupedTables =
|
||||
myType.declarations
|
||||
.filter(m => m.isMethod)
|
||||
.map(_.asMethod)
|
||||
.filter(_.asMethod.returnType =:= typeOf[Seq[Table]])
|
||||
.flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Table]])
|
||||
|
||||
lazy val allTables: Seq[Table] = (singleTables ++ groupedTables).toSeq
|
||||
|
||||
def singleQueries =
|
||||
myType.declarations
|
||||
.filter(m => m.isMethod)
|
||||
.map(_.asMethod)
|
||||
.filter(_.asMethod.returnType =:= typeOf[Query])
|
||||
.map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Query])
|
||||
|
||||
def groupedQueries =
|
||||
myType.declarations
|
||||
.filter(m => m.isMethod)
|
||||
.map(_.asMethod)
|
||||
.filter(_.asMethod.returnType =:= typeOf[Seq[Query]])
|
||||
.flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Query]])
|
||||
|
||||
lazy val allQueries = (singleQueries ++ groupedQueries).toSeq
|
||||
|
||||
def html: String = {
|
||||
val singleQueries =
|
||||
myType.declarations
|
||||
.filter(m => m.isMethod)
|
||||
.map(_.asMethod)
|
||||
.filter(_.asMethod.returnType =:= typeOf[Query])
|
||||
.map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Query])
|
||||
.mkString(",")
|
||||
val queries =
|
||||
myType.declarations
|
||||
.filter(m => m.isMethod)
|
||||
.map(_.asMethod)
|
||||
.filter(_.asMethod.returnType =:= typeOf[Seq[Query]])
|
||||
.map { method =>
|
||||
val queries = runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Query]]
|
||||
val queryList = queries.map(_.name).mkString(", ")
|
||||
s"""
|
||||
|<h3>${method.name}</h3>
|
||||
|<ul>$queryList</ul>
|
||||
""".stripMargin
|
||||
}.mkString("\n")
|
||||
|
||||
s"""
|
||||
|<h1>Spark SQL Performance Benchmarking</h1>
|
||||
|<h2>Available Queries</h2>
|
||||
|$singleQueries
|
||||
|$queries
|
||||
""".stripMargin
|
||||
}
|
||||
|
||||
trait ExecutionMode
|
||||
object ExecutionMode {
|
||||
// Benchmark run by collecting queries results (e.g. rdd.collect())
|
||||
case object CollectResults extends ExecutionMode
|
||||
|
||||
// Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit))
|
||||
case object ForeachResults extends ExecutionMode
|
||||
|
||||
// Benchmark run by saving the output of each query as a parquet file at the specified location
|
||||
case class WriteParquet(location: String) extends ExecutionMode
|
||||
}
|
||||
|
||||
/** Factory object for benchmark queries. */
|
||||
object Query {
|
||||
def apply(
|
||||
name: String,
|
||||
sqlText: String,
|
||||
description: String,
|
||||
executionMode: ExecutionMode = ExecutionMode.ForeachResults): Query = {
|
||||
new Query(name, sqlContext.sql(sqlText), description, Some(sqlText), executionMode)
|
||||
}
|
||||
|
||||
def apply(
|
||||
name: String,
|
||||
dataFrameBuilder: => DataFrame,
|
||||
description: String): Query = {
|
||||
new Query(name, dataFrameBuilder, description, None, ExecutionMode.CollectResults)
|
||||
}
|
||||
}
|
||||
|
||||
/** Holds one benchmark query and its metadata. */
|
||||
class Query(
|
||||
val name: String,
|
||||
buildDataFrame: => DataFrame,
|
||||
val description: String,
|
||||
val sqlText: Option[String],
|
||||
val executionMode: ExecutionMode) {
|
||||
|
||||
override def toString =
|
||||
s"""
|
||||
|== Query: $name ==
|
||||
|${buildDataFrame.queryExecution.analyzed}
|
||||
""".stripMargin
|
||||
|
||||
lazy val tablesInvolved = buildDataFrame.queryExecution.logical collect {
|
||||
case UnresolvedRelation(tableIdentifier, _) => {
|
||||
// We are ignoring the database name.
|
||||
tableIdentifier.last
|
||||
}
|
||||
}
|
||||
|
||||
def newDataFrame() = buildDataFrame
|
||||
|
||||
def benchmarkMs[A](f: => A): Double = {
|
||||
val startTime = System.nanoTime()
|
||||
val ret = f
|
||||
val endTime = System.nanoTime()
|
||||
(endTime - startTime).toDouble / 1000000
|
||||
}
|
||||
|
||||
def benchmark(includeBreakdown: Boolean, description: String = "") = {
|
||||
try {
|
||||
val dataFrame = buildDataFrame
|
||||
sparkContext.setJobDescription(s"Query: $name, $description")
|
||||
val queryExecution = dataFrame.queryExecution
|
||||
// We are not counting the time of ScalaReflection.convertRowToScala.
|
||||
val parsingTime = benchmarkMs {
|
||||
queryExecution.logical
|
||||
}
|
||||
val analysisTime = benchmarkMs {
|
||||
queryExecution.analyzed
|
||||
}
|
||||
val optimizationTime = benchmarkMs {
|
||||
queryExecution.optimizedPlan
|
||||
}
|
||||
val planningTime = benchmarkMs {
|
||||
queryExecution.executedPlan
|
||||
}
|
||||
|
||||
val breakdownResults = if (includeBreakdown) {
|
||||
val depth = queryExecution.executedPlan.treeString.split("\n").size
|
||||
val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i)))
|
||||
physicalOperators.map {
|
||||
case (index, node) =>
|
||||
val executionTime = benchmarkMs {
|
||||
node.execute().map(_.copy()).foreach(row => Unit)
|
||||
}
|
||||
BreakdownResult(node.nodeName, node.simpleString, index, executionTime)
|
||||
}
|
||||
} else {
|
||||
Seq.empty[BreakdownResult]
|
||||
}
|
||||
|
||||
// The executionTime for the entire query includes the time of type conversion from catalyst to scala.
|
||||
// The executionTime for the entire query includes the time of type conversion
|
||||
// from catalyst to scala.
|
||||
val executionTime = benchmarkMs {
|
||||
executionMode match {
|
||||
case ExecutionMode.CollectResults => dataFrame.rdd.collect()
|
||||
case ExecutionMode.ForeachResults => dataFrame.rdd.foreach { row => Unit }
|
||||
case ExecutionMode.WriteParquet(location) => dataFrame.saveAsParquetFile(s"$location/$name.parquet")
|
||||
}
|
||||
}
|
||||
|
||||
val joinTypes = dataFrame.queryExecution.executedPlan.collect {
|
||||
case k if k.nodeName contains "Join" => k.nodeName
|
||||
}
|
||||
|
||||
BenchmarkResult(
|
||||
name = name,
|
||||
joinTypes = joinTypes,
|
||||
tables = tablesInvolved,
|
||||
parsingTime = parsingTime,
|
||||
analysisTime = analysisTime,
|
||||
optimizationTime = optimizationTime,
|
||||
planningTime = planningTime,
|
||||
executionTime = executionTime,
|
||||
queryExecution = dataFrame.queryExecution.toString,
|
||||
breakDown = breakdownResults)
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
BenchmarkResult(
|
||||
name = name,
|
||||
failure = Failure(e.getClass.getName, e.getMessage))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,45 @@
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import org.apache.spark.sql.SQLContext
|
||||
|
||||
trait JoinPerformance extends Benchmark {
|
||||
// 1.5 mb, 1 file
|
||||
|
||||
import ExecutionMode._
|
||||
|
||||
val x = Table(
|
||||
"1milints",
|
||||
sqlContext.range(0, 1000000)
|
||||
.repartition(1))
|
||||
|
||||
val joinTables = Seq(
|
||||
// 143.542mb, 10 files
|
||||
Table(
|
||||
"1bilints",
|
||||
sqlContext.range(0, 100000000)
|
||||
.repartition(10)),
|
||||
|
||||
// 1.4348gb, 10 files
|
||||
Table(
|
||||
"1bilints",
|
||||
sqlContext.range(0, 1000000000)
|
||||
.repartition(10))
|
||||
)
|
||||
|
||||
val sortMergeJoin = Variation("sortMergeJoin", Seq("on", "off")) {
|
||||
case "off" => sqlContext.setConf("spark.sql.planner.sortMergeJoin", "false")
|
||||
case "on" => sqlContext.setConf("spark.sql.planner.sortMergeJoin", "true")
|
||||
}
|
||||
|
||||
val singleKeyJoins = Seq("1milints", "100milints", "1bilints").flatMap { table1 =>
|
||||
Seq("1milints", "100milints", "1bilints").flatMap { table2 =>
|
||||
Seq("JOIN", "RIGHT JOIN", "LEFT JOIN", "FULL OUTER JOIN").map { join =>
|
||||
Query(
|
||||
s"singleKey-$join-$table1-$table2",
|
||||
s"SELECT COUNT(*) FROM $table1 a $join $table2 b ON a.id = b.id",
|
||||
"equi-inner join a small table with a big table using a single key.",
|
||||
executionMode = CollectResults)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,46 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf.bigdata
|
||||
|
||||
import com.databricks.spark.sql.perf._
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.parquet.TPCDSTableForTest
|
||||
import org.apache.spark.sql.{Column, SQLContext}
|
||||
|
||||
class BigData (
|
||||
@transient sqlContext: SQLContext,
|
||||
sparkVersion: String,
|
||||
dataLocation: String,
|
||||
tables: Seq[Table],
|
||||
scaleFactor: String)
|
||||
extends Dataset(
|
||||
sqlContext,
|
||||
sparkVersion,
|
||||
dataLocation,
|
||||
tables,
|
||||
scaleFactor) with Serializable {
|
||||
import sqlContext._
|
||||
import sqlContext.implicits._
|
||||
|
||||
override val datasetName = "bigDataBenchmark"
|
||||
|
||||
override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = {
|
||||
tables.map(table =>
|
||||
BigDataTableForTest(table, dataLocation, scaleFactor, sqlContext))
|
||||
}
|
||||
}
|
||||
|
||||
@ -16,10 +16,12 @@
|
||||
|
||||
package com.databricks.spark.sql.perf.bigdata
|
||||
|
||||
import com.databricks.spark.sql.perf.ExecutionMode.ForeachResults
|
||||
import com.databricks.spark.sql.perf.Query
|
||||
import com.databricks.spark.sql.perf.Benchmark
|
||||
|
||||
trait Queries extends Benchmark {
|
||||
|
||||
import ExecutionMode._
|
||||
|
||||
object Queries {
|
||||
val queries1to3 = Seq(
|
||||
Query(
|
||||
name = "q1A",
|
||||
|
||||
@ -1,79 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf.bigdata
|
||||
|
||||
// This is a hack until parquet has better support for partitioning.
|
||||
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
|
||||
import com.databricks.spark.sql.perf._
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
|
||||
import org.apache.hadoop.mapreduce.{Job, OutputCommitter, RecordWriter, TaskAttemptContext}
|
||||
import org.apache.spark.SerializableWritable
|
||||
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
import org.apache.spark.sql.{Column, ColumnName, SQLContext}
|
||||
import parquet.hadoop.ParquetOutputFormat
|
||||
import parquet.hadoop.util.ContextUtil
|
||||
|
||||
import scala.sys.process._
|
||||
|
||||
case class BigDataTableForTest(
|
||||
table: Table,
|
||||
baseDir: String,
|
||||
scaleFactor: String,
|
||||
@transient sqlContext: SQLContext)
|
||||
extends TableForTest(table, baseDir, sqlContext) with Serializable {
|
||||
|
||||
@transient val sparkContext = sqlContext.sparkContext
|
||||
|
||||
override def generate(): Unit =
|
||||
throw new UnsupportedOperationException(
|
||||
"Generate data for BigDataBenchmark has not been implemented")
|
||||
}
|
||||
|
||||
case class Tables(sqlContext: SQLContext) {
|
||||
import sqlContext.implicits._
|
||||
|
||||
val tables = Seq(
|
||||
Table("rankings",
|
||||
UnpartitionedTable,
|
||||
'pageURL .string,
|
||||
'pageRank .int,
|
||||
'avgDuration .int),
|
||||
|
||||
Table("uservisits",
|
||||
UnpartitionedTable,
|
||||
'sourceIP .string,
|
||||
'destURL .string,
|
||||
'visitDate .string,
|
||||
'adRevenue .double,
|
||||
'userAgent .string,
|
||||
'countryCode .string,
|
||||
'languageCode .string,
|
||||
'searchWord .string,
|
||||
'duration .int),
|
||||
|
||||
Table("documents",
|
||||
UnpartitionedTable,
|
||||
'line .string)
|
||||
)
|
||||
}
|
||||
@ -1,112 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import com.databricks.spark.sql.perf.ExecutionMode.{WriteParquet, ForeachResults, CollectResults}
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||
|
||||
trait ExecutionMode
|
||||
object ExecutionMode {
|
||||
// Benchmark run by collecting queries results (e.g. rdd.collect())
|
||||
case object CollectResults extends ExecutionMode
|
||||
|
||||
// Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit))
|
||||
case object ForeachResults extends ExecutionMode
|
||||
|
||||
// Benchmark run by saving the output of each query as a parquet file at the specified location
|
||||
case class WriteParquet(location: String) extends ExecutionMode
|
||||
}
|
||||
|
||||
case class Query(name: String, sqlText: String, description: String, executionMode: ExecutionMode)
|
||||
|
||||
case class QueryForTest(
|
||||
query: Query,
|
||||
includeBreakdown: Boolean,
|
||||
@transient sqlContext: SQLContext) {
|
||||
@transient val sparkContext = sqlContext.sparkContext
|
||||
|
||||
val name = query.name
|
||||
|
||||
def benchmarkMs[A](f: => A): Double = {
|
||||
val startTime = System.nanoTime()
|
||||
val ret = f
|
||||
val endTime = System.nanoTime()
|
||||
(endTime - startTime).toDouble / 1000000
|
||||
}
|
||||
|
||||
def benchmark(description: String = "") = {
|
||||
try {
|
||||
sparkContext.setJobDescription(s"Query: ${query.name}, $description")
|
||||
val dataFrame = sqlContext.sql(query.sqlText)
|
||||
val queryExecution = dataFrame.queryExecution
|
||||
// We are not counting the time of ScalaReflection.convertRowToScala.
|
||||
val parsingTime = benchmarkMs { queryExecution.logical }
|
||||
val analysisTime = benchmarkMs { queryExecution.analyzed }
|
||||
val optimizationTime = benchmarkMs { queryExecution.optimizedPlan }
|
||||
val planningTime = benchmarkMs { queryExecution.executedPlan }
|
||||
|
||||
val breakdownResults = if (includeBreakdown) {
|
||||
val depth = queryExecution.executedPlan.treeString.split("\n").size
|
||||
val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i)))
|
||||
physicalOperators.map {
|
||||
case (index, node) =>
|
||||
val executionTime = benchmarkMs { node.execute().map(_.copy()).foreach(row => Unit) }
|
||||
BreakdownResult(node.nodeName, node.simpleString, index, executionTime)
|
||||
}
|
||||
} else {
|
||||
Seq.empty[BreakdownResult]
|
||||
}
|
||||
|
||||
// The executionTime for the entire query includes the time of type conversion
|
||||
// from catalyst to scala.
|
||||
val executionTime = benchmarkMs {
|
||||
query.executionMode match {
|
||||
case CollectResults => dataFrame.rdd.collect()
|
||||
case ForeachResults => dataFrame.rdd.foreach { row => Unit }
|
||||
case WriteParquet(location) => dataFrame.saveAsParquetFile(s"$location/$name.parquet")
|
||||
}
|
||||
}
|
||||
|
||||
val joinTypes = dataFrame.queryExecution.executedPlan.collect {
|
||||
case k if k.nodeName contains "Join" => k.nodeName
|
||||
}
|
||||
|
||||
val tablesInvolved = dataFrame.queryExecution.logical collect {
|
||||
case UnresolvedRelation(tableIdentifier, _) => {
|
||||
// We are ignoring the database name.
|
||||
tableIdentifier.last
|
||||
}
|
||||
}
|
||||
|
||||
BenchmarkResult(
|
||||
name = query.name,
|
||||
joinTypes = joinTypes,
|
||||
tables = tablesInvolved,
|
||||
parsingTime = parsingTime,
|
||||
analysisTime = analysisTime,
|
||||
optimizationTime = optimizationTime,
|
||||
planningTime = planningTime,
|
||||
executionTime = executionTime,
|
||||
breakdownResults)
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
throw new RuntimeException(
|
||||
s"Failed to benchmark query ${query.name}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
88
src/main/scala/com/databricks/spark/sql/perf/results.scala
Normal file
88
src/main/scala/com/databricks/spark/sql/perf/results.scala
Normal file
@ -0,0 +1,88 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
/**
|
||||
* The performance results of all given queries for a single iteration.
|
||||
* @param timestamp The timestamp indicates when the entire experiment is started.
|
||||
* @param iteration The index number of the current iteration.
|
||||
* @param tags Tags of this iteration (variations are stored at here).
|
||||
* @param configuration Configuration properties of this iteration.
|
||||
* @param results The performance results of queries for this iteration.
|
||||
*/
|
||||
case class ExperimentRun(
|
||||
timestamp: Long,
|
||||
iteration: Int,
|
||||
tags: Map[String, String],
|
||||
configuration: BenchmarkConfiguration,
|
||||
results: Seq[BenchmarkResult])
|
||||
|
||||
/**
|
||||
* The configuration used for an iteration of an experiment.
|
||||
* @param sparkVersion The version of Spark.
|
||||
* @param sqlConf All configuration properties related to Spark SQL.
|
||||
* @param sparkConf All configuration properties of Spark.
|
||||
* @param defaultParallelism The default parallelism of the cluster.
|
||||
* Usually, it is the number of cores of the cluster.
|
||||
*/
|
||||
case class BenchmarkConfiguration(
|
||||
sparkVersion: String = org.apache.spark.SPARK_VERSION,
|
||||
sqlConf: Map[String, String],
|
||||
sparkConf: Map[String,String],
|
||||
defaultParallelism: Int)
|
||||
|
||||
/**
|
||||
* The result of a query.
|
||||
* @param name The name of the query.
|
||||
* @param joinTypes The type of join operations in the query.
|
||||
* @param tables The tables involved in the query.
|
||||
* @param parsingTime The time used to parse the query.
|
||||
* @param analysisTime The time used to analyze the query.
|
||||
* @param optimizationTime The time used to optimize the query.
|
||||
* @param planningTime The time used to plan the query.
|
||||
* @param executionTime The time used to execute the query.
|
||||
* @param breakDown The breakdown results of the query plan tree.
|
||||
*/
|
||||
case class BenchmarkResult(
|
||||
name: String,
|
||||
joinTypes: Seq[String] = Nil,
|
||||
tables: Seq[String] = Nil,
|
||||
parsingTime: Option[Double] = None,
|
||||
analysisTime: Option[Double] = None,
|
||||
optimizationTime: Option[Double] = None,
|
||||
planningTime: Option[Double] = None,
|
||||
executionTime: Option[Double] = None,
|
||||
breakDown: Seq[BreakdownResult] = Nil,
|
||||
queryExecution: Option[String] = None,
|
||||
failure: Option[Failure] = None)
|
||||
|
||||
/**
|
||||
* The execution time of a subtree of the query plan tree of a specific query.
|
||||
* @param nodeName The name of the top physical operator of the subtree.
|
||||
* @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree.
|
||||
* @param index The index of the top physical operator of the subtree
|
||||
* in the original query plan tree. The index starts from 0
|
||||
* (0 represents the top physical operator of the original query plan tree).
|
||||
* @param executionTime The execution time of the subtree.
|
||||
*/
|
||||
case class BreakdownResult(
|
||||
nodeName: String,
|
||||
nodeNameWithArgs: String,
|
||||
index: Int,
|
||||
executionTime: Double)
|
||||
|
||||
case class Failure(className: String, message: String)
|
||||
@ -1,305 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import scala.concurrent._
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{Path, FileSystem}
|
||||
import org.apache.spark.sql.SQLContext
|
||||
|
||||
/**
|
||||
* The configuration used for an iteration of an experiment.
|
||||
* @param sparkVersion The version of Spark.
|
||||
* @param scaleFactor The scale factor of the dataset.
|
||||
* @param sqlConf All configuration properties related to Spark SQL.
|
||||
* @param sparkConf All configuration properties of Spark.
|
||||
* @param defaultParallelism The default parallelism of the cluster.
|
||||
* Usually, it is the number of cores of the cluster.
|
||||
*/
|
||||
case class BenchmarkConfiguration(
|
||||
sparkVersion: String,
|
||||
scaleFactor: String,
|
||||
sqlConf: Map[String, String],
|
||||
sparkConf: Map[String,String],
|
||||
defaultParallelism: Int)
|
||||
|
||||
/**
|
||||
* The execution time of a subtree of the query plan tree of a specific query.
|
||||
* @param nodeName The name of the top physical operator of the subtree.
|
||||
* @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree.
|
||||
* @param index The index of the top physical operator of the subtree
|
||||
* in the original query plan tree. The index starts from 0
|
||||
* (0 represents the top physical operator of the original query plan tree).
|
||||
* @param executionTime The execution time of the subtree.
|
||||
*/
|
||||
case class BreakdownResult(
|
||||
nodeName: String,
|
||||
nodeNameWithArgs: String,
|
||||
index: Int,
|
||||
executionTime: Double)
|
||||
|
||||
/**
|
||||
* The result of a query.
|
||||
* @param name The name of the query.
|
||||
* @param joinTypes The type of join operations in the query.
|
||||
* @param tables The tables involved in the query.
|
||||
* @param parsingTime The time used to parse the query.
|
||||
* @param analysisTime The time used to analyze the query.
|
||||
* @param optimizationTime The time used to optimize the query.
|
||||
* @param planningTime The time used to plan the query.
|
||||
* @param executionTime The time used to execute the query.
|
||||
* @param breakDown The breakdown results of the query plan tree.
|
||||
*/
|
||||
case class BenchmarkResult(
|
||||
name: String,
|
||||
joinTypes: Seq[String],
|
||||
tables: Seq[String],
|
||||
parsingTime: Double,
|
||||
analysisTime: Double,
|
||||
optimizationTime: Double,
|
||||
planningTime: Double,
|
||||
executionTime: Double,
|
||||
breakDown: Seq[BreakdownResult])
|
||||
|
||||
/**
|
||||
* A Variation represents a setting (e.g. the number of shuffle partitions and if tables
|
||||
* are cached in memory) that we want to change in a experiment run.
|
||||
* A Variation has three parts, `name`, `options`, and `setup`.
|
||||
* The `name` is the identifier of a Variation. `options` is a Seq of options that
|
||||
* will be used for a query. Basically, a query will be executed with every option
|
||||
* defined in the list of `options`. `setup` defines the needed action for every
|
||||
* option. For example, the following Variation is used to change the number of shuffle
|
||||
* partitions of a query. The name of the Variation is "shufflePartitions". There are
|
||||
* two options, 200 and 2000. The setup is used to set the value of property
|
||||
* "spark.sql.shuffle.partitions".
|
||||
*
|
||||
* {{{
|
||||
* Variation("shufflePartitions", Seq("200", "2000")) {
|
||||
* case num => sqlContext.setConf("spark.sql.shuffle.partitions", num)
|
||||
* }
|
||||
* }}}
|
||||
*/
|
||||
case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit)
|
||||
|
||||
/**
|
||||
* The performance results of all given queries for a single iteration.
|
||||
* @param timestamp The timestamp indicates when the entire experiment is started.
|
||||
* @param datasetName The name of dataset.
|
||||
* @param iteration The index number of the current iteration.
|
||||
* @param tags Tags of this iteration (variations are stored at here).
|
||||
* @param configuration Configuration properties of this iteration.
|
||||
* @param results The performance results of queries for this iteration.
|
||||
*/
|
||||
case class ExperimentRun(
|
||||
timestamp: Long,
|
||||
datasetName: String,
|
||||
iteration: Int,
|
||||
tags: Map[String, String],
|
||||
configuration: BenchmarkConfiguration,
|
||||
results: Seq[BenchmarkResult])
|
||||
|
||||
/**
|
||||
* The dataset of a benchmark.
|
||||
* @param sqlContext An existing SQLContext.
|
||||
* @param sparkVersion The version of Spark.
|
||||
* @param dataLocation The location of the dataset used by this experiment.
|
||||
* @param tables Tables that will be used in this experiment.
|
||||
* @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H
|
||||
* and TPC-DS, the scale factor is a number roughly representing the
|
||||
* size of raw data files. For some other benchmarks, the scale factor
|
||||
* is a short string describing the scale of the dataset.
|
||||
*/
|
||||
abstract class Dataset(
|
||||
@transient sqlContext: SQLContext,
|
||||
sparkVersion: String,
|
||||
dataLocation: String,
|
||||
tables: Seq[Table],
|
||||
scaleFactor: String) extends Serializable {
|
||||
|
||||
val datasetName: String
|
||||
|
||||
@transient val sparkContext = sqlContext.sparkContext
|
||||
|
||||
def createTablesForTest(tables: Seq[Table]): Seq[TableForTest]
|
||||
|
||||
val tablesForTest: Seq[TableForTest] = createTablesForTest(tables)
|
||||
|
||||
def checkData(): Unit = {
|
||||
tablesForTest.foreach { table =>
|
||||
val fs = FileSystem.get(new java.net.URI(table.outputDir), sparkContext.hadoopConfiguration)
|
||||
val exists = fs.exists(new Path(table.outputDir))
|
||||
val wasSuccessful = fs.exists(new Path(s"${table.outputDir}/_SUCCESS"))
|
||||
|
||||
if (!wasSuccessful) {
|
||||
if (exists) {
|
||||
println(s"Table '${table.name}' not generated successfully, regenerating.")
|
||||
} else {
|
||||
println(s"Table '${table.name}' does not exist, generating.")
|
||||
}
|
||||
fs.delete(new Path(table.outputDir), true)
|
||||
table.generate()
|
||||
} else {
|
||||
println(s"Table ${table.name} already exists.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def allStats = tablesForTest.map(_.stats).reduceLeft(_.unionAll(_))
|
||||
|
||||
/**
|
||||
* Does necessary setup work such as data generation and transformation. It needs to be
|
||||
* called before running any query.
|
||||
*/
|
||||
def setup(): Unit = {
|
||||
checkData()
|
||||
tablesForTest.foreach(_.createTempTable())
|
||||
}
|
||||
|
||||
def currentConfiguration = BenchmarkConfiguration(
|
||||
sparkVersion = sparkVersion,
|
||||
scaleFactor = scaleFactor,
|
||||
sqlConf = sqlContext.getAllConfs,
|
||||
sparkConf = sparkContext.getConf.getAll.toMap,
|
||||
defaultParallelism = sparkContext.defaultParallelism)
|
||||
|
||||
/**
|
||||
* Starts an experiment run with a given set of queries.
|
||||
* @param queries Queries to be executed.
|
||||
* @param resultsLocation The location of performance results.
|
||||
* @param includeBreakdown If it is true, breakdown results of a query will be recorded.
|
||||
* Setting it to true may significantly increase the time used to
|
||||
* execute a query.
|
||||
* @param iterations The number of iterations.
|
||||
* @param variations [[Variation]]s used in this run.
|
||||
* @param tags Tags of this run.
|
||||
* @return It returns a ExperimentStatus object that can be used to
|
||||
* track the progress of this experiment run.
|
||||
*/
|
||||
def runExperiment(
|
||||
queries: Seq[Query],
|
||||
resultsLocation: String,
|
||||
includeBreakdown: Boolean = false,
|
||||
iterations: Int = 3,
|
||||
variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("")) { _ => {} }),
|
||||
tags: Map[String, String] = Map.empty) = {
|
||||
|
||||
val queriesToRun = queries.map(query => QueryForTest(query, includeBreakdown, sqlContext))
|
||||
|
||||
class ExperimentStatus {
|
||||
val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]()
|
||||
val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]()
|
||||
val currentMessages = new collection.mutable.ArrayBuffer[String]()
|
||||
|
||||
@volatile
|
||||
var currentQuery = ""
|
||||
|
||||
def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match {
|
||||
case Nil => List(Nil)
|
||||
case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt
|
||||
}
|
||||
|
||||
val timestamp = System.currentTimeMillis()
|
||||
val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList)
|
||||
val resultsFuture = future {
|
||||
val results = (1 to iterations).flatMap { i =>
|
||||
combinations.map { setup =>
|
||||
val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map {
|
||||
case (v, idx) =>
|
||||
v.setup(v.options(idx))
|
||||
v.name -> v.options(idx).toString
|
||||
}
|
||||
|
||||
val result = ExperimentRun(
|
||||
timestamp = timestamp,
|
||||
datasetName = datasetName,
|
||||
iteration = i,
|
||||
tags = currentOptions.toMap ++ tags,
|
||||
configuration = currentConfiguration,
|
||||
queriesToRun.flatMap { q =>
|
||||
val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}"
|
||||
currentMessages += s"Running query ${q.name} $setup"
|
||||
|
||||
currentQuery = q.name
|
||||
val singleResult = try q.benchmark(setup) :: Nil catch {
|
||||
case e: Exception =>
|
||||
currentMessages += s"Failed to run query ${q.name}: $e"
|
||||
Nil
|
||||
}
|
||||
currentResults ++= singleResult
|
||||
singleResult
|
||||
})
|
||||
currentRuns += result
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
val resultsTable = sqlContext.createDataFrame(results)
|
||||
currentMessages += s"Results stored to: $resultsLocation/$timestamp"
|
||||
resultsTable.toJSON.coalesce(1).saveAsTextFile(s"$resultsLocation/$timestamp")
|
||||
resultsTable
|
||||
}
|
||||
|
||||
/** Waits for the finish of the experiment. */
|
||||
def waitForFinish(timeoutInSeconds: Int) = {
|
||||
Await.result(resultsFuture, timeoutInSeconds.seconds)
|
||||
}
|
||||
|
||||
/** Returns results from an actively running experiment. */
|
||||
def getCurrentResults() = {
|
||||
val tbl = sqlContext.createDataFrame(currentResults)
|
||||
tbl.registerTempTable("currentResults")
|
||||
tbl
|
||||
}
|
||||
|
||||
/** Returns full iterations from an actively running experiment. */
|
||||
def getCurrentRuns() = {
|
||||
val tbl = sqlContext.createDataFrame(currentRuns)
|
||||
tbl.registerTempTable("currentRuns")
|
||||
tbl
|
||||
}
|
||||
|
||||
def tail(n: Int = 5) = {
|
||||
currentMessages.takeRight(n).mkString("\n")
|
||||
}
|
||||
|
||||
def status =
|
||||
if (resultsFuture.isCompleted) {
|
||||
if (resultsFuture.value.get.isFailure) "Failed" else "Successful"
|
||||
} else {
|
||||
"Running"
|
||||
}
|
||||
|
||||
override def toString =
|
||||
s"""
|
||||
|=== $status Experiment ===
|
||||
|Permalink: table("allResults").where('timestamp === ${timestamp}L)
|
||||
|Queries: ${queriesToRun.map(_.name).map(n => if(n == currentQuery) s"|$n|" else n).mkString(" ")}
|
||||
|Iterations complete: ${currentRuns.size / combinations.size} / $iterations
|
||||
|Queries run: ${currentResults.size} / ${iterations * combinations.size * queriesToRun.size}
|
||||
|Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s
|
||||
|
|
||||
|== Logs ==
|
||||
|${tail()}
|
||||
""".stripMargin
|
||||
}
|
||||
new ExperimentStatus
|
||||
}
|
||||
}
|
||||
@ -1,60 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
abstract class TableType
|
||||
case object UnpartitionedTable extends TableType
|
||||
case class PartitionedTable(partitionColumn: String) extends TableType
|
||||
|
||||
case class Table(name: String, tableType: TableType, fields: StructField*)
|
||||
|
||||
abstract class TableForTest(
|
||||
table: Table,
|
||||
baseDir: String,
|
||||
@transient sqlContext: SQLContext) extends Serializable {
|
||||
|
||||
val schema = StructType(table.fields)
|
||||
|
||||
val name = table.name
|
||||
|
||||
val outputDir = s"$baseDir/${name}"
|
||||
|
||||
def fromCatalog = sqlContext.table(name)
|
||||
|
||||
def stats =
|
||||
fromCatalog.select(
|
||||
lit(name) as "tableName",
|
||||
count("*") as "numRows",
|
||||
lit(fromCatalog.queryExecution.optimizedPlan.statistics.sizeInBytes.toLong) as "sizeInBytes")
|
||||
|
||||
def createTempTable(): Unit = {
|
||||
sqlContext.sql(
|
||||
s"""
|
||||
|CREATE TEMPORARY TABLE ${name}
|
||||
|USING org.apache.spark.sql.parquet
|
||||
|OPTIONS (
|
||||
| path '${outputDir}'
|
||||
|)
|
||||
""".stripMargin)
|
||||
}
|
||||
|
||||
def generate(): Unit
|
||||
}
|
||||
@ -14,12 +14,14 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf.tpcds.queries
|
||||
package com.databricks.spark.sql.perf.tpcds
|
||||
|
||||
import com.databricks.spark.sql.perf.ExecutionMode.CollectResults
|
||||
import com.databricks.spark.sql.perf.Query
|
||||
import com.databricks.spark.sql.perf.Benchmark
|
||||
|
||||
trait ImpalaKitQueries extends Benchmark {
|
||||
|
||||
import ExecutionMode._
|
||||
|
||||
object ImpalaKitQueries {
|
||||
// Queries are from
|
||||
// https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries
|
||||
val queries = Seq(
|
||||
@ -14,12 +14,14 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf.tpcds.queries
|
||||
package com.databricks.spark.sql.perf.tpcds
|
||||
|
||||
import com.databricks.spark.sql.perf.ExecutionMode.ForeachResults
|
||||
import com.databricks.spark.sql.perf.Query
|
||||
import com.databricks.spark.sql.perf.Benchmark
|
||||
|
||||
trait SimpleQueries extends Benchmark {
|
||||
|
||||
import ExecutionMode._
|
||||
|
||||
object SimpleQueries {
|
||||
val q7Derived = Seq(
|
||||
("q7-simpleScan",
|
||||
"""
|
||||
@ -17,55 +17,16 @@
|
||||
package com.databricks.spark.sql.perf.tpcds
|
||||
|
||||
import com.databricks.spark.sql.perf._
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.parquet.TPCDSTableForTest
|
||||
import org.apache.spark.sql.{Column, SQLContext}
|
||||
import org.apache.spark.sql.SQLContext
|
||||
|
||||
/**
|
||||
* TPC-DS benchmark's dataset.
|
||||
* @param sqlContext An existing SQLContext.
|
||||
* @param sparkVersion The version of Spark.
|
||||
* @param dataLocation The location of the dataset used by this experiment.
|
||||
* @param dsdgenDir The location of dsdgen in every worker machine.
|
||||
* @param resultsLocation The location of performance results.
|
||||
* @param tables Tables that will be used in this experiment.
|
||||
* @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H
|
||||
* and TPC-DS, the scale factor is a number roughly representing the
|
||||
* size of raw data files. For some other benchmarks, the scale factor
|
||||
* is a short string describing the scale of the dataset.
|
||||
*/
|
||||
class TPCDS (
|
||||
@transient sqlContext: SQLContext,
|
||||
sparkVersion: String,
|
||||
dataLocation: String,
|
||||
dsdgenDir: String,
|
||||
tables: Seq[Table],
|
||||
scaleFactor: String,
|
||||
userSpecifiedBaseDir: Option[String] = None)
|
||||
extends Dataset(
|
||||
sqlContext,
|
||||
sparkVersion,
|
||||
dataLocation,
|
||||
tables,
|
||||
scaleFactor) with Serializable {
|
||||
import sqlContext._
|
||||
import sqlContext.implicits._
|
||||
|
||||
override val datasetName = "tpcds"
|
||||
|
||||
lazy val baseDir =
|
||||
userSpecifiedBaseDir.getOrElse(s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true")
|
||||
|
||||
override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = {
|
||||
tables.map(table =>
|
||||
TPCDSTableForTest(table, baseDir, scaleFactor.toInt, dsdgenDir, sqlContext))
|
||||
}
|
||||
|
||||
override def setup(): Unit = {
|
||||
super.setup()
|
||||
setupBroadcast()
|
||||
}
|
||||
class TPCDS (@transient sqlContext: SQLContext)
|
||||
extends Benchmark(sqlContext) with ImpalaKitQueries with SimpleQueries with Serializable {
|
||||
|
||||
/*
|
||||
def setupBroadcast(skipTables: Seq[String] = Seq("store_sales", "customer")) = {
|
||||
val skipExpr = skipTables.map(t => !('tableName === t)).reduceLeft[Column](_ && _)
|
||||
val threshold =
|
||||
@ -79,5 +40,6 @@ class TPCDS (
|
||||
println(setQuery)
|
||||
sql(setQuery)
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.parquet // This is a hack until parquet has better support for partitioning.
|
||||
package com.databricks.spark.sql.perf.tpcds
|
||||
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
@ -37,171 +37,63 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
import parquet.hadoop.ParquetOutputFormat
|
||||
import parquet.hadoop.util.ContextUtil
|
||||
|
||||
class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) {
|
||||
import sqlContext.implicits._
|
||||
|
||||
case class TPCDSTableForTest(
|
||||
table: Table,
|
||||
baseDir: String,
|
||||
scaleFactor: Int,
|
||||
dsdgenDir: String,
|
||||
@transient sqlContext: SQLContext,
|
||||
maxRowsPerPartitions: Int = 20 * 1000 * 1000)
|
||||
extends TableForTest(table, baseDir, sqlContext) with Serializable with SparkHadoopMapReduceUtil {
|
||||
|
||||
@transient val sparkContext = sqlContext.sparkContext
|
||||
|
||||
def sparkContext = sqlContext.sparkContext
|
||||
val dsdgen = s"$dsdgenDir/dsdgen"
|
||||
|
||||
override def generate(): Unit = {
|
||||
val partitions = table.tableType match {
|
||||
case PartitionedTable(_) => scaleFactor
|
||||
case _ => 1
|
||||
}
|
||||
case class Table(name: String, partitionColumns: Seq[String], fields: StructField*) {
|
||||
val schema = StructType(fields)
|
||||
val partitions = if (partitionColumns.isEmpty) 1 else 100
|
||||
|
||||
val generatedData = {
|
||||
sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
|
||||
val localToolsDir = if (new java.io.File(dsdgen).exists) {
|
||||
dsdgenDir
|
||||
} else if (new java.io.File(s"/$dsdgen").exists) {
|
||||
s"/$dsdgenDir"
|
||||
} else {
|
||||
sys.error(s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
|
||||
}
|
||||
|
||||
val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
|
||||
val commands = Seq(
|
||||
"bash", "-c",
|
||||
s"cd $localToolsDir && ./dsdgen -table ${table.name} -filter Y -scale $scaleFactor $parallel")
|
||||
println(commands)
|
||||
commands.lines
|
||||
}
|
||||
}
|
||||
|
||||
generatedData.setName(s"${table.name}, sf=$scaleFactor, strings")
|
||||
|
||||
val rows = generatedData.mapPartitions { iter =>
|
||||
val currentRow = new GenericMutableRow(schema.fields.size)
|
||||
iter.map { l =>
|
||||
(0 until schema.fields.length).foreach(currentRow.setNullAt)
|
||||
l.split("\\|", -1).zipWithIndex.dropRight(1).foreach { case (f, i) => currentRow(i) = f}
|
||||
currentRow: Row
|
||||
}
|
||||
}
|
||||
|
||||
val stringData =
|
||||
sqlContext.createDataFrame(
|
||||
rows,
|
||||
StructType(schema.fields.map(f => StructField(f.name, StringType))))
|
||||
|
||||
val convertedData = {
|
||||
val columns = schema.fields.map { f =>
|
||||
val columnName = new ColumnName(f.name)
|
||||
columnName.cast(f.dataType).as(f.name)
|
||||
}
|
||||
stringData.select(columns: _*)
|
||||
}
|
||||
|
||||
table.tableType match {
|
||||
// This is an awful hack... spark sql parquet should support this natively.
|
||||
case PartitionedTable(partitioningColumn) =>
|
||||
sqlContext.setConf("spark.sql.planner.externalSort", "true")
|
||||
val output = convertedData.queryExecution.analyzed.output
|
||||
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
|
||||
|
||||
val writeSupport =
|
||||
if (schema.fields.map(_.dataType).forall(_.isPrimitive)) {
|
||||
classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport]
|
||||
def df = {
|
||||
val generatedData = {
|
||||
sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
|
||||
val localToolsDir = if (new java.io.File(dsdgen).exists) {
|
||||
dsdgenDir
|
||||
} else if (new java.io.File(s"/$dsdgen").exists) {
|
||||
s"/$dsdgenDir"
|
||||
} else {
|
||||
classOf[org.apache.spark.sql.parquet.RowWriteSupport]
|
||||
sys.error(s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
|
||||
}
|
||||
|
||||
ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
|
||||
|
||||
val conf = new SerializableWritable(ContextUtil.getConfiguration(job))
|
||||
org.apache.spark.sql.parquet.RowWriteSupport.setSchema(schema.toAttributes, conf.value)
|
||||
|
||||
val partColumnAttr =
|
||||
BindReferences.bindReference[Expression](
|
||||
output.find(_.name == partitioningColumn).get,
|
||||
output)
|
||||
|
||||
|
||||
// TODO: clusterBy would be faster than orderBy
|
||||
val orderedConvertedData =
|
||||
convertedData.filter(new Column(partitioningColumn) isNotNull).orderBy(Column(partitioningColumn) asc)
|
||||
orderedConvertedData.queryExecution.toRdd.foreachPartition { iter =>
|
||||
var writer: RecordWriter[Void, Row] = null
|
||||
val getPartition = new InterpretedMutableProjection(Seq(partColumnAttr))
|
||||
var currentPartition: Row = null
|
||||
var hadoopContext: TaskAttemptContext = null
|
||||
var committer: OutputCommitter = null
|
||||
|
||||
var rowCount = 0
|
||||
var partition = 0
|
||||
|
||||
while (iter.hasNext) {
|
||||
val currentRow = iter.next()
|
||||
|
||||
rowCount += 1
|
||||
if (rowCount >= maxRowsPerPartitions) {
|
||||
rowCount = 0
|
||||
partition += 1
|
||||
println(s"Starting partition $partition")
|
||||
if (writer != null) {
|
||||
writer.close(hadoopContext)
|
||||
committer.commitTask(hadoopContext)
|
||||
}
|
||||
writer = null
|
||||
}
|
||||
|
||||
if ((getPartition(currentRow) != currentPartition || writer == null) &&
|
||||
!getPartition.currentValue.isNullAt(0)) {
|
||||
rowCount = 0
|
||||
currentPartition = getPartition.currentValue.copy()
|
||||
if (writer != null) {
|
||||
writer.close(hadoopContext)
|
||||
committer.commitTask(hadoopContext)
|
||||
}
|
||||
|
||||
val job = new Job(conf.value)
|
||||
val keyType = classOf[Void]
|
||||
job.setOutputKeyClass(keyType)
|
||||
job.setOutputValueClass(classOf[Row])
|
||||
NewFileOutputFormat.setOutputPath(
|
||||
job,
|
||||
new Path(s"$outputDir/$partitioningColumn=${currentPartition(0)}"))
|
||||
val wrappedConf = new SerializableWritable(job.getConfiguration)
|
||||
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
|
||||
val jobtrackerID = formatter.format(new Date())
|
||||
val stageId = partition
|
||||
|
||||
val attemptNumber = 1
|
||||
/* "reduce task" <split #> <attempt # = spark task #> */
|
||||
val attemptId = newTaskAttemptID(jobtrackerID, partition, isMap = false, partition, attemptNumber)
|
||||
hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
|
||||
val format = new ParquetOutputFormat[Row]
|
||||
committer = format.getOutputCommitter(hadoopContext)
|
||||
committer.setupTask(hadoopContext)
|
||||
writer = format.getRecordWriter(hadoopContext)
|
||||
|
||||
}
|
||||
if (!getPartition.currentValue.isNullAt(0)) {
|
||||
writer.write(null, currentRow)
|
||||
}
|
||||
}
|
||||
if (writer != null) {
|
||||
writer.close(hadoopContext)
|
||||
committer.commitTask(hadoopContext)
|
||||
}
|
||||
val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
|
||||
val commands = Seq(
|
||||
"bash", "-c",
|
||||
s"cd $localToolsDir && ./dsdgen -table $name -filter Y -scale $scaleFactor $parallel")
|
||||
println(commands)
|
||||
commands.lines
|
||||
}
|
||||
val fs = FileSystem.get(new java.net.URI(outputDir), new Configuration())
|
||||
fs.create(new Path(s"$outputDir/_SUCCESS")).close()
|
||||
case _ => convertedData.saveAsParquetFile(outputDir)
|
||||
}
|
||||
|
||||
generatedData.setName(s"$name, sf=$scaleFactor, strings")
|
||||
|
||||
val rows = generatedData.mapPartitions { iter =>
|
||||
val currentRow = new GenericMutableRow(schema.fields.size)
|
||||
iter.map { l =>
|
||||
(0 until schema.fields.length).foreach(currentRow.setNullAt)
|
||||
l.split("\\|", -1).zipWithIndex.dropRight(1).foreach { case (f, i) => currentRow(i) = f}
|
||||
currentRow: Row
|
||||
}
|
||||
}
|
||||
|
||||
val stringData =
|
||||
sqlContext.createDataFrame(
|
||||
rows,
|
||||
StructType(schema.fields.map(f => StructField(f.name, StringType))))
|
||||
|
||||
val convertedData = {
|
||||
val columns = schema.fields.map { f =>
|
||||
val columnName = new ColumnName(f.name)
|
||||
columnName.cast(f.dataType).as(f.name)
|
||||
}
|
||||
stringData.select(columns: _*)
|
||||
}
|
||||
|
||||
convertedData
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case class Tables(sqlContext: SQLContext) {
|
||||
import sqlContext.implicits._
|
||||
|
||||
val tables = Seq(
|
||||
/* This is another large table that we don't build yet.
|
||||
@ -212,7 +104,7 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'inv_warehouse_sk .int,
|
||||
'inv_quantity_on_hand .int),*/
|
||||
Table("store_sales",
|
||||
PartitionedTable("ss_sold_date_sk"),
|
||||
partitionColumns = "ss_sold_date_sk" :: Nil,
|
||||
'ss_sold_date_sk .int,
|
||||
'ss_sold_time_sk .int,
|
||||
'ss_item_sk .int,
|
||||
@ -237,7 +129,7 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'ss_net_paid_inc_tax .decimal(7,2),
|
||||
'ss_net_profit .decimal(7,2)),
|
||||
Table("customer",
|
||||
UnpartitionedTable,
|
||||
partitionColumns = Nil,
|
||||
'c_customer_sk .int,
|
||||
'c_customer_id .string,
|
||||
'c_current_cdemo_sk .int,
|
||||
@ -257,7 +149,7 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'c_email_address .string,
|
||||
'c_last_review_date .string),
|
||||
Table("customer_address",
|
||||
UnpartitionedTable,
|
||||
partitionColumns = Nil,
|
||||
'ca_address_sk .int,
|
||||
'ca_address_id .string,
|
||||
'ca_street_number .string,
|
||||
@ -272,7 +164,7 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'ca_gmt_offset .decimal(5,2),
|
||||
'ca_location_type .string),
|
||||
Table("customer_demographics",
|
||||
UnpartitionedTable,
|
||||
partitionColumns = Nil,
|
||||
'cd_demo_sk .int,
|
||||
'cd_gender .string,
|
||||
'cd_marital_status .string,
|
||||
@ -283,7 +175,7 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'cd_dep_employed_count .int,
|
||||
'cd_dep_college_count .int),
|
||||
Table("date_dim",
|
||||
UnpartitionedTable,
|
||||
partitionColumns = Nil,
|
||||
'd_date_sk .int,
|
||||
'd_date_id .string,
|
||||
'd_date .string,
|
||||
@ -313,14 +205,14 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'd_current_quarter .string,
|
||||
'd_current_year .string),
|
||||
Table("household_demographics",
|
||||
UnpartitionedTable,
|
||||
partitionColumns = Nil,
|
||||
'hd_demo_sk .int,
|
||||
'hd_income_band_sk .int,
|
||||
'hd_buy_potential .string,
|
||||
'hd_dep_count .int,
|
||||
'hd_vehicle_count .int),
|
||||
Table("item",
|
||||
UnpartitionedTable,
|
||||
partitionColumns = Nil,
|
||||
'i_item_sk .int,
|
||||
'i_item_id .string,
|
||||
'i_rec_start_date .string,
|
||||
@ -344,7 +236,7 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'i_manager_id .int,
|
||||
'i_product_name .string),
|
||||
Table("promotion",
|
||||
UnpartitionedTable,
|
||||
partitionColumns = Nil,
|
||||
'p_promo_sk .int,
|
||||
'p_promo_id .string,
|
||||
'p_start_date_sk .int,
|
||||
@ -365,7 +257,7 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'p_purpose .string,
|
||||
'p_discount_active .string),
|
||||
Table("store",
|
||||
UnpartitionedTable,
|
||||
partitionColumns = Nil,
|
||||
's_store_sk .int,
|
||||
's_store_id .string,
|
||||
's_rec_start_date .string,
|
||||
@ -396,7 +288,7 @@ case class Tables(sqlContext: SQLContext) {
|
||||
's_gmt_offset .decimal(5,2),
|
||||
's_tax_precentage .decimal(5,2)),
|
||||
Table("time_dim",
|
||||
UnpartitionedTable,
|
||||
partitionColumns = Nil,
|
||||
't_time_sk .int,
|
||||
't_time_id .string,
|
||||
't_time .int,
|
||||
|
||||
@ -1,22 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf.tpcds
|
||||
|
||||
package object queries {
|
||||
val impalaKitQueries = ImpalaKitQueries.impalaKitQueries
|
||||
val q7DerivedQueries = SimpleQueries.q7Derived
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user