Merge pull request #7 from marmbrus/fixBreakdown

Fixes to breakdown calculation and table creation.
This commit is contained in:
Yin Huai 2015-08-13 18:39:53 -07:00
commit 1fe1729331
4 changed files with 46 additions and 10 deletions

View File

@ -17,7 +17,10 @@
package com.databricks.spark.sql.perf
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.execution.SparkPlan
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
@ -171,7 +174,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
currentPlan = q.newDataFrame().queryExecution.executedPlan.toString
startTime = System.currentTimeMillis()
val singleResult = q.benchmark(includeBreakdown, setup)
val singleResult = q.benchmark(includeBreakdown, setup, currentMessages)
singleResult.failure.foreach { f =>
failures += 1
currentMessages += s"Query '${q.name}' failed: ${f.message}"
@ -393,7 +396,10 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
(endTime - startTime).toDouble / 1000000
}
def benchmark(includeBreakdown: Boolean, description: String = "") = {
def benchmark(
includeBreakdown: Boolean,
description: String = "",
messages: ArrayBuffer[String]) = {
try {
val dataFrame = buildDataFrame
sparkContext.setJobDescription(s"Query: $name, $description")
@ -413,27 +419,47 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
}
val breakdownResults = if (includeBreakdown) {
val depth = queryExecution.executedPlan.treeString.split("\n").size
val depth = queryExecution.executedPlan.collect { case p: SparkPlan => p }.size
val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i)))
physicalOperators.map {
val indexMap = physicalOperators.map { case (index, op) => (op, index) }.toMap
val timeMap = new mutable.HashMap[Int, Double]
physicalOperators.reverse.map {
case (index, node) =>
messages += s"Breakdown: ${node.simpleString}"
val newNode = buildDataFrame.queryExecution.executedPlan(index)
val executionTime = benchmarkMs {
node.execute().map(_.copy()).foreach(row => Unit)
newNode.execute().foreach((row: Any) => Unit)
}
BreakdownResult(node.nodeName, node.simpleString, index, executionTime)
timeMap += ((index, executionTime))
val childIndexes = node.children.map(indexMap)
val childTime = childIndexes.map(timeMap).sum
messages += s"Breakdown time: $executionTime (+${executionTime - childTime})"
BreakdownResult(
node.nodeName,
node.simpleString.replaceAll("#\\d+", ""),
index,
childIndexes,
executionTime,
executionTime - childTime)
}
} else {
Seq.empty[BreakdownResult]
}
// The executionTime for the entire query includes the time of type conversion from catalyst to scala.
// The executionTime for the entire query includes the time of type conversion from catalyst
// to scala.
// The executionTime for the entire query includes the time of type conversion
// from catalyst to scala.
val executionTime = benchmarkMs {
executionMode match {
case ExecutionMode.CollectResults => dataFrame.rdd.collect()
case ExecutionMode.ForeachResults => dataFrame.rdd.foreach { row => Unit }
case ExecutionMode.WriteParquet(location) => dataFrame.saveAsParquetFile(s"$location/$name.parquet")
case ExecutionMode.WriteParquet(location) =>
dataFrame.saveAsParquetFile(s"$location/$name.parquet")
}
}

View File

@ -0,0 +1,8 @@
package com.databricks.spark.sql
import org.apache.spark.sql.functions._
package object perf {
val runtime =
(col("result.analysisTime") + col("result.optimizationTime") + col("result.planningTime") + col("result.executionTime")).as("runtime")
}

View File

@ -83,6 +83,8 @@ case class BreakdownResult(
nodeName: String,
nodeNameWithArgs: String,
index: Int,
executionTime: Double)
children: Seq[Int],
executionTime: Double,
delta: Double)
case class Failure(className: String, message: String)

View File

@ -37,7 +37,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.util.ContextUtil
class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) {
class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extends Serializable {
import sqlContext.implicits._
def sparkContext = sqlContext.sparkContext