Merge pull request #8 from yhuai/hashSum
Add a new ExecutionMode to calculate the sum of hash values of result rows.
This commit is contained in:
commit
d6f89c862d
@ -16,34 +16,30 @@
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||
import org.apache.spark.sql.execution.SparkPlan
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.concurrent._
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{Path, FileSystem}
|
||||
import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
|
||||
|
||||
import org.apache.spark.sql.catalyst.plans.logical.Subquery
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||
import org.apache.spark.sql.execution.SparkPlan
|
||||
import org.apache.spark.sql.functions._
|
||||
|
||||
/**
|
||||
* A collection of queries that test a particular aspect of Spark SQL.
|
||||
*
|
||||
* @param sqlContext An existing SQLContext.
|
||||
*/
|
||||
abstract class Benchmark(@transient protected val sqlContext: SQLContext)
|
||||
abstract class Benchmark(
|
||||
@transient protected val sqlContext: SQLContext,
|
||||
val resultsLocation: String = "/spark/sql/performance",
|
||||
val resultsTableName: String = "sqlPerformance")
|
||||
extends Serializable {
|
||||
|
||||
import sqlContext.implicits._
|
||||
|
||||
val resultsLocation = "/spark/sql/performance"
|
||||
val resultsTableName = "sqlPerformance"
|
||||
|
||||
def createResultsTable() = {
|
||||
sqlContext.sql(s"DROP TABLE $resultsTableName")
|
||||
sqlContext.createExternalTable(
|
||||
@ -89,6 +85,11 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
|
||||
case "on" => sqlContext.setConf("spark.sql.unsafe.enabled", "true")
|
||||
}
|
||||
|
||||
val tungsten = Variation("unsafe", Seq("on", "off")) {
|
||||
case "off" => sqlContext.setConf("spark.sql.tungsten.enabled", "false")
|
||||
case "on" => sqlContext.setConf("spark.sql.tungsten.enabled", "true")
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts an experiment run with a given set of queries.
|
||||
* @param queriesToRun a list of queries to be executed.
|
||||
@ -339,13 +340,25 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
|
||||
trait ExecutionMode
|
||||
object ExecutionMode {
|
||||
// Benchmark run by collecting queries results (e.g. rdd.collect())
|
||||
case object CollectResults extends ExecutionMode
|
||||
case object CollectResults extends ExecutionMode {
|
||||
override def toString: String = "collect"
|
||||
}
|
||||
|
||||
// Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit))
|
||||
case object ForeachResults extends ExecutionMode
|
||||
case object ForeachResults extends ExecutionMode {
|
||||
override def toString: String = "foreach"
|
||||
}
|
||||
|
||||
// Benchmark run by saving the output of each query as a parquet file at the specified location
|
||||
case class WriteParquet(location: String) extends ExecutionMode
|
||||
case class WriteParquet(location: String) extends ExecutionMode {
|
||||
override def toString: String = "saveToParquet"
|
||||
}
|
||||
|
||||
// Benchmark run by calculating the sum of the hash value of all rows. This is used to check
|
||||
// query results.
|
||||
case object HashResults extends ExecutionMode {
|
||||
override def toString: String = "hash"
|
||||
}
|
||||
}
|
||||
|
||||
/** Factory object for benchmark queries. */
|
||||
@ -454,12 +467,24 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
|
||||
// to scala.
|
||||
// The executionTime for the entire query includes the time of type conversion
|
||||
// from catalyst to scala.
|
||||
var result: Option[Long] = None
|
||||
val executionTime = benchmarkMs {
|
||||
executionMode match {
|
||||
case ExecutionMode.CollectResults => dataFrame.rdd.collect()
|
||||
case ExecutionMode.ForeachResults => dataFrame.rdd.foreach { row => Unit }
|
||||
case ExecutionMode.WriteParquet(location) =>
|
||||
dataFrame.saveAsParquetFile(s"$location/$name.parquet")
|
||||
case ExecutionMode.HashResults =>
|
||||
val columnStr = dataFrame.schema.map(_.name).mkString(",")
|
||||
// SELECT SUM(HASH(col1, col2, ...)) FROM (benchmark query)
|
||||
val ret =
|
||||
dataFrame
|
||||
.selectExpr(s"hash($columnStr) as hashValue")
|
||||
.groupBy()
|
||||
.sum("hashValue")
|
||||
.head()
|
||||
.getLong(0)
|
||||
result = Some(ret)
|
||||
}
|
||||
}
|
||||
|
||||
@ -469,6 +494,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
|
||||
|
||||
BenchmarkResult(
|
||||
name = name,
|
||||
mode = executionMode.toString,
|
||||
joinTypes = joinTypes,
|
||||
tables = tablesInvolved,
|
||||
parsingTime = parsingTime,
|
||||
@ -476,14 +502,21 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
|
||||
optimizationTime = optimizationTime,
|
||||
planningTime = planningTime,
|
||||
executionTime = executionTime,
|
||||
result = result,
|
||||
queryExecution = dataFrame.queryExecution.toString,
|
||||
breakDown = breakdownResults)
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
BenchmarkResult(
|
||||
name = name,
|
||||
mode = executionMode.toString,
|
||||
failure = Failure(e.getClass.getName, e.getMessage))
|
||||
}
|
||||
}
|
||||
|
||||
/** Change the ExecutionMode of this Query to HashResults, which is used to check the query result. */
|
||||
def checkResult: Query = {
|
||||
new Query(name, buildDataFrame, description, sqlText, ExecutionMode.HashResults)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -48,6 +48,7 @@ case class BenchmarkConfiguration(
|
||||
/**
|
||||
* The result of a query.
|
||||
* @param name The name of the query.
|
||||
* @param mode The ExecutionMode of this run.
|
||||
* @param joinTypes The type of join operations in the query.
|
||||
* @param tables The tables involved in the query.
|
||||
* @param parsingTime The time used to parse the query.
|
||||
@ -55,10 +56,14 @@ case class BenchmarkConfiguration(
|
||||
* @param optimizationTime The time used to optimize the query.
|
||||
* @param planningTime The time used to plan the query.
|
||||
* @param executionTime The time used to execute the query.
|
||||
* @param result the result of this run. It is not necessarily the result of the query.
|
||||
* For example, it can be the number of rows generated by this query or
|
||||
* the sum of hash values of rows generated by this query.
|
||||
* @param breakDown The breakdown results of the query plan tree.
|
||||
*/
|
||||
case class BenchmarkResult(
|
||||
name: String,
|
||||
mode: String,
|
||||
joinTypes: Seq[String] = Nil,
|
||||
tables: Seq[String] = Nil,
|
||||
parsingTime: Option[Double] = None,
|
||||
@ -66,6 +71,7 @@ case class BenchmarkResult(
|
||||
optimizationTime: Option[Double] = None,
|
||||
planningTime: Option[Double] = None,
|
||||
executionTime: Option[Double] = None,
|
||||
result: Option[Long] = None,
|
||||
breakDown: Seq[BreakdownResult] = Nil,
|
||||
queryExecution: Option[String] = None,
|
||||
failure: Option[Failure] = None)
|
||||
|
||||
@ -23,8 +23,12 @@ import org.apache.spark.sql.SQLContext
|
||||
* TPC-DS benchmark's dataset.
|
||||
* @param sqlContext An existing SQLContext.
|
||||
*/
|
||||
class TPCDS (@transient sqlContext: SQLContext)
|
||||
extends Benchmark(sqlContext) with ImpalaKitQueries with SimpleQueries with Serializable {
|
||||
class TPCDS (
|
||||
@transient sqlContext: SQLContext,
|
||||
resultsLocation: String = "/spark/sql/performance",
|
||||
resultsTableName: String = "sqlPerformance")
|
||||
extends Benchmark(sqlContext, resultsLocation, resultsTableName)
|
||||
with ImpalaKitQueries with SimpleQueries with Serializable {
|
||||
|
||||
/*
|
||||
def setupBroadcast(skipTables: Seq[String] = Seq("store_sales", "customer")) = {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user