Took Aaron's comments
This commit is contained in:
parent
d866cce1a1
commit
a4a53b8a73
@ -16,7 +16,8 @@
|
||||
|
||||
package com.databricks.spark.sql.perf.bigdata
|
||||
|
||||
import com.databricks.spark.sql.perf.{ForeachResults, Query}
|
||||
import com.databricks.spark.sql.perf.ExecutionMode.ForeachResults
|
||||
import com.databricks.spark.sql.perf.Query
|
||||
|
||||
object Queries {
|
||||
val queries1to3 = Seq(
|
||||
|
||||
@ -16,22 +16,21 @@
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import com.databricks.spark.sql.perf.ExecutionMode.{WriteParquet, ForeachResults, CollectResults}
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||
|
||||
/**
|
||||
* The execution mode of a benchmark:
|
||||
* - CollectResults: Benchmark run by collecting queries results
|
||||
* (e.g. rdd.collect())
|
||||
* - ForeachResults: Benchmark run by iterating through the queries results rows
|
||||
* (e.g. rdd.foreach(row => Unit))
|
||||
* - WriteParquet(location): Benchmark run by saving the output of each query as a
|
||||
* parquet file at the specified location
|
||||
*/
|
||||
abstract class ExecutionMode
|
||||
case object CollectResults extends ExecutionMode
|
||||
case object ForeachResults extends ExecutionMode
|
||||
case class WriteParquet(location: String) extends ExecutionMode
|
||||
trait ExecutionMode
|
||||
object ExecutionMode {
|
||||
// Benchmark run by collecting queries results (e.g. rdd.collect())
|
||||
case object CollectResults extends ExecutionMode
|
||||
|
||||
// Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit))
|
||||
case object ForeachResults extends ExecutionMode
|
||||
|
||||
// Benchmark run by saving the output of each query as a parquet file at the specified location
|
||||
case class WriteParquet(location: String) extends ExecutionMode
|
||||
}
|
||||
|
||||
case class Query(name: String, sqlText: String, description: String, executionMode: ExecutionMode)
|
||||
|
||||
@ -73,11 +72,17 @@ case class QueryForTest(
|
||||
Seq.empty[BreakdownResult]
|
||||
}
|
||||
|
||||
// The executionTime for the entire query includes the time of type conversion from catalyst to scala.
|
||||
val executionTime = query.executionMode match {
|
||||
case CollectResults => benchmarkMs { dataFrame.rdd.collect() }
|
||||
case ForeachResults => benchmarkMs { dataFrame.rdd.foreach { row => Unit } }
|
||||
case WriteParquet(location) => benchmarkMs { dataFrame.saveAsParquetFile(s"$location/$name.parquet") }
|
||||
// The executionTime for the entire query includes the time of type conversion
|
||||
// from catalyst to scala.
|
||||
val executionTime = benchmarkMs {
|
||||
query.executionMode match {
|
||||
case CollectResults => dataFrame.rdd.collect()
|
||||
case ForeachResults => dataFrame.rdd.foreach { row => Unit }
|
||||
case WriteParquet(location) => {
|
||||
dataFrame.rdd.collect()
|
||||
dataFrame.saveAsParquetFile(s"$location/$name.parquet")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val joinTypes = dataFrame.queryExecution.executedPlan.collect {
|
||||
|
||||
@ -16,7 +16,8 @@
|
||||
|
||||
package com.databricks.spark.sql.perf.tpcds.queries
|
||||
|
||||
import com.databricks.spark.sql.perf.{CollectResults, Query}
|
||||
import com.databricks.spark.sql.perf.ExecutionMode.CollectResults
|
||||
import com.databricks.spark.sql.perf.Query
|
||||
|
||||
object ImpalaKitQueries {
|
||||
// Queries are from
|
||||
@ -1462,8 +1463,8 @@ object ImpalaKitQueries {
|
||||
| max(ss_promo_sk) as max_ss_promo_sk
|
||||
|from store_sales
|
||||
""".stripMargin)
|
||||
).map {
|
||||
case (name, sqlText) => Query(name, sqlText, description = "original query", executionMode = CollectResults)
|
||||
).map { case (name, sqlText) =>
|
||||
Query(name, sqlText, description = "original query", executionMode = CollectResults)
|
||||
}
|
||||
|
||||
val interactiveQueries =
|
||||
|
||||
@ -16,7 +16,8 @@
|
||||
|
||||
package com.databricks.spark.sql.perf.tpcds.queries
|
||||
|
||||
import com.databricks.spark.sql.perf.{ForeachResults, Query}
|
||||
import com.databricks.spark.sql.perf.ExecutionMode.ForeachResults
|
||||
import com.databricks.spark.sql.perf.Query
|
||||
|
||||
object SimpleQueries {
|
||||
val q7Derived = Seq(
|
||||
@ -136,7 +137,7 @@ object SimpleQueries {
|
||||
|limit 100
|
||||
|-- end query 1 in stream 0 using template query7.tpl
|
||||
""".stripMargin)
|
||||
).map {
|
||||
case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", executionMode = ForeachResults)
|
||||
).map { case (name, sqlText) =>
|
||||
Query(name = name, sqlText = sqlText, description = "", executionMode = ForeachResults)
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user