Add an ExecutionMode to check query results.

This commit is contained in:
Yin Huai 2015-08-13 13:22:46 -07:00
parent 1fe1729331
commit 11bfdc7c5a
2 changed files with 27 additions and 7 deletions

View File

@ -16,20 +16,16 @@
package com.databricks.spark.sql.perf
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.execution.SparkPlan
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical.Subquery
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.functions._
/**
* A collection of queries that test a particular aspect of Spark SQL.
@ -346,6 +342,10 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
// Benchmark run by saving the output of each query as a parquet file at the specified location
case class WriteParquet(location: String) extends ExecutionMode
// Benchmark run by calculating the sum of the hash value of all rows. This is used to check
// query results.
case object HashResults extends ExecutionMode
}
/** Factory object for benchmark queries. */
@ -454,12 +454,24 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
// to scala.
// The executionTime for the entire query includes the time of type conversion
// from catalyst to scala.
var hashSum: Option[Long] = None
val executionTime = benchmarkMs {
executionMode match {
case ExecutionMode.CollectResults => dataFrame.rdd.collect()
case ExecutionMode.ForeachResults => dataFrame.rdd.foreach { row => Unit }
case ExecutionMode.WriteParquet(location) =>
dataFrame.saveAsParquetFile(s"$location/$name.parquet")
case ExecutionMode.HashResults =>
val columnStr = dataFrame.schema.map(_.name).mkString(",")
// SELECT SUM(HASH(col1, col2, ...)) FROM (benchmark query)
val result =
dataFrame
.selectExpr(s"hash($columnStr) as hashValue")
.groupBy()
.sum("hashValue")
.head()
.getLong(0)
hashSum = Some(result)
}
}
@ -476,6 +488,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
optimizationTime = optimizationTime,
planningTime = planningTime,
executionTime = executionTime,
hashSum = hashSum,
queryExecution = dataFrame.queryExecution.toString,
breakDown = breakdownResults)
} catch {
@ -485,5 +498,10 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
failure = Failure(e.getClass.getName, e.getMessage))
}
}
/** Change the ExecutionMode of this Query to HashResults, which is used to check the query result. */
def checkResult: Query = {
new Query(name, buildDataFrame, description, sqlText, ExecutionMode.HashResults)
}
}
}

View File

@ -55,6 +55,7 @@ case class BenchmarkConfiguration(
* @param optimizationTime The time used to optimize the query.
* @param planningTime The time used to plan the query.
* @param executionTime The time used to execute the query.
* @param hashSum sum of hash values calculated from result rows of the query.
* @param breakDown The breakdown results of the query plan tree.
*/
case class BenchmarkResult(
@ -66,6 +67,7 @@ case class BenchmarkResult(
optimizationTime: Option[Double] = None,
planningTime: Option[Double] = None,
executionTime: Option[Double] = None,
hashSum: Option[Long] = None,
breakDown: Seq[BreakdownResult] = Nil,
queryExecution: Option[String] = None,
failure: Option[Failure] = None)