From 4101a1e96864e7120cb27b1217a57ffa8c8c62d7 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 13 Aug 2015 15:47:01 -0700 Subject: [PATCH 1/2] Fixes to breakdown calculation and table creation. --- .../databricks/spark/sql/perf/Benchmark.scala | 42 +++++++++++++++---- .../databricks/spark/sql/perf/package.scala | 8 ++++ .../databricks/spark/sql/perf/results.scala | 4 +- .../spark/sql/perf/tpcds/Tables.scala | 2 +- 4 files changed, 46 insertions(+), 10 deletions(-) create mode 100644 src/main/scala/com/databricks/spark/sql/perf/package.scala diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index 5b1119c..9a62bee 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -17,7 +17,10 @@ package com.databricks.spark.sql.perf import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.execution.SparkPlan +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import scala.concurrent._ import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global @@ -171,7 +174,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) currentPlan = q.newDataFrame().queryExecution.executedPlan.toString startTime = System.currentTimeMillis() - val singleResult = q.benchmark(includeBreakdown, setup) + val singleResult = q.benchmark(includeBreakdown, setup, currentMessages) singleResult.failure.foreach { f => failures += 1 currentMessages += s"Query '${q.name}' failed: ${f.message}" @@ -393,7 +396,10 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) (endTime - startTime).toDouble / 1000000 } - def benchmark(includeBreakdown: Boolean, description: String = "") = { + def benchmark( + includeBreakdown: Boolean, + description: String = "", + messages: ArrayBuffer[String]) = { try { val dataFrame = buildDataFrame sparkContext.setJobDescription(s"Query: $name, $description") @@ -413,27 +419,47 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) } val breakdownResults = if (includeBreakdown) { - val depth = queryExecution.executedPlan.treeString.split("\n").size + val depth = queryExecution.executedPlan.collect { case p: SparkPlan => p }.size val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i))) - physicalOperators.map { + val indexMap = physicalOperators.map { case (index, op) => (op, index) }.toMap + val timeMap = new mutable.HashMap[Int, Double] + + physicalOperators.reverse.map { case (index, node) => + messages += s"Breakdown: ${node.simpleString}" + val newNode = buildDataFrame.queryExecution.executedPlan(index) val executionTime = benchmarkMs { - node.execute().map(_.copy()).foreach(row => Unit) + node.execute().foreach((row: Any) => Unit) } - BreakdownResult(node.nodeName, node.simpleString, index, executionTime) + timeMap += ((index, executionTime)) + + val childIndexes = node.children.map(indexMap) + val childTime = childIndexes.map(timeMap).sum + + messages += s"Breakdown time: $executionTime (+${executionTime - childTime})" + + BreakdownResult( + node.nodeName, + node.simpleString.replaceAll("#\\d+", ""), + index, + childIndexes, + executionTime, + executionTime - childTime) } } else { Seq.empty[BreakdownResult] } - // The executionTime for the entire query includes the time of type conversion from catalyst to scala. + // The executionTime for the entire query includes the time of type conversion from catalyst + // to scala. // The executionTime for the entire query includes the time of type conversion // from catalyst to scala. val executionTime = benchmarkMs { executionMode match { case ExecutionMode.CollectResults => dataFrame.rdd.collect() case ExecutionMode.ForeachResults => dataFrame.rdd.foreach { row => Unit } - case ExecutionMode.WriteParquet(location) => dataFrame.saveAsParquetFile(s"$location/$name.parquet") + case ExecutionMode.WriteParquet(location) => + dataFrame.saveAsParquetFile(s"$location/$name.parquet") } } diff --git a/src/main/scala/com/databricks/spark/sql/perf/package.scala b/src/main/scala/com/databricks/spark/sql/perf/package.scala new file mode 100644 index 0000000..080d024 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/package.scala @@ -0,0 +1,8 @@ +package com.databricks.spark.sql + +import org.apache.spark.sql.functions._ + +package object perf { + val runtime = + (col("result.analysisTime") + col("result.optimizationTime") + col("result.planningTime") + col("result.executionTime")).as("runtime") +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index 7d31ed2..cd1f491 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -83,6 +83,8 @@ case class BreakdownResult( nodeName: String, nodeNameWithArgs: String, index: Int, - executionTime: Double) + children: Seq[Int], + executionTime: Double, + delta: Double) case class Failure(className: String, message: String) \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index 0466bf9..011221e 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import parquet.hadoop.ParquetOutputFormat import parquet.hadoop.util.ContextUtil -class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) { +class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extends Serializable { import sqlContext.implicits._ def sparkContext = sqlContext.sparkContext From ed8ddfedcd78d8df47301ed00710fd6ae9668e61 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 13 Aug 2015 17:54:00 -0700 Subject: [PATCH 2/2] yins comments --- src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index 9a62bee..17088ab 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -429,7 +429,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) messages += s"Breakdown: ${node.simpleString}" val newNode = buildDataFrame.queryExecution.executedPlan(index) val executionTime = benchmarkMs { - node.execute().foreach((row: Any) => Unit) + newNode.execute().foreach((row: Any) => Unit) } timeMap += ((index, executionTime))