diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index 17088ab..be7fc8b 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -16,34 +16,30 @@ package com.databricks.spark.sql.perf -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.execution.SparkPlan - import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent._ import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} - -import org.apache.spark.sql.catalyst.plans.logical.Subquery +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.functions._ /** * A collection of queries that test a particular aspect of Spark SQL. * * @param sqlContext An existing SQLContext. */ -abstract class Benchmark(@transient protected val sqlContext: SQLContext) +abstract class Benchmark( + @transient protected val sqlContext: SQLContext, + val resultsLocation: String = "/spark/sql/performance", + val resultsTableName: String = "sqlPerformance") extends Serializable { import sqlContext.implicits._ - val resultsLocation = "/spark/sql/performance" - val resultsTableName = "sqlPerformance" - def createResultsTable() = { sqlContext.sql(s"DROP TABLE $resultsTableName") sqlContext.createExternalTable( @@ -89,6 +85,11 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) case "on" => sqlContext.setConf("spark.sql.unsafe.enabled", "true") } + val tungsten = Variation("unsafe", Seq("on", "off")) { + case "off" => sqlContext.setConf("spark.sql.tungsten.enabled", "false") + case "on" => sqlContext.setConf("spark.sql.tungsten.enabled", "true") + } + /** * Starts an experiment run with a given set of queries. * @param queriesToRun a list of queries to be executed. @@ -339,13 +340,25 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) trait ExecutionMode object ExecutionMode { // Benchmark run by collecting queries results (e.g. rdd.collect()) - case object CollectResults extends ExecutionMode + case object CollectResults extends ExecutionMode { + override def toString: String = "collect" + } // Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit)) - case object ForeachResults extends ExecutionMode + case object ForeachResults extends ExecutionMode { + override def toString: String = "foreach" + } // Benchmark run by saving the output of each query as a parquet file at the specified location - case class WriteParquet(location: String) extends ExecutionMode + case class WriteParquet(location: String) extends ExecutionMode { + override def toString: String = "saveToParquet" + } + + // Benchmark run by calculating the sum of the hash value of all rows. This is used to check + // query results. + case object HashResults extends ExecutionMode { + override def toString: String = "hash" + } } /** Factory object for benchmark queries. */ @@ -454,12 +467,24 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) // to scala. // The executionTime for the entire query includes the time of type conversion // from catalyst to scala. + var result: Option[Long] = None val executionTime = benchmarkMs { executionMode match { case ExecutionMode.CollectResults => dataFrame.rdd.collect() case ExecutionMode.ForeachResults => dataFrame.rdd.foreach { row => Unit } case ExecutionMode.WriteParquet(location) => dataFrame.saveAsParquetFile(s"$location/$name.parquet") + case ExecutionMode.HashResults => + val columnStr = dataFrame.schema.map(_.name).mkString(",") + // SELECT SUM(HASH(col1, col2, ...)) FROM (benchmark query) + val ret = + dataFrame + .selectExpr(s"hash($columnStr) as hashValue") + .groupBy() + .sum("hashValue") + .head() + .getLong(0) + result = Some(ret) } } @@ -469,6 +494,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) BenchmarkResult( name = name, + mode = executionMode.toString, joinTypes = joinTypes, tables = tablesInvolved, parsingTime = parsingTime, @@ -476,14 +502,21 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) optimizationTime = optimizationTime, planningTime = planningTime, executionTime = executionTime, + result = result, queryExecution = dataFrame.queryExecution.toString, breakDown = breakdownResults) } catch { case e: Exception => BenchmarkResult( name = name, + mode = executionMode.toString, failure = Failure(e.getClass.getName, e.getMessage)) } } + + /** Change the ExecutionMode of this Query to HashResults, which is used to check the query result. */ + def checkResult: Query = { + new Query(name, buildDataFrame, description, sqlText, ExecutionMode.HashResults) + } } } \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index cd1f491..8cf4708 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -48,6 +48,7 @@ case class BenchmarkConfiguration( /** * The result of a query. * @param name The name of the query. + * @param mode The ExecutionMode of this run. * @param joinTypes The type of join operations in the query. * @param tables The tables involved in the query. * @param parsingTime The time used to parse the query. @@ -55,10 +56,14 @@ case class BenchmarkConfiguration( * @param optimizationTime The time used to optimize the query. * @param planningTime The time used to plan the query. * @param executionTime The time used to execute the query. + * @param result the result of this run. It is not necessarily the result of the query. + * For example, it can be the number of rows generated by this query or + * the sum of hash values of rows generated by this query. * @param breakDown The breakdown results of the query plan tree. */ case class BenchmarkResult( name: String, + mode: String, joinTypes: Seq[String] = Nil, tables: Seq[String] = Nil, parsingTime: Option[Double] = None, @@ -66,6 +71,7 @@ case class BenchmarkResult( optimizationTime: Option[Double] = None, planningTime: Option[Double] = None, executionTime: Option[Double] = None, + result: Option[Long] = None, breakDown: Seq[BreakdownResult] = Nil, queryExecution: Option[String] = None, failure: Option[Failure] = None) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala index dab3fbd..fdda692 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala @@ -23,8 +23,12 @@ import org.apache.spark.sql.SQLContext * TPC-DS benchmark's dataset. * @param sqlContext An existing SQLContext. */ -class TPCDS (@transient sqlContext: SQLContext) - extends Benchmark(sqlContext) with ImpalaKitQueries with SimpleQueries with Serializable { +class TPCDS ( + @transient sqlContext: SQLContext, + resultsLocation: String = "/spark/sql/performance", + resultsTableName: String = "sqlPerformance") + extends Benchmark(sqlContext, resultsLocation, resultsTableName) + with ImpalaKitQueries with SimpleQueries with Serializable { /* def setupBroadcast(skipTables: Seq[String] = Seq("store_sales", "customer")) = {