From 9640cd8c1e86a8a974a7c75d0929efc57db49906 Mon Sep 17 00:00:00 2001 From: Jean-Yves Stephan Date: Tue, 21 Jul 2015 13:23:11 -0700 Subject: [PATCH] The execution mode (collect results / foreach results / writeparquet) is now specified as an argument to Query. --- .../spark/sql/perf/bigdata/Queries.scala | 20 ++++++++-------- .../com/databricks/spark/sql/perf/query.scala | 24 +++++++++++++++---- .../perf/tpcds/queries/ImpalaKitQueries.scala | 6 ++--- .../perf/tpcds/queries/SimpleQueries.scala | 4 ++-- 4 files changed, 34 insertions(+), 20 deletions(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala index 14b5720..2c8805f 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala @@ -16,7 +16,7 @@ package com.databricks.spark.sql.perf.bigdata -import com.databricks.spark.sql.perf.Query +import com.databricks.spark.sql.perf.{ForeachResults, Query} object Queries { val queries1to3 = Seq( @@ -32,7 +32,7 @@ object Queries { | pageRank > 1000 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q1B", @@ -46,7 +46,7 @@ object Queries { | pageRank > 100 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q1C", @@ -60,7 +60,7 @@ object Queries { | pageRank > 10 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q2A", @@ -74,7 +74,7 @@ object Queries { | SUBSTR(sourceIP, 1, 8) """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q2B", @@ -88,7 +88,7 @@ object Queries { | SUBSTR(sourceIP, 1, 10) """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q2C", @@ -102,7 +102,7 @@ object Queries { | SUBSTR(sourceIP, 1, 12) """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q3A", @@ -121,7 +121,7 @@ object Queries { |ORDER BY totalRevenue DESC LIMIT 1 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q3B", @@ -140,7 +140,7 @@ object Queries { |ORDER BY totalRevenue DESC LIMIT 1 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q3C", @@ -158,6 +158,6 @@ object Queries { |ORDER BY totalRevenue DESC LIMIT 1 """.stripMargin, description = "", - collectResults = false) + executionMode = ForeachResults) ) } diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index 78a410c..c593ada 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -19,7 +19,21 @@ package com.databricks.spark.sql.perf import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -case class Query(name: String, sqlText: String, description: String, collectResults: Boolean) +/** + * The execution mode of a benchmark: + * - CollectResults: Benchmark run by collecting queries results + * (e.g. rdd.collect()) + * - ForeachResults: Benchmark run by iterating through the queries results rows + * (e.g. rdd.foreach(row => Unit)) + * - WriteParquet(location): Benchmark run by saving the output of each query as a + * parquet file at the specified location + */ +abstract class ExecutionMode +case object CollectResults extends ExecutionMode +case object ForeachResults extends ExecutionMode +case class WriteParquet(location: String) extends ExecutionMode + +case class Query(name: String, sqlText: String, description: String, executionMode: ExecutionMode) case class QueryForTest( query: Query, @@ -60,10 +74,10 @@ case class QueryForTest( } // The executionTime for the entire query includes the time of type conversion from catalyst to scala. - val executionTime = if (query.collectResults) { - benchmarkMs { dataFrame.rdd.collect() } - } else { - benchmarkMs { dataFrame.rdd.foreach {row => Unit } } + val executionTime = query.executionMode match { + case CollectResults => benchmarkMs { dataFrame.rdd.collect() } + case ForeachResults => benchmarkMs { dataFrame.rdd.foreach { row => Unit } } + case WriteParquet(location) => benchmarkMs { dataFrame.saveAsParquetFile(s"$location/${query.name}.parquet") } } val joinTypes = dataFrame.queryExecution.executedPlan.collect { diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala index c530232..7380cb7 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala @@ -16,7 +16,7 @@ package com.databricks.spark.sql.perf.tpcds.queries -import com.databricks.spark.sql.perf.Query +import com.databricks.spark.sql.perf.{CollectResults, Query} object ImpalaKitQueries { // Queries are from @@ -1024,7 +1024,7 @@ object ImpalaKitQueries { |from store_sales """.stripMargin) ).map { - case (name, sqlText) => Query(name, sqlText, description = "", collectResults = true) + case (name, sqlText) => Query(name, sqlText, description = "", executionMode = CollectResults) } val queriesMap = queries.map(q => q.name -> q).toMap @@ -1463,7 +1463,7 @@ object ImpalaKitQueries { |from store_sales """.stripMargin) ).map { - case (name, sqlText) => Query(name, sqlText, description = "original query", collectResults = true) + case (name, sqlText) => Query(name, sqlText, description = "original query", executionMode = CollectResults) } val interactiveQueries = diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala index fd4f4b6..dac12a2 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala @@ -16,7 +16,7 @@ package com.databricks.spark.sql.perf.tpcds.queries -import com.databricks.spark.sql.perf.Query +import com.databricks.spark.sql.perf.{ForeachResults, Query} object SimpleQueries { val q7Derived = Seq( @@ -137,6 +137,6 @@ object SimpleQueries { |-- end query 1 in stream 0 using template query7.tpl """.stripMargin) ).map { - case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", collectResults = false) + case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", executionMode = ForeachResults) } }