diff --git a/README.md b/README.md index 4d2a97d..3765bdc 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ val tpcds = new TPCDS (sqlContext = sqlContext) After setup, users can use `runExperiment` function to run benchmarking queries and record query execution time. Taking TPC-DS as an example, you can start an experiment by using ``` -val experiment = tpcds.runExperiment(queriesToRun = tpcds.interactiveQueries) +val experiment = tpcds.runExperiment(tpcds.interactiveQueries) ``` For every experiment run (i.e. every call of `runExperiment`), Spark SQL Perf will use the timestamp of the start time to identify this experiment. Performance results will be stored in the sub-dir named by the timestamp in the given `resultsLocation` (for example `results/1429213883272`). The performance results are stored in the JSON format. diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index 233706f..a5912c9 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -16,6 +16,8 @@ package com.databricks.spark.sql.perf +import java.util.UUID + import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent._ @@ -25,7 +27,7 @@ import scala.util.Try import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, AnalysisException, DataFrame, SQLContext} -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedRelation} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.{SparkContext, SparkEnv} @@ -113,6 +115,7 @@ abstract class Benchmark( * @param variations [[Variation]]s used in this run. The cross product of all variations will be * run for each execution * iteration. * @param tags Tags of this run. + * @param timeout wait at most timeout milliseconds for each query, 0 means wait forever * @return It returns a ExperimentStatus object that can be used to * track the progress of this experiment run. */ @@ -121,7 +124,8 @@ abstract class Benchmark( includeBreakdown: Boolean = false, iterations: Int = 3, variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("true")) { _ => {} }), - tags: Map[String, String] = Map.empty) = { + tags: Map[String, String] = Map.empty, + timeout: Long = 0L) = { class ExperimentStatus { val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]() @@ -152,8 +156,13 @@ abstract class Benchmark( executionsToRun .collect { case query: Query => query } .flatMap { query => - query.newDataFrame().queryExecution.logical.collect { - case UnresolvedRelation(t, _) => t.table + try { + query.newDataFrame().queryExecution.logical.collect { + case UnresolvedRelation(t, _) => t.table + } + } catch { + // ignore the queries that can't be parsed + case e: Exception => Seq() } } .distinct @@ -162,16 +171,19 @@ abstract class Benchmark( sqlContext.table(name) currentMessages += s"Table $name exists." } catch { - case ae: AnalysisException => + case ae: Exception => val table = allTables .find(_.name == name) - .getOrElse(sys.error(s"Couldn't read table $name and its not defined as a Benchmark.Table.")) - - currentMessages += s"Creating table: $name" - table.data - .write - .mode("overwrite") - .saveAsTable(name) + if (table.isDefined) { + currentMessages += s"Creating table: $name" + table.get.data + .write + .mode("overwrite") + .saveAsTable(name) + } else { + // the table could be subquery + println(s"Couldn't read table $name and its not defined as a Benchmark.Table.") + } } } @@ -197,12 +209,18 @@ abstract class Benchmark( currentExecution = q.name currentPlan = q match { - case query: Query => query.newDataFrame().queryExecution.executedPlan.toString() + case query: Query => + try { + query.newDataFrame().queryExecution.executedPlan.toString() + } catch { + case e: Exception => + s"failed to parse: $e" + } case _ => "" } startTime = System.currentTimeMillis() - val singleResult = q.benchmark(includeBreakdown, setup, currentMessages) + val singleResult = q.benchmark(includeBreakdown, setup, currentMessages, timeout) singleResult.failure.foreach { f => failures += 1 currentMessages += s"Execution '${q.name}' failed: ${f.message}" @@ -463,10 +481,11 @@ abstract class Benchmark( final def benchmark( includeBreakdown: Boolean, description: String = "", - messages: ArrayBuffer[String]): BenchmarkResult = { + messages: ArrayBuffer[String], + timeout: Long): BenchmarkResult = { sparkContext.setJobDescription(s"Execution: $name, $description") beforeBenchmark() - val result = doBenchmark(includeBreakdown, description, messages) + val result = runBenchmark(includeBreakdown, description, messages, timeout) afterBenchmark(sqlContext.sparkContext) result } @@ -482,6 +501,34 @@ abstract class Benchmark( .foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) } } + private def runBenchmark( + includeBreakdown: Boolean, + description: String = "", + messages: ArrayBuffer[String], + timeout: Long): BenchmarkResult = { + val jobgroup = UUID.randomUUID().toString + var result: BenchmarkResult = null + val thread = new Thread("benchmark runner") { + override def run(): Unit = { + sparkContext.setJobGroup(jobgroup, s"benchmark $name", true) + result = doBenchmark(includeBreakdown, description, messages) + } + } + thread.setDaemon(true) + thread.start() + thread.join(timeout) + if (thread.isAlive) { + sparkContext.cancelJobGroup(jobgroup) + thread.interrupt() + result = BenchmarkResult( + name = name, + mode = executionMode.toString, + failure = Some(Failure("Timeout", s"timeout after ${timeout / 1000} seconds")) + ) + } + result + } + protected def doBenchmark( includeBreakdown: Boolean, description: String = "", @@ -546,11 +593,22 @@ abstract class Benchmark( override val executionMode: ExecutionMode = ExecutionMode.ForeachResults) extends Benchmarkable with Serializable { - override def toString = - s""" - |== Query: $name == - |${buildDataFrame.queryExecution.analyzed} + override def toString = { + try { + s""" + |== Query: $name == + |${buildDataFrame.queryExecution.analyzed} """.stripMargin + } catch { + case e: Exception => + s""" + |== Query: $name == + | Can't be analyzed: $e + | + | $description + """.stripMargin + } + } lazy val tablesInvolved = buildDataFrame.queryExecution.logical collect { case UnresolvedRelation(tableIdentifier, _) => { diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala index 8f6beaf..b2a07cf 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala @@ -72,20 +72,39 @@ class TPCDS ( println(succeeded.map("\"" + _ + "\"")) } - def run(queries: Seq[Query], numRows: Int = 1): Unit = { + def run(queries: Seq[Query], numRows: Int = 1, timeout: Int = 0): Unit = { val succeeded = mutable.ArrayBuffer.empty[String] queries.foreach { q => println(s"Query: ${q.name}") - try { - val start = System.currentTimeMillis() - val df = sqlContext.sql(q.sqlText.get) - df.show(numRows) - succeeded += q.name - println(s" Took: ${System.currentTimeMillis() - start} ms") - println("------------------------------------------------------------------") - } catch { - case e: Exception => - println("Failed to run: " + e) + val start = System.currentTimeMillis() + val df = sqlContext.sql(q.sqlText.get) + var failed = false + val jobgroup = s"benchmark ${q.name}" + val t = new Thread("query runner") { + override def run(): Unit = { + try { + sqlContext.sparkContext.setJobGroup(jobgroup, jobgroup, true) + df.show(numRows) + } catch { + case e: Exception => + println("Failed to run: " + e) + failed = true + } + } + } + t.setDaemon(true) + t.start() + t.join(timeout) + if (t.isAlive) { + println(s"Timeout after $timeout seconds") + sqlContext.sparkContext.cancelJobGroup(jobgroup) + t.interrupt() + } else { + if (!failed) { + succeeded += q.name + println(s" Took: ${System.currentTimeMillis() - start} ms") + println("------------------------------------------------------------------") + } } } println(s"Ran ${succeeded.size} out of ${queries.size}") diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS_1_4_Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS_1_4_Queries.scala index 1cdd272..bf88672 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS_1_4_Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS_1_4_Queries.scala @@ -3840,7 +3840,24 @@ trait Tpcds_1_4_Queries extends Benchmark { | substr(w_warehouse_name,1,20), sm_type, cc_name | order by substr(w_warehouse_name,1,20), sm_type, cc_name | limit 100 - """.stripMargin) + """.stripMargin), + ("qSsMax", + """ + |select + | count(*) as total, + | count(ss_sold_date_sk) as not_null_total, + | count(distinct ss_sold_date_sk) as unique_days, + | max(ss_sold_date_sk) as max_ss_sold_date_sk, + | max(ss_sold_time_sk) as max_ss_sold_time_sk, + | max(ss_item_sk) as max_ss_item_sk, + | max(ss_customer_sk) as max_ss_customer_sk, + | max(ss_cdemo_sk) as max_ss_cdemo_sk, + | max(ss_hdemo_sk) as max_ss_hdemo_sk, + | max(ss_addr_sk) as max_ss_addr_sk, + | max(ss_store_sk) as max_ss_store_sk, + | max(ss_promo_sk) as max_ss_promo_sk + |from store_sales + """.stripMargin) ).map { case (name, sqlText) => Query(name + "-v1.4", sqlText, description = "TPCDS 1.4 Query", executionMode = CollectResults) } @@ -3851,14 +3868,14 @@ trait Tpcds_1_4_Queries extends Benchmark { "q19", "q21", "q25", "q26", "q28", "q29", "q31", "q34", "q37", "q38", "q39a", "q39b", "q40", "q42", "q43", "q46", "q48", "q52", "q55", "q59", "q64", "q65", "q66", "q68", "q71", "q72", "q73", "q74", "q75", "q76", "q78", "q79", "q82", "q84", "q85", "q87", "q88", "q90", "q91", - "q93", "q96", "q97").map(tpcds1_4QueriesMap) + "q93", "q96", "q97", "qSsMax").map(tpcds1_4QueriesMap) // Queries that are plannable using HiveQL val hiveDialectPlannableQueries = Seq("q3", "q4", "q7", "q11", "q13", "q15", "q17", "q19", "q21", "q25", "q26", "q28", "q29", "q31", "q34", "q37", "q39a", "q39b", "q40", "q42", "q43", "q46", "q47", "q48", "q49", "q51", "q52", "q53", "q55", "q57", "q59", "q63", "q64", "q65", "q68", "q71", "q72", "q73", "q74", "q75", "q76", "q78", "q79", "q82", "q84", "q85", "q88", "q89", - "q90", "q91", "q93", "q96", "q97").map(tpcds1_4QueriesMap) + "q90", "q91", "q93", "q96", "q97", "qSsMax").map(tpcds1_4QueriesMap) // check results q17, q25, q91 // OOM: q4, q11, q13, q48, q65, q78, q85 @@ -3868,5 +3885,5 @@ trait Tpcds_1_4_Queries extends Benchmark { val sqlDialectRunnable: Seq[Query] = Seq("q2", "q3", "q7", "q8", "q15", "q17", "q19", "q21", "q25", "q26", "q28", "q29", "q31", "q34", "q37", "q38", "q39a", "q39b", "q40", "q42", "q43", "q46", "q52", "q55", "q59", "q66", "q68", "q71", "q72", "q73", "q74", "q75", "q76", "q79", - "q82", "q84", "q87", "q88", "q90", "q91", "q93", "q96", "q97").map(tpcds1_4QueriesMap) + "q82", "q84", "q87", "q88", "q90", "q91", "q93", "q96", "q97", "qSsMax").map(tpcds1_4QueriesMap) }