diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala index 14b5720..5a639ba 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala @@ -16,6 +16,7 @@ package com.databricks.spark.sql.perf.bigdata +import com.databricks.spark.sql.perf.ExecutionMode.ForeachResults import com.databricks.spark.sql.perf.Query object Queries { @@ -32,7 +33,7 @@ object Queries { | pageRank > 1000 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q1B", @@ -46,7 +47,7 @@ object Queries { | pageRank > 100 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q1C", @@ -60,7 +61,7 @@ object Queries { | pageRank > 10 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q2A", @@ -74,7 +75,7 @@ object Queries { | SUBSTR(sourceIP, 1, 8) """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q2B", @@ -88,7 +89,7 @@ object Queries { | SUBSTR(sourceIP, 1, 10) """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q2C", @@ -102,7 +103,7 @@ object Queries { | SUBSTR(sourceIP, 1, 12) """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q3A", @@ -121,7 +122,7 @@ object Queries { |ORDER BY totalRevenue DESC LIMIT 1 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q3B", @@ -140,7 +141,7 @@ object Queries { |ORDER BY totalRevenue DESC LIMIT 1 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q3C", @@ -158,6 +159,6 @@ object Queries { |ORDER BY totalRevenue DESC LIMIT 1 """.stripMargin, description = "", - collectResults = false) + executionMode = ForeachResults) ) } diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index c7191a3..e7e8400 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -16,10 +16,23 @@ package com.databricks.spark.sql.perf +import com.databricks.spark.sql.perf.ExecutionMode.{WriteParquet, ForeachResults, CollectResults} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -case class Query(name: String, sqlText: String, description: String, collectResults: Boolean) +trait ExecutionMode +object ExecutionMode { + // Benchmark run by collecting queries results (e.g. rdd.collect()) + case object CollectResults extends ExecutionMode + + // Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit)) + case object ForeachResults extends ExecutionMode + + // Benchmark run by saving the output of each query as a parquet file at the specified location + case class WriteParquet(location: String) extends ExecutionMode +} + +case class Query(name: String, sqlText: String, description: String, executionMode: ExecutionMode) case class QueryForTest( query: Query, @@ -59,11 +72,15 @@ case class QueryForTest( Seq.empty[BreakdownResult] } - // The executionTime for the entire query includes the time of type conversion from catalyst to scala. - val executionTime = if (query.collectResults) { - benchmarkMs { dataFrame.rdd.collect() } - } else { - benchmarkMs { dataFrame.rdd.foreach {row => Unit } } + // The executionTime for the entire query includes the time of type conversion + // from catalyst to scala. + val executionTime = benchmarkMs { + query.executionMode match { + case CollectResults => dataFrame.rdd.collect() + case ForeachResults => dataFrame.rdd.foreach { row => Unit } + case WriteParquet(location) => dataFrame.saveAsParquetFile(s"$location/$name.parquet") + } + } } val joinTypes = dataFrame.queryExecution.executedPlan.collect { diff --git a/src/main/scala/com/databricks/spark/sql/perf/table.scala b/src/main/scala/com/databricks/spark/sql/perf/table.scala index cb331e2..3c94557 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/table.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/table.scala @@ -16,21 +16,9 @@ package com.databricks.spark.sql.perf -import java.text.SimpleDateFormat -import java.util.Date - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat -import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext, RecordWriter, Job} -import org.apache.spark.SerializableWritable -import org.apache.spark.sql.{SQLContext, Column} -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.HiveMetastoreTypes import org.apache.spark.sql.types._ -import parquet.hadoop.ParquetOutputFormat -import parquet.hadoop.util.ContextUtil abstract class TableType case object UnpartitionedTable extends TableType @@ -47,7 +35,7 @@ abstract class TableForTest( val name = table.name - val outputDir = s"$baseDir/parquet/${name}" + val outputDir = s"$baseDir/${name}" def fromCatalog = sqlContext.table(name) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala index c530232..248fbac 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala @@ -16,6 +16,7 @@ package com.databricks.spark.sql.perf.tpcds.queries +import com.databricks.spark.sql.perf.ExecutionMode.CollectResults import com.databricks.spark.sql.perf.Query object ImpalaKitQueries { @@ -1024,7 +1025,7 @@ object ImpalaKitQueries { |from store_sales """.stripMargin) ).map { - case (name, sqlText) => Query(name, sqlText, description = "", collectResults = true) + case (name, sqlText) => Query(name, sqlText, description = "", executionMode = CollectResults) } val queriesMap = queries.map(q => q.name -> q).toMap @@ -1462,8 +1463,8 @@ object ImpalaKitQueries { | max(ss_promo_sk) as max_ss_promo_sk |from store_sales """.stripMargin) - ).map { - case (name, sqlText) => Query(name, sqlText, description = "original query", collectResults = true) + ).map { case (name, sqlText) => + Query(name, sqlText, description = "original query", executionMode = CollectResults) } val interactiveQueries = diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala index fd4f4b6..514afc5 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala @@ -16,6 +16,7 @@ package com.databricks.spark.sql.perf.tpcds.queries +import com.databricks.spark.sql.perf.ExecutionMode.ForeachResults import com.databricks.spark.sql.perf.Query object SimpleQueries { @@ -136,7 +137,7 @@ object SimpleQueries { |limit 100 |-- end query 1 in stream 0 using template query7.tpl """.stripMargin) - ).map { - case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", collectResults = false) + ).map { case (name, sqlText) => + Query(name = name, sqlText = sqlText, description = "", executionMode = ForeachResults) } }