Added optional parameters to runBenchmark to specify a location to save queries outputs as parquet files.
+ Removed the hardcoded baseDir/parquet/ structure
This commit is contained in:
parent
3eca8d2947
commit
8e62e4fdbd
@ -36,7 +36,7 @@ case class QueryForTest(
|
||||
(endTime - startTime).toDouble / 1000000
|
||||
}
|
||||
|
||||
def benchmark(description: String = "") = {
|
||||
def benchmark(description: String = "", queryOutputLocation: Option[String]) = {
|
||||
try {
|
||||
sparkContext.setJobDescription(s"Query: ${query.name}, $description")
|
||||
val dataFrame = sqlContext.sql(query.sqlText)
|
||||
@ -77,6 +77,8 @@ case class QueryForTest(
|
||||
}
|
||||
}
|
||||
|
||||
queryOutputLocation.foreach(dir => dataFrame.saveAsParquetFile(s"$dir/$name.parquet"))
|
||||
|
||||
BenchmarkResult(
|
||||
name = query.name,
|
||||
joinTypes = joinTypes,
|
||||
|
||||
@ -183,6 +183,7 @@ abstract class Dataset(
|
||||
* Starts an experiment run with a given set of queries.
|
||||
* @param queries Queries to be executed.
|
||||
* @param resultsLocation The location of performance results.
|
||||
* @param queryOutputLocation If defined, location where queries results should be saved as parquet files
|
||||
* @param includeBreakdown If it is true, breakdown results of a query will be recorded.
|
||||
* Setting it to true may significantly increase the time used to
|
||||
* execute a query.
|
||||
@ -195,6 +196,7 @@ abstract class Dataset(
|
||||
def runExperiment(
|
||||
queries: Seq[Query],
|
||||
resultsLocation: String,
|
||||
queryOutputLocation: Option[String] = None,
|
||||
includeBreakdown: Boolean = false,
|
||||
iterations: Int = 3,
|
||||
variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("")) { _ => {} }),
|
||||
@ -237,7 +239,7 @@ abstract class Dataset(
|
||||
currentMessages += s"Running query ${q.name} $setup"
|
||||
|
||||
currentQuery = q.name
|
||||
val singleResult = try q.benchmark(setup) :: Nil catch {
|
||||
val singleResult = try q.benchmark(setup, queryOutputLocation) :: Nil catch {
|
||||
case e: Exception =>
|
||||
currentMessages += s"Failed to run query ${q.name}: $e"
|
||||
Nil
|
||||
|
||||
@ -16,21 +16,9 @@
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{Path, FileSystem}
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
|
||||
import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext, RecordWriter, Job}
|
||||
import org.apache.spark.SerializableWritable
|
||||
import org.apache.spark.sql.{SQLContext, Column}
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.hive.HiveMetastoreTypes
|
||||
import org.apache.spark.sql.types._
|
||||
import parquet.hadoop.ParquetOutputFormat
|
||||
import parquet.hadoop.util.ContextUtil
|
||||
|
||||
abstract class TableType
|
||||
case object UnpartitionedTable extends TableType
|
||||
@ -47,7 +35,7 @@ abstract class TableForTest(
|
||||
|
||||
val name = table.name
|
||||
|
||||
val outputDir = s"$baseDir/parquet/${name}"
|
||||
val outputDir = s"$baseDir/${name}"
|
||||
|
||||
def fromCatalog = sqlContext.table(name)
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user