diff --git a/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala
new file mode 100644
index 0000000..def479b
--- /dev/null
+++ b/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala
@@ -0,0 +1,65 @@
+package com.databricks.spark.sql.perf
+
+import org.apache.spark.sql.{Row, SQLContext}
+
+trait AggregationPerformance extends Benchmark {
+
+ import sqlContext.implicits._
+
+ val sizes = (1 to 6).map(math.pow(10, _).toInt).toSeq
+
+ val variousCardinality = sizes.map { size =>
+ Table(s"ints$size",
+ sparkContext.parallelize(1 to size).flatMap { group =>
+ (1 to 10000).map(i => (group, i))
+ }.toDF("a", "b"))
+ }
+
+ val lowCardinality = sizes.map { size =>
+ val fullSize = size * 10000L
+ Table(
+ s"twoGroups$fullSize",
+ sqlContext.range(0, fullSize).select($"id" % 2 as 'a, $"id" as 'b))
+ }
+
+ val newAggreation = Variation("aggregationType", Seq("new", "old")) {
+ case "old" => sqlContext.setConf("spark.sql.useAggregate2", "false")
+ case "new" => sqlContext.setConf("spark.sql.useAggregate2", "true")
+ }
+
+ val varyNumGroupsAvg: Seq[Query] = variousCardinality.map(_.name).map { table =>
+ Query(
+ s"avg-$table",
+ s"SELECT AVG(b) FROM $table GROUP BY a",
+ "an average with a varying number of groups",
+ collectResults = false)
+ }
+
+ val twoGroupsAvg: Seq[Query] = lowCardinality.map(_.name).map { table =>
+ Query(
+ s"avg-$table",
+ s"SELECT AVG(b) FROM $table GROUP BY a",
+ "an average on an int column with only two groups",
+ collectResults = false)
+ }
+
+ val complexInput =
+ Seq("1milints", "100milints", "1bilints").map { table =>
+ Query(
+ s"aggregation-complex-input-$table",
+ s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table",
+ "Sum of 9 columns added together",
+ collectResults = true)
+ }
+
+ val aggregates =
+ Seq("1milints", "100milints", "1bilints").flatMap { table =>
+ Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg =>
+ Query(
+ s"single-aggregate-$agg-$table",
+ s"SELECT $agg(id) FROM $table",
+ "aggregation of a single column",
+ collectResults = true)
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala
index d087b99..f1073b6 100644
--- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala
+++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala
@@ -24,7 +24,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem}
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical.Subquery
@@ -49,7 +49,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
protected def sparkContext = sqlContext.sparkContext
- implicit def toOption[A](a: A) = Option(a)
+ protected implicit def toOption[A](a: A) = Option(a)
def currentConfiguration = BenchmarkConfiguration(
sqlConf = sqlContext.getAllConfs,
@@ -76,14 +76,25 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
*/
case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit)
+ val codegen = Variation("codegen", Seq("on", "off")) {
+ case "off" => sqlContext.setConf("spark.sql.codegen", "false")
+ case "on" => sqlContext.setConf("spark.sql.codegen", "true")
+ }
+
+ val unsafe = Variation("unsafe", Seq("on", "off")) {
+ case "off" => sqlContext.setConf("spark.sql.unsafe.enabled", "false")
+ case "on" => sqlContext.setConf("spark.sql.unsafe.enabled", "true")
+ }
+
/**
* Starts an experiment run with a given set of queries.
- * @param queriesToRun Queries to be executed.
+ * @param queriesToRun a list of queries to be executed.
* @param includeBreakdown If it is true, breakdown results of a query will be recorded.
* Setting it to true may significantly increase the time used to
* execute a query.
- * @param iterations The number of iterations.
- * @param variations [[Variation]]s used in this run.
+ * @param iterations The number of iterations to run of each query.
+ * @param variations [[Variation]]s used in this run. The cross product of all variations will be
+ * run for each query * iteration.
* @param tags Tags of this run.
* @return It returns a ExperimentStatus object that can be used to
* track the progress of this experiment run.
@@ -115,6 +126,29 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
val timestamp = System.currentTimeMillis()
val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList)
val resultsFuture = Future {
+ queriesToRun.flatMap { query =>
+ query.newDataFrame().queryExecution.logical.collect {
+ case UnresolvedRelation(Seq(name), _) => name
+ }
+ }.distinct.foreach { name =>
+ try {
+ sqlContext.table(name)
+ currentMessages += s"Table $name exists."
+ } catch {
+ case ae: AnalysisException =>
+ val table = allTables
+ .find(_.name == name)
+ .getOrElse(sys.error(s"Couldn't read table $name and its not defined as a Benchmark.Table."))
+
+ currentMessages += s"Creating table: $name"
+ table.data
+ .write
+ .mode("overwrite")
+ .saveAsTable(name)
+ }
+ }
+
+
val results = (1 to iterations).flatMap { i =>
combinations.map { setup =>
val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map {
@@ -142,7 +176,8 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
failures += 1
currentMessages += s"Query '${q.name}' failed: ${f.message}"
}
- singleResult.executionTime.foreach(time => currentMessages += s"Exec time: $time")
+ singleResult.executionTime.foreach(time =>
+ currentMessages += s"Exec time: ${time / 1000}s")
currentResults += singleResult
singleResult :: Nil
})
@@ -184,7 +219,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
tbl
}
- def tail(n: Int = 5) = {
+ def tail(n: Int = 20) = {
currentMessages.takeRight(n).mkString("\n")
}
@@ -209,7 +244,7 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
|Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s
|
|
@@ -225,6 +260,79 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
new ExperimentStatus
}
+ case class Table(
+ name: String,
+ data: DataFrame)
+
+ import reflect.runtime._, universe._
+ import reflect.runtime._
+ import universe._
+
+ private val runtimeMirror = universe.runtimeMirror(getClass.getClassLoader)
+ val myType = runtimeMirror.classSymbol(getClass).toType
+
+ def singleTables =
+ myType.declarations
+ .filter(m => m.isMethod)
+ .map(_.asMethod)
+ .filter(_.asMethod.returnType =:= typeOf[Table])
+ .map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Table])
+
+ def groupedTables =
+ myType.declarations
+ .filter(m => m.isMethod)
+ .map(_.asMethod)
+ .filter(_.asMethod.returnType =:= typeOf[Seq[Table]])
+ .flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Table]])
+
+ lazy val allTables: Seq[Table] = (singleTables ++ groupedTables).toSeq
+
+ def singleQueries =
+ myType.declarations
+ .filter(m => m.isMethod)
+ .map(_.asMethod)
+ .filter(_.asMethod.returnType =:= typeOf[Query])
+ .map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Query])
+
+ def groupedQueries =
+ myType.declarations
+ .filter(m => m.isMethod)
+ .map(_.asMethod)
+ .filter(_.asMethod.returnType =:= typeOf[Seq[Query]])
+ .flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Query]])
+
+ lazy val allQueries = (singleQueries ++ groupedQueries).toSeq
+
+ def html: String = {
+ val singleQueries =
+ myType.declarations
+ .filter(m => m.isMethod)
+ .map(_.asMethod)
+ .filter(_.asMethod.returnType =:= typeOf[Query])
+ .map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Query])
+ .mkString(",")
+ val queries =
+ myType.declarations
+ .filter(m => m.isMethod)
+ .map(_.asMethod)
+ .filter(_.asMethod.returnType =:= typeOf[Seq[Query]])
+ .map { method =>
+ val queries = runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Query]]
+ val queryList = queries.map(_.name).mkString(", ")
+ s"""
+ |${method.name}
+ |