diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index c7191a3..78a410c 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -36,7 +36,7 @@ case class QueryForTest( (endTime - startTime).toDouble / 1000000 } - def benchmark(description: String = "") = { + def benchmark(description: String = "", queryOutputLocation: Option[String]) = { try { sparkContext.setJobDescription(s"Query: ${query.name}, $description") val dataFrame = sqlContext.sql(query.sqlText) @@ -77,6 +77,8 @@ case class QueryForTest( } } + queryOutputLocation.foreach(dir => dataFrame.saveAsParquetFile(s"$dir/$name.parquet")) + BenchmarkResult( name = query.name, joinTypes = joinTypes, diff --git a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala index fb4d69a..72130f9 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala @@ -183,6 +183,7 @@ abstract class Dataset( * Starts an experiment run with a given set of queries. * @param queries Queries to be executed. * @param resultsLocation The location of performance results. + * @param queryOutputLocation If defined, location where queries results should be saved as parquet files * @param includeBreakdown If it is true, breakdown results of a query will be recorded. * Setting it to true may significantly increase the time used to * execute a query. @@ -195,6 +196,7 @@ abstract class Dataset( def runExperiment( queries: Seq[Query], resultsLocation: String, + queryOutputLocation: Option[String] = None, includeBreakdown: Boolean = false, iterations: Int = 3, variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("")) { _ => {} }), @@ -237,7 +239,7 @@ abstract class Dataset( currentMessages += s"Running query ${q.name} $setup" currentQuery = q.name - val singleResult = try q.benchmark(setup) :: Nil catch { + val singleResult = try q.benchmark(setup, queryOutputLocation) :: Nil catch { case e: Exception => currentMessages += s"Failed to run query ${q.name}: $e" Nil diff --git a/src/main/scala/com/databricks/spark/sql/perf/table.scala b/src/main/scala/com/databricks/spark/sql/perf/table.scala index cb331e2..3c94557 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/table.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/table.scala @@ -16,21 +16,9 @@ package com.databricks.spark.sql.perf -import java.text.SimpleDateFormat -import java.util.Date - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat -import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext, RecordWriter, Job} -import org.apache.spark.SerializableWritable -import org.apache.spark.sql.{SQLContext, Column} -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.HiveMetastoreTypes import org.apache.spark.sql.types._ -import parquet.hadoop.ParquetOutputFormat -import parquet.hadoop.util.ContextUtil abstract class TableType case object UnpartitionedTable extends TableType @@ -47,7 +35,7 @@ abstract class TableForTest( val name = table.name - val outputDir = s"$baseDir/parquet/${name}" + val outputDir = s"$baseDir/${name}" def fromCatalog = sqlContext.table(name)