From 8e62e4fdbd74226b2c688105021ea6115fd07df5 Mon Sep 17 00:00:00 2001 From: Jean-Yves Stephan Date: Mon, 20 Jul 2015 17:09:20 -0700 Subject: [PATCH 1/6] Added optional parameters to runBenchmark to specify a location to save queries outputs as parquet files. + Removed the hardcoded baseDir/parquet/ structure --- .../com/databricks/spark/sql/perf/query.scala | 4 +++- .../spark/sql/perf/runBenchmarks.scala | 4 +++- .../com/databricks/spark/sql/perf/table.scala | 16 ++-------------- 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index c7191a3..78a410c 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -36,7 +36,7 @@ case class QueryForTest( (endTime - startTime).toDouble / 1000000 } - def benchmark(description: String = "") = { + def benchmark(description: String = "", queryOutputLocation: Option[String]) = { try { sparkContext.setJobDescription(s"Query: ${query.name}, $description") val dataFrame = sqlContext.sql(query.sqlText) @@ -77,6 +77,8 @@ case class QueryForTest( } } + queryOutputLocation.foreach(dir => dataFrame.saveAsParquetFile(s"$dir/$name.parquet")) + BenchmarkResult( name = query.name, joinTypes = joinTypes, diff --git a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala index fb4d69a..72130f9 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala @@ -183,6 +183,7 @@ abstract class Dataset( * Starts an experiment run with a given set of queries. * @param queries Queries to be executed. * @param resultsLocation The location of performance results. + * @param queryOutputLocation If defined, location where queries results should be saved as parquet files * @param includeBreakdown If it is true, breakdown results of a query will be recorded. * Setting it to true may significantly increase the time used to * execute a query. @@ -195,6 +196,7 @@ abstract class Dataset( def runExperiment( queries: Seq[Query], resultsLocation: String, + queryOutputLocation: Option[String] = None, includeBreakdown: Boolean = false, iterations: Int = 3, variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("")) { _ => {} }), @@ -237,7 +239,7 @@ abstract class Dataset( currentMessages += s"Running query ${q.name} $setup" currentQuery = q.name - val singleResult = try q.benchmark(setup) :: Nil catch { + val singleResult = try q.benchmark(setup, queryOutputLocation) :: Nil catch { case e: Exception => currentMessages += s"Failed to run query ${q.name}: $e" Nil diff --git a/src/main/scala/com/databricks/spark/sql/perf/table.scala b/src/main/scala/com/databricks/spark/sql/perf/table.scala index cb331e2..3c94557 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/table.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/table.scala @@ -16,21 +16,9 @@ package com.databricks.spark.sql.perf -import java.text.SimpleDateFormat -import java.util.Date - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat -import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext, RecordWriter, Job} -import org.apache.spark.SerializableWritable -import org.apache.spark.sql.{SQLContext, Column} -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.HiveMetastoreTypes import org.apache.spark.sql.types._ -import parquet.hadoop.ParquetOutputFormat -import parquet.hadoop.util.ContextUtil abstract class TableType case object UnpartitionedTable extends TableType @@ -47,7 +35,7 @@ abstract class TableForTest( val name = table.name - val outputDir = s"$baseDir/parquet/${name}" + val outputDir = s"$baseDir/${name}" def fromCatalog = sqlContext.table(name) From 9640cd8c1e86a8a974a7c75d0929efc57db49906 Mon Sep 17 00:00:00 2001 From: Jean-Yves Stephan Date: Tue, 21 Jul 2015 13:23:11 -0700 Subject: [PATCH 2/6] The execution mode (collect results / foreach results / writeparquet) is now specified as an argument to Query. --- .../spark/sql/perf/bigdata/Queries.scala | 20 ++++++++-------- .../com/databricks/spark/sql/perf/query.scala | 24 +++++++++++++++---- .../perf/tpcds/queries/ImpalaKitQueries.scala | 6 ++--- .../perf/tpcds/queries/SimpleQueries.scala | 4 ++-- 4 files changed, 34 insertions(+), 20 deletions(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala index 14b5720..2c8805f 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala @@ -16,7 +16,7 @@ package com.databricks.spark.sql.perf.bigdata -import com.databricks.spark.sql.perf.Query +import com.databricks.spark.sql.perf.{ForeachResults, Query} object Queries { val queries1to3 = Seq( @@ -32,7 +32,7 @@ object Queries { | pageRank > 1000 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q1B", @@ -46,7 +46,7 @@ object Queries { | pageRank > 100 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q1C", @@ -60,7 +60,7 @@ object Queries { | pageRank > 10 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q2A", @@ -74,7 +74,7 @@ object Queries { | SUBSTR(sourceIP, 1, 8) """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q2B", @@ -88,7 +88,7 @@ object Queries { | SUBSTR(sourceIP, 1, 10) """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q2C", @@ -102,7 +102,7 @@ object Queries { | SUBSTR(sourceIP, 1, 12) """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q3A", @@ -121,7 +121,7 @@ object Queries { |ORDER BY totalRevenue DESC LIMIT 1 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q3B", @@ -140,7 +140,7 @@ object Queries { |ORDER BY totalRevenue DESC LIMIT 1 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q3C", @@ -158,6 +158,6 @@ object Queries { |ORDER BY totalRevenue DESC LIMIT 1 """.stripMargin, description = "", - collectResults = false) + executionMode = ForeachResults) ) } diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index 78a410c..c593ada 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -19,7 +19,21 @@ package com.databricks.spark.sql.perf import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -case class Query(name: String, sqlText: String, description: String, collectResults: Boolean) +/** + * The execution mode of a benchmark: + * - CollectResults: Benchmark run by collecting queries results + * (e.g. rdd.collect()) + * - ForeachResults: Benchmark run by iterating through the queries results rows + * (e.g. rdd.foreach(row => Unit)) + * - WriteParquet(location): Benchmark run by saving the output of each query as a + * parquet file at the specified location + */ +abstract class ExecutionMode +case object CollectResults extends ExecutionMode +case object ForeachResults extends ExecutionMode +case class WriteParquet(location: String) extends ExecutionMode + +case class Query(name: String, sqlText: String, description: String, executionMode: ExecutionMode) case class QueryForTest( query: Query, @@ -60,10 +74,10 @@ case class QueryForTest( } // The executionTime for the entire query includes the time of type conversion from catalyst to scala. - val executionTime = if (query.collectResults) { - benchmarkMs { dataFrame.rdd.collect() } - } else { - benchmarkMs { dataFrame.rdd.foreach {row => Unit } } + val executionTime = query.executionMode match { + case CollectResults => benchmarkMs { dataFrame.rdd.collect() } + case ForeachResults => benchmarkMs { dataFrame.rdd.foreach { row => Unit } } + case WriteParquet(location) => benchmarkMs { dataFrame.saveAsParquetFile(s"$location/${query.name}.parquet") } } val joinTypes = dataFrame.queryExecution.executedPlan.collect { diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala index c530232..7380cb7 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala @@ -16,7 +16,7 @@ package com.databricks.spark.sql.perf.tpcds.queries -import com.databricks.spark.sql.perf.Query +import com.databricks.spark.sql.perf.{CollectResults, Query} object ImpalaKitQueries { // Queries are from @@ -1024,7 +1024,7 @@ object ImpalaKitQueries { |from store_sales """.stripMargin) ).map { - case (name, sqlText) => Query(name, sqlText, description = "", collectResults = true) + case (name, sqlText) => Query(name, sqlText, description = "", executionMode = CollectResults) } val queriesMap = queries.map(q => q.name -> q).toMap @@ -1463,7 +1463,7 @@ object ImpalaKitQueries { |from store_sales """.stripMargin) ).map { - case (name, sqlText) => Query(name, sqlText, description = "original query", collectResults = true) + case (name, sqlText) => Query(name, sqlText, description = "original query", executionMode = CollectResults) } val interactiveQueries = diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala index fd4f4b6..dac12a2 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala @@ -16,7 +16,7 @@ package com.databricks.spark.sql.perf.tpcds.queries -import com.databricks.spark.sql.perf.Query +import com.databricks.spark.sql.perf.{ForeachResults, Query} object SimpleQueries { val q7Derived = Seq( @@ -137,6 +137,6 @@ object SimpleQueries { |-- end query 1 in stream 0 using template query7.tpl """.stripMargin) ).map { - case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", collectResults = false) + case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", executionMode = ForeachResults) } } From 933f3f0bb5c6d611b50764d83c7282716cfb668f Mon Sep 17 00:00:00 2001 From: Jean-Yves Stephan Date: Tue, 21 Jul 2015 13:26:50 -0700 Subject: [PATCH 3/6] Removed queryOutputLocation parameter --- src/main/scala/com/databricks/spark/sql/perf/query.scala | 8 +++----- .../com/databricks/spark/sql/perf/runBenchmarks.scala | 3 +-- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index c593ada..5828805 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -50,7 +50,7 @@ case class QueryForTest( (endTime - startTime).toDouble / 1000000 } - def benchmark(description: String = "", queryOutputLocation: Option[String]) = { + def benchmark(description: String = "") = { try { sparkContext.setJobDescription(s"Query: ${query.name}, $description") val dataFrame = sqlContext.sql(query.sqlText) @@ -77,7 +77,7 @@ case class QueryForTest( val executionTime = query.executionMode match { case CollectResults => benchmarkMs { dataFrame.rdd.collect() } case ForeachResults => benchmarkMs { dataFrame.rdd.foreach { row => Unit } } - case WriteParquet(location) => benchmarkMs { dataFrame.saveAsParquetFile(s"$location/${query.name}.parquet") } + case WriteParquet(location) => benchmarkMs { dataFrame.saveAsParquetFile(s"$location/$name.parquet") } } val joinTypes = dataFrame.queryExecution.executedPlan.collect { @@ -90,9 +90,7 @@ case class QueryForTest( tableIdentifier.last } } - - queryOutputLocation.foreach(dir => dataFrame.saveAsParquetFile(s"$dir/$name.parquet")) - + BenchmarkResult( name = query.name, joinTypes = joinTypes, diff --git a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala index 72130f9..878d609 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala @@ -196,7 +196,6 @@ abstract class Dataset( def runExperiment( queries: Seq[Query], resultsLocation: String, - queryOutputLocation: Option[String] = None, includeBreakdown: Boolean = false, iterations: Int = 3, variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("")) { _ => {} }), @@ -239,7 +238,7 @@ abstract class Dataset( currentMessages += s"Running query ${q.name} $setup" currentQuery = q.name - val singleResult = try q.benchmark(setup, queryOutputLocation) :: Nil catch { + val singleResult = try q.benchmark(setup) :: Nil catch { case e: Exception => currentMessages += s"Failed to run query ${q.name}: $e" Nil From d866cce1a14d7926fb90b9b97b841e2ac65e3202 Mon Sep 17 00:00:00 2001 From: Jean-Yves Stephan Date: Tue, 21 Jul 2015 13:27:50 -0700 Subject: [PATCH 4/6] Format --- src/main/scala/com/databricks/spark/sql/perf/query.scala | 2 +- .../scala/com/databricks/spark/sql/perf/runBenchmarks.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index 5828805..d2541f9 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -90,7 +90,7 @@ case class QueryForTest( tableIdentifier.last } } - + BenchmarkResult( name = query.name, joinTypes = joinTypes, diff --git a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala index 878d609..fb4d69a 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala @@ -183,7 +183,6 @@ abstract class Dataset( * Starts an experiment run with a given set of queries. * @param queries Queries to be executed. * @param resultsLocation The location of performance results. - * @param queryOutputLocation If defined, location where queries results should be saved as parquet files * @param includeBreakdown If it is true, breakdown results of a query will be recorded. * Setting it to true may significantly increase the time used to * execute a query. From a4a53b8a73f55f8f90c0c811cd7962b22975c510 Mon Sep 17 00:00:00 2001 From: Jean-Yves Stephan Date: Tue, 21 Jul 2015 20:05:53 -0700 Subject: [PATCH 5/6] Took Aaron's comments --- .../spark/sql/perf/bigdata/Queries.scala | 3 +- .../com/databricks/spark/sql/perf/query.scala | 41 +++++++++++-------- .../perf/tpcds/queries/ImpalaKitQueries.scala | 7 ++-- .../perf/tpcds/queries/SimpleQueries.scala | 7 ++-- 4 files changed, 33 insertions(+), 25 deletions(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala index 2c8805f..5a639ba 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala @@ -16,7 +16,8 @@ package com.databricks.spark.sql.perf.bigdata -import com.databricks.spark.sql.perf.{ForeachResults, Query} +import com.databricks.spark.sql.perf.ExecutionMode.ForeachResults +import com.databricks.spark.sql.perf.Query object Queries { val queries1to3 = Seq( diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index d2541f9..b39095a 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -16,22 +16,21 @@ package com.databricks.spark.sql.perf +import com.databricks.spark.sql.perf.ExecutionMode.{WriteParquet, ForeachResults, CollectResults} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -/** - * The execution mode of a benchmark: - * - CollectResults: Benchmark run by collecting queries results - * (e.g. rdd.collect()) - * - ForeachResults: Benchmark run by iterating through the queries results rows - * (e.g. rdd.foreach(row => Unit)) - * - WriteParquet(location): Benchmark run by saving the output of each query as a - * parquet file at the specified location - */ -abstract class ExecutionMode -case object CollectResults extends ExecutionMode -case object ForeachResults extends ExecutionMode -case class WriteParquet(location: String) extends ExecutionMode +trait ExecutionMode +object ExecutionMode { + // Benchmark run by collecting queries results (e.g. rdd.collect()) + case object CollectResults extends ExecutionMode + + // Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit)) + case object ForeachResults extends ExecutionMode + + // Benchmark run by saving the output of each query as a parquet file at the specified location + case class WriteParquet(location: String) extends ExecutionMode +} case class Query(name: String, sqlText: String, description: String, executionMode: ExecutionMode) @@ -73,11 +72,17 @@ case class QueryForTest( Seq.empty[BreakdownResult] } - // The executionTime for the entire query includes the time of type conversion from catalyst to scala. - val executionTime = query.executionMode match { - case CollectResults => benchmarkMs { dataFrame.rdd.collect() } - case ForeachResults => benchmarkMs { dataFrame.rdd.foreach { row => Unit } } - case WriteParquet(location) => benchmarkMs { dataFrame.saveAsParquetFile(s"$location/$name.parquet") } + // The executionTime for the entire query includes the time of type conversion + // from catalyst to scala. + val executionTime = benchmarkMs { + query.executionMode match { + case CollectResults => dataFrame.rdd.collect() + case ForeachResults => dataFrame.rdd.foreach { row => Unit } + case WriteParquet(location) => { + dataFrame.rdd.collect() + dataFrame.saveAsParquetFile(s"$location/$name.parquet") + } + } } val joinTypes = dataFrame.queryExecution.executedPlan.collect { diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala index 7380cb7..248fbac 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala @@ -16,7 +16,8 @@ package com.databricks.spark.sql.perf.tpcds.queries -import com.databricks.spark.sql.perf.{CollectResults, Query} +import com.databricks.spark.sql.perf.ExecutionMode.CollectResults +import com.databricks.spark.sql.perf.Query object ImpalaKitQueries { // Queries are from @@ -1462,8 +1463,8 @@ object ImpalaKitQueries { | max(ss_promo_sk) as max_ss_promo_sk |from store_sales """.stripMargin) - ).map { - case (name, sqlText) => Query(name, sqlText, description = "original query", executionMode = CollectResults) + ).map { case (name, sqlText) => + Query(name, sqlText, description = "original query", executionMode = CollectResults) } val interactiveQueries = diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala index dac12a2..514afc5 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala @@ -16,7 +16,8 @@ package com.databricks.spark.sql.perf.tpcds.queries -import com.databricks.spark.sql.perf.{ForeachResults, Query} +import com.databricks.spark.sql.perf.ExecutionMode.ForeachResults +import com.databricks.spark.sql.perf.Query object SimpleQueries { val q7Derived = Seq( @@ -136,7 +137,7 @@ object SimpleQueries { |limit 100 |-- end query 1 in stream 0 using template query7.tpl """.stripMargin) - ).map { - case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", executionMode = ForeachResults) + ).map { case (name, sqlText) => + Query(name = name, sqlText = sqlText, description = "", executionMode = ForeachResults) } } From 653d82134de31580b8d6e5b0690fb08ee6702855 Mon Sep 17 00:00:00 2001 From: Jean-Yves Stephan Date: Wed, 22 Jul 2015 13:30:40 -0700 Subject: [PATCH 6/6] No collect before saveAsParquet --- src/main/scala/com/databricks/spark/sql/perf/query.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index b39095a..e7e8400 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -78,9 +78,7 @@ case class QueryForTest( query.executionMode match { case CollectResults => dataFrame.rdd.collect() case ForeachResults => dataFrame.rdd.foreach { row => Unit } - case WriteParquet(location) => { - dataFrame.rdd.collect() - dataFrame.saveAsParquetFile(s"$location/$name.parquet") + case WriteParquet(location) => dataFrame.saveAsParquetFile(s"$location/$name.parquet") } } }