Fix 3 local benchmark classes (#165)

* Fix AggregationPerformance
* Fix JoinPerformance
* Fix average computation for datasets
* Add explanation and usage for local benchmarks
This commit is contained in:
Phil 2018-09-17 13:08:56 +01:00 committed by Nico Poggi
parent aac7eb54c1
commit d9a41a1204
4 changed files with 98 additions and 55 deletions

View File

@ -34,6 +34,16 @@ The first run of `bin/run` will build the library.
Use `sbt package` or `sbt assembly` to build the library jar.
## Local performance tests
The framework contains twelve benchmarks that can be executed in local mode. They are organized into three classes and target different components and functions of Spark:
* [DatasetPerformance](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/DatasetPerformance.scala) compares the performance of the old RDD API with the new Dataframe and Dataset APIs.
These benchmarks can be launched with the command `bin/run --benchmark DatasetPerformance`
* [JoinPerformance](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala) compares the performance of joining different table sizes and shapes with different join types.
These benchmarks can be launched with the command `bin/run --benchmark JoinPerformance`
* [AggregationPerformance](https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala) compares the performance of aggregating different table sizes using different aggregation types.
These benchmarks can be launched with the command `bin/run --benchmark AggregationPerformance`
# MLlib tests
To run MLlib tests, run `/bin/run-ml yamlfile`, where `yamlfile` is the path to a YAML configuration

View File

@ -1,27 +1,55 @@
package com.databricks.spark.sql.perf
import org.apache.spark.sql.{Row, SQLContext}
trait AggregationPerformance extends Benchmark {
class AggregationPerformance extends Benchmark {
import sqlContext.implicits._
import ExecutionMode._
val sizes = (1 to 6).map(math.pow(10, _).toInt).toSeq
val sizes = (1 to 6).map(math.pow(10, _).toInt)
val x = Table(
"1milints", {
val df = sqlContext.range(0, 1000000).repartition(1)
df.createTempView("1milints")
df
})
val joinTables = Seq(
Table(
"100milints", {
val df = sqlContext.range(0, 100000000).repartition(10)
df.createTempView("100milints")
df
}),
Table(
"1bilints", {
val df = sqlContext.range(0, 1000000000).repartition(10)
df.createTempView("1bilints")
df
}
)
)
val variousCardinality = sizes.map { size =>
Table(s"ints$size",
sparkContext.parallelize(1 to size).flatMap { group =>
Table(s"ints$size", {
val df = sparkContext.parallelize(1 to size).flatMap { group =>
(1 to 10000).map(i => (group, i))
}.toDF("a", "b"))
}.toDF("a", "b")
df.createTempView(s"ints$size")
df
})
}
val lowCardinality = sizes.map { size =>
val fullSize = size * 10000L
Table(
s"twoGroups$fullSize",
sqlContext.range(0, fullSize).select($"id" % 2 as 'a, $"id" as 'b))
s"twoGroups$fullSize", {
val df = sqlContext.range(0, fullSize).select($"id" % 2 as 'a, $"id" as 'b)
df.createTempView(s"twoGroups$fullSize")
df
})
}
val newAggreation = Variation("aggregationType", Seq("new", "old")) {
@ -29,7 +57,7 @@ trait AggregationPerformance extends Benchmark {
case "new" => sqlContext.setConf("spark.sql.useAggregate2", "true")
}
val varyNumGroupsAvg: Seq[Query] = variousCardinality.map(_.name).map { table =>
val varyNumGroupsAvg: Seq[Benchmarkable] = variousCardinality.map(_.name).map { table =>
Query(
s"avg-$table",
s"SELECT AVG(b) FROM $table GROUP BY a",
@ -37,7 +65,7 @@ trait AggregationPerformance extends Benchmark {
executionMode = ForeachResults)
}
val twoGroupsAvg: Seq[Query] = lowCardinality.map(_.name).map { table =>
val twoGroupsAvg: Seq[Benchmarkable] = lowCardinality.map(_.name).map { table =>
Query(
s"avg-$table",
s"SELECT AVG(b) FROM $table GROUP BY a",
@ -45,7 +73,7 @@ trait AggregationPerformance extends Benchmark {
executionMode = ForeachResults)
}
val complexInput =
val complexInput: Seq[Benchmarkable] =
Seq("1milints", "100milints", "1bilints").map { table =>
Query(
s"aggregation-complex-input-$table",
@ -54,7 +82,7 @@ trait AggregationPerformance extends Benchmark {
executionMode = CollectResults)
}
val aggregates =
val aggregates: Seq[Benchmarkable] =
Seq("1milints", "100milints", "1bilints").flatMap { table =>
Seq("SUM", "AVG", "COUNT", "STDDEV").map { agg =>
Query(

View File

@ -16,9 +16,31 @@
package com.databricks.spark.sql.perf
import org.apache.spark.sql.{Encoder, SQLContext}
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.expressions.Aggregator
object TypedAverage extends Aggregator[Long, SumAndCount, Double] {
override def zero: SumAndCount = SumAndCount(0L, 0)
override def reduce(b: SumAndCount, a: Long): SumAndCount = {
b.count += 1
b.sum += a
b
}
override def bufferEncoder = Encoders.product
override def outputEncoder = Encoders.scalaDouble
override def finish(reduction: SumAndCount): Double = reduction.sum.toDouble / reduction.count
override def merge(b1: SumAndCount, b2: SumAndCount): SumAndCount = {
b1.count += b2.count
b1.sum += b2.sum
b1
}
}
case class Data(id: Long)
case class SumAndCount(var sum: Long, var count: Int)
@ -32,7 +54,7 @@ class DatasetPerformance extends Benchmark {
val rdd = sparkContext.range(1, numLongs)
val smallNumLongs = 1000000
val smallds = sqlContext.range(1, smallNumLongs)
val smallds = sqlContext.range(1, smallNumLongs).as[Long]
val smallrdd = sparkContext.range(1, smallNumLongs)
def allBenchmarks = range ++ backToBackFilters ++ backToBackMaps ++ computeAverage
@ -99,32 +121,10 @@ class DatasetPerformance extends Benchmark {
.map(d => Data(d.id + 1L)))
)
val average = new Aggregator[Long, SumAndCount, Double] {
override def zero: SumAndCount = SumAndCount(0, 0)
override def reduce(b: SumAndCount, a: Long): SumAndCount = {
b.count += 1
b.sum += a
b
}
override def bufferEncoder = implicitly[Encoder[SumAndCount]]
override def outputEncoder = implicitly[Encoder[Double]]
override def finish(reduction: SumAndCount): Double = reduction.sum.toDouble / reduction.count
override def merge(b1: SumAndCount, b2: SumAndCount): SumAndCount = {
b1.count += b2.count
b1.sum += b2.sum
b1
}
}.toColumn
val computeAverage = Seq(
new Query(
"DS: average",
smallds.as[Long].select(average).toDF(),
smallds.select(TypedAverage.toColumn).toDF(),
executionMode = ExecutionMode.CollectResults),
new Query(
"DF: average",
@ -140,4 +140,4 @@ class DatasetPerformance extends Benchmark {
sumAndCount._1.toDouble / sumAndCount._2
})
)
}
}

View File

@ -3,8 +3,8 @@ package com.databricks.spark.sql.perf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
trait JoinPerformance extends Benchmark {
// 1.5 mb, 1 file
class JoinPerformance extends Benchmark {
import ExecutionMode._
import sqlContext.implicits._
@ -12,22 +12,27 @@ trait JoinPerformance extends Benchmark {
private val table = sqlContext.table _
val x = Table(
"1milints",
sqlContext.range(0, 1000000)
.repartition(1))
"1milints", { // 1.5 mb, 1 file
val df = sqlContext.range(0, 1000000).repartition(1)
df.createTempView("1milints")
df
})
val joinTables = Seq(
// 143.542mb, 10 files
Table(
"100milints",
sqlContext.range(0, 100000000)
.repartition(10)),
"100milints", { // 143.542mb, 10 files
val df = sqlContext.range(0, 100000000).repartition(10)
df.createTempView("100milints")
df
}),
// 1.4348gb, 10 files
Table(
"1bilints",
sqlContext.range(0, 1000000000)
.repartition(10))
"1bilints", { // 143.542mb, 10 files
val df = sqlContext.range(0, 1000000000).repartition(10)
df.createTempView("1bilints")
df
}
)
)
val sortMergeJoin = Variation("sortMergeJoin", Seq("on", "off")) {
@ -35,7 +40,7 @@ trait JoinPerformance extends Benchmark {
case "on" => sqlContext.setConf("spark.sql.planner.sortMergeJoin", "true")
}
val singleKeyJoins = Seq("1milints", "100milints", "1bilints").flatMap { table1 =>
val singleKeyJoins: Seq[Benchmarkable] = Seq("1milints", "100milints", "1bilints").flatMap { table1 =>
Seq("1milints", "100milints", "1bilints").flatMap { table2 =>
Seq("JOIN", "RIGHT JOIN", "LEFT JOIN", "FULL OUTER JOIN").map { join =>
Query(
@ -63,9 +68,9 @@ trait JoinPerformance extends Benchmark {
val varyNumMatches = Seq(1, 2, 4, 8, 16).map { numCopies =>
val ints = table("100milints")
val copiedInts = Seq.fill(numCopies)(ints).reduce(_ unionAll _)
val copiedInts = Seq.fill(numCopies)(ints).reduce(_ union _)
new Query(
s"join - numMatches: $numCopies",
copiedInts.as("a").join(ints.as("b"), $"a.id" === $"b.id"))
}
}
}