Merge pull request #2 from jystephan/master

Allow saving benchmark queries results as parquet files
This commit is contained in:
Yin Huai 2015-07-22 13:40:39 -07:00
commit a50fedd5bc
5 changed files with 42 additions and 34 deletions

View File

@ -16,6 +16,7 @@
package com.databricks.spark.sql.perf.bigdata
import com.databricks.spark.sql.perf.ExecutionMode.ForeachResults
import com.databricks.spark.sql.perf.Query
object Queries {
@ -32,7 +33,7 @@ object Queries {
| pageRank > 1000
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q1B",
@ -46,7 +47,7 @@ object Queries {
| pageRank > 100
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q1C",
@ -60,7 +61,7 @@ object Queries {
| pageRank > 10
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q2A",
@ -74,7 +75,7 @@ object Queries {
| SUBSTR(sourceIP, 1, 8)
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q2B",
@ -88,7 +89,7 @@ object Queries {
| SUBSTR(sourceIP, 1, 10)
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q2C",
@ -102,7 +103,7 @@ object Queries {
| SUBSTR(sourceIP, 1, 12)
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q3A",
@ -121,7 +122,7 @@ object Queries {
|ORDER BY totalRevenue DESC LIMIT 1
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q3B",
@ -140,7 +141,7 @@ object Queries {
|ORDER BY totalRevenue DESC LIMIT 1
""".stripMargin,
description = "",
collectResults = false),
executionMode = ForeachResults),
Query(
name = "q3C",
@ -158,6 +159,6 @@ object Queries {
|ORDER BY totalRevenue DESC LIMIT 1
""".stripMargin,
description = "",
collectResults = false)
executionMode = ForeachResults)
)
}

View File

@ -16,10 +16,23 @@
package com.databricks.spark.sql.perf
import com.databricks.spark.sql.perf.ExecutionMode.{WriteParquet, ForeachResults, CollectResults}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
case class Query(name: String, sqlText: String, description: String, collectResults: Boolean)
trait ExecutionMode
object ExecutionMode {
// Benchmark run by collecting queries results (e.g. rdd.collect())
case object CollectResults extends ExecutionMode
// Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit))
case object ForeachResults extends ExecutionMode
// Benchmark run by saving the output of each query as a parquet file at the specified location
case class WriteParquet(location: String) extends ExecutionMode
}
case class Query(name: String, sqlText: String, description: String, executionMode: ExecutionMode)
case class QueryForTest(
query: Query,
@ -59,11 +72,15 @@ case class QueryForTest(
Seq.empty[BreakdownResult]
}
// The executionTime for the entire query includes the time of type conversion from catalyst to scala.
val executionTime = if (query.collectResults) {
benchmarkMs { dataFrame.rdd.collect() }
} else {
benchmarkMs { dataFrame.rdd.foreach {row => Unit } }
// The executionTime for the entire query includes the time of type conversion
// from catalyst to scala.
val executionTime = benchmarkMs {
query.executionMode match {
case CollectResults => dataFrame.rdd.collect()
case ForeachResults => dataFrame.rdd.foreach { row => Unit }
case WriteParquet(location) => dataFrame.saveAsParquetFile(s"$location/$name.parquet")
}
}
}
val joinTypes = dataFrame.queryExecution.executedPlan.collect {

View File

@ -16,21 +16,9 @@
package com.databricks.spark.sql.perf
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext, RecordWriter, Job}
import org.apache.spark.SerializableWritable
import org.apache.spark.sql.{SQLContext, Column}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types._
import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.util.ContextUtil
abstract class TableType
case object UnpartitionedTable extends TableType
@ -47,7 +35,7 @@ abstract class TableForTest(
val name = table.name
val outputDir = s"$baseDir/parquet/${name}"
val outputDir = s"$baseDir/${name}"
def fromCatalog = sqlContext.table(name)

View File

@ -16,6 +16,7 @@
package com.databricks.spark.sql.perf.tpcds.queries
import com.databricks.spark.sql.perf.ExecutionMode.CollectResults
import com.databricks.spark.sql.perf.Query
object ImpalaKitQueries {
@ -1024,7 +1025,7 @@ object ImpalaKitQueries {
|from store_sales
""".stripMargin)
).map {
case (name, sqlText) => Query(name, sqlText, description = "", collectResults = true)
case (name, sqlText) => Query(name, sqlText, description = "", executionMode = CollectResults)
}
val queriesMap = queries.map(q => q.name -> q).toMap
@ -1462,8 +1463,8 @@ object ImpalaKitQueries {
| max(ss_promo_sk) as max_ss_promo_sk
|from store_sales
""".stripMargin)
).map {
case (name, sqlText) => Query(name, sqlText, description = "original query", collectResults = true)
).map { case (name, sqlText) =>
Query(name, sqlText, description = "original query", executionMode = CollectResults)
}
val interactiveQueries =

View File

@ -16,6 +16,7 @@
package com.databricks.spark.sql.perf.tpcds.queries
import com.databricks.spark.sql.perf.ExecutionMode.ForeachResults
import com.databricks.spark.sql.perf.Query
object SimpleQueries {
@ -136,7 +137,7 @@ object SimpleQueries {
|limit 100
|-- end query 1 in stream 0 using template query7.tpl
""".stripMargin)
).map {
case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", collectResults = false)
).map { case (name, sqlText) =>
Query(name = name, sqlText = sqlText, description = "", executionMode = ForeachResults)
}
}