The execution mode (collect results / foreach results / writeparquet) is now specified as an argument to Query.

This commit is contained in:
Jean-Yves Stephan 2015-07-21 13:23:11 -07:00
parent 8e62e4fdbd
commit 9640cd8c1e
4 changed files with 34 additions and 20 deletions

View File

@ -16,7 +16,7 @@
package com.databricks.spark.sql.perf.bigdata
import com.databricks.spark.sql.perf.Query
import com.databricks.spark.sql.perf.{ForeachResults, Query}
object Queries {
val queries1to3 = Seq(
@ -32,7 +32,7 @@ object Queries {
| pageRank > 1000
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q1B",
@ -46,7 +46,7 @@ object Queries {
| pageRank > 100
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q1C",
@ -60,7 +60,7 @@ object Queries {
| pageRank > 10
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q2A",
@ -74,7 +74,7 @@ object Queries {
| SUBSTR(sourceIP, 1, 8)
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q2B",
@ -88,7 +88,7 @@ object Queries {
| SUBSTR(sourceIP, 1, 10)
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q2C",
@ -102,7 +102,7 @@ object Queries {
| SUBSTR(sourceIP, 1, 12)
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q3A",
@ -121,7 +121,7 @@ object Queries {
|ORDER BY totalRevenue DESC LIMIT 1
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q3B",
@ -140,7 +140,7 @@ object Queries {
|ORDER BY totalRevenue DESC LIMIT 1
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q3C",
@ -158,6 +158,6 @@ object Queries {
|ORDER BY totalRevenue DESC LIMIT 1
""".stripMargin,
description = "",
collectResults = false)
executionMode = ForeachResults)
)
}

View File

@ -19,7 +19,21 @@ package com.databricks.spark.sql.perf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
case class Query(name: String, sqlText: String, description: String, collectResults: Boolean)
/**
* The execution mode of a benchmark:
* - CollectResults: Benchmark run by collecting queries results
* (e.g. rdd.collect())
* - ForeachResults: Benchmark run by iterating through the queries results rows
* (e.g. rdd.foreach(row => Unit))
* - WriteParquet(location): Benchmark run by saving the output of each query as a
* parquet file at the specified location
*/
abstract class ExecutionMode
case object CollectResults extends ExecutionMode
case object ForeachResults extends ExecutionMode
case class WriteParquet(location: String) extends ExecutionMode
case class Query(name: String, sqlText: String, description: String, executionMode: ExecutionMode)
case class QueryForTest(
query: Query,
@ -60,10 +74,10 @@ case class QueryForTest(
}
// The executionTime for the entire query includes the time of type conversion from catalyst to scala.
val executionTime = if (query.collectResults) {
benchmarkMs { dataFrame.rdd.collect() }
} else {
benchmarkMs { dataFrame.rdd.foreach {row => Unit } }
val executionTime = query.executionMode match {
case CollectResults => benchmarkMs { dataFrame.rdd.collect() }
case ForeachResults => benchmarkMs { dataFrame.rdd.foreach { row => Unit } }
case WriteParquet(location) => benchmarkMs { dataFrame.saveAsParquetFile(s"$location/${query.name}.parquet") }
}
val joinTypes = dataFrame.queryExecution.executedPlan.collect {

View File

@ -16,7 +16,7 @@
package com.databricks.spark.sql.perf.tpcds.queries
import com.databricks.spark.sql.perf.Query
import com.databricks.spark.sql.perf.{CollectResults, Query}
object ImpalaKitQueries {
// Queries are from
@ -1024,7 +1024,7 @@ object ImpalaKitQueries {
|from store_sales
""".stripMargin)
).map {
case (name, sqlText) => Query(name, sqlText, description = "", collectResults = true)
case (name, sqlText) => Query(name, sqlText, description = "", executionMode = CollectResults)
}
val queriesMap = queries.map(q => q.name -> q).toMap
@ -1463,7 +1463,7 @@ object ImpalaKitQueries {
|from store_sales
""".stripMargin)
).map {
case (name, sqlText) => Query(name, sqlText, description = "original query", collectResults = true)
case (name, sqlText) => Query(name, sqlText, description = "original query", executionMode = CollectResults)
}
val interactiveQueries =

View File

@ -16,7 +16,7 @@
package com.databricks.spark.sql.perf.tpcds.queries
import com.databricks.spark.sql.perf.Query
import com.databricks.spark.sql.perf.{ForeachResults, Query}
object SimpleQueries {
val q7Derived = Seq(
@ -137,6 +137,6 @@ object SimpleQueries {
|-- end query 1 in stream 0 using template query7.tpl
""".stripMargin)
).map {
case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", collectResults = false)
case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", executionMode = ForeachResults)
}
}