diff --git a/README.md b/README.md index 7a27871..794537b 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,16 @@ The first run of `bin/run` will build the library. Use `sbt package` or `sbt assembly` to build the library jar. +## Local performance tests +The framework contains twelve benchmarks that can be executed in local mode. They are organized into three classes and target different components and functions of Spark: +* [DatasetPerformance](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/DatasetPerformance.scala) compares the performance of the old RDD API with the new Dataframe and Dataset APIs. +These benchmarks can be launched with the command `bin/run --benchmark DatasetPerformance` +* [JoinPerformance](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala) compares the performance of joining different table sizes and shapes with different join types. +These benchmarks can be launched with the command `bin/run --benchmark JoinPerformance` +* [AggregationPerformance](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala) compares the performance of aggregating different table sizes using different aggregation types. +These benchmarks can be launched with the command `bin/run --benchmark AggregationPerformance` + + # MLlib tests To run MLlib tests, run `/bin/run-ml yamlfile`, where `yamlfile` is the path to a YAML configuration diff --git a/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala index f27c596..0ba3930 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala @@ -1,27 +1,55 @@ package com.databricks.spark.sql.perf -import org.apache.spark.sql.{Row, SQLContext} - -trait AggregationPerformance extends Benchmark { +class AggregationPerformance extends Benchmark { import sqlContext.implicits._ import ExecutionMode._ - val sizes = (1 to 6).map(math.pow(10, _).toInt).toSeq + val sizes = (1 to 6).map(math.pow(10, _).toInt) + + val x = Table( + "1milints", { + val df = sqlContext.range(0, 1000000).repartition(1) + df.createTempView("1milints") + df + }) + + val joinTables = Seq( + Table( + "100milints", { + val df = sqlContext.range(0, 100000000).repartition(10) + df.createTempView("100milints") + df + }), + + Table( + "1bilints", { + val df = sqlContext.range(0, 1000000000).repartition(10) + df.createTempView("1bilints") + df + } + ) + ) val variousCardinality = sizes.map { size => - Table(s"ints$size", - sparkContext.parallelize(1 to size).flatMap { group => + Table(s"ints$size", { + val df = sparkContext.parallelize(1 to size).flatMap { group => (1 to 10000).map(i => (group, i)) - }.toDF("a", "b")) + }.toDF("a", "b") + df.createTempView(s"ints$size") + df + }) } val lowCardinality = sizes.map { size => val fullSize = size * 10000L Table( - s"twoGroups$fullSize", - sqlContext.range(0, fullSize).select($"id" % 2 as 'a, $"id" as 'b)) + s"twoGroups$fullSize", { + val df = sqlContext.range(0, fullSize).select($"id" % 2 as 'a, $"id" as 'b) + df.createTempView(s"twoGroups$fullSize") + df + }) } val newAggreation = Variation("aggregationType", Seq("new", "old")) { @@ -29,7 +57,7 @@ trait AggregationPerformance extends Benchmark { case "new" => sqlContext.setConf("spark.sql.useAggregate2", "true") } - val varyNumGroupsAvg: Seq[Query] = variousCardinality.map(_.name).map { table => + val varyNumGroupsAvg: Seq[Benchmarkable] = variousCardinality.map(_.name).map { table => Query( s"avg-$table", s"SELECT AVG(b) FROM $table GROUP BY a", @@ -37,7 +65,7 @@ trait AggregationPerformance extends Benchmark { executionMode = ForeachResults) } - val twoGroupsAvg: Seq[Query] = lowCardinality.map(_.name).map { table => + val twoGroupsAvg: Seq[Benchmarkable] = lowCardinality.map(_.name).map { table => Query( s"avg-$table", s"SELECT AVG(b) FROM $table GROUP BY a", @@ -45,7 +73,7 @@ trait AggregationPerformance extends Benchmark { executionMode = ForeachResults) } - val complexInput = + val complexInput: Seq[Benchmarkable] = Seq("1milints", "100milints", "1bilints").map { table => Query( s"aggregation-complex-input-$table", @@ -54,7 +82,7 @@ trait AggregationPerformance extends Benchmark { executionMode = CollectResults) } - val aggregates = + val aggregates: Seq[Benchmarkable] = Seq("1milints", "100milints", "1bilints").flatMap { table => Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg => Query( diff --git a/src/main/scala/com/databricks/spark/sql/perf/DatasetPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/DatasetPerformance.scala index c49da52..0aaa629 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/DatasetPerformance.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/DatasetPerformance.scala @@ -16,9 +16,31 @@ package com.databricks.spark.sql.perf -import org.apache.spark.sql.{Encoder, SQLContext} +import org.apache.spark.sql.Encoders import org.apache.spark.sql.expressions.Aggregator +object TypedAverage extends Aggregator[Long, SumAndCount, Double] { + override def zero: SumAndCount = SumAndCount(0L, 0) + + override def reduce(b: SumAndCount, a: Long): SumAndCount = { + b.count += 1 + b.sum += a + b + } + + override def bufferEncoder = Encoders.product + + override def outputEncoder = Encoders.scalaDouble + + override def finish(reduction: SumAndCount): Double = reduction.sum.toDouble / reduction.count + + override def merge(b1: SumAndCount, b2: SumAndCount): SumAndCount = { + b1.count += b2.count + b1.sum += b2.sum + b1 + } +} + case class Data(id: Long) case class SumAndCount(var sum: Long, var count: Int) @@ -32,7 +54,7 @@ class DatasetPerformance extends Benchmark { val rdd = sparkContext.range(1, numLongs) val smallNumLongs = 1000000 - val smallds = sqlContext.range(1, smallNumLongs) + val smallds = sqlContext.range(1, smallNumLongs).as[Long] val smallrdd = sparkContext.range(1, smallNumLongs) def allBenchmarks = range ++ backToBackFilters ++ backToBackMaps ++ computeAverage @@ -99,32 +121,10 @@ class DatasetPerformance extends Benchmark { .map(d => Data(d.id + 1L))) ) - val average = new Aggregator[Long, SumAndCount, Double] { - override def zero: SumAndCount = SumAndCount(0, 0) - - override def reduce(b: SumAndCount, a: Long): SumAndCount = { - b.count += 1 - b.sum += a - b - } - - override def bufferEncoder = implicitly[Encoder[SumAndCount]] - - override def outputEncoder = implicitly[Encoder[Double]] - - override def finish(reduction: SumAndCount): Double = reduction.sum.toDouble / reduction.count - - override def merge(b1: SumAndCount, b2: SumAndCount): SumAndCount = { - b1.count += b2.count - b1.sum += b2.sum - b1 - } - }.toColumn - val computeAverage = Seq( new Query( "DS: average", - smallds.as[Long].select(average).toDF(), + smallds.select(TypedAverage.toColumn).toDF(), executionMode = ExecutionMode.CollectResults), new Query( "DF: average", @@ -140,4 +140,4 @@ class DatasetPerformance extends Benchmark { sumAndCount._1.toDouble / sumAndCount._2 }) ) -} +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala index 05971b0..8c58706 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala @@ -3,8 +3,8 @@ package com.databricks.spark.sql.perf import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ -trait JoinPerformance extends Benchmark { - // 1.5 mb, 1 file +class JoinPerformance extends Benchmark { + import ExecutionMode._ import sqlContext.implicits._ @@ -12,22 +12,27 @@ trait JoinPerformance extends Benchmark { private val table = sqlContext.table _ val x = Table( - "1milints", - sqlContext.range(0, 1000000) - .repartition(1)) + "1milints", { // 1.5 mb, 1 file + val df = sqlContext.range(0, 1000000).repartition(1) + df.createTempView("1milints") + df + }) val joinTables = Seq( - // 143.542mb, 10 files Table( - "100milints", - sqlContext.range(0, 100000000) - .repartition(10)), + "100milints", { // 143.542mb, 10 files + val df = sqlContext.range(0, 100000000).repartition(10) + df.createTempView("100milints") + df + }), - // 1.4348gb, 10 files Table( - "1bilints", - sqlContext.range(0, 1000000000) - .repartition(10)) + "1bilints", { // 143.542mb, 10 files + val df = sqlContext.range(0, 1000000000).repartition(10) + df.createTempView("1bilints") + df + } + ) ) val sortMergeJoin = Variation("sortMergeJoin", Seq("on", "off")) { @@ -35,7 +40,7 @@ trait JoinPerformance extends Benchmark { case "on" => sqlContext.setConf("spark.sql.planner.sortMergeJoin", "true") } - val singleKeyJoins = Seq("1milints", "100milints", "1bilints").flatMap { table1 => + val singleKeyJoins: Seq[Benchmarkable] = Seq("1milints", "100milints", "1bilints").flatMap { table1 => Seq("1milints", "100milints", "1bilints").flatMap { table2 => Seq("JOIN", "RIGHT JOIN", "LEFT JOIN", "FULL OUTER JOIN").map { join => Query( @@ -63,9 +68,9 @@ trait JoinPerformance extends Benchmark { val varyNumMatches = Seq(1, 2, 4, 8, 16).map { numCopies => val ints = table("100milints") - val copiedInts = Seq.fill(numCopies)(ints).reduce(_ unionAll _) + val copiedInts = Seq.fill(numCopies)(ints).reduce(_ union _) new Query( s"join - numMatches: $numCopies", copiedInts.as("a").join(ints.as("b"), $"a.id" === $"b.id")) } -} +} \ No newline at end of file