address comments.

This commit is contained in:
Yin Huai 2015-08-14 10:13:11 -07:00
parent 51546868f4
commit d5c3104ec6
2 changed files with 29 additions and 10 deletions

View File

@ -85,6 +85,11 @@ abstract class Benchmark(
case "on" => sqlContext.setConf("spark.sql.unsafe.enabled", "true")
}
val tungsten = Variation("unsafe", Seq("on", "off")) {
case "off" => sqlContext.setConf("spark.sql.tungsten.enabled", "false")
case "on" => sqlContext.setConf("spark.sql.tungsten.enabled", "true")
}
/**
* Starts an experiment run with a given set of queries.
* @param queriesToRun a list of queries to be executed.
@ -335,17 +340,25 @@ abstract class Benchmark(
trait ExecutionMode
object ExecutionMode {
// Benchmark run by collecting queries results (e.g. rdd.collect())
case object CollectResults extends ExecutionMode
case object CollectResults extends ExecutionMode {
override def toString: String = "collect"
}
// Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit))
case object ForeachResults extends ExecutionMode
case object ForeachResults extends ExecutionMode {
override def toString: String = "foreach"
}
// Benchmark run by saving the output of each query as a parquet file at the specified location
case class WriteParquet(location: String) extends ExecutionMode
case class WriteParquet(location: String) extends ExecutionMode {
override def toString: String = "saveToParquet"
}
// Benchmark run by calculating the sum of the hash value of all rows. This is used to check
// query results.
case object HashResults extends ExecutionMode
case object HashResults extends ExecutionMode {
override def toString: String = "hash"
}
}
/** Factory object for benchmark queries. */
@ -454,7 +467,7 @@ abstract class Benchmark(
// to scala.
// The executionTime for the entire query includes the time of type conversion
// from catalyst to scala.
var hashSum: Option[Long] = None
var result: Option[Long] = None
val executionTime = benchmarkMs {
executionMode match {
case ExecutionMode.CollectResults => dataFrame.rdd.collect()
@ -464,14 +477,14 @@ abstract class Benchmark(
case ExecutionMode.HashResults =>
val columnStr = dataFrame.schema.map(_.name).mkString(",")
// SELECT SUM(HASH(col1, col2, ...)) FROM (benchmark query)
val result =
val ret =
dataFrame
.selectExpr(s"hash($columnStr) as hashValue")
.groupBy()
.sum("hashValue")
.head()
.getLong(0)
hashSum = Some(result)
result = Some(ret)
}
}
@ -481,6 +494,7 @@ abstract class Benchmark(
BenchmarkResult(
name = name,
mode = executionMode.toString,
joinTypes = joinTypes,
tables = tablesInvolved,
parsingTime = parsingTime,
@ -488,13 +502,14 @@ abstract class Benchmark(
optimizationTime = optimizationTime,
planningTime = planningTime,
executionTime = executionTime,
hashSum = hashSum,
result = result,
queryExecution = dataFrame.queryExecution.toString,
breakDown = breakdownResults)
} catch {
case e: Exception =>
BenchmarkResult(
name = name,
mode = executionMode.toString,
failure = Failure(e.getClass.getName, e.getMessage))
}
}

View File

@ -48,6 +48,7 @@ case class BenchmarkConfiguration(
/**
* The result of a query.
* @param name The name of the query.
* @param mode The ExecutionMode of this run.
* @param joinTypes The type of join operations in the query.
* @param tables The tables involved in the query.
* @param parsingTime The time used to parse the query.
@ -55,11 +56,14 @@ case class BenchmarkConfiguration(
* @param optimizationTime The time used to optimize the query.
* @param planningTime The time used to plan the query.
* @param executionTime The time used to execute the query.
* @param hashSum sum of hash values calculated from result rows of the query.
* @param result the result of this run. It is not necessarily the result of the query.
* For example, it can be the number of rows generated by this query or
* the sum of hash values of rows generated by this query.
* @param breakDown The breakdown results of the query plan tree.
*/
case class BenchmarkResult(
name: String,
mode: String,
joinTypes: Seq[String] = Nil,
tables: Seq[String] = Nil,
parsingTime: Option[Double] = None,
@ -67,7 +71,7 @@ case class BenchmarkResult(
optimizationTime: Option[Double] = None,
planningTime: Option[Double] = None,
executionTime: Option[Double] = None,
hashSum: Option[Long] = None,
result: Option[Long] = None,
breakDown: Seq[BreakdownResult] = Nil,
queryExecution: Option[String] = None,
failure: Option[Failure] = None)