diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala index 2c8805f..5a639ba 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala @@ -16,7 +16,8 @@ package com.databricks.spark.sql.perf.bigdata -import com.databricks.spark.sql.perf.{ForeachResults, Query} +import com.databricks.spark.sql.perf.ExecutionMode.ForeachResults +import com.databricks.spark.sql.perf.Query object Queries { val queries1to3 = Seq( diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index d2541f9..b39095a 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -16,22 +16,21 @@ package com.databricks.spark.sql.perf +import com.databricks.spark.sql.perf.ExecutionMode.{WriteParquet, ForeachResults, CollectResults} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -/** - * The execution mode of a benchmark: - * - CollectResults: Benchmark run by collecting queries results - * (e.g. rdd.collect()) - * - ForeachResults: Benchmark run by iterating through the queries results rows - * (e.g. rdd.foreach(row => Unit)) - * - WriteParquet(location): Benchmark run by saving the output of each query as a - * parquet file at the specified location - */ -abstract class ExecutionMode -case object CollectResults extends ExecutionMode -case object ForeachResults extends ExecutionMode -case class WriteParquet(location: String) extends ExecutionMode +trait ExecutionMode +object ExecutionMode { + // Benchmark run by collecting queries results (e.g. rdd.collect()) + case object CollectResults extends ExecutionMode + + // Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit)) + case object ForeachResults extends ExecutionMode + + // Benchmark run by saving the output of each query as a parquet file at the specified location + case class WriteParquet(location: String) extends ExecutionMode +} case class Query(name: String, sqlText: String, description: String, executionMode: ExecutionMode) @@ -73,11 +72,17 @@ case class QueryForTest( Seq.empty[BreakdownResult] } - // The executionTime for the entire query includes the time of type conversion from catalyst to scala. - val executionTime = query.executionMode match { - case CollectResults => benchmarkMs { dataFrame.rdd.collect() } - case ForeachResults => benchmarkMs { dataFrame.rdd.foreach { row => Unit } } - case WriteParquet(location) => benchmarkMs { dataFrame.saveAsParquetFile(s"$location/$name.parquet") } + // The executionTime for the entire query includes the time of type conversion + // from catalyst to scala. + val executionTime = benchmarkMs { + query.executionMode match { + case CollectResults => dataFrame.rdd.collect() + case ForeachResults => dataFrame.rdd.foreach { row => Unit } + case WriteParquet(location) => { + dataFrame.rdd.collect() + dataFrame.saveAsParquetFile(s"$location/$name.parquet") + } + } } val joinTypes = dataFrame.queryExecution.executedPlan.collect { diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala index 7380cb7..248fbac 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala @@ -16,7 +16,8 @@ package com.databricks.spark.sql.perf.tpcds.queries -import com.databricks.spark.sql.perf.{CollectResults, Query} +import com.databricks.spark.sql.perf.ExecutionMode.CollectResults +import com.databricks.spark.sql.perf.Query object ImpalaKitQueries { // Queries are from @@ -1462,8 +1463,8 @@ object ImpalaKitQueries { | max(ss_promo_sk) as max_ss_promo_sk |from store_sales """.stripMargin) - ).map { - case (name, sqlText) => Query(name, sqlText, description = "original query", executionMode = CollectResults) + ).map { case (name, sqlText) => + Query(name, sqlText, description = "original query", executionMode = CollectResults) } val interactiveQueries = diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala index dac12a2..514afc5 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala @@ -16,7 +16,8 @@ package com.databricks.spark.sql.perf.tpcds.queries -import com.databricks.spark.sql.perf.{ForeachResults, Query} +import com.databricks.spark.sql.perf.ExecutionMode.ForeachResults +import com.databricks.spark.sql.perf.Query object SimpleQueries { val q7Derived = Seq( @@ -136,7 +137,7 @@ object SimpleQueries { |limit 100 |-- end query 1 in stream 0 using template query7.tpl """.stripMargin) - ).map { - case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", executionMode = ForeachResults) + ).map { case (name, sqlText) => + Query(name = name, sqlText = sqlText, description = "", executionMode = ForeachResults) } }