diff --git a/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala index c7705a2..05971b0 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala @@ -1,11 +1,15 @@ package com.databricks.spark.sql.perf -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ trait JoinPerformance extends Benchmark { // 1.5 mb, 1 file import ExecutionMode._ + import sqlContext.implicits._ + + private val table = sqlContext.table _ val x = Table( "1milints", @@ -15,7 +19,7 @@ trait JoinPerformance extends Benchmark { val joinTables = Seq( // 143.542mb, 10 files Table( - "1bilints", + "100milints", sqlContext.range(0, 100000000) .repartition(10)), @@ -42,4 +46,26 @@ trait JoinPerformance extends Benchmark { } } } -} \ No newline at end of file + + val varyDataSize = Seq(1, 128, 256, 512, 1024).map { dataSize => + val intsWithData = table("100milints").select($"id", lit("*" * dataSize).as(s"data$dataSize")) + new Query( + s"join - datasize: $dataSize", + intsWithData.as("a").join(intsWithData.as("b"), $"a.id" === $"b.id")) + } + + val varyKeyType = Seq(StringType, IntegerType, LongType, DoubleType).map { keyType => + val convertedInts = table("100milints").select($"id".cast(keyType).as("id")) + new Query( + s"join - keytype: $keyType", + convertedInts.as("a").join(convertedInts.as("b"), $"a.id" === $"b.id")) + } + + val varyNumMatches = Seq(1, 2, 4, 8, 16).map { numCopies => + val ints = table("100milints") + val copiedInts = Seq.fill(numCopies)(ints).reduce(_ unionAll _) + new Query( + s"join - numMatches: $numCopies", + copiedInts.as("a").join(ints.as("b"), $"a.id" === $"b.id")) + } +}