with data generation

This commit is contained in:
Michael Armbrust 2015-07-22 00:29:58 -07:00
parent eba8cea93c
commit f00ad77985
8 changed files with 226 additions and 67 deletions

View File

@ -0,0 +1,65 @@
package com.databricks.spark.sql.perf
import org.apache.spark.sql.{Row, SQLContext}
trait AggregationPerformance extends Benchmark {
import sqlContext.implicits._
val sizes = (1 to 6).map(math.pow(10, _).toInt).toSeq
val variousCardinality = sizes.map { size =>
Table(s"ints$size",
sparkContext.parallelize(1 to size).flatMap { group =>
(1 to 10000).map(i => (group, i))
}.toDF("a", "b"))
}
val lowCardinality = sizes.map { size =>
val fullSize = size * 10000L
Table(
s"twoGroups$fullSize",
sqlContext.range(0, fullSize).select($"id" % 2 as 'a, $"id" as 'b))
}
val newAggreation = Variation("aggregationType", Seq("new", "old")) {
case "old" => sqlContext.setConf("spark.sql.useAggregate2", "false")
case "new" => sqlContext.setConf("spark.sql.useAggregate2", "true")
}
val varyNumGroupsAvg: Seq[Query] = variousCardinality.map(_.name).map { table =>
Query(
s"avg-$table",
s"SELECT AVG(b) FROM $table GROUP BY a",
"an average with a varying number of groups",
collectResults = false)
}
val twoGroupsAvg: Seq[Query] = lowCardinality.map(_.name).map { table =>
Query(
s"avg-$table",
s"SELECT AVG(b) FROM $table GROUP BY a",
"an average on an int column with only two groups",
collectResults = false)
}
val complexInput =
Seq("1milints", "100milints", "1bilints").map { table =>
Query(
s"aggregation-complex-input-$table",
s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table",
"Sum of 9 columns added together",
collectResults = true)
}
val aggregates =
Seq("1milints", "100milints", "1bilints").flatMap { table =>
Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg =>
Query(
s"single-aggregate-$agg-$table",
s"SELECT $agg(id) FROM $table",
"aggregation of a single column",
collectResults = true)
}
}
}

View File

@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical.Subquery
@ -49,7 +49,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
protected def sparkContext = sqlContext.sparkContext
implicit def toOption[A](a: A) = Option(a)
protected implicit def toOption[A](a: A) = Option(a)
def currentConfiguration = BenchmarkConfiguration(
sqlConf = sqlContext.getAllConfs,
@ -76,14 +76,25 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
*/
case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit)
val codegen = Variation("codegen", Seq("on", "off")) {
case "off" => sqlContext.setConf("spark.sql.codegen", "false")
case "on" => sqlContext.setConf("spark.sql.codegen", "true")
}
val unsafe = Variation("unsafe", Seq("on", "off")) {
case "off" => sqlContext.setConf("spark.sql.unsafe.enabled", "false")
case "on" => sqlContext.setConf("spark.sql.unsafe.enabled", "true")
}
/**
* Starts an experiment run with a given set of queries.
* @param queriesToRun Queries to be executed.
* @param queriesToRun a list of queries to be executed.
* @param includeBreakdown If it is true, breakdown results of a query will be recorded.
* Setting it to true may significantly increase the time used to
* execute a query.
* @param iterations The number of iterations.
* @param variations [[Variation]]s used in this run.
* @param iterations The number of iterations to run of each query.
* @param variations [[Variation]]s used in this run. The cross product of all variations will be
* run for each query * iteration.
* @param tags Tags of this run.
* @return It returns a ExperimentStatus object that can be used to
* track the progress of this experiment run.
@ -115,6 +126,29 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
val timestamp = System.currentTimeMillis()
val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList)
val resultsFuture = Future {
queriesToRun.flatMap { query =>
query.newDataFrame().queryExecution.logical.collect {
case UnresolvedRelation(Seq(name), _) => name
}
}.distinct.foreach { name =>
try {
sqlContext.table(name)
currentMessages += s"Table $name exists."
} catch {
case ae: AnalysisException =>
val table = allTables
.find(_.name == name)
.getOrElse(sys.error(s"Couldn't read table $name and its not defined as a Benchmark.Table."))
currentMessages += s"Creating table: $name"
table.data
.write
.mode("overwrite")
.saveAsTable(name)
}
}
val results = (1 to iterations).flatMap { i =>
combinations.map { setup =>
val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map {
@ -142,7 +176,8 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
failures += 1
currentMessages += s"Query '${q.name}' failed: ${f.message}"
}
singleResult.executionTime.foreach(time => currentMessages += s"Exec time: $time")
singleResult.executionTime.foreach(time =>
currentMessages += s"Exec time: ${time / 1000}s")
currentResults += singleResult
singleResult :: Nil
})
@ -184,7 +219,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
tbl
}
def tail(n: Int = 5) = {
def tail(n: Int = 20) = {
currentMessages.takeRight(n).mkString("\n")
}
@ -209,7 +244,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
|<b>Run time:</b> ${(System.currentTimeMillis() - timestamp) / 1000}s<br/>
|
|<h2>Current Query: $currentQuery</h2>
|Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s
|Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s<br/>
|$currentConfig<br/>
|<h3>QueryPlan</h3>
|<pre>
@ -225,6 +260,79 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
new ExperimentStatus
}
case class Table(
name: String,
data: DataFrame)
import reflect.runtime._, universe._
import reflect.runtime._
import universe._
private val runtimeMirror = universe.runtimeMirror(getClass.getClassLoader)
val myType = runtimeMirror.classSymbol(getClass).toType
def singleTables =
myType.declarations
.filter(m => m.isMethod)
.map(_.asMethod)
.filter(_.asMethod.returnType =:= typeOf[Table])
.map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Table])
def groupedTables =
myType.declarations
.filter(m => m.isMethod)
.map(_.asMethod)
.filter(_.asMethod.returnType =:= typeOf[Seq[Table]])
.flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Table]])
lazy val allTables: Seq[Table] = (singleTables ++ groupedTables).toSeq
def singleQueries =
myType.declarations
.filter(m => m.isMethod)
.map(_.asMethod)
.filter(_.asMethod.returnType =:= typeOf[Query])
.map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Query])
def groupedQueries =
myType.declarations
.filter(m => m.isMethod)
.map(_.asMethod)
.filter(_.asMethod.returnType =:= typeOf[Seq[Query]])
.flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Query]])
lazy val allQueries = (singleQueries ++ groupedQueries).toSeq
def html: String = {
val singleQueries =
myType.declarations
.filter(m => m.isMethod)
.map(_.asMethod)
.filter(_.asMethod.returnType =:= typeOf[Query])
.map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Query])
.mkString(",")
val queries =
myType.declarations
.filter(m => m.isMethod)
.map(_.asMethod)
.filter(_.asMethod.returnType =:= typeOf[Seq[Query]])
.map { method =>
val queries = runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Query]]
val queryList = queries.map(_.name).mkString(", ")
s"""
|<h3>${method.name}</h3>
|<ul>$queryList</ul>
""".stripMargin
}.mkString("\n")
s"""
|<h1>Spark SQL Performance Benchmarking</h1>
|<h2>Available Queries</h2>
|$singleQueries
|$queries
""".stripMargin
}
/** Factory object for benchmark queries. */
object Query {
def apply(

View File

@ -2,25 +2,31 @@ package com.databricks.spark.sql.perf
import org.apache.spark.sql.SQLContext
class JoinPerformance(sqlContext: SQLContext) extends Benchmark(sqlContext) {
def buildTables() = {
// 1.5 mb, 1 file
sqlContext.range(0, 1000000)
.repartition(1)
.write.mode("ignore")
.saveAsTable("1milints")
trait JoinPerformance extends Benchmark {
// 1.5 mb, 1 file
val x = Table(
"1milints",
sqlContext.range(0, 1000000)
.repartition(1))
val joinTables = Seq(
// 143.542mb, 10 files
sqlContext.range(0, 100000000)
.repartition(10)
.write.mode("ignore")
.saveAsTable("100milints")
Table(
"1bilints",
sqlContext.range(0, 100000000)
.repartition(10)),
// 1.4348gb, 10 files
sqlContext.range(0, 1000000000)
.repartition(10)
.write.mode("ignore")
.saveAsTable("1bilints")
Table(
"1bilints",
sqlContext.range(0, 1000000000)
.repartition(10))
)
val sortMergeJoin = Variation("sortMergeJoin", Seq("on", "off")) {
case "off" => sqlContext.setConf("spark.sql.planner.sortMergeJoin", "false")
case "on" => sqlContext.setConf("spark.sql.planner.sortMergeJoin", "true")
}
val singleKeyJoins = Seq("1milints", "100milints", "1bilints").flatMap { table1 =>
@ -33,25 +39,5 @@ class JoinPerformance(sqlContext: SQLContext) extends Benchmark(sqlContext) {
collectResults = true)
}
}
}.filterNot(_.name contains "FULL OUTER JOIN-1milints-1bilints")
val complexInput =
Seq("1milints", "100milints", "1bilints").map { table =>
Query(
"aggregation-complex-input",
s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table",
"Sum of 9 columns added together",
collectResults = true)
}
val aggregates =
Seq("1milints", "100milints", "1bilints").flatMap { table =>
Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg =>
Query(
s"single-aggregate-$agg",
s"SELECT $agg(id) FROM $table",
"aggregation of a single column",
collectResults = true)
}
}
}
}

View File

@ -24,13 +24,13 @@ import org.apache.spark.sql.{Column, SQLContext}
class BigDataBenchmark (
@transient sqlContext: SQLContext,
dataLocation: String,
val tables: Seq[Table],
val tables: Seq[Table2],
scaleFactor: String)
extends Benchmark(sqlContext) with Serializable with TableCreator {
import sqlContext._
import sqlContext.implicits._
override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = {
override def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest] = {
tables.map(table =>
BigDataTableForTest(table, dataLocation, scaleFactor, sqlContext))
}

View File

@ -37,7 +37,7 @@ import parquet.hadoop.util.ContextUtil
import scala.sys.process._
case class BigDataTableForTest(
table: Table,
table: Table2,
baseDir: String,
scaleFactor: String,
@transient sqlContext: SQLContext)
@ -54,13 +54,13 @@ case class Tables(sqlContext: SQLContext) {
import sqlContext.implicits._
val tables = Seq(
Table("rankings",
Table2("rankings",
UnpartitionedTable,
'pageURL .string,
'pageRank .int,
'avgDuration .int),
Table("uservisits",
Table2("uservisits",
UnpartitionedTable,
'sourceIP .string,
'destURL .string,
@ -72,7 +72,7 @@ case class Tables(sqlContext: SQLContext) {
'searchWord .string,
'duration .int),
Table("documents",
Table2("documents",
UnpartitionedTable,
'line .string)
)

View File

@ -34,9 +34,9 @@ import parquet.hadoop.util.ContextUtil
trait TableCreator {
def tables: Seq[Table]
def tables: Seq[Table2]
def createTablesForTest(tables: Seq[Table]): Seq[TableForTest]
def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest]
val tablesForTest: Seq[TableForTest] = createTablesForTest(tables)
@ -65,10 +65,10 @@ abstract class TableType
case object UnpartitionedTable extends TableType
case class PartitionedTable(partitionColumn: String) extends TableType
case class Table(name: String, tableType: TableType, fields: StructField*)
case class Table2(name: String, tableType: TableType, fields: StructField*)
abstract class TableForTest(
table: Table,
table: Table2,
baseDir: String,
@transient sqlContext: SQLContext) extends Serializable {

View File

@ -39,7 +39,7 @@ class TPCDS (
sparkVersion: String,
dataLocation: String,
dsdgenDir: String,
val tables: Seq[Table],
val tables: Seq[Table2],
scaleFactor: String,
userSpecifiedBaseDir: Option[String] = None)
extends Benchmark(sqlContext) with TableCreator with Serializable {
@ -49,7 +49,7 @@ class TPCDS (
lazy val baseDir =
userSpecifiedBaseDir.getOrElse(s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true")
override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = {
override def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest] = {
tables.map(table =>
TPCDSTableForTest(table, baseDir, scaleFactor.toInt, dsdgenDir, sqlContext))
}

View File

@ -39,7 +39,7 @@ import parquet.hadoop.util.ContextUtil
case class TPCDSTableForTest(
table: Table,
table: Table2,
baseDir: String,
scaleFactor: Int,
dsdgenDir: String,
@ -213,7 +213,7 @@ case class Tables(sqlContext: SQLContext) {
'inv_item_sk .int,
'inv_warehouse_sk .int,
'inv_quantity_on_hand .int),*/
Table("store_sales",
Table2("store_sales",
PartitionedTable("ss_sold_date_sk"),
'ss_sold_date_sk .int,
'ss_sold_time_sk .int,
@ -238,7 +238,7 @@ case class Tables(sqlContext: SQLContext) {
'ss_net_paid .decimal(7,2),
'ss_net_paid_inc_tax .decimal(7,2),
'ss_net_profit .decimal(7,2)),
Table("customer",
Table2("customer",
UnpartitionedTable,
'c_customer_sk .int,
'c_customer_id .string,
@ -258,7 +258,7 @@ case class Tables(sqlContext: SQLContext) {
'c_login .string,
'c_email_address .string,
'c_last_review_date .string),
Table("customer_address",
Table2("customer_address",
UnpartitionedTable,
'ca_address_sk .int,
'ca_address_id .string,
@ -273,7 +273,7 @@ case class Tables(sqlContext: SQLContext) {
'ca_country .string,
'ca_gmt_offset .decimal(5,2),
'ca_location_type .string),
Table("customer_demographics",
Table2("customer_demographics",
UnpartitionedTable,
'cd_demo_sk .int,
'cd_gender .string,
@ -284,7 +284,7 @@ case class Tables(sqlContext: SQLContext) {
'cd_dep_count .int,
'cd_dep_employed_count .int,
'cd_dep_college_count .int),
Table("date_dim",
Table2("date_dim",
UnpartitionedTable,
'd_date_sk .int,
'd_date_id .string,
@ -314,14 +314,14 @@ case class Tables(sqlContext: SQLContext) {
'd_current_month .string,
'd_current_quarter .string,
'd_current_year .string),
Table("household_demographics",
Table2("household_demographics",
UnpartitionedTable,
'hd_demo_sk .int,
'hd_income_band_sk .int,
'hd_buy_potential .string,
'hd_dep_count .int,
'hd_vehicle_count .int),
Table("item",
Table2("item",
UnpartitionedTable,
'i_item_sk .int,
'i_item_id .string,
@ -345,7 +345,7 @@ case class Tables(sqlContext: SQLContext) {
'i_container .string,
'i_manager_id .int,
'i_product_name .string),
Table("promotion",
Table2("promotion",
UnpartitionedTable,
'p_promo_sk .int,
'p_promo_id .string,
@ -366,7 +366,7 @@ case class Tables(sqlContext: SQLContext) {
'p_channel_details .string,
'p_purpose .string,
'p_discount_active .string),
Table("store",
Table2("store",
UnpartitionedTable,
's_store_sk .int,
's_store_id .string,
@ -397,7 +397,7 @@ case class Tables(sqlContext: SQLContext) {
's_country .string,
's_gmt_offset .decimal(5,2),
's_tax_precentage .decimal(5,2)),
Table("time_dim",
Table2("time_dim",
UnpartitionedTable,
't_time_sk .int,
't_time_id .string,