From 11bfdc7c5a1e9f49393b5e36aefc06d180d94f73 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 13 Aug 2015 13:22:46 -0700 Subject: [PATCH] Add an ExecutionMode to check query results. --- .../databricks/spark/sql/perf/Benchmark.scala | 32 +++++++++++++++---- .../databricks/spark/sql/perf/results.scala | 2 ++ 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index 17088ab..bffdec6 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -16,20 +16,16 @@ package com.databricks.spark.sql.perf -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.execution.SparkPlan - import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.concurrent._ import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} - -import org.apache.spark.sql.catalyst.plans.logical.Subquery +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.functions._ /** * A collection of queries that test a particular aspect of Spark SQL. @@ -346,6 +342,10 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) // Benchmark run by saving the output of each query as a parquet file at the specified location case class WriteParquet(location: String) extends ExecutionMode + + // Benchmark run by calculating the sum of the hash value of all rows. This is used to check + // query results. + case object HashResults extends ExecutionMode } /** Factory object for benchmark queries. */ @@ -454,12 +454,24 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) // to scala. // The executionTime for the entire query includes the time of type conversion // from catalyst to scala. + var hashSum: Option[Long] = None val executionTime = benchmarkMs { executionMode match { case ExecutionMode.CollectResults => dataFrame.rdd.collect() case ExecutionMode.ForeachResults => dataFrame.rdd.foreach { row => Unit } case ExecutionMode.WriteParquet(location) => dataFrame.saveAsParquetFile(s"$location/$name.parquet") + case ExecutionMode.HashResults => + val columnStr = dataFrame.schema.map(_.name).mkString(",") + // SELECT SUM(HASH(col1, col2, ...)) FROM (benchmark query) + val result = + dataFrame + .selectExpr(s"hash($columnStr) as hashValue") + .groupBy() + .sum("hashValue") + .head() + .getLong(0) + hashSum = Some(result) } } @@ -476,6 +488,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) optimizationTime = optimizationTime, planningTime = planningTime, executionTime = executionTime, + hashSum = hashSum, queryExecution = dataFrame.queryExecution.toString, breakDown = breakdownResults) } catch { @@ -485,5 +498,10 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) failure = Failure(e.getClass.getName, e.getMessage)) } } + + /** Change the ExecutionMode of this Query to HashResults, which is used to check the query result. */ + def checkResult: Query = { + new Query(name, buildDataFrame, description, sqlText, ExecutionMode.HashResults) + } } } \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index cd1f491..434ece0 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -55,6 +55,7 @@ case class BenchmarkConfiguration( * @param optimizationTime The time used to optimize the query. * @param planningTime The time used to plan the query. * @param executionTime The time used to execute the query. + * @param hashSum sum of hash values calculated from result rows of the query. * @param breakDown The breakdown results of the query plan tree. */ case class BenchmarkResult( @@ -66,6 +67,7 @@ case class BenchmarkResult( optimizationTime: Option[Double] = None, planningTime: Option[Double] = None, executionTime: Option[Double] = None, + hashSum: Option[Long] = None, breakDown: Seq[BreakdownResult] = Nil, queryExecution: Option[String] = None, failure: Option[Failure] = None)