Refactoring and doc.

This commit is contained in:
Yin Huai 2015-04-16 18:10:57 -07:00
parent 930751810e
commit 6c5657b609
10 changed files with 435 additions and 249 deletions

View File

@ -1,42 +1,63 @@
## Spark SQL Performance Tests (WIP)
# Spark SQL Performance Tests
This is a performance testing framework for [Spark SQL](https://spark.apache.org/sql/) in [Apache Spark](https://spark.apache.org/) 1.3+.
**Note: This README is still under development. Please also check our source code for more information.**
## How to use it
The rest of document will use TPC-DS benchmark as an example. We will add contents to explain how to use other benchmarks add the support of a new benchmark dataset in future.
### Setup a dataset
Before running any query, a dataset needs to be setup by creating a `Dataset` object. Every benchmark support in Spark SQL Perf needs to implement its own `Dataset` class. A `Dataset` object takes a few parameters that will be used to setup the needed tables and its `setup` function is used to setup needed tables. For TPC-DS benchmark, the class is `TPCDS` in the package of `com.databricks.spark.sql.perf.tpcds`. For example, to setup a TPC-DS dataset, you can
### TPC-DS
```
import com.databricks.spark.sql.perf.tpcds.TPCDS
import org.apache.spark.sql.parquet.Tables
// Tables used for TPC-DS.
// Tables in TPC-DS benchmark used by experiments.
val tables = Tables(sqlContext)
// Setup TPC-DS experiment
val tpcds =
new TPCDS (
new TPCDS (
sqlContext = sqlContext,
sparkVersion = "1.3.1",
dataLocation = <the location of data>,
dsdgenDir = <the location of dsdgen in every worker>,
resultsLocation = <the location of performance results>,
tables = tables.tables,
scaleFactor = "2",
collectResults = true)
tpcds.setupExperiment()
scaleFactor = <scale factor>,
includeBreakdown = false)
```
// Take a look at the size of every table.
tpcds.allStats.show
After a `TPCDS` object is created, tables of it can be setup by calling
// Get all of the queries.
import com.databricks.spark.sql.perf.tpcds.Queries
// Just pick a single query as an example.
val oneQuery = Seq(Queries.q7Derived.head)
// Start the experiment.
val runningExp = tpcds.runExperiment(queries = oneQuery, iterations = 1)
```
tpcds.setup()
```
The `setup` function will first check if needed tables are stored at the location specified by `dataLocation`. If not, it will creates tables at there by using the data generator tool `dsdgen` provided by TPC-DS benchmark (This tool needs to be pre-installed at the location specified by `dsdgenDir` in every worker).
### Run benchmarking queries
After setup, users can use `runExperiment` function to run benchmarking queries and record query execution time. Taking TPC-DS as an example, you can start an experiment by using
```
tpcds.runExperiment(
queries = <a Seq of Queries>,
resultsLocation = <the root location of performance results>,
includeBreakdown = <if measure the performance of every physical operators>,
iterations = <the number of iterations>,
variations = <variations used in the experiment>,
tags = <tags of this experiment>)
```
For every experiment run (i.e.\ every call of `runExperiment`), Spark SQL Perf will use the timestamp of the start time to identify this experiment. Performance results will be stored in the sub-dir named by the timestamp in the given `resultsLocation` (for example `results/1429213883272`). The performance results are stored in the JSON format.
### Retrieve results
The follow code can be used to retrieve results ...
```
// Get experiments results.
import com.databricks.spark.sql.perf.Results
val results = Results(resultsLocation = <the location of performance results>, sqlContext = sqlContext)
// This is all results.
val results = Results(resultsLocation = <the root location of performance results>, sqlContext = sqlContext)
// Get the DataFrame representing all results stored in the dir specified by resultsLocation.
val allResults = results.allResults
allResults.registerTempTable("results")
// This is the result for a single experiment started at the timestamp represented by 1429132621024 (2015-04-15 14:17:01.024).
// Use DataFrame API to get results of a single run.
allResults.filter("timestamp = 1429132621024")
```
```

View File

@ -25,22 +25,18 @@ class BigData (
@transient sqlContext: SQLContext,
sparkVersion: String,
dataLocation: String,
resultsLocation: String,
tables: Seq[Table],
scaleFactor: String,
collectResults: Boolean)
extends Experiment(
scaleFactor: String)
extends Dataset(
sqlContext,
sparkVersion,
dataLocation,
resultsLocation,
tables,
scaleFactor,
collectResults) with Serializable {
scaleFactor) with Serializable {
import sqlContext._
import sqlContext.implicits._
override val experiment = "bigDataBenchmark"
override val datasetName = "bigDataBenchmark"
override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = {
tables.map(table =>

View File

@ -20,68 +20,94 @@ import com.databricks.spark.sql.perf.Query
object Queries {
val queries1to3 = Seq(
Query("q1A",
"""
Query(
name = "q1A",
sqlText =
"""
|SELECT
| pageURL,
| pageRank
|FROM rankings
|WHERE
| pageRank > 1000
""".stripMargin),
""".stripMargin,
description = "",
collectResults = false),
Query("q1B",
"""
Query(
name = "q1B",
sqlText =
"""
|SELECT
| pageURL,
| pageRank
|FROM rankings
|WHERE
| pageRank > 100
""".stripMargin),
""".stripMargin,
description = "",
collectResults = false),
Query("q1C",
"""
Query(
name = "q1C",
sqlText =
"""
|SELECT
| pageURL,
| pageRank
|FROM rankings
|WHERE
| pageRank > 10
""".stripMargin),
""".stripMargin,
description = "",
collectResults = false),
Query("q2A",
"""
Query(
name = "q2A",
sqlText =
"""
|SELECT
| SUBSTR(sourceIP, 1, 8),
| SUM(adRevenue)
|FROM uservisits
|GROUP BY
| SUBSTR(sourceIP, 1, 8)
""".stripMargin),
""".stripMargin,
description = "",
collectResults = false),
Query("q2B",
"""
Query(
name = "q2B",
sqlText =
"""
|SELECT
| SUBSTR(sourceIP, 1, 10),
| SUM(adRevenue)
|FROM uservisits
|GROUP BY
| SUBSTR(sourceIP, 1, 10)
""".stripMargin),
""".stripMargin,
description = "",
collectResults = false),
Query("q2C",
"""
Query(
name = "q2C",
sqlText =
"""
|SELECT
| SUBSTR(sourceIP, 1, 12),
| SUM(adRevenue)
|FROM uservisits
|GROUP BY
| SUBSTR(sourceIP, 1, 12)
""".stripMargin),
""".stripMargin,
description = "",
collectResults = false),
Query("q3A",
"""
Query(
name = "q3A",
sqlText =
"""
|SELECT sourceIP, totalRevenue, avgPageRank
|FROM
| (SELECT sourceIP,
@ -93,10 +119,14 @@ object Queries {
| AND UV.visitDate < "1980-04-01"
| GROUP BY UV.sourceIP) tmp
|ORDER BY totalRevenue DESC LIMIT 1
""".stripMargin),
""".stripMargin,
description = "",
collectResults = false),
Query("q3B",
"""
Query(
name = "q3B",
sqlText =
"""
|SELECT sourceIP, totalRevenue, avgPageRank
|FROM
| (SELECT sourceIP,
@ -108,10 +138,13 @@ object Queries {
| AND UV.visitDate < "1983-01-01"
| GROUP BY UV.sourceIP) tmp
|ORDER BY totalRevenue DESC LIMIT 1
""".stripMargin),
""".stripMargin,
description = "",
collectResults = false),
Query("q3C",
"""
Query(
name = "q3C",
sqlText = """
|SELECT sourceIP, totalRevenue, avgPageRank
|FROM
| (SELECT sourceIP,
@ -123,6 +156,8 @@ object Queries {
| AND UV.visitDate < "2010-01-01"
| GROUP BY UV.sourceIP) tmp
|ORDER BY totalRevenue DESC LIMIT 1
""".stripMargin)
""".stripMargin,
description = "",
collectResults = false)
)
}

View File

@ -45,7 +45,9 @@ case class BigDataTableForTest(
@transient val sparkContext = sqlContext.sparkContext
override def generate(): Unit = ???
override def generate(): Unit =
throw new UnsupportedOperationException(
"Generate data for BigDataBenchmark has not been implemented")
}
case class Tables(sqlContext: SQLContext) {

View File

@ -19,11 +19,11 @@ package com.databricks.spark.sql.perf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
case class Query(name: String, sqlText: String)
case class Query(name: String, sqlText: String, description: String, collectResults: Boolean)
case class QueryForTest(
query: Query,
collectResults: Boolean,
includeBreakdown: Boolean,
@transient sqlContext: SQLContext) {
@transient val sparkContext = sqlContext.sparkContext
@ -54,21 +54,40 @@ case class QueryForTest(
sparkContext.setJobDescription(s"Query: ${query.name}, $description")
val queryExecution = dataFrame.queryExecution
// We are not counting the time of ScalaReflection.convertRowToScala.
val execution = if (collectResults) {
benchmarkMs { queryExecution.toRdd.map(_.copy()).collect() }
val parsingTime = benchmarkMs { queryExecution.logical }
val analysisTime = benchmarkMs { queryExecution.analyzed }
val optimizationTime = benchmarkMs { queryExecution.optimizedPlan }
val planningTime = benchmarkMs { queryExecution.executedPlan }
val breakdownResults = if (includeBreakdown) {
val depth = queryExecution.executedPlan.treeString.split("\n").size
val physicalOperators = (0 until depth).map(i => (i, queryExecution.executedPlan(i)))
physicalOperators.map {
case (index, node) =>
val executionTime = benchmarkMs { node.execute().map(_.copy()).foreach(row => Unit) }
BreakdownResult(node.nodeName, node.simpleString, index, executionTime)
}
} else {
benchmarkMs { queryExecution.toRdd.map(_.copy()).foreach {row => Unit } }
Seq.empty[BreakdownResult]
}
// The executionTime for the entire query includes the time of type conversion from catalyst to scala.
val executionTime = if (query.collectResults) {
benchmarkMs { dataFrame.rdd.collect() }
} else {
benchmarkMs { dataFrame.rdd.foreach {row => Unit } }
}
BenchmarkResult(
name = query.name,
joinTypes = joinTypes,
tables = tablesInvolved,
parsingTime = benchmarkMs { queryExecution.logical },
analysisTime = benchmarkMs { queryExecution.analyzed },
optimizationTime = benchmarkMs { queryExecution.optimizedPlan },
planningTime = benchmarkMs { queryExecution.executedPlan },
executionTime = execution)
parsingTime = parsingTime,
analysisTime = analysisTime,
optimizationTime = optimizationTime,
planningTime = planningTime,
executionTime = executionTime,
breakdownResults)
} catch {
case e: Exception =>
throw new RuntimeException(

View File

@ -23,15 +23,49 @@ import org.apache.spark.sql.SQLContext
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
/**
* The configuration used for an iteration of an experiment.
* @param sparkVersion The version of Spark.
* @param scaleFactor The scale factor of the dataset.
* @param sqlConf All configuration properties related to Spark SQL.
* @param sparkConf All configuration properties of Spark.
* @param defaultParallelism The default parallelism of the cluster.
* Usually, it is the number of cores of the cluster.
*/
case class BenchmarkConfiguration(
sparkVersion: String,
scaleFactor: String,
useDecimal: Boolean,
sqlConf: Map[String, String],
sparkConf: Map[String,String],
cores: Int,
collectResults: Boolean)
defaultParallelism: Int)
/**
* The execution time of a subtree of the query plan tree of a specific query.
* @param nodeName The name of the top physical operator of the subtree.
* @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree.
* @param index The index of the top physical operator of the subtree
* in the original query plan tree. The index starts from 0
* (0 represents the top physical operator of the original query plan tree).
* @param executionTime The execution time of the subtree.
*/
case class BreakdownResult(
nodeName: String,
nodeNameWithArgs: String,
index: Int,
executionTime: Double)
/**
* The result of a query.
* @param name The name of the query.
* @param joinTypes The type of join operations in the query.
* @param tables The tables involved in the query.
* @param parsingTime The time used to parse the query.
* @param analysisTime The time used to analyze the query.
* @param optimizationTime The time used to optimize the query.
* @param planningTime The time used to plan the query.
* @param executionTime The time used to execute the query.
* @param breakDown The breakdown results of the query plan tree.
*/
case class BenchmarkResult(
name: String,
joinTypes: Seq[String],
@ -40,30 +74,65 @@ case class BenchmarkResult(
analysisTime: Double,
optimizationTime: Double,
planningTime: Double,
executionTime: Double)
executionTime: Double,
breakDown: Seq[BreakdownResult])
/**
* A Variation represents a setting (e.g. the number of shuffle partitions and if tables
* are cached in memory) that we want to change in a experiment run.
* A Variation has three parts, `name`, `options`, and `setup`.
* The `name` is the identifier of a Variation. `options` is a Seq of options that
* will be used for a query. Basically, a query will be executed with every option
* defined in the list of `options`. `setup` defines the needed action for every
* option. For example, the following Variation is used to change the number of shuffle
* partitions of a query. The name of the Variation is "shufflePartitions". There are
* two options, 200 and 2000. The setup is used to set the value of property
* "spark.sql.shuffle.partitions".
*
* {{{
* Variation("shufflePartitions", Seq("200", "2000")) {
* case num => sqlContext.setConf("spark.sql.shuffle.partitions", num)
* }
* }}}
*/
case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit)
/**
* The performance results of all given queries for a single iteration.
* @param timestamp The timestamp indicates when the entire experiment is started.
* @param datasetName The name of dataset.
* @param iteration The index number of the current iteration.
* @param tags Tags of this iteration (variations are stored at here).
* @param configuration Configuration properties of this iteration.
* @param results The performance results of queries for this iteration.
*/
case class ExperimentRun(
timestamp: Long,
experiment: String,
datasetName: String,
iteration: Int,
tags: Map[String, String],
configuration: BenchmarkConfiguration,
results: Seq[BenchmarkResult])
case class Benchmark(tables: Seq[Table])
abstract class Experiment(
/**
* The dataset of a benchmark.
* @param sqlContext An existing SQLContext.
* @param sparkVersion The version of Spark.
* @param dataLocation The location of the dataset used by this experiment.
* @param tables Tables that will be used in this experiment.
* @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H
* and TPC-DS, the scale factor is a number roughly representing the
* size of raw data files. For some other benchmarks, the scale factor
* is a short string describing the scale of the dataset.
*/
abstract class Dataset(
@transient sqlContext: SQLContext,
sparkVersion: String,
dataLocation: String,
resultsLocation: String,
tables: Seq[Table],
scaleFactor: String,
collectResults: Boolean) extends Serializable {
scaleFactor: String) extends Serializable {
val experiment: String
val datasetName: String
@transient val sparkContext = sqlContext.sparkContext
@ -93,7 +162,11 @@ abstract class Experiment(
def allStats = tablesForTest.map(_.stats).reduceLeft(_.unionAll(_))
def setupExperiment(): Unit = {
/**
* Does necessary setup work such as data generation and transformation. It needs to be
* called before running any query.
*/
def setup(): Unit = {
checkData()
tablesForTest.foreach(_.createTempTable())
}
@ -101,19 +174,32 @@ abstract class Experiment(
def currentConfiguration = BenchmarkConfiguration(
sparkVersion = sparkVersion,
scaleFactor = scaleFactor,
useDecimal = true,
sqlConf = sqlContext.getAllConfs,
sparkConf = sparkContext.getConf.getAll.toMap,
cores = sparkContext.defaultMinPartitions,
collectResults = collectResults)
defaultParallelism = sparkContext.defaultParallelism)
/**
* Starts an experiment run with a given set of queries.
* @param queries Queries to be executed.
* @param resultsLocation The location of performance results.
* @param includeBreakdown If it is true, breakdown results of a query will be recorded.
* Setting it to true may significantly increase the time used to
* execute a query.
* @param iterations The number of iterations.
* @param variations [[Variation]]s used in this run.
* @param tags Tags of this run.
* @return It returns a ExperimentStatus object that can be used to
* track the progress of this experiment run.
*/
def runExperiment(
queries: Seq[Query],
resultsLocation: String,
includeBreakdown: Boolean = false,
iterations: Int = 3,
variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("")) { _ => {} }),
tags: Map[String, String] = Map.empty) = {
val queriesToRun = queries.map(query => QueryForTest(query, collectResults, sqlContext))
val queriesToRun = queries.map(query => QueryForTest(query, includeBreakdown, sqlContext))
class ExperimentStatus {
val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]()
@ -141,7 +227,7 @@ abstract class Experiment(
val result = ExperimentRun(
timestamp = timestamp,
experiment = experiment,
datasetName = datasetName,
iteration = i,
tags = currentOptions.toMap ++ tags,
configuration = currentConfiguration,

View File

@ -21,27 +21,36 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.parquet.TPCDSTableForTest
import org.apache.spark.sql.{Column, SQLContext}
/**
* TPC-DS benchmark's dataset.
* @param sqlContext An existing SQLContext.
* @param sparkVersion The version of Spark.
* @param dataLocation The location of the dataset used by this experiment.
* @param dsdgenDir The location of dsdgen in every worker machine.
* @param resultsLocation The location of performance results.
* @param tables Tables that will be used in this experiment.
* @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H
* and TPC-DS, the scale factor is a number roughly representing the
* size of raw data files. For some other benchmarks, the scale factor
* is a short string describing the scale of the dataset.
*/
class TPCDS (
@transient sqlContext: SQLContext,
sparkVersion: String,
dataLocation: String,
dsdgenDir: String,
resultsLocation: String,
tables: Seq[Table],
scaleFactor: String,
collectResults: Boolean)
extends Experiment(
scaleFactor: String)
extends Dataset(
sqlContext,
sparkVersion,
dataLocation,
resultsLocation,
tables,
scaleFactor,
collectResults) with Serializable {
scaleFactor) with Serializable {
import sqlContext._
import sqlContext.implicits._
override val experiment = "tpcds"
override val datasetName = "tpcds"
def baseDir = s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true"
@ -50,8 +59,8 @@ class TPCDS (
TPCDSTableForTest(table, baseDir, scaleFactor.toInt, dsdgenDir, sqlContext))
}
override def setupExperiment(): Unit = {
super.setupExperiment()
override def setup(): Unit = {
super.setup()
setupBroadcast()
}

View File

@ -14,132 +14,11 @@
* limitations under the License.
*/
package com.databricks.spark.sql.perf.tpcds
package com.databricks.spark.sql.perf.tpcds.queries
import com.databricks.spark.sql.perf.Query
object Queries {
val q7Derived = Seq(
("q7-simpleScan",
"""
|select
| ss_quantity,
| ss_list_price,
| ss_coupon_amt,
| ss_coupon_amt,
| ss_cdemo_sk,
| ss_item_sk,
| ss_promo_sk,
| ss_sold_date_sk
|from store_sales
|where
| ss_sold_date_sk between 2450815 and 2451179
""".stripMargin),
("q7-twoMapJoins", """
|select
| i_item_id,
| ss_quantity,
| ss_list_price,
| ss_coupon_amt,
| ss_sales_price,
| ss_promo_sk,
| ss_sold_date_sk
|from
| store_sales
| join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk)
| join item on (store_sales.ss_item_sk = item.i_item_sk)
|where
| cd_gender = 'F'
| and cd_marital_status = 'W'
| and cd_education_status = 'Primary'
| and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter
""".stripMargin),
("q7-fourMapJoins", """
|select
| i_item_id,
| ss_quantity,
| ss_list_price,
| ss_coupon_amt,
| ss_sales_price
|from
| store_sales
| join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk)
| join item on (store_sales.ss_item_sk = item.i_item_sk)
| join promotion on (store_sales.ss_promo_sk = promotion.p_promo_sk)
| join date_dim on (ss_sold_date_sk = d_date_sk)
|where
| cd_gender = 'F'
| and cd_marital_status = 'W'
| and cd_education_status = 'Primary'
| and (p_channel_email = 'N'
| or p_channel_event = 'N')
| and d_year = 1998
| -- and ss_date between '1998-01-01' and '1998-12-31'
| and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter
""".stripMargin),
("q7-noOrderBy", """
|select
| i_item_id,
| avg(ss_quantity) agg1,
| avg(ss_list_price) agg2,
| avg(ss_coupon_amt) agg3,
| avg(ss_sales_price) agg4
|from
| store_sales
| join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk)
| join item on (store_sales.ss_item_sk = item.i_item_sk)
| join promotion on (store_sales.ss_promo_sk = promotion.p_promo_sk)
| join date_dim on (ss_sold_date_sk = d_date_sk)
|where
| cd_gender = 'F'
| and cd_marital_status = 'W'
| and cd_education_status = 'Primary'
| and (p_channel_email = 'N'
| or p_channel_event = 'N')
| and d_year = 1998
| -- and ss_date between '1998-01-01' and '1998-12-31'
| and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter
|group by
| i_item_id
""".stripMargin),
("q7", """
|-- start query 1 in stream 0 using template query7.tpl
|select
| i_item_id,
| avg(ss_quantity) agg1,
| avg(ss_list_price) agg2,
| avg(ss_coupon_amt) agg3,
| avg(ss_sales_price) agg4
|from
| store_sales
| join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk)
| join item on (store_sales.ss_item_sk = item.i_item_sk)
| join promotion on (store_sales.ss_promo_sk = promotion.p_promo_sk)
| join date_dim on (ss_sold_date_sk = d_date_sk)
|where
| cd_gender = 'F'
| and cd_marital_status = 'W'
| and cd_education_status = 'Primary'
| and (p_channel_email = 'N'
| or p_channel_event = 'N')
| and d_year = 1998
| -- and ss_date between '1998-01-01' and '1998-12-31'
| and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter
|group by
| i_item_id
|order by
| i_item_id
|limit 100
|-- end query 1 in stream 0 using template query7.tpl
""".stripMargin)
).map {
case (name, sqlText) => Query(name, sqlText)
}
object ImpalaKitQueries {
// Queries are from
// https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries
val queries = Seq(
@ -1145,7 +1024,7 @@ object Queries {
|from store_sales
""".stripMargin)
).map {
case (name, sqlText) => Query(name, sqlText)
case (name, sqlText) => Query(name, sqlText, description = "", collectResults = true)
}
val queriesMap = queries.map(q => q.name -> q).toMap
@ -1215,31 +1094,6 @@ object Queries {
,i_manufact
limit 100"""),
/* WHERE IS ss_sold_date?
("q27partitioned", """
select i_item_id,
s_state,
avg(ss_quantity) agg1,
avg(ss_list_price) agg2,
avg(ss_coupon_amt) agg3,
avg(ss_sales_price) agg4
from store_sales
JOIN customer_demographics ON store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk
JOIN date_dim ON store_sales.ss_sold_date_sk = date_dim.d_date_sk
JOIN store ON store_sales.ss_store_sk = store.s_store_sk
JOIN item ON store_sales.ss_item_sk = item.i_item_sk
where
cd_gender = 'F' and
cd_marital_status = 'W' and
cd_education_status = 'Primary' and
d_year = 1998 and
s_state = 'TN' and
ss_sold_date between '1998-01-01' and '1998-12-31'
group by i_item_id, s_state
order by i_item_id
,s_state
limit 100"""), */
("q27", """
select i_item_id,
s_state,
@ -1609,12 +1463,12 @@ object Queries {
|from store_sales
""".stripMargin)
).map {
case (name, sqlText) => Query(name, sqlText)
case (name, sqlText) => Query(name, sqlText, description = "original query", collectResults = true)
}
val interactiveQueries =
Seq("q19", "q42", "q52", "q55", "q63", "q68", "q73", "q98").map(queriesMap)
val reportingQueries = Seq("q3","q7","q27","q43", "q53", "q89").map(queriesMap)
val deepAnalyticQueries = Seq("q34", "q46", "q59", "q65", "q79", "ss_max").map(queriesMap)
val allClouderaQueries = interactiveQueries ++ reportingQueries ++ deepAnalyticQueries
val impalaKitQueries = interactiveQueries ++ reportingQueries ++ deepAnalyticQueries
}

View File

@ -0,0 +1,142 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.sql.perf.tpcds.queries
import com.databricks.spark.sql.perf.Query
object SimpleQueries {
val q7Derived = Seq(
("q7-simpleScan",
"""
|select
| ss_quantity,
| ss_list_price,
| ss_coupon_amt,
| ss_coupon_amt,
| ss_cdemo_sk,
| ss_item_sk,
| ss_promo_sk,
| ss_sold_date_sk
|from store_sales
|where
| ss_sold_date_sk between 2450815 and 2451179
""".stripMargin),
("q7-twoMapJoins", """
|select
| i_item_id,
| ss_quantity,
| ss_list_price,
| ss_coupon_amt,
| ss_sales_price,
| ss_promo_sk,
| ss_sold_date_sk
|from
| store_sales
| join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk)
| join item on (store_sales.ss_item_sk = item.i_item_sk)
|where
| cd_gender = 'F'
| and cd_marital_status = 'W'
| and cd_education_status = 'Primary'
| and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter
""".stripMargin),
("q7-fourMapJoins", """
|select
| i_item_id,
| ss_quantity,
| ss_list_price,
| ss_coupon_amt,
| ss_sales_price
|from
| store_sales
| join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk)
| join item on (store_sales.ss_item_sk = item.i_item_sk)
| join promotion on (store_sales.ss_promo_sk = promotion.p_promo_sk)
| join date_dim on (ss_sold_date_sk = d_date_sk)
|where
| cd_gender = 'F'
| and cd_marital_status = 'W'
| and cd_education_status = 'Primary'
| and (p_channel_email = 'N'
| or p_channel_event = 'N')
| and d_year = 1998
| -- and ss_date between '1998-01-01' and '1998-12-31'
| and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter
""".stripMargin),
("q7-noOrderBy", """
|select
| i_item_id,
| avg(ss_quantity) agg1,
| avg(ss_list_price) agg2,
| avg(ss_coupon_amt) agg3,
| avg(ss_sales_price) agg4
|from
| store_sales
| join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk)
| join item on (store_sales.ss_item_sk = item.i_item_sk)
| join promotion on (store_sales.ss_promo_sk = promotion.p_promo_sk)
| join date_dim on (ss_sold_date_sk = d_date_sk)
|where
| cd_gender = 'F'
| and cd_marital_status = 'W'
| and cd_education_status = 'Primary'
| and (p_channel_email = 'N'
| or p_channel_event = 'N')
| and d_year = 1998
| -- and ss_date between '1998-01-01' and '1998-12-31'
| and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter
|group by
| i_item_id
""".stripMargin),
("q7", """
|-- start query 1 in stream 0 using template query7.tpl
|select
| i_item_id,
| avg(ss_quantity) agg1,
| avg(ss_list_price) agg2,
| avg(ss_coupon_amt) agg3,
| avg(ss_sales_price) agg4
|from
| store_sales
| join customer_demographics on (store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk)
| join item on (store_sales.ss_item_sk = item.i_item_sk)
| join promotion on (store_sales.ss_promo_sk = promotion.p_promo_sk)
| join date_dim on (ss_sold_date_sk = d_date_sk)
|where
| cd_gender = 'F'
| and cd_marital_status = 'W'
| and cd_education_status = 'Primary'
| and (p_channel_email = 'N'
| or p_channel_event = 'N')
| and d_year = 1998
| -- and ss_date between '1998-01-01' and '1998-12-31'
| and ss_sold_date_sk between 2450815 and 2451179 -- partition key filter
|group by
| i_item_id
|order by
| i_item_id
|limit 100
|-- end query 1 in stream 0 using template query7.tpl
""".stripMargin)
).map {
case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", collectResults = false)
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright 2015 Databricks Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.sql.perf.tpcds
package object queries {
val impalaKitQueries = ImpalaKitQueries.impalaKitQueries
val q7DerivedQueries = SimpleQueries.q7Derived
}