Main Class for running Benchmarks from the command line
This PR adds the ability to run performance test locally as a stand alone program that reports the results to the console:
```
$ bin/run --help
spark-sql-perf 0.2.0
Usage: spark-sql-perf [options]
-b <value> | --benchmark <value>
the name of the benchmark to run
-f <value> | --filter <value>
a filter on the name of the queries to run
-i <value> | --iterations <value>
the number of iterations to run
--help
prints this usage text
$ bin/run --benchmark DatasetPerformance
```
Author: Michael Armbrust <michael@databricks.com>
Closes #47 from marmbrus/MainClass.
This commit is contained in:
parent
5c93fff323
commit
663ca7560e
24
README.md
24
README.md
@ -2,10 +2,32 @@
|
||||
|
||||
[](https://travis-ci.org/databricks/spark-sql-perf)
|
||||
|
||||
This is a performance testing framework for [Spark SQL](https://spark.apache.org/sql/) in [Apache Spark](https://spark.apache.org/) 1.4+.
|
||||
This is a performance testing framework for [Spark SQL](https://spark.apache.org/sql/) in [Apache Spark](https://spark.apache.org/) 1.6+.
|
||||
|
||||
**Note: This README is still under development. Please also check our source code for more information.**
|
||||
|
||||
# Quick Start
|
||||
|
||||
```
|
||||
$ bin/run --help
|
||||
|
||||
spark-sql-perf 0.2.0
|
||||
Usage: spark-sql-perf [options]
|
||||
|
||||
-b <value> | --benchmark <value>
|
||||
the name of the benchmark to run
|
||||
-f <value> | --filter <value>
|
||||
a filter on the name of the queries to run
|
||||
-i <value> | --iterations <value>
|
||||
the number of iterations to run
|
||||
--help
|
||||
prints this usage text
|
||||
|
||||
$ bin/run --benchmark DatasetPerformance
|
||||
```
|
||||
|
||||
# TPC-DS
|
||||
|
||||
## How to use it
|
||||
The rest of document will use TPC-DS benchmark as an example. We will add contents to explain how to use other benchmarks add the support of a new benchmark dataset in future.
|
||||
|
||||
|
||||
4
bin/run
Executable file
4
bin/run
Executable file
@ -0,0 +1,4 @@
|
||||
#!/bin/bash
|
||||
|
||||
ARGS="runMain com.databricks.spark.sql.perf.RunBenchmark $@"
|
||||
build/sbt "$ARGS"
|
||||
10
build.sbt
10
build.sbt
@ -14,19 +14,25 @@ licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")
|
||||
|
||||
sparkVersion := "1.6.0"
|
||||
|
||||
resolvers += "Apache Preview Repo" at "https://repository.apache.org/content/repositories/orgapachespark-1156/"
|
||||
|
||||
sparkComponents ++= Seq("sql", "hive")
|
||||
|
||||
initialCommands in console :=
|
||||
"""
|
||||
|import org.apache.spark.sql._
|
||||
|import org.apache.spark.sql.functions._
|
||||
|import org.apache.spark.sql.types._
|
||||
|import org.apache.spark.sql.hive.test.TestHive
|
||||
|import TestHive.implicits
|
||||
|import TestHive.sql
|
||||
|
|
||||
|val sqlContext = TestHive
|
||||
|import sqlContext.implicits._
|
||||
""".stripMargin
|
||||
|
||||
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion.value
|
||||
|
||||
libraryDependencies += "com.github.scopt" %% "scopt" % "3.3.0"
|
||||
|
||||
libraryDependencies += "com.twitter" %% "util-jvm" % "6.23.0" % "provided"
|
||||
|
||||
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test"
|
||||
|
||||
@ -39,22 +39,21 @@ import com.databricks.spark.sql.perf.cpu._
|
||||
* @param sqlContext An existing SQLContext.
|
||||
*/
|
||||
abstract class Benchmark(
|
||||
@transient protected val sqlContext: SQLContext,
|
||||
val resultsLocation: String = "/spark/sql/performance",
|
||||
val resultsTableName: String = "sqlPerformance")
|
||||
@transient val sqlContext: SQLContext)
|
||||
extends Serializable {
|
||||
|
||||
import sqlContext.implicits._
|
||||
|
||||
def createResultsTable() = {
|
||||
sqlContext.sql(s"DROP TABLE $resultsTableName")
|
||||
sqlContext.createExternalTable(
|
||||
"sqlPerformance", "json", Map("path" -> (resultsLocation + "/*/")))
|
||||
}
|
||||
def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate()))
|
||||
|
||||
val resultsLocation =
|
||||
sqlContext.getAllConfs.getOrElse(
|
||||
"spark.sql.perf.results",
|
||||
"/spark/sql/performance")
|
||||
|
||||
protected def sparkContext = sqlContext.sparkContext
|
||||
|
||||
protected implicit def toOption[A](a: A) = Option(a)
|
||||
protected implicit def toOption[A](a: A): Option[A] = Option(a)
|
||||
|
||||
val buildInfo = Try(getClass.getClassLoader.loadClass("org.apache.spark.BuildInfo")).map { cls =>
|
||||
cls.getMethods
|
||||
@ -132,6 +131,11 @@ abstract class Benchmark(
|
||||
val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]()
|
||||
val currentMessages = new collection.mutable.ArrayBuffer[String]()
|
||||
|
||||
def logMessage(msg: String) = {
|
||||
println(msg)
|
||||
currentMessages += msg
|
||||
}
|
||||
|
||||
// Stats for HTML status message.
|
||||
@volatile var currentExecution = ""
|
||||
@volatile var currentPlan = "" // for queries only
|
||||
@ -149,6 +153,7 @@ abstract class Benchmark(
|
||||
}
|
||||
|
||||
val timestamp = System.currentTimeMillis()
|
||||
val resultPath = s"$resultsLocation/timestamp=$timestamp"
|
||||
val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList)
|
||||
val resultsFuture = Future {
|
||||
|
||||
@ -169,20 +174,20 @@ abstract class Benchmark(
|
||||
.foreach { name =>
|
||||
try {
|
||||
sqlContext.table(name)
|
||||
currentMessages += s"Table $name exists."
|
||||
logMessage(s"Table $name exists.")
|
||||
} catch {
|
||||
case ae: Exception =>
|
||||
val table = allTables
|
||||
.find(_.name == name)
|
||||
if (table.isDefined) {
|
||||
currentMessages += s"Creating table: $name"
|
||||
logMessage(s"Creating table: $name")
|
||||
table.get.data
|
||||
.write
|
||||
.mode("overwrite")
|
||||
.saveAsTable(name)
|
||||
} else {
|
||||
// the table could be subquery
|
||||
println(s"Couldn't read table $name and its not defined as a Benchmark.Table.")
|
||||
logMessage(s"Couldn't read table $name and its not defined as a Benchmark.Table.")
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -205,7 +210,7 @@ abstract class Benchmark(
|
||||
|
||||
executionsToRun.flatMap { q =>
|
||||
val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}"
|
||||
currentMessages += s"Running execution ${q.name} $setup"
|
||||
logMessage(s"Running execution ${q.name} $setup")
|
||||
|
||||
currentExecution = q.name
|
||||
currentPlan = q match {
|
||||
@ -220,13 +225,15 @@ abstract class Benchmark(
|
||||
}
|
||||
startTime = System.currentTimeMillis()
|
||||
|
||||
val singleResult = q.benchmark(includeBreakdown, setup, currentMessages, timeout)
|
||||
val singleResult =
|
||||
q.benchmark(includeBreakdown, setup, currentMessages, timeout)
|
||||
|
||||
singleResult.failure.foreach { f =>
|
||||
failures += 1
|
||||
currentMessages += s"Execution '${q.name}' failed: ${f.message}"
|
||||
logMessage(s"Execution '${q.name}' failed: ${f.message}")
|
||||
}
|
||||
singleResult.executionTime.foreach { time =>
|
||||
currentMessages += s"Execution time: ${time / 1000}s"
|
||||
logMessage(s"Execution time: ${time / 1000}s")
|
||||
}
|
||||
currentResults += singleResult
|
||||
singleResult :: Nil
|
||||
@ -240,16 +247,16 @@ abstract class Benchmark(
|
||||
|
||||
try {
|
||||
val resultsTable = sqlContext.createDataFrame(results)
|
||||
currentMessages += s"Results written to table: 'sqlPerformance' at $resultsLocation/$timestamp"
|
||||
logMessage(s"Results written to table: 'sqlPerformance' at $resultPath")
|
||||
results.toDF()
|
||||
.coalesce(1)
|
||||
.write
|
||||
.format("json")
|
||||
.save(s"$resultsLocation/$timestamp")
|
||||
.save(resultPath)
|
||||
|
||||
results.toDF()
|
||||
} catch {
|
||||
case e: Throwable => currentMessages += s"Failed to write data: $e"
|
||||
case e: Throwable => logMessage(s"Failed to write data: $e")
|
||||
}
|
||||
|
||||
logCollection()
|
||||
@ -257,13 +264,13 @@ abstract class Benchmark(
|
||||
|
||||
def scheduleCpuCollection(fs: FS) = {
|
||||
logCollection = () => {
|
||||
currentMessages += s"Begining CPU log collection"
|
||||
logMessage(s"Begining CPU log collection")
|
||||
try {
|
||||
val location = cpu.collectLogs(sqlContext, fs, timestamp)
|
||||
currentMessages += s"cpu results recorded to $location"
|
||||
logMessage(s"cpu results recorded to $location")
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
currentMessages += s"Error collecting logs: $e"
|
||||
logMessage(s"Error collecting logs: $e")
|
||||
throw e
|
||||
}
|
||||
}
|
||||
@ -327,7 +334,7 @@ abstract class Benchmark(
|
||||
}
|
||||
s"""
|
||||
|<h2>$status Experiment</h2>
|
||||
|<b>Permalink:</b> <tt>table("$resultsTableName").where('timestamp === ${timestamp}L)</tt><br/>
|
||||
|<b>Permalink:</b> <tt>sqlContext.read.json("$resultPath")</tt><br/>
|
||||
|<b>Iterations complete:</b> ${currentRuns.size / combinations.size} / $iterations<br/>
|
||||
|<b>Failures:</b> $failures<br/>
|
||||
|<b>Executions run:</b> ${currentResults.size} / ${iterations * combinations.size * executionsToRun.size}
|
||||
@ -383,15 +390,15 @@ abstract class Benchmark(
|
||||
myType.declarations
|
||||
.filter(m => m.isMethod)
|
||||
.map(_.asMethod)
|
||||
.filter(_.asMethod.returnType =:= typeOf[Query])
|
||||
.map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Query])
|
||||
.filter(_.asMethod.returnType =:= typeOf[Benchmarkable])
|
||||
.map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Benchmarkable])
|
||||
|
||||
def groupedQueries =
|
||||
myType.declarations
|
||||
.filter(m => m.isMethod)
|
||||
.map(_.asMethod)
|
||||
.filter(_.asMethod.returnType =:= typeOf[Seq[Query]])
|
||||
.flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Query]])
|
||||
.filter(_.asMethod.returnType =:= typeOf[Seq[Benchmarkable]])
|
||||
.flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Benchmarkable]])
|
||||
|
||||
@transient
|
||||
lazy val allQueries = (singleQueries ++ groupedQueries).toSeq
|
||||
@ -426,35 +433,6 @@ abstract class Benchmark(
|
||||
""".stripMargin
|
||||
}
|
||||
|
||||
trait ExecutionMode extends Serializable
|
||||
case object ExecutionMode {
|
||||
// Benchmark run by collecting queries results (e.g. rdd.collect())
|
||||
case object CollectResults extends ExecutionMode {
|
||||
override def toString: String = "collect"
|
||||
}
|
||||
|
||||
// Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit))
|
||||
case object ForeachResults extends ExecutionMode {
|
||||
override def toString: String = "foreach"
|
||||
}
|
||||
|
||||
// Benchmark run by saving the output of each query as a parquet file at the specified location
|
||||
case class WriteParquet(location: String) extends ExecutionMode {
|
||||
override def toString: String = "saveToParquet"
|
||||
}
|
||||
|
||||
// Benchmark run by calculating the sum of the hash value of all rows. This is used to check
|
||||
// query results.
|
||||
case object HashResults extends ExecutionMode {
|
||||
override def toString: String = "hash"
|
||||
}
|
||||
|
||||
// Results from Spark perf
|
||||
case object SparkPerfResults extends ExecutionMode {
|
||||
override def toString: String = "sparkPerf"
|
||||
}
|
||||
}
|
||||
|
||||
/** Factory object for benchmark queries. */
|
||||
case object Query {
|
||||
def apply(
|
||||
@ -473,80 +451,16 @@ abstract class Benchmark(
|
||||
}
|
||||
}
|
||||
|
||||
/** A trait to describe things that can be benchmarked. */
|
||||
trait Benchmarkable {
|
||||
val name: String
|
||||
protected val executionMode: ExecutionMode
|
||||
|
||||
final def benchmark(
|
||||
includeBreakdown: Boolean,
|
||||
description: String = "",
|
||||
messages: ArrayBuffer[String],
|
||||
timeout: Long): BenchmarkResult = {
|
||||
sparkContext.setJobDescription(s"Execution: $name, $description")
|
||||
beforeBenchmark()
|
||||
val result = runBenchmark(includeBreakdown, description, messages, timeout)
|
||||
afterBenchmark(sqlContext.sparkContext)
|
||||
result
|
||||
}
|
||||
|
||||
protected def beforeBenchmark(): Unit = { }
|
||||
|
||||
private def afterBenchmark(sc: SparkContext): Unit = {
|
||||
// Best-effort clean up of weakly referenced RDDs, shuffles, and broadcasts
|
||||
System.gc()
|
||||
// Remove any leftover blocks that still exist
|
||||
sc.getExecutorStorageStatus
|
||||
.flatMap { status => status.blocks.map { case (bid, _) => bid } }
|
||||
.foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) }
|
||||
}
|
||||
|
||||
private def runBenchmark(
|
||||
includeBreakdown: Boolean,
|
||||
description: String = "",
|
||||
messages: ArrayBuffer[String],
|
||||
timeout: Long): BenchmarkResult = {
|
||||
val jobgroup = UUID.randomUUID().toString
|
||||
var result: BenchmarkResult = null
|
||||
val thread = new Thread("benchmark runner") {
|
||||
override def run(): Unit = {
|
||||
sparkContext.setJobGroup(jobgroup, s"benchmark $name", true)
|
||||
result = doBenchmark(includeBreakdown, description, messages)
|
||||
}
|
||||
}
|
||||
thread.setDaemon(true)
|
||||
thread.start()
|
||||
thread.join(timeout)
|
||||
if (thread.isAlive) {
|
||||
sparkContext.cancelJobGroup(jobgroup)
|
||||
thread.interrupt()
|
||||
result = BenchmarkResult(
|
||||
name = name,
|
||||
mode = executionMode.toString,
|
||||
failure = Some(Failure("Timeout", s"timeout after ${timeout / 1000} seconds"))
|
||||
)
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
protected def doBenchmark(
|
||||
includeBreakdown: Boolean,
|
||||
description: String = "",
|
||||
messages: ArrayBuffer[String]): BenchmarkResult
|
||||
|
||||
protected def measureTimeMs[A](f: => A): Double = {
|
||||
val startTime = System.nanoTime()
|
||||
f
|
||||
val endTime = System.nanoTime()
|
||||
(endTime - startTime).toDouble / 1000000
|
||||
}
|
||||
}
|
||||
|
||||
object RDDCount {
|
||||
def apply(
|
||||
name: String,
|
||||
rdd: RDD[_]) = {
|
||||
new SparkPerfExecution(name, Map.empty, () => Unit, () => rdd.count())
|
||||
new SparkPerfExecution(
|
||||
name,
|
||||
Map.empty,
|
||||
() => Unit,
|
||||
() => rdd.count(),
|
||||
rdd.toDebugString)
|
||||
}
|
||||
}
|
||||
|
||||
@ -555,9 +469,16 @@ abstract class Benchmark(
|
||||
override val name: String,
|
||||
parameters: Map[String, String],
|
||||
prepare: () => Unit,
|
||||
run: () => Unit)
|
||||
run: () => Unit,
|
||||
description: String = "")
|
||||
extends Benchmarkable {
|
||||
|
||||
override def toString: String =
|
||||
s"""
|
||||
|== $name ==
|
||||
|$description
|
||||
""".stripMargin
|
||||
|
||||
protected override val executionMode: ExecutionMode = ExecutionMode.SparkPerfResults
|
||||
|
||||
protected override def beforeBenchmark(): Unit = { prepare() }
|
||||
@ -593,7 +514,7 @@ abstract class Benchmark(
|
||||
override val executionMode: ExecutionMode = ExecutionMode.ForeachResults)
|
||||
extends Benchmarkable with Serializable {
|
||||
|
||||
override def toString = {
|
||||
override def toString: String = {
|
||||
try {
|
||||
s"""
|
||||
|== Query: $name ==
|
||||
@ -606,7 +527,7 @@ abstract class Benchmark(
|
||||
| Can't be analyzed: $e
|
||||
|
|
||||
| $description
|
||||
""".stripMargin
|
||||
""".stripMargin
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,96 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.{SparkEnv, SparkContext}
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
/** A trait to describe things that can be benchmarked. */
|
||||
trait Benchmarkable {
|
||||
val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
|
||||
val sparkContext = sqlContext.sparkContext
|
||||
|
||||
val name: String
|
||||
protected val executionMode: ExecutionMode
|
||||
|
||||
final def benchmark(
|
||||
includeBreakdown: Boolean,
|
||||
description: String = "",
|
||||
messages: ArrayBuffer[String],
|
||||
timeout: Long): BenchmarkResult = {
|
||||
sparkContext.setJobDescription(s"Execution: $name, $description")
|
||||
beforeBenchmark()
|
||||
val result = runBenchmark(includeBreakdown, description, messages, timeout)
|
||||
afterBenchmark(sqlContext.sparkContext)
|
||||
result
|
||||
}
|
||||
|
||||
protected def beforeBenchmark(): Unit = { }
|
||||
|
||||
private def afterBenchmark(sc: SparkContext): Unit = {
|
||||
// Best-effort clean up of weakly referenced RDDs, shuffles, and broadcasts
|
||||
System.gc()
|
||||
// Remove any leftover blocks that still exist
|
||||
sc.getExecutorStorageStatus
|
||||
.flatMap { status => status.blocks.map { case (bid, _) => bid } }
|
||||
.foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) }
|
||||
}
|
||||
|
||||
private def runBenchmark(
|
||||
includeBreakdown: Boolean,
|
||||
description: String = "",
|
||||
messages: ArrayBuffer[String],
|
||||
timeout: Long): BenchmarkResult = {
|
||||
val jobgroup = UUID.randomUUID().toString
|
||||
var result: BenchmarkResult = null
|
||||
val thread = new Thread("benchmark runner") {
|
||||
override def run(): Unit = {
|
||||
sparkContext.setJobGroup(jobgroup, s"benchmark $name", true)
|
||||
result = doBenchmark(includeBreakdown, description, messages)
|
||||
}
|
||||
}
|
||||
thread.setDaemon(true)
|
||||
thread.start()
|
||||
thread.join(timeout)
|
||||
if (thread.isAlive) {
|
||||
sparkContext.cancelJobGroup(jobgroup)
|
||||
thread.interrupt()
|
||||
result = BenchmarkResult(
|
||||
name = name,
|
||||
mode = executionMode.toString,
|
||||
failure = Some(Failure("Timeout", s"timeout after ${timeout / 1000} seconds"))
|
||||
)
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
protected def doBenchmark(
|
||||
includeBreakdown: Boolean,
|
||||
description: String = "",
|
||||
messages: ArrayBuffer[String]): BenchmarkResult
|
||||
|
||||
protected def measureTimeMs[A](f: => A): Double = {
|
||||
val startTime = System.nanoTime()
|
||||
f
|
||||
val endTime = System.nanoTime()
|
||||
(endTime - startTime).toDouble / 1000000
|
||||
}
|
||||
}
|
||||
@ -1,12 +1,29 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.expressions.Aggregator
|
||||
|
||||
case class Data(id: Long)
|
||||
|
||||
case class SumAndCount(var sum: Long, var count: Int)
|
||||
|
||||
trait DatasetPerformance extends Benchmark {
|
||||
class DatasetPerformance extends Benchmark {
|
||||
|
||||
import sqlContext.implicits._
|
||||
|
||||
@ -31,7 +48,7 @@ trait DatasetPerformance extends Benchmark {
|
||||
executionMode = ExecutionMode.ForeachResults),
|
||||
RDDCount(
|
||||
"RDD: range",
|
||||
rdd.map(Data))
|
||||
rdd.map(Data(_)))
|
||||
)
|
||||
|
||||
val backToBackFilters = Seq(
|
||||
@ -51,7 +68,7 @@ trait DatasetPerformance extends Benchmark {
|
||||
.filter("id % 103 != 0")),
|
||||
RDDCount(
|
||||
"RDD: back-to-back filters",
|
||||
rdd.map(Data)
|
||||
rdd.map(Data(_))
|
||||
.filter(_.id % 100 != 0)
|
||||
.filter(_.id % 101 != 0)
|
||||
.filter(_.id % 102 != 0)
|
||||
|
||||
@ -0,0 +1,52 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
/**
|
||||
* Describes how a given Spark benchmark should be run (i.e. should the results be collected to
|
||||
* the driver or just computed on the executors.
|
||||
*/
|
||||
trait ExecutionMode extends Serializable
|
||||
case object ExecutionMode {
|
||||
/** Benchmark run by collecting queries results (e.g. rdd.collect()) */
|
||||
case object CollectResults extends ExecutionMode {
|
||||
override def toString: String = "collect"
|
||||
}
|
||||
|
||||
/** Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit)) */
|
||||
case object ForeachResults extends ExecutionMode {
|
||||
override def toString: String = "foreach"
|
||||
}
|
||||
|
||||
/** Benchmark run by saving the output of each query as a parquet file. */
|
||||
case class WriteParquet(location: String) extends ExecutionMode {
|
||||
override def toString: String = "saveToParquet"
|
||||
}
|
||||
|
||||
/**
|
||||
* Benchmark run by calculating the sum of the hash value of all rows. This is used to check
|
||||
* query results do not change.
|
||||
*/
|
||||
case object HashResults extends ExecutionMode {
|
||||
override def toString: String = "hash"
|
||||
}
|
||||
|
||||
/** Results from Spark perf */
|
||||
case object SparkPerfResults extends ExecutionMode {
|
||||
override def toString: String = "sparkPerf"
|
||||
}
|
||||
}
|
||||
112
src/main/scala/com/databricks/spark/sql/perf/RunBenchmark.scala
Normal file
112
src/main/scala/com/databricks/spark/sql/perf/RunBenchmark.scala
Normal file
@ -0,0 +1,112 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import java.net.InetAddress
|
||||
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.{SparkContext, SparkConf}
|
||||
|
||||
import scala.util.Try
|
||||
|
||||
case class RunConfig(
|
||||
benchmarkName: String = null,
|
||||
filter: Option[String] = None,
|
||||
iterations: Int = 3)
|
||||
|
||||
/**
|
||||
* Runs a benchmark locally and prints the results to the screen.
|
||||
*/
|
||||
object RunBenchmark {
|
||||
def main(args: Array[String]): Unit = {
|
||||
val parser = new scopt.OptionParser[RunConfig]("spark-sql-perf") {
|
||||
head("spark-sql-perf", "0.2.0")
|
||||
opt[String]('b', "benchmark")
|
||||
.action { (x, c) => c.copy(benchmarkName = x) }
|
||||
.text("the name of the benchmark to run")
|
||||
.required()
|
||||
opt[String]('f', "filter")
|
||||
.action((x, c) => c.copy(filter = Some(x)))
|
||||
.text("a filter on the name of the queries to run")
|
||||
opt[Int]('i', "iterations")
|
||||
.action((x, c) => c.copy(iterations = x))
|
||||
.text("the number of iterations to run")
|
||||
help("help")
|
||||
.text("prints this usage text")
|
||||
}
|
||||
|
||||
parser.parse(args, RunConfig()) match {
|
||||
case Some(config) =>
|
||||
run(config)
|
||||
case None =>
|
||||
System.exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
def run(config: RunConfig): Unit = {
|
||||
val conf = new SparkConf()
|
||||
.setMaster("local[*]")
|
||||
.setAppName(getClass.getName)
|
||||
|
||||
val sc = SparkContext.getOrCreate(conf)
|
||||
val sqlContext = SQLContext.getOrCreate(sc)
|
||||
import sqlContext.implicits._
|
||||
|
||||
sqlContext.setConf("spark.sql.perf.results", new java.io.File("performance").toURI.toString)
|
||||
val benchmark = Try {
|
||||
Class.forName(config.benchmarkName)
|
||||
.newInstance()
|
||||
.asInstanceOf[Benchmark]
|
||||
} getOrElse {
|
||||
Class.forName("com.databricks.spark.sql.perf." + config.benchmarkName)
|
||||
.newInstance()
|
||||
.asInstanceOf[Benchmark]
|
||||
}
|
||||
|
||||
val allQueries = config.filter.map { f =>
|
||||
benchmark.allQueries.filter(_.name contains f)
|
||||
} getOrElse {
|
||||
benchmark.allQueries
|
||||
}
|
||||
|
||||
println("== QUERY LIST ==")
|
||||
allQueries.foreach(println)
|
||||
|
||||
val experiment = benchmark.runExperiment(
|
||||
executionsToRun = allQueries,
|
||||
iterations = config.iterations,
|
||||
tags = Map(
|
||||
"runtype" -> "local",
|
||||
"host" -> InetAddress.getLocalHost().getHostName()))
|
||||
|
||||
println("== STARTING EXPERIMENT ==")
|
||||
experiment.waitForFinish(1000 * 60 * 30)
|
||||
experiment.getCurrentRuns()
|
||||
.withColumn("result", explode($"results"))
|
||||
.select("result.*")
|
||||
.groupBy("name")
|
||||
.agg(
|
||||
min($"executionTime") as 'minTimeMs,
|
||||
max($"executionTime") as 'maxTimeMs,
|
||||
avg($"executionTime") as 'avgTimeMs,
|
||||
stddev($"executionTime") as 'stdDev)
|
||||
.orderBy("name")
|
||||
.show()
|
||||
println(s"""Results: sqlContext.read.json("${experiment.resultPath}")""")
|
||||
}
|
||||
}
|
||||
@ -16,7 +16,7 @@
|
||||
|
||||
package com.databricks.spark.sql.perf.bigdata
|
||||
|
||||
import com.databricks.spark.sql.perf.Benchmark
|
||||
import com.databricks.spark.sql.perf.{ExecutionMode, Benchmark}
|
||||
|
||||
trait Queries extends Benchmark {
|
||||
|
||||
|
||||
@ -16,7 +16,7 @@
|
||||
|
||||
package com.databricks.spark.sql.perf.tpcds
|
||||
|
||||
import com.databricks.spark.sql.perf.Benchmark
|
||||
import com.databricks.spark.sql.perf.{ExecutionMode, Benchmark}
|
||||
|
||||
trait ImpalaKitQueries extends Benchmark {
|
||||
|
||||
|
||||
@ -16,7 +16,7 @@
|
||||
|
||||
package com.databricks.spark.sql.perf.tpcds
|
||||
|
||||
import com.databricks.spark.sql.perf.Benchmark
|
||||
import com.databricks.spark.sql.perf.{ExecutionMode, Benchmark}
|
||||
|
||||
trait SimpleQueries extends Benchmark {
|
||||
|
||||
|
||||
@ -25,12 +25,12 @@ import org.apache.spark.sql.SQLContext
|
||||
* TPC-DS benchmark's dataset.
|
||||
* @param sqlContext An existing SQLContext.
|
||||
*/
|
||||
class TPCDS (
|
||||
@transient sqlContext: SQLContext,
|
||||
resultsLocation: String = "/spark/sql/performance",
|
||||
resultsTableName: String = "sqlPerformance")
|
||||
extends Benchmark(sqlContext, resultsLocation, resultsTableName)
|
||||
with ImpalaKitQueries with SimpleQueries with Tpcds_1_4_Queries with Serializable {
|
||||
class TPCDS
|
||||
extends Benchmark
|
||||
with ImpalaKitQueries
|
||||
with SimpleQueries
|
||||
with Tpcds_1_4_Queries
|
||||
with Serializable {
|
||||
|
||||
/*
|
||||
def setupBroadcast(skipTables: Seq[String] = Seq("store_sales", "customer")) = {
|
||||
|
||||
@ -16,7 +16,7 @@
|
||||
|
||||
package com.databricks.spark.sql.perf.tpcds
|
||||
|
||||
import com.databricks.spark.sql.perf.Benchmark
|
||||
import com.databricks.spark.sql.perf.{ExecutionMode, Benchmark}
|
||||
|
||||
/**
|
||||
* This implements the official TPCDS v1.4 queries with only cosmetic modifications
|
||||
|
||||
@ -6,7 +6,8 @@ import org.scalatest.FunSuite
|
||||
|
||||
class DatasetPerformanceSuite extends FunSuite {
|
||||
test("run benchmark") {
|
||||
val benchmark = new Benchmark(TestHive) with DatasetPerformance {
|
||||
TestHive // Init HiveContext
|
||||
val benchmark = new DatasetPerformance() {
|
||||
override val numLongs = 100
|
||||
}
|
||||
import benchmark._
|
||||
|
||||
Loading…
Reference in New Issue
Block a user