diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index de8728d..be7fc8b 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -85,6 +85,11 @@ abstract class Benchmark( case "on" => sqlContext.setConf("spark.sql.unsafe.enabled", "true") } + val tungsten = Variation("unsafe", Seq("on", "off")) { + case "off" => sqlContext.setConf("spark.sql.tungsten.enabled", "false") + case "on" => sqlContext.setConf("spark.sql.tungsten.enabled", "true") + } + /** * Starts an experiment run with a given set of queries. * @param queriesToRun a list of queries to be executed. @@ -335,17 +340,25 @@ abstract class Benchmark( trait ExecutionMode object ExecutionMode { // Benchmark run by collecting queries results (e.g. rdd.collect()) - case object CollectResults extends ExecutionMode + case object CollectResults extends ExecutionMode { + override def toString: String = "collect" + } // Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit)) - case object ForeachResults extends ExecutionMode + case object ForeachResults extends ExecutionMode { + override def toString: String = "foreach" + } // Benchmark run by saving the output of each query as a parquet file at the specified location - case class WriteParquet(location: String) extends ExecutionMode + case class WriteParquet(location: String) extends ExecutionMode { + override def toString: String = "saveToParquet" + } // Benchmark run by calculating the sum of the hash value of all rows. This is used to check // query results. - case object HashResults extends ExecutionMode + case object HashResults extends ExecutionMode { + override def toString: String = "hash" + } } /** Factory object for benchmark queries. */ @@ -454,7 +467,7 @@ abstract class Benchmark( // to scala. // The executionTime for the entire query includes the time of type conversion // from catalyst to scala. - var hashSum: Option[Long] = None + var result: Option[Long] = None val executionTime = benchmarkMs { executionMode match { case ExecutionMode.CollectResults => dataFrame.rdd.collect() @@ -464,14 +477,14 @@ abstract class Benchmark( case ExecutionMode.HashResults => val columnStr = dataFrame.schema.map(_.name).mkString(",") // SELECT SUM(HASH(col1, col2, ...)) FROM (benchmark query) - val result = + val ret = dataFrame .selectExpr(s"hash($columnStr) as hashValue") .groupBy() .sum("hashValue") .head() .getLong(0) - hashSum = Some(result) + result = Some(ret) } } @@ -481,6 +494,7 @@ abstract class Benchmark( BenchmarkResult( name = name, + mode = executionMode.toString, joinTypes = joinTypes, tables = tablesInvolved, parsingTime = parsingTime, @@ -488,13 +502,14 @@ abstract class Benchmark( optimizationTime = optimizationTime, planningTime = planningTime, executionTime = executionTime, - hashSum = hashSum, + result = result, queryExecution = dataFrame.queryExecution.toString, breakDown = breakdownResults) } catch { case e: Exception => BenchmarkResult( name = name, + mode = executionMode.toString, failure = Failure(e.getClass.getName, e.getMessage)) } } diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index 434ece0..8cf4708 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -48,6 +48,7 @@ case class BenchmarkConfiguration( /** * The result of a query. * @param name The name of the query. + * @param mode The ExecutionMode of this run. * @param joinTypes The type of join operations in the query. * @param tables The tables involved in the query. * @param parsingTime The time used to parse the query. @@ -55,11 +56,14 @@ case class BenchmarkConfiguration( * @param optimizationTime The time used to optimize the query. * @param planningTime The time used to plan the query. * @param executionTime The time used to execute the query. - * @param hashSum sum of hash values calculated from result rows of the query. + * @param result the result of this run. It is not necessarily the result of the query. + * For example, it can be the number of rows generated by this query or + * the sum of hash values of rows generated by this query. * @param breakDown The breakdown results of the query plan tree. */ case class BenchmarkResult( name: String, + mode: String, joinTypes: Seq[String] = Nil, tables: Seq[String] = Nil, parsingTime: Option[Double] = None, @@ -67,7 +71,7 @@ case class BenchmarkResult( optimizationTime: Option[Double] = None, planningTime: Option[Double] = None, executionTime: Option[Double] = None, - hashSum: Option[Long] = None, + result: Option[Long] = None, breakDown: Seq[BreakdownResult] = Nil, queryExecution: Option[String] = None, failure: Option[Failure] = None)