From f00ad77985049bcf7072264841c8b09817295fe7 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 22 Jul 2015 00:29:58 -0700 Subject: [PATCH] with data generation --- .../sql/perf/AggregationPerformance.scala | 65 +++++++++ .../databricks/spark/sql/perf/Benchmark.scala | 124 ++++++++++++++++-- .../spark/sql/perf/JoinPerformance.scala | 58 ++++---- .../spark/sql/perf/bigdata/BigData.scala | 4 +- .../spark/sql/perf/bigdata/Tables.scala | 8 +- .../com/databricks/spark/sql/perf/table.scala | 8 +- .../spark/sql/perf/tpcds/TPCDS.scala | 4 +- .../spark/sql/perf/tpcds/Tables.scala | 22 ++-- 8 files changed, 226 insertions(+), 67 deletions(-) create mode 100644 src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala diff --git a/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala new file mode 100644 index 0000000..def479b --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala @@ -0,0 +1,65 @@ +package com.databricks.spark.sql.perf + +import org.apache.spark.sql.{Row, SQLContext} + +trait AggregationPerformance extends Benchmark { + + import sqlContext.implicits._ + + val sizes = (1 to 6).map(math.pow(10, _).toInt).toSeq + + val variousCardinality = sizes.map { size => + Table(s"ints$size", + sparkContext.parallelize(1 to size).flatMap { group => + (1 to 10000).map(i => (group, i)) + }.toDF("a", "b")) + } + + val lowCardinality = sizes.map { size => + val fullSize = size * 10000L + Table( + s"twoGroups$fullSize", + sqlContext.range(0, fullSize).select($"id" % 2 as 'a, $"id" as 'b)) + } + + val newAggreation = Variation("aggregationType", Seq("new", "old")) { + case "old" => sqlContext.setConf("spark.sql.useAggregate2", "false") + case "new" => sqlContext.setConf("spark.sql.useAggregate2", "true") + } + + val varyNumGroupsAvg: Seq[Query] = variousCardinality.map(_.name).map { table => + Query( + s"avg-$table", + s"SELECT AVG(b) FROM $table GROUP BY a", + "an average with a varying number of groups", + collectResults = false) + } + + val twoGroupsAvg: Seq[Query] = lowCardinality.map(_.name).map { table => + Query( + s"avg-$table", + s"SELECT AVG(b) FROM $table GROUP BY a", + "an average on an int column with only two groups", + collectResults = false) + } + + val complexInput = + Seq("1milints", "100milints", "1bilints").map { table => + Query( + s"aggregation-complex-input-$table", + s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table", + "Sum of 9 columns added together", + collectResults = true) + } + + val aggregates = + Seq("1milints", "100milints", "1bilints").flatMap { table => + Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg => + Query( + s"single-aggregate-$agg-$table", + s"SELECT $agg(id) FROM $table", + "aggregation of a single column", + collectResults = true) + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index d087b99..f1073b6 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} import org.apache.spark.sql.catalyst.plans.logical.Subquery @@ -49,7 +49,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) protected def sparkContext = sqlContext.sparkContext - implicit def toOption[A](a: A) = Option(a) + protected implicit def toOption[A](a: A) = Option(a) def currentConfiguration = BenchmarkConfiguration( sqlConf = sqlContext.getAllConfs, @@ -76,14 +76,25 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) */ case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit) + val codegen = Variation("codegen", Seq("on", "off")) { + case "off" => sqlContext.setConf("spark.sql.codegen", "false") + case "on" => sqlContext.setConf("spark.sql.codegen", "true") + } + + val unsafe = Variation("unsafe", Seq("on", "off")) { + case "off" => sqlContext.setConf("spark.sql.unsafe.enabled", "false") + case "on" => sqlContext.setConf("spark.sql.unsafe.enabled", "true") + } + /** * Starts an experiment run with a given set of queries. - * @param queriesToRun Queries to be executed. + * @param queriesToRun a list of queries to be executed. * @param includeBreakdown If it is true, breakdown results of a query will be recorded. * Setting it to true may significantly increase the time used to * execute a query. - * @param iterations The number of iterations. - * @param variations [[Variation]]s used in this run. + * @param iterations The number of iterations to run of each query. + * @param variations [[Variation]]s used in this run. The cross product of all variations will be + * run for each query * iteration. * @param tags Tags of this run. * @return It returns a ExperimentStatus object that can be used to * track the progress of this experiment run. @@ -115,6 +126,29 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) val timestamp = System.currentTimeMillis() val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList) val resultsFuture = Future { + queriesToRun.flatMap { query => + query.newDataFrame().queryExecution.logical.collect { + case UnresolvedRelation(Seq(name), _) => name + } + }.distinct.foreach { name => + try { + sqlContext.table(name) + currentMessages += s"Table $name exists." + } catch { + case ae: AnalysisException => + val table = allTables + .find(_.name == name) + .getOrElse(sys.error(s"Couldn't read table $name and its not defined as a Benchmark.Table.")) + + currentMessages += s"Creating table: $name" + table.data + .write + .mode("overwrite") + .saveAsTable(name) + } + } + + val results = (1 to iterations).flatMap { i => combinations.map { setup => val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map { @@ -142,7 +176,8 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) failures += 1 currentMessages += s"Query '${q.name}' failed: ${f.message}" } - singleResult.executionTime.foreach(time => currentMessages += s"Exec time: $time") + singleResult.executionTime.foreach(time => + currentMessages += s"Exec time: ${time / 1000}s") currentResults += singleResult singleResult :: Nil }) @@ -184,7 +219,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) tbl } - def tail(n: Int = 5) = { + def tail(n: Int = 20) = { currentMessages.takeRight(n).mkString("\n") } @@ -209,7 +244,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) |Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s
| |

Current Query: $currentQuery

- |Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s + |Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s
|$currentConfig
|

QueryPlan

|
@@ -225,6 +260,79 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
     new ExperimentStatus
   }
 
+  case class Table(
+      name: String,
+      data: DataFrame)
+
+  import reflect.runtime._, universe._
+  import reflect.runtime._
+  import universe._
+
+  private val runtimeMirror = universe.runtimeMirror(getClass.getClassLoader)
+  val myType = runtimeMirror.classSymbol(getClass).toType
+
+  def singleTables =
+    myType.declarations
+      .filter(m => m.isMethod)
+      .map(_.asMethod)
+      .filter(_.asMethod.returnType =:= typeOf[Table])
+      .map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Table])
+
+  def groupedTables =
+    myType.declarations
+      .filter(m => m.isMethod)
+      .map(_.asMethod)
+      .filter(_.asMethod.returnType =:= typeOf[Seq[Table]])
+      .flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Table]])
+
+  lazy val allTables: Seq[Table] = (singleTables ++ groupedTables).toSeq
+
+  def singleQueries =
+    myType.declarations
+      .filter(m => m.isMethod)
+      .map(_.asMethod)
+      .filter(_.asMethod.returnType =:= typeOf[Query])
+      .map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Query])
+
+  def groupedQueries =
+    myType.declarations
+      .filter(m => m.isMethod)
+      .map(_.asMethod)
+      .filter(_.asMethod.returnType =:= typeOf[Seq[Query]])
+      .flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Query]])
+
+  lazy val allQueries = (singleQueries ++ groupedQueries).toSeq
+
+  def html: String = {
+    val singleQueries =
+      myType.declarations
+        .filter(m => m.isMethod)
+        .map(_.asMethod)
+        .filter(_.asMethod.returnType =:= typeOf[Query])
+        .map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Query])
+        .mkString(",")
+    val queries =
+      myType.declarations
+      .filter(m => m.isMethod)
+      .map(_.asMethod)
+      .filter(_.asMethod.returnType =:= typeOf[Seq[Query]])
+      .map { method =>
+        val queries = runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Query]]
+        val queryList = queries.map(_.name).mkString(", ")
+        s"""
+          |

${method.name}

+ | + """.stripMargin + }.mkString("\n") + + s""" + |

Spark SQL Performance Benchmarking

+ |

Available Queries

+ |$singleQueries + |$queries + """.stripMargin + } + /** Factory object for benchmark queries. */ object Query { def apply( diff --git a/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala index 0da49e3..f2c541b 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala @@ -2,25 +2,31 @@ package com.databricks.spark.sql.perf import org.apache.spark.sql.SQLContext -class JoinPerformance(sqlContext: SQLContext) extends Benchmark(sqlContext) { - def buildTables() = { - // 1.5 mb, 1 file - sqlContext.range(0, 1000000) - .repartition(1) - .write.mode("ignore") - .saveAsTable("1milints") +trait JoinPerformance extends Benchmark { + // 1.5 mb, 1 file + val x = Table( + "1milints", + sqlContext.range(0, 1000000) + .repartition(1)) + + val joinTables = Seq( // 143.542mb, 10 files - sqlContext.range(0, 100000000) - .repartition(10) - .write.mode("ignore") - .saveAsTable("100milints") + Table( + "1bilints", + sqlContext.range(0, 100000000) + .repartition(10)), // 1.4348gb, 10 files - sqlContext.range(0, 1000000000) - .repartition(10) - .write.mode("ignore") - .saveAsTable("1bilints") + Table( + "1bilints", + sqlContext.range(0, 1000000000) + .repartition(10)) + ) + + val sortMergeJoin = Variation("sortMergeJoin", Seq("on", "off")) { + case "off" => sqlContext.setConf("spark.sql.planner.sortMergeJoin", "false") + case "on" => sqlContext.setConf("spark.sql.planner.sortMergeJoin", "true") } val singleKeyJoins = Seq("1milints", "100milints", "1bilints").flatMap { table1 => @@ -33,25 +39,5 @@ class JoinPerformance(sqlContext: SQLContext) extends Benchmark(sqlContext) { collectResults = true) } } - }.filterNot(_.name contains "FULL OUTER JOIN-1milints-1bilints") - - val complexInput = - Seq("1milints", "100milints", "1bilints").map { table => - Query( - "aggregation-complex-input", - s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table", - "Sum of 9 columns added together", - collectResults = true) - } - - val aggregates = - Seq("1milints", "100milints", "1bilints").flatMap { table => - Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg => - Query( - s"single-aggregate-$agg", - s"SELECT $agg(id) FROM $table", - "aggregation of a single column", - collectResults = true) - } - } + } } \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala index c723382..c30cf7b 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala @@ -24,13 +24,13 @@ import org.apache.spark.sql.{Column, SQLContext} class BigDataBenchmark ( @transient sqlContext: SQLContext, dataLocation: String, - val tables: Seq[Table], + val tables: Seq[Table2], scaleFactor: String) extends Benchmark(sqlContext) with Serializable with TableCreator { import sqlContext._ import sqlContext.implicits._ - override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = { + override def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest] = { tables.map(table => BigDataTableForTest(table, dataLocation, scaleFactor, sqlContext)) } diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala index 8301f59..9b6f657 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala @@ -37,7 +37,7 @@ import parquet.hadoop.util.ContextUtil import scala.sys.process._ case class BigDataTableForTest( - table: Table, + table: Table2, baseDir: String, scaleFactor: String, @transient sqlContext: SQLContext) @@ -54,13 +54,13 @@ case class Tables(sqlContext: SQLContext) { import sqlContext.implicits._ val tables = Seq( - Table("rankings", + Table2("rankings", UnpartitionedTable, 'pageURL .string, 'pageRank .int, 'avgDuration .int), - Table("uservisits", + Table2("uservisits", UnpartitionedTable, 'sourceIP .string, 'destURL .string, @@ -72,7 +72,7 @@ case class Tables(sqlContext: SQLContext) { 'searchWord .string, 'duration .int), - Table("documents", + Table2("documents", UnpartitionedTable, 'line .string) ) diff --git a/src/main/scala/com/databricks/spark/sql/perf/table.scala b/src/main/scala/com/databricks/spark/sql/perf/table.scala index 2eca42e..4fd4381 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/table.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/table.scala @@ -34,9 +34,9 @@ import parquet.hadoop.util.ContextUtil trait TableCreator { - def tables: Seq[Table] + def tables: Seq[Table2] - def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] + def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest] val tablesForTest: Seq[TableForTest] = createTablesForTest(tables) @@ -65,10 +65,10 @@ abstract class TableType case object UnpartitionedTable extends TableType case class PartitionedTable(partitionColumn: String) extends TableType -case class Table(name: String, tableType: TableType, fields: StructField*) +case class Table2(name: String, tableType: TableType, fields: StructField*) abstract class TableForTest( - table: Table, + table: Table2, baseDir: String, @transient sqlContext: SQLContext) extends Serializable { diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala index a685739..8c61a33 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala @@ -39,7 +39,7 @@ class TPCDS ( sparkVersion: String, dataLocation: String, dsdgenDir: String, - val tables: Seq[Table], + val tables: Seq[Table2], scaleFactor: String, userSpecifiedBaseDir: Option[String] = None) extends Benchmark(sqlContext) with TableCreator with Serializable { @@ -49,7 +49,7 @@ class TPCDS ( lazy val baseDir = userSpecifiedBaseDir.getOrElse(s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true") - override def createTablesForTest(tables: Seq[Table]): Seq[TableForTest] = { + override def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest] = { tables.map(table => TPCDSTableForTest(table, baseDir, scaleFactor.toInt, dsdgenDir, sqlContext)) } diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index a6b180e..f559347 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -39,7 +39,7 @@ import parquet.hadoop.util.ContextUtil case class TPCDSTableForTest( - table: Table, + table: Table2, baseDir: String, scaleFactor: Int, dsdgenDir: String, @@ -213,7 +213,7 @@ case class Tables(sqlContext: SQLContext) { 'inv_item_sk .int, 'inv_warehouse_sk .int, 'inv_quantity_on_hand .int),*/ - Table("store_sales", + Table2("store_sales", PartitionedTable("ss_sold_date_sk"), 'ss_sold_date_sk .int, 'ss_sold_time_sk .int, @@ -238,7 +238,7 @@ case class Tables(sqlContext: SQLContext) { 'ss_net_paid .decimal(7,2), 'ss_net_paid_inc_tax .decimal(7,2), 'ss_net_profit .decimal(7,2)), - Table("customer", + Table2("customer", UnpartitionedTable, 'c_customer_sk .int, 'c_customer_id .string, @@ -258,7 +258,7 @@ case class Tables(sqlContext: SQLContext) { 'c_login .string, 'c_email_address .string, 'c_last_review_date .string), - Table("customer_address", + Table2("customer_address", UnpartitionedTable, 'ca_address_sk .int, 'ca_address_id .string, @@ -273,7 +273,7 @@ case class Tables(sqlContext: SQLContext) { 'ca_country .string, 'ca_gmt_offset .decimal(5,2), 'ca_location_type .string), - Table("customer_demographics", + Table2("customer_demographics", UnpartitionedTable, 'cd_demo_sk .int, 'cd_gender .string, @@ -284,7 +284,7 @@ case class Tables(sqlContext: SQLContext) { 'cd_dep_count .int, 'cd_dep_employed_count .int, 'cd_dep_college_count .int), - Table("date_dim", + Table2("date_dim", UnpartitionedTable, 'd_date_sk .int, 'd_date_id .string, @@ -314,14 +314,14 @@ case class Tables(sqlContext: SQLContext) { 'd_current_month .string, 'd_current_quarter .string, 'd_current_year .string), - Table("household_demographics", + Table2("household_demographics", UnpartitionedTable, 'hd_demo_sk .int, 'hd_income_band_sk .int, 'hd_buy_potential .string, 'hd_dep_count .int, 'hd_vehicle_count .int), - Table("item", + Table2("item", UnpartitionedTable, 'i_item_sk .int, 'i_item_id .string, @@ -345,7 +345,7 @@ case class Tables(sqlContext: SQLContext) { 'i_container .string, 'i_manager_id .int, 'i_product_name .string), - Table("promotion", + Table2("promotion", UnpartitionedTable, 'p_promo_sk .int, 'p_promo_id .string, @@ -366,7 +366,7 @@ case class Tables(sqlContext: SQLContext) { 'p_channel_details .string, 'p_purpose .string, 'p_discount_active .string), - Table("store", + Table2("store", UnpartitionedTable, 's_store_sk .int, 's_store_id .string, @@ -397,7 +397,7 @@ case class Tables(sqlContext: SQLContext) { 's_country .string, 's_gmt_offset .decimal(5,2), 's_tax_precentage .decimal(5,2)), - Table("time_dim", + Table2("time_dim", UnpartitionedTable, 't_time_sk .int, 't_time_id .string,