try to run all TPCDS queries in benchmark (even can't be parsed)

This commit is contained in:
Davies Liu 2016-01-08 15:03:44 -08:00
parent 3105219fb0
commit cec648ac0f
4 changed files with 130 additions and 36 deletions

View File

@ -35,7 +35,7 @@ val tpcds = new TPCDS (sqlContext = sqlContext)
After setup, users can use `runExperiment` function to run benchmarking queries and record query execution time. Taking TPC-DS as an example, you can start an experiment by using
```
val experiment = tpcds.runExperiment(queriesToRun = tpcds.interactiveQueries)
val experiment = tpcds.runExperiment(tpcds.interactiveQueries)
```
For every experiment run (i.e. every call of `runExperiment`), Spark SQL Perf will use the timestamp of the start time to identify this experiment. Performance results will be stored in the sub-dir named by the timestamp in the given `resultsLocation` (for example `results/1429213883272`). The performance results are stored in the JSON format.

View File

@ -16,6 +16,8 @@
package com.databricks.spark.sql.perf
import java.util.UUID
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
@ -25,7 +27,7 @@ import scala.util.Try
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, AnalysisException, DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedRelation}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.{SparkContext, SparkEnv}
@ -113,6 +115,7 @@ abstract class Benchmark(
* @param variations [[Variation]]s used in this run. The cross product of all variations will be
* run for each execution * iteration.
* @param tags Tags of this run.
* @param timeout wait at most timeout milliseconds for each query, 0 means wait forever
* @return It returns a ExperimentStatus object that can be used to
* track the progress of this experiment run.
*/
@ -121,7 +124,8 @@ abstract class Benchmark(
includeBreakdown: Boolean = false,
iterations: Int = 3,
variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("true")) { _ => {} }),
tags: Map[String, String] = Map.empty) = {
tags: Map[String, String] = Map.empty,
timeout: Long = 0L) = {
class ExperimentStatus {
val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]()
@ -152,8 +156,13 @@ abstract class Benchmark(
executionsToRun
.collect { case query: Query => query }
.flatMap { query =>
query.newDataFrame().queryExecution.logical.collect {
case UnresolvedRelation(t, _) => t.table
try {
query.newDataFrame().queryExecution.logical.collect {
case UnresolvedRelation(t, _) => t.table
}
} catch {
// ignore the queries that can't be parsed
case e: Exception => Seq()
}
}
.distinct
@ -162,16 +171,19 @@ abstract class Benchmark(
sqlContext.table(name)
currentMessages += s"Table $name exists."
} catch {
case ae: AnalysisException =>
case ae: Exception =>
val table = allTables
.find(_.name == name)
.getOrElse(sys.error(s"Couldn't read table $name and its not defined as a Benchmark.Table."))
currentMessages += s"Creating table: $name"
table.data
.write
.mode("overwrite")
.saveAsTable(name)
if (table.isDefined) {
currentMessages += s"Creating table: $name"
table.get.data
.write
.mode("overwrite")
.saveAsTable(name)
} else {
// the table could be subquery
println(s"Couldn't read table $name and its not defined as a Benchmark.Table.")
}
}
}
@ -197,12 +209,18 @@ abstract class Benchmark(
currentExecution = q.name
currentPlan = q match {
case query: Query => query.newDataFrame().queryExecution.executedPlan.toString()
case query: Query =>
try {
query.newDataFrame().queryExecution.executedPlan.toString()
} catch {
case e: Exception =>
s"failed to parse: $e"
}
case _ => ""
}
startTime = System.currentTimeMillis()
val singleResult = q.benchmark(includeBreakdown, setup, currentMessages)
val singleResult = q.benchmark(includeBreakdown, setup, currentMessages, timeout)
singleResult.failure.foreach { f =>
failures += 1
currentMessages += s"Execution '${q.name}' failed: ${f.message}"
@ -463,10 +481,11 @@ abstract class Benchmark(
final def benchmark(
includeBreakdown: Boolean,
description: String = "",
messages: ArrayBuffer[String]): BenchmarkResult = {
messages: ArrayBuffer[String],
timeout: Long): BenchmarkResult = {
sparkContext.setJobDescription(s"Execution: $name, $description")
beforeBenchmark()
val result = doBenchmark(includeBreakdown, description, messages)
val result = runBenchmark(includeBreakdown, description, messages, timeout)
afterBenchmark(sqlContext.sparkContext)
result
}
@ -482,6 +501,34 @@ abstract class Benchmark(
.foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) }
}
private def runBenchmark(
includeBreakdown: Boolean,
description: String = "",
messages: ArrayBuffer[String],
timeout: Long): BenchmarkResult = {
val jobgroup = UUID.randomUUID().toString
var result: BenchmarkResult = null
val thread = new Thread("benchmark runner") {
override def run(): Unit = {
sparkContext.setJobGroup(jobgroup, s"benchmark $name", true)
result = doBenchmark(includeBreakdown, description, messages)
}
}
thread.setDaemon(true)
thread.start()
thread.join(timeout)
if (thread.isAlive) {
sparkContext.cancelJobGroup(jobgroup)
thread.interrupt()
result = BenchmarkResult(
name = name,
mode = executionMode.toString,
failure = Some(Failure("Timeout", s"timeout after ${timeout / 1000} seconds"))
)
}
result
}
protected def doBenchmark(
includeBreakdown: Boolean,
description: String = "",
@ -546,11 +593,22 @@ abstract class Benchmark(
override val executionMode: ExecutionMode = ExecutionMode.ForeachResults)
extends Benchmarkable with Serializable {
override def toString =
s"""
|== Query: $name ==
|${buildDataFrame.queryExecution.analyzed}
override def toString = {
try {
s"""
|== Query: $name ==
|${buildDataFrame.queryExecution.analyzed}
""".stripMargin
} catch {
case e: Exception =>
s"""
|== Query: $name ==
| Can't be analyzed: $e
|
| $description
""".stripMargin
}
}
lazy val tablesInvolved = buildDataFrame.queryExecution.logical collect {
case UnresolvedRelation(tableIdentifier, _) => {

View File

@ -72,20 +72,39 @@ class TPCDS (
println(succeeded.map("\"" + _ + "\""))
}
def run(queries: Seq[Query], numRows: Int = 1): Unit = {
def run(queries: Seq[Query], numRows: Int = 1, timeout: Int = 0): Unit = {
val succeeded = mutable.ArrayBuffer.empty[String]
queries.foreach { q =>
println(s"Query: ${q.name}")
try {
val start = System.currentTimeMillis()
val df = sqlContext.sql(q.sqlText.get)
df.show(numRows)
succeeded += q.name
println(s" Took: ${System.currentTimeMillis() - start} ms")
println("------------------------------------------------------------------")
} catch {
case e: Exception =>
println("Failed to run: " + e)
val start = System.currentTimeMillis()
val df = sqlContext.sql(q.sqlText.get)
var failed = false
val jobgroup = s"benchmark ${q.name}"
val t = new Thread("query runner") {
override def run(): Unit = {
try {
sqlContext.sparkContext.setJobGroup(jobgroup, jobgroup, true)
df.show(numRows)
} catch {
case e: Exception =>
println("Failed to run: " + e)
failed = true
}
}
}
t.setDaemon(true)
t.start()
t.join(timeout)
if (t.isAlive) {
println(s"Timeout after $timeout seconds")
sqlContext.sparkContext.cancelJobGroup(jobgroup)
t.interrupt()
} else {
if (!failed) {
succeeded += q.name
println(s" Took: ${System.currentTimeMillis() - start} ms")
println("------------------------------------------------------------------")
}
}
}
println(s"Ran ${succeeded.size} out of ${queries.size}")

View File

@ -3840,7 +3840,24 @@ trait Tpcds_1_4_Queries extends Benchmark {
| substr(w_warehouse_name,1,20), sm_type, cc_name
| order by substr(w_warehouse_name,1,20), sm_type, cc_name
| limit 100
""".stripMargin)
""".stripMargin),
("qSsMax",
"""
|select
| count(*) as total,
| count(ss_sold_date_sk) as not_null_total,
| count(distinct ss_sold_date_sk) as unique_days,
| max(ss_sold_date_sk) as max_ss_sold_date_sk,
| max(ss_sold_time_sk) as max_ss_sold_time_sk,
| max(ss_item_sk) as max_ss_item_sk,
| max(ss_customer_sk) as max_ss_customer_sk,
| max(ss_cdemo_sk) as max_ss_cdemo_sk,
| max(ss_hdemo_sk) as max_ss_hdemo_sk,
| max(ss_addr_sk) as max_ss_addr_sk,
| max(ss_store_sk) as max_ss_store_sk,
| max(ss_promo_sk) as max_ss_promo_sk
|from store_sales
""".stripMargin)
).map { case (name, sqlText) =>
Query(name + "-v1.4", sqlText, description = "TPCDS 1.4 Query", executionMode = CollectResults)
}
@ -3851,14 +3868,14 @@ trait Tpcds_1_4_Queries extends Benchmark {
"q19", "q21", "q25", "q26", "q28", "q29", "q31", "q34", "q37", "q38", "q39a", "q39b", "q40",
"q42", "q43", "q46", "q48", "q52", "q55", "q59", "q64", "q65", "q66", "q68", "q71", "q72",
"q73", "q74", "q75", "q76", "q78", "q79", "q82", "q84", "q85", "q87", "q88", "q90", "q91",
"q93", "q96", "q97").map(tpcds1_4QueriesMap)
"q93", "q96", "q97", "qSsMax").map(tpcds1_4QueriesMap)
// Queries that are plannable using HiveQL
val hiveDialectPlannableQueries = Seq("q3", "q4", "q7", "q11", "q13", "q15", "q17", "q19", "q21",
"q25", "q26", "q28", "q29", "q31", "q34", "q37", "q39a", "q39b", "q40", "q42", "q43", "q46",
"q47", "q48", "q49", "q51", "q52", "q53", "q55", "q57", "q59", "q63", "q64", "q65", "q68",
"q71", "q72", "q73", "q74", "q75", "q76", "q78", "q79", "q82", "q84", "q85", "q88", "q89",
"q90", "q91", "q93", "q96", "q97").map(tpcds1_4QueriesMap)
"q90", "q91", "q93", "q96", "q97", "qSsMax").map(tpcds1_4QueriesMap)
// check results q17, q25, q91
// OOM: q4, q11, q13, q48, q65, q78, q85
@ -3868,5 +3885,5 @@ trait Tpcds_1_4_Queries extends Benchmark {
val sqlDialectRunnable: Seq[Query] = Seq("q2", "q3", "q7", "q8", "q15", "q17", "q19", "q21",
"q25", "q26", "q28", "q29", "q31", "q34", "q37", "q38", "q39a", "q39b", "q40", "q42", "q43",
"q46", "q52", "q55", "q59", "q66", "q68", "q71", "q72", "q73", "q74", "q75", "q76", "q79",
"q82", "q84", "q87", "q88", "q90", "q91", "q93", "q96", "q97").map(tpcds1_4QueriesMap)
"q82", "q84", "q87", "q88", "q90", "q91", "q93", "q96", "q97", "qSsMax").map(tpcds1_4QueriesMap)
}