diff --git a/README.md b/README.md index f3cb072..0a3a1cc 100644 --- a/README.md +++ b/README.md @@ -1,42 +1,63 @@ -## Spark SQL Performance Tests (WIP) +# Spark SQL Performance Tests + +This is a performance testing framework for [Spark SQL](https://spark.apache.org/sql/) in [Apache Spark](https://spark.apache.org/) 1.3+. + +**Note: This README is still under development. Please also check our source code for more information.** + +## How to use it +The rest of document will use TPC-DS benchmark as an example. We will add contents to explain how to use other benchmarks add the support of a new benchmark dataset in future. + +### Setup a dataset +Before running any query, a dataset needs to be setup by creating a `Dataset` object. Every benchmark support in Spark SQL Perf needs to implement its own `Dataset` class. A `Dataset` object takes a few parameters that will be used to setup the needed tables and its `setup` function is used to setup needed tables. For TPC-DS benchmark, the class is `TPCDS` in the package of `com.databricks.spark.sql.perf.tpcds`. For example, to setup a TPC-DS dataset, you can -### TPC-DS ``` -import com.databricks.spark.sql.perf.tpcds.TPCDS import org.apache.spark.sql.parquet.Tables - -// Tables used for TPC-DS. +// Tables in TPC-DS benchmark used by experiments. val tables = Tables(sqlContext) - // Setup TPC-DS experiment val tpcds = -new TPCDS ( + new TPCDS ( sqlContext = sqlContext, sparkVersion = "1.3.1", dataLocation = , dsdgenDir = , - resultsLocation = , tables = tables.tables, - scaleFactor = "2", - collectResults = true) -tpcds.setupExperiment() + scaleFactor = , + includeBreakdown = false) +``` -// Take a look at the size of every table. -tpcds.allStats.show +After a `TPCDS` object is created, tables of it can be setup by calling -// Get all of the queries. -import com.databricks.spark.sql.perf.tpcds.Queries -// Just pick a single query as an example. -val oneQuery = Seq(Queries.q7Derived.head) -// Start the experiment. -val runningExp = tpcds.runExperiment(queries = oneQuery, iterations = 1) +``` +tpcds.setup() +``` +The `setup` function will first check if needed tables are stored at the location specified by `dataLocation`. If not, it will creates tables at there by using the data generator tool `dsdgen` provided by TPC-DS benchmark (This tool needs to be pre-installed at the location specified by `dsdgenDir` in every worker). + +### Run benchmarking queries +After setup, users can use `runExperiment` function to run benchmarking queries and record query execution time. Taking TPC-DS as an example, you can start an experiment by using + +``` +tpcds.runExperiment( + queries = , + resultsLocation = , + includeBreakdown = , + iterations = , + variations = , + tags = ) +``` + +For every experiment run (i.e.\ every call of `runExperiment`), Spark SQL Perf will use the timestamp of the start time to identify this experiment. Performance results will be stored in the sub-dir named by the timestamp in the given `resultsLocation` (for example `results/1429213883272`). The performance results are stored in the JSON format. + +### Retrieve results +The follow code can be used to retrieve results ... + +``` // Get experiments results. import com.databricks.spark.sql.perf.Results -val results = Results(resultsLocation = , sqlContext = sqlContext) -// This is all results. +val results = Results(resultsLocation = , sqlContext = sqlContext) +// Get the DataFrame representing all results stored in the dir specified by resultsLocation. val allResults = results.allResults -allResults.registerTempTable("results") -// This is the result for a single experiment started at the timestamp represented by 1429132621024 (2015-04-15 14:17:01.024). +// Use DataFrame API to get results of a single run. allResults.filter("timestamp = 1429132621024") -``` +``` \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala index cebe2d8..16d0b1a 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala @@ -25,22 +25,18 @@ class BigData ( @transient sqlContext: SQLContext, sparkVersion: String, dataLocation: String, - resultsLocation: String, tables: Seq[Table], - scaleFactor: String, - collectResults: Boolean) - extends Experiment( + scaleFactor: String) + extends Dataset( sqlContext, sparkVersion, dataLocation, - resultsLocation, tables, - scaleFactor, - collectResults) with Serializable { + scaleFactor) with Serializable { import sqlContext._ import sqlContext.implicits._ - override val experiment = "bigDataBenchmark" + override val datasetName = "bigDataBenchmark" override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = { tables.map(table => diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala index 9372dcb..14b5720 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala @@ -20,68 +20,94 @@ import com.databricks.spark.sql.perf.Query object Queries { val queries1to3 = Seq( - Query("q1A", - """ + Query( + name = "q1A", + sqlText = + """ |SELECT | pageURL, | pageRank |FROM rankings |WHERE | pageRank > 1000 - """.stripMargin), + """.stripMargin, + description = "", + collectResults = false), - Query("q1B", - """ + Query( + name = "q1B", + sqlText = + """ |SELECT | pageURL, | pageRank |FROM rankings |WHERE | pageRank > 100 - """.stripMargin), + """.stripMargin, + description = "", + collectResults = false), - Query("q1C", - """ + Query( + name = "q1C", + sqlText = + """ |SELECT | pageURL, | pageRank |FROM rankings |WHERE | pageRank > 10 - """.stripMargin), + """.stripMargin, + description = "", + collectResults = false), - Query("q2A", - """ + Query( + name = "q2A", + sqlText = + """ |SELECT | SUBSTR(sourceIP, 1, 8), | SUM(adRevenue) |FROM uservisits |GROUP BY | SUBSTR(sourceIP, 1, 8) - """.stripMargin), + """.stripMargin, + description = "", + collectResults = false), - Query("q2B", - """ + Query( + name = "q2B", + sqlText = + """ |SELECT | SUBSTR(sourceIP, 1, 10), | SUM(adRevenue) |FROM uservisits |GROUP BY | SUBSTR(sourceIP, 1, 10) - """.stripMargin), + """.stripMargin, + description = "", + collectResults = false), - Query("q2C", - """ + Query( + name = "q2C", + sqlText = + """ |SELECT | SUBSTR(sourceIP, 1, 12), | SUM(adRevenue) |FROM uservisits |GROUP BY | SUBSTR(sourceIP, 1, 12) - """.stripMargin), + """.stripMargin, + description = "", + collectResults = false), - Query("q3A", - """ + Query( + name = "q3A", + sqlText = + """ |SELECT sourceIP, totalRevenue, avgPageRank |FROM | (SELECT sourceIP, @@ -93,10 +119,14 @@ object Queries { | AND UV.visitDate < "1980-04-01" | GROUP BY UV.sourceIP) tmp |ORDER BY totalRevenue DESC LIMIT 1 - """.stripMargin), + """.stripMargin, + description = "", + collectResults = false), - Query("q3B", - """ + Query( + name = "q3B", + sqlText = + """ |SELECT sourceIP, totalRevenue, avgPageRank |FROM | (SELECT sourceIP, @@ -108,10 +138,13 @@ object Queries { | AND UV.visitDate < "1983-01-01" | GROUP BY UV.sourceIP) tmp |ORDER BY totalRevenue DESC LIMIT 1 - """.stripMargin), + """.stripMargin, + description = "", + collectResults = false), - Query("q3C", - """ + Query( + name = "q3C", + sqlText = """ |SELECT sourceIP, totalRevenue, avgPageRank |FROM | (SELECT sourceIP, @@ -123,6 +156,8 @@ object Queries { | AND UV.visitDate < "2010-01-01" | GROUP BY UV.sourceIP) tmp |ORDER BY totalRevenue DESC LIMIT 1 - """.stripMargin) + """.stripMargin, + description = "", + collectResults = false) ) } diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala index 3a062ed..8301f59 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala @@ -45,7 +45,9 @@ case class BigDataTableForTest( @transient val sparkContext = sqlContext.sparkContext - override def generate(): Unit = ??? + override def generate(): Unit = + throw new UnsupportedOperationException( + "Generate data for BigDataBenchmark has not been implemented") } case class Tables(sqlContext: SQLContext) { diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index beeeed8..d83fbb8 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -19,11 +19,11 @@ package com.databricks.spark.sql.perf import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -case class Query(name: String, sqlText: String) +case class Query(name: String, sqlText: String, description: String, collectResults: Boolean) case class QueryForTest( query: Query, - collectResults: Boolean, + includeBreakdown: Boolean, @transient sqlContext: SQLContext) { @transient val sparkContext = sqlContext.sparkContext @@ -54,21 +54,40 @@ case class QueryForTest( sparkContext.setJobDescription(s"Query: ${query.name}, $description") val queryExecution = dataFrame.queryExecution // We are not counting the time of ScalaReflection.convertRowToScala. - val execution = if (collectResults) { - benchmarkMs { queryExecution.toRdd.map(_.copy()).collect() } + val parsingTime = benchmarkMs { queryExecution.logical } + val analysisTime = benchmarkMs { queryExecution.analyzed } + val optimizationTime = benchmarkMs { queryExecution.optimizedPlan } + val planningTime = benchmarkMs { queryExecution.executedPlan } + + val breakdownResults = if (includeBreakdown) { + val depth = queryExecution.executedPlan.treeString.split("\n").size + val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i))) + physicalOperators.map { + case (index, node) => + val executionTime = benchmarkMs { node.execute().map(_.copy()).foreach(row => Unit) } + BreakdownResult(node.nodeName, node.simpleString, index, executionTime) + } } else { - benchmarkMs { queryExecution.toRdd.map(_.copy()).foreach {row => Unit } } + Seq.empty[BreakdownResult] + } + + // The executionTime for the entire query includes the time of type conversion from catalyst to scala. + val executionTime = if (query.collectResults) { + benchmarkMs { dataFrame.rdd.collect() } + } else { + benchmarkMs { dataFrame.rdd.foreach {row => Unit } } } BenchmarkResult( name = query.name, joinTypes = joinTypes, tables = tablesInvolved, - parsingTime = benchmarkMs { queryExecution.logical }, - analysisTime = benchmarkMs { queryExecution.analyzed }, - optimizationTime = benchmarkMs { queryExecution.optimizedPlan }, - planningTime = benchmarkMs { queryExecution.executedPlan }, - executionTime = execution) + parsingTime = parsingTime, + analysisTime = analysisTime, + optimizationTime = optimizationTime, + planningTime = planningTime, + executionTime = executionTime, + breakdownResults) } catch { case e: Exception => throw new RuntimeException( diff --git a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala index 01a82b7..41d09c9 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala @@ -23,15 +23,49 @@ import org.apache.spark.sql.SQLContext import scala.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global +/** + * The configuration used for an iteration of an experiment. + * @param sparkVersion The version of Spark. + * @param scaleFactor The scale factor of the dataset. + * @param sqlConf All configuration properties related to Spark SQL. + * @param sparkConf All configuration properties of Spark. + * @param defaultParallelism The default parallelism of the cluster. + * Usually, it is the number of cores of the cluster. + */ case class BenchmarkConfiguration( sparkVersion: String, scaleFactor: String, - useDecimal: Boolean, sqlConf: Map[String, String], sparkConf: Map[String,String], - cores: Int, - collectResults: Boolean) + defaultParallelism: Int) +/** + * The execution time of a subtree of the query plan tree of a specific query. + * @param nodeName The name of the top physical operator of the subtree. + * @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree. + * @param index The index of the top physical operator of the subtree + * in the original query plan tree. The index starts from 0 + * (0 represents the top physical operator of the original query plan tree). + * @param executionTime The execution time of the subtree. + */ +case class BreakdownResult( + nodeName: String, + nodeNameWithArgs: String, + index: Int, + executionTime: Double) + +/** + * The result of a query. + * @param name The name of the query. + * @param joinTypes The type of join operations in the query. + * @param tables The tables involved in the query. + * @param parsingTime The time used to parse the query. + * @param analysisTime The time used to analyze the query. + * @param optimizationTime The time used to optimize the query. + * @param planningTime The time used to plan the query. + * @param executionTime The time used to execute the query. + * @param breakDown The breakdown results of the query plan tree. + */ case class BenchmarkResult( name: String, joinTypes: Seq[String], @@ -40,30 +74,65 @@ case class BenchmarkResult( analysisTime: Double, optimizationTime: Double, planningTime: Double, - executionTime: Double) + executionTime: Double, + breakDown: Seq[BreakdownResult]) +/** + * A Variation represents a setting (e.g. the number of shuffle partitions and if tables + * are cached in memory) that we want to change in a experiment run. + * A Variation has three parts, `name`, `options`, and `setup`. + * The `name` is the identifier of a Variation. `options` is a Seq of options that + * will be used for a query. Basically, a query will be executed with every option + * defined in the list of `options`. `setup` defines the needed action for every + * option. For example, the following Variation is used to change the number of shuffle + * partitions of a query. The name of the Variation is "shufflePartitions". There are + * two options, 200 and 2000. The setup is used to set the value of property + * "spark.sql.shuffle.partitions". + * + * {{{ + * Variation("shufflePartitions", Seq("200", "2000")) { + * case num => sqlContext.setConf("spark.sql.shuffle.partitions", num) + * } + * }}} + */ case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit) +/** + * The performance results of all given queries for a single iteration. + * @param timestamp The timestamp indicates when the entire experiment is started. + * @param datasetName The name of dataset. + * @param iteration The index number of the current iteration. + * @param tags Tags of this iteration (variations are stored at here). + * @param configuration Configuration properties of this iteration. + * @param results The performance results of queries for this iteration. + */ case class ExperimentRun( timestamp: Long, - experiment: String, + datasetName: String, iteration: Int, tags: Map[String, String], configuration: BenchmarkConfiguration, results: Seq[BenchmarkResult]) -case class Benchmark(tables: Seq[Table]) - -abstract class Experiment( +/** + * The dataset of a benchmark. + * @param sqlContext An existing SQLContext. + * @param sparkVersion The version of Spark. + * @param dataLocation The location of the dataset used by this experiment. + * @param tables Tables that will be used in this experiment. + * @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H + * and TPC-DS, the scale factor is a number roughly representing the + * size of raw data files. For some other benchmarks, the scale factor + * is a short string describing the scale of the dataset. + */ +abstract class Dataset( @transient sqlContext: SQLContext, sparkVersion: String, dataLocation: String, - resultsLocation: String, tables: Seq[Table], - scaleFactor: String, - collectResults: Boolean) extends Serializable { + scaleFactor: String) extends Serializable { - val experiment: String + val datasetName: String @transient val sparkContext = sqlContext.sparkContext @@ -93,7 +162,11 @@ abstract class Experiment( def allStats = tablesForTest.map(_.stats).reduceLeft(_.unionAll(_)) - def setupExperiment(): Unit = { + /** + * Does necessary setup work such as data generation and transformation. It needs to be + * called before running any query. + */ + def setup(): Unit = { checkData() tablesForTest.foreach(_.createTempTable()) } @@ -101,19 +174,32 @@ abstract class Experiment( def currentConfiguration = BenchmarkConfiguration( sparkVersion = sparkVersion, scaleFactor = scaleFactor, - useDecimal = true, sqlConf = sqlContext.getAllConfs, sparkConf = sparkContext.getConf.getAll.toMap, - cores = sparkContext.defaultMinPartitions, - collectResults = collectResults) + defaultParallelism = sparkContext.defaultParallelism) + /** + * Starts an experiment run with a given set of queries. + * @param queries Queries to be executed. + * @param resultsLocation The location of performance results. + * @param includeBreakdown If it is true, breakdown results of a query will be recorded. + * Setting it to true may significantly increase the time used to + * execute a query. + * @param iterations The number of iterations. + * @param variations [[Variation]]s used in this run. + * @param tags Tags of this run. + * @return It returns a ExperimentStatus object that can be used to + * track the progress of this experiment run. + */ def runExperiment( queries: Seq[Query], + resultsLocation: String, + includeBreakdown: Boolean = false, iterations: Int = 3, variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("")) { _ => {} }), tags: Map[String, String] = Map.empty) = { - val queriesToRun = queries.map(query => QueryForTest(query, collectResults, sqlContext)) + val queriesToRun = queries.map(query => QueryForTest(query, includeBreakdown, sqlContext)) class ExperimentStatus { val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]() @@ -141,7 +227,7 @@ abstract class Experiment( val result = ExperimentRun( timestamp = timestamp, - experiment = experiment, + datasetName = datasetName, iteration = i, tags = currentOptions.toMap ++ tags, configuration = currentConfiguration, diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala index b6d7172..eac4328 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala @@ -21,27 +21,36 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.parquet.TPCDSTableForTest import org.apache.spark.sql.{Column, SQLContext} +/** + * TPC-DS benchmark's dataset. + * @param sqlContext An existing SQLContext. + * @param sparkVersion The version of Spark. + * @param dataLocation The location of the dataset used by this experiment. + * @param dsdgenDir The location of dsdgen in every worker machine. + * @param resultsLocation The location of performance results. + * @param tables Tables that will be used in this experiment. + * @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H + * and TPC-DS, the scale factor is a number roughly representing the + * size of raw data files. For some other benchmarks, the scale factor + * is a short string describing the scale of the dataset. + */ class TPCDS ( @transient sqlContext: SQLContext, sparkVersion: String, dataLocation: String, dsdgenDir: String, - resultsLocation: String, tables: Seq[Table], - scaleFactor: String, - collectResults: Boolean) - extends Experiment( + scaleFactor: String) + extends Dataset( sqlContext, sparkVersion, dataLocation, - resultsLocation, tables, - scaleFactor, - collectResults) with Serializable { + scaleFactor) with Serializable { import sqlContext._ import sqlContext.implicits._ - override val experiment = "tpcds" + override val datasetName = "tpcds" def baseDir = s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true" @@ -50,8 +59,8 @@ class TPCDS ( TPCDSTableForTest(table, baseDir, scaleFactor.toInt, dsdgenDir, sqlContext)) } - override def setupExperiment(): Unit = { - super.setupExperiment() + override def setup(): Unit = { + super.setup() setupBroadcast() } diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala similarity index 91% rename from src/main/scala/com/databricks/spark/sql/perf/tpcds/Queries.scala rename to src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala index e2a2e3a..c530232 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala @@ -14,132 +14,11 @@ * limitations under the License. */ -package com.databricks.spark.sql.perf.tpcds +package com.databricks.spark.sql.perf.tpcds.queries import com.databricks.spark.sql.perf.Query -object Queries { - val q7Derived = Seq( - ("q7-simpleScan", - """ - |select - | ss_quantity, - | ss_list_price, - | ss_coupon_amt, - | ss_coupon_amt, - | ss_cdemo_sk, - | ss_item_sk, - | ss_promo_sk, - | ss_sold_date_sk - |from store_sales - |where - | ss_sold_date_sk between 2450815 and 2451179 - """.stripMargin), - - ("q7-twoMapJoins", """ - |select - | i_item_id, - | ss_quantity, - | ss_list_price, - | ss_coupon_amt, - | ss_sales_price, - | ss_promo_sk, - | ss_sold_date_sk - |from - | store_sales - | join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk) - | join item on (store_sales.ss_item_sk = item.i_item_sk) - |where - | cd_gender = 'F' - | and cd_marital_status = 'W' - | and cd_education_status = 'Primary' - | and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter - """.stripMargin), - - ("q7-fourMapJoins", """ - |select - | i_item_id, - | ss_quantity, - | ss_list_price, - | ss_coupon_amt, - | ss_sales_price - |from - | store_sales - | join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk) - | join item on (store_sales.ss_item_sk = item.i_item_sk) - | join promotion on (store_sales.ss_promo_sk = promotion.p_promo_sk) - | join date_dim on (ss_sold_date_sk = d_date_sk) - |where - | cd_gender = 'F' - | and cd_marital_status = 'W' - | and cd_education_status = 'Primary' - | and (p_channel_email = 'N' - | or p_channel_event = 'N') - | and d_year = 1998 - | -- and ss_date between '1998-01-01' and '1998-12-31' - | and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter - """.stripMargin), - - ("q7-noOrderBy", """ - |select - | i_item_id, - | avg(ss_quantity) agg1, - | avg(ss_list_price) agg2, - | avg(ss_coupon_amt) agg3, - | avg(ss_sales_price) agg4 - |from - | store_sales - | join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk) - | join item on (store_sales.ss_item_sk = item.i_item_sk) - | join promotion on (store_sales.ss_promo_sk = promotion.p_promo_sk) - | join date_dim on (ss_sold_date_sk = d_date_sk) - |where - | cd_gender = 'F' - | and cd_marital_status = 'W' - | and cd_education_status = 'Primary' - | and (p_channel_email = 'N' - | or p_channel_event = 'N') - | and d_year = 1998 - | -- and ss_date between '1998-01-01' and '1998-12-31' - | and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter - |group by - | i_item_id - """.stripMargin), - - ("q7", """ - |-- start query 1 in stream 0 using template query7.tpl - |select - | i_item_id, - | avg(ss_quantity) agg1, - | avg(ss_list_price) agg2, - | avg(ss_coupon_amt) agg3, - | avg(ss_sales_price) agg4 - |from - | store_sales - | join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk) - | join item on (store_sales.ss_item_sk = item.i_item_sk) - | join promotion on (store_sales.ss_promo_sk = promotion.p_promo_sk) - | join date_dim on (ss_sold_date_sk = d_date_sk) - |where - | cd_gender = 'F' - | and cd_marital_status = 'W' - | and cd_education_status = 'Primary' - | and (p_channel_email = 'N' - | or p_channel_event = 'N') - | and d_year = 1998 - | -- and ss_date between '1998-01-01' and '1998-12-31' - | and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter - |group by - | i_item_id - |order by - | i_item_id - |limit 100 - |-- end query 1 in stream 0 using template query7.tpl - """.stripMargin) - ).map { - case (name, sqlText) => Query(name, sqlText) - } - +object ImpalaKitQueries { // Queries are from // https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries val queries = Seq( @@ -1145,7 +1024,7 @@ object Queries { |from store_sales """.stripMargin) ).map { - case (name, sqlText) => Query(name, sqlText) + case (name, sqlText) => Query(name, sqlText, description = "", collectResults = true) } val queriesMap = queries.map(q => q.name -> q).toMap @@ -1215,31 +1094,6 @@ object Queries { ,i_manufact limit 100"""), - /* WHERE IS ss_sold_date? - ("q27partitioned", """ - select i_item_id, - s_state, - avg(ss_quantity) agg1, - avg(ss_list_price) agg2, - avg(ss_coupon_amt) agg3, - avg(ss_sales_price) agg4 - from store_sales - JOIN customer_demographics ON store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk - JOIN date_dim ON store_sales.ss_sold_date_sk = date_dim.d_date_sk - JOIN store ON store_sales.ss_store_sk = store.s_store_sk - JOIN item ON store_sales.ss_item_sk = item.i_item_sk - where - cd_gender = 'F' and - cd_marital_status = 'W' and - cd_education_status = 'Primary' and - d_year = 1998 and - s_state = 'TN' and - ss_sold_date between '1998-01-01' and '1998-12-31' - group by i_item_id, s_state - order by i_item_id - ,s_state - limit 100"""), */ - ("q27", """ select i_item_id, s_state, @@ -1609,12 +1463,12 @@ object Queries { |from store_sales """.stripMargin) ).map { - case (name, sqlText) => Query(name, sqlText) + case (name, sqlText) => Query(name, sqlText, description = "original query", collectResults = true) } val interactiveQueries = Seq("q19", "q42", "q52", "q55", "q63", "q68", "q73", "q98").map(queriesMap) val reportingQueries = Seq("q3","q7","q27","q43", "q53", "q89").map(queriesMap) val deepAnalyticQueries = Seq("q34", "q46", "q59", "q65", "q79", "ss_max").map(queriesMap) - val allClouderaQueries = interactiveQueries ++ reportingQueries ++ deepAnalyticQueries + val impalaKitQueries = interactiveQueries ++ reportingQueries ++ deepAnalyticQueries } diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala new file mode 100644 index 0000000..fd4f4b6 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala @@ -0,0 +1,142 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf.tpcds.queries + +import com.databricks.spark.sql.perf.Query + +object SimpleQueries { + val q7Derived = Seq( + ("q7-simpleScan", + """ + |select + | ss_quantity, + | ss_list_price, + | ss_coupon_amt, + | ss_coupon_amt, + | ss_cdemo_sk, + | ss_item_sk, + | ss_promo_sk, + | ss_sold_date_sk + |from store_sales + |where + | ss_sold_date_sk between 2450815 and 2451179 + """.stripMargin), + + ("q7-twoMapJoins", """ + |select + | i_item_id, + | ss_quantity, + | ss_list_price, + | ss_coupon_amt, + | ss_sales_price, + | ss_promo_sk, + | ss_sold_date_sk + |from + | store_sales + | join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk) + | join item on (store_sales.ss_item_sk = item.i_item_sk) + |where + | cd_gender = 'F' + | and cd_marital_status = 'W' + | and cd_education_status = 'Primary' + | and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter + """.stripMargin), + + ("q7-fourMapJoins", """ + |select + | i_item_id, + | ss_quantity, + | ss_list_price, + | ss_coupon_amt, + | ss_sales_price + |from + | store_sales + | join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk) + | join item on (store_sales.ss_item_sk = item.i_item_sk) + | join promotion on (store_sales.ss_promo_sk = promotion.p_promo_sk) + | join date_dim on (ss_sold_date_sk = d_date_sk) + |where + | cd_gender = 'F' + | and cd_marital_status = 'W' + | and cd_education_status = 'Primary' + | and (p_channel_email = 'N' + | or p_channel_event = 'N') + | and d_year = 1998 + | -- and ss_date between '1998-01-01' and '1998-12-31' + | and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter + """.stripMargin), + + ("q7-noOrderBy", """ + |select + | i_item_id, + | avg(ss_quantity) agg1, + | avg(ss_list_price) agg2, + | avg(ss_coupon_amt) agg3, + | avg(ss_sales_price) agg4 + |from + | store_sales + | join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk) + | join item on (store_sales.ss_item_sk = item.i_item_sk) + | join promotion on (store_sales.ss_promo_sk = promotion.p_promo_sk) + | join date_dim on (ss_sold_date_sk = d_date_sk) + |where + | cd_gender = 'F' + | and cd_marital_status = 'W' + | and cd_education_status = 'Primary' + | and (p_channel_email = 'N' + | or p_channel_event = 'N') + | and d_year = 1998 + | -- and ss_date between '1998-01-01' and '1998-12-31' + | and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter + |group by + | i_item_id + """.stripMargin), + + ("q7", """ + |-- start query 1 in stream 0 using template query7.tpl + |select + | i_item_id, + | avg(ss_quantity) agg1, + | avg(ss_list_price) agg2, + | avg(ss_coupon_amt) agg3, + | avg(ss_sales_price) agg4 + |from + | store_sales + | join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk) + | join item on (store_sales.ss_item_sk = item.i_item_sk) + | join promotion on (store_sales.ss_promo_sk = promotion.p_promo_sk) + | join date_dim on (ss_sold_date_sk = d_date_sk) + |where + | cd_gender = 'F' + | and cd_marital_status = 'W' + | and cd_education_status = 'Primary' + | and (p_channel_email = 'N' + | or p_channel_event = 'N') + | and d_year = 1998 + | -- and ss_date between '1998-01-01' and '1998-12-31' + | and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter + |group by + | i_item_id + |order by + | i_item_id + |limit 100 + |-- end query 1 in stream 0 using template query7.tpl + """.stripMargin) + ).map { + case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", collectResults = false) + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/package.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/package.scala new file mode 100644 index 0000000..4b154ac --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/package.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf.tpcds + +package object queries { + val impalaKitQueries = ImpalaKitQueries.impalaKitQueries + val q7DerivedQueries = SimpleQueries.q7Derived +}