diff --git a/README.md b/README.md index 3765bdc..a7384b2 100644 --- a/README.md +++ b/README.md @@ -2,10 +2,32 @@ [![Build Status](https://travis-ci.org/databricks/spark-sql-perf.svg)](https://travis-ci.org/databricks/spark-sql-perf) -This is a performance testing framework for [Spark SQL](https://spark.apache.org/sql/) in [Apache Spark](https://spark.apache.org/) 1.4+. +This is a performance testing framework for [Spark SQL](https://spark.apache.org/sql/) in [Apache Spark](https://spark.apache.org/) 1.6+. **Note: This README is still under development. Please also check our source code for more information.** +# Quick Start + +``` +$ bin/run --help + +spark-sql-perf 0.2.0 +Usage: spark-sql-perf [options] + + -b | --benchmark + the name of the benchmark to run + -f | --filter + a filter on the name of the queries to run + -i | --iterations + the number of iterations to run + --help + prints this usage text + +$ bin/run --benchmark DatasetPerformance +``` + +# TPC-DS + ## How to use it The rest of document will use TPC-DS benchmark as an example. We will add contents to explain how to use other benchmarks add the support of a new benchmark dataset in future. diff --git a/bin/run b/bin/run new file mode 100755 index 0000000..52a192c --- /dev/null +++ b/bin/run @@ -0,0 +1,4 @@ +#!/bin/bash + +ARGS="runMain com.databricks.spark.sql.perf.RunBenchmark $@" +build/sbt "$ARGS" \ No newline at end of file diff --git a/build.sbt b/build.sbt index bc2b5a1..7000df9 100644 --- a/build.sbt +++ b/build.sbt @@ -14,19 +14,25 @@ licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0") sparkVersion := "1.6.0" -resolvers += "Apache Preview Repo" at "https://repository.apache.org/content/repositories/orgapachespark-1156/" - sparkComponents ++= Seq("sql", "hive") initialCommands in console := """ + |import org.apache.spark.sql._ + |import org.apache.spark.sql.functions._ + |import org.apache.spark.sql.types._ |import org.apache.spark.sql.hive.test.TestHive |import TestHive.implicits |import TestHive.sql + | + |val sqlContext = TestHive + |import sqlContext.implicits._ """.stripMargin libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion.value +libraryDependencies += "com.github.scopt" %% "scopt" % "3.3.0" + libraryDependencies += "com.twitter" %% "util-jvm" % "6.23.0" % "provided" libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test" diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index a5912c9..9e25e12 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -39,22 +39,21 @@ import com.databricks.spark.sql.perf.cpu._ * @param sqlContext An existing SQLContext. */ abstract class Benchmark( - @transient protected val sqlContext: SQLContext, - val resultsLocation: String = "/spark/sql/performance", - val resultsTableName: String = "sqlPerformance") + @transient val sqlContext: SQLContext) extends Serializable { import sqlContext.implicits._ - def createResultsTable() = { - sqlContext.sql(s"DROP TABLE $resultsTableName") - sqlContext.createExternalTable( - "sqlPerformance", "json", Map("path" -> (resultsLocation + "/*/"))) - } + def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate())) + + val resultsLocation = + sqlContext.getAllConfs.getOrElse( + "spark.sql.perf.results", + "/spark/sql/performance") protected def sparkContext = sqlContext.sparkContext - protected implicit def toOption[A](a: A) = Option(a) + protected implicit def toOption[A](a: A): Option[A] = Option(a) val buildInfo = Try(getClass.getClassLoader.loadClass("org.apache.spark.BuildInfo")).map { cls => cls.getMethods @@ -132,6 +131,11 @@ abstract class Benchmark( val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]() val currentMessages = new collection.mutable.ArrayBuffer[String]() + def logMessage(msg: String) = { + println(msg) + currentMessages += msg + } + // Stats for HTML status message. @volatile var currentExecution = "" @volatile var currentPlan = "" // for queries only @@ -149,6 +153,7 @@ abstract class Benchmark( } val timestamp = System.currentTimeMillis() + val resultPath = s"$resultsLocation/timestamp=$timestamp" val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList) val resultsFuture = Future { @@ -169,20 +174,20 @@ abstract class Benchmark( .foreach { name => try { sqlContext.table(name) - currentMessages += s"Table $name exists." + logMessage(s"Table $name exists.") } catch { case ae: Exception => val table = allTables .find(_.name == name) if (table.isDefined) { - currentMessages += s"Creating table: $name" + logMessage(s"Creating table: $name") table.get.data .write .mode("overwrite") .saveAsTable(name) } else { // the table could be subquery - println(s"Couldn't read table $name and its not defined as a Benchmark.Table.") + logMessage(s"Couldn't read table $name and its not defined as a Benchmark.Table.") } } } @@ -205,7 +210,7 @@ abstract class Benchmark( executionsToRun.flatMap { q => val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}" - currentMessages += s"Running execution ${q.name} $setup" + logMessage(s"Running execution ${q.name} $setup") currentExecution = q.name currentPlan = q match { @@ -220,13 +225,15 @@ abstract class Benchmark( } startTime = System.currentTimeMillis() - val singleResult = q.benchmark(includeBreakdown, setup, currentMessages, timeout) + val singleResult = + q.benchmark(includeBreakdown, setup, currentMessages, timeout) + singleResult.failure.foreach { f => failures += 1 - currentMessages += s"Execution '${q.name}' failed: ${f.message}" + logMessage(s"Execution '${q.name}' failed: ${f.message}") } singleResult.executionTime.foreach { time => - currentMessages += s"Execution time: ${time / 1000}s" + logMessage(s"Execution time: ${time / 1000}s") } currentResults += singleResult singleResult :: Nil @@ -240,16 +247,16 @@ abstract class Benchmark( try { val resultsTable = sqlContext.createDataFrame(results) - currentMessages += s"Results written to table: 'sqlPerformance' at $resultsLocation/$timestamp" + logMessage(s"Results written to table: 'sqlPerformance' at $resultPath") results.toDF() .coalesce(1) .write .format("json") - .save(s"$resultsLocation/$timestamp") + .save(resultPath) results.toDF() } catch { - case e: Throwable => currentMessages += s"Failed to write data: $e" + case e: Throwable => logMessage(s"Failed to write data: $e") } logCollection() @@ -257,13 +264,13 @@ abstract class Benchmark( def scheduleCpuCollection(fs: FS) = { logCollection = () => { - currentMessages += s"Begining CPU log collection" + logMessage(s"Begining CPU log collection") try { val location = cpu.collectLogs(sqlContext, fs, timestamp) - currentMessages += s"cpu results recorded to $location" + logMessage(s"cpu results recorded to $location") } catch { case e: Throwable => - currentMessages += s"Error collecting logs: $e" + logMessage(s"Error collecting logs: $e") throw e } } @@ -327,7 +334,7 @@ abstract class Benchmark( } s""" |

$status Experiment

- |Permalink: table("$resultsTableName").where('timestamp === ${timestamp}L)
+ |Permalink: sqlContext.read.json("$resultPath")
|Iterations complete: ${currentRuns.size / combinations.size} / $iterations
|Failures: $failures
|Executions run: ${currentResults.size} / ${iterations * combinations.size * executionsToRun.size} @@ -383,15 +390,15 @@ abstract class Benchmark( myType.declarations .filter(m => m.isMethod) .map(_.asMethod) - .filter(_.asMethod.returnType =:= typeOf[Query]) - .map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Query]) + .filter(_.asMethod.returnType =:= typeOf[Benchmarkable]) + .map(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Benchmarkable]) def groupedQueries = myType.declarations .filter(m => m.isMethod) .map(_.asMethod) - .filter(_.asMethod.returnType =:= typeOf[Seq[Query]]) - .flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Query]]) + .filter(_.asMethod.returnType =:= typeOf[Seq[Benchmarkable]]) + .flatMap(method => runtimeMirror.reflect(this).reflectMethod(method).apply().asInstanceOf[Seq[Benchmarkable]]) @transient lazy val allQueries = (singleQueries ++ groupedQueries).toSeq @@ -426,35 +433,6 @@ abstract class Benchmark( """.stripMargin } - trait ExecutionMode extends Serializable - case object ExecutionMode { - // Benchmark run by collecting queries results (e.g. rdd.collect()) - case object CollectResults extends ExecutionMode { - override def toString: String = "collect" - } - - // Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit)) - case object ForeachResults extends ExecutionMode { - override def toString: String = "foreach" - } - - // Benchmark run by saving the output of each query as a parquet file at the specified location - case class WriteParquet(location: String) extends ExecutionMode { - override def toString: String = "saveToParquet" - } - - // Benchmark run by calculating the sum of the hash value of all rows. This is used to check - // query results. - case object HashResults extends ExecutionMode { - override def toString: String = "hash" - } - - // Results from Spark perf - case object SparkPerfResults extends ExecutionMode { - override def toString: String = "sparkPerf" - } - } - /** Factory object for benchmark queries. */ case object Query { def apply( @@ -473,80 +451,16 @@ abstract class Benchmark( } } - /** A trait to describe things that can be benchmarked. */ - trait Benchmarkable { - val name: String - protected val executionMode: ExecutionMode - - final def benchmark( - includeBreakdown: Boolean, - description: String = "", - messages: ArrayBuffer[String], - timeout: Long): BenchmarkResult = { - sparkContext.setJobDescription(s"Execution: $name, $description") - beforeBenchmark() - val result = runBenchmark(includeBreakdown, description, messages, timeout) - afterBenchmark(sqlContext.sparkContext) - result - } - - protected def beforeBenchmark(): Unit = { } - - private def afterBenchmark(sc: SparkContext): Unit = { - // Best-effort clean up of weakly referenced RDDs, shuffles, and broadcasts - System.gc() - // Remove any leftover blocks that still exist - sc.getExecutorStorageStatus - .flatMap { status => status.blocks.map { case (bid, _) => bid } } - .foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) } - } - - private def runBenchmark( - includeBreakdown: Boolean, - description: String = "", - messages: ArrayBuffer[String], - timeout: Long): BenchmarkResult = { - val jobgroup = UUID.randomUUID().toString - var result: BenchmarkResult = null - val thread = new Thread("benchmark runner") { - override def run(): Unit = { - sparkContext.setJobGroup(jobgroup, s"benchmark $name", true) - result = doBenchmark(includeBreakdown, description, messages) - } - } - thread.setDaemon(true) - thread.start() - thread.join(timeout) - if (thread.isAlive) { - sparkContext.cancelJobGroup(jobgroup) - thread.interrupt() - result = BenchmarkResult( - name = name, - mode = executionMode.toString, - failure = Some(Failure("Timeout", s"timeout after ${timeout / 1000} seconds")) - ) - } - result - } - - protected def doBenchmark( - includeBreakdown: Boolean, - description: String = "", - messages: ArrayBuffer[String]): BenchmarkResult - - protected def measureTimeMs[A](f: => A): Double = { - val startTime = System.nanoTime() - f - val endTime = System.nanoTime() - (endTime - startTime).toDouble / 1000000 - } - } - object RDDCount { def apply( name: String, rdd: RDD[_]) = { - new SparkPerfExecution(name, Map.empty, () => Unit, () => rdd.count()) + new SparkPerfExecution( + name, + Map.empty, + () => Unit, + () => rdd.count(), + rdd.toDebugString) } } @@ -555,9 +469,16 @@ abstract class Benchmark( override val name: String, parameters: Map[String, String], prepare: () => Unit, - run: () => Unit) + run: () => Unit, + description: String = "") extends Benchmarkable { + override def toString: String = + s""" + |== $name == + |$description + """.stripMargin + protected override val executionMode: ExecutionMode = ExecutionMode.SparkPerfResults protected override def beforeBenchmark(): Unit = { prepare() } @@ -593,7 +514,7 @@ abstract class Benchmark( override val executionMode: ExecutionMode = ExecutionMode.ForeachResults) extends Benchmarkable with Serializable { - override def toString = { + override def toString: String = { try { s""" |== Query: $name == @@ -606,7 +527,7 @@ abstract class Benchmark( | Can't be analyzed: $e | | $description - """.stripMargin + """.stripMargin } } diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala new file mode 100644 index 0000000..2271a97 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala @@ -0,0 +1,96 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf + +import java.util.UUID + +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkEnv, SparkContext} + +import scala.collection.mutable.ArrayBuffer + +/** A trait to describe things that can be benchmarked. */ +trait Benchmarkable { + val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate()) + val sparkContext = sqlContext.sparkContext + + val name: String + protected val executionMode: ExecutionMode + + final def benchmark( + includeBreakdown: Boolean, + description: String = "", + messages: ArrayBuffer[String], + timeout: Long): BenchmarkResult = { + sparkContext.setJobDescription(s"Execution: $name, $description") + beforeBenchmark() + val result = runBenchmark(includeBreakdown, description, messages, timeout) + afterBenchmark(sqlContext.sparkContext) + result + } + + protected def beforeBenchmark(): Unit = { } + + private def afterBenchmark(sc: SparkContext): Unit = { + // Best-effort clean up of weakly referenced RDDs, shuffles, and broadcasts + System.gc() + // Remove any leftover blocks that still exist + sc.getExecutorStorageStatus + .flatMap { status => status.blocks.map { case (bid, _) => bid } } + .foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) } + } + + private def runBenchmark( + includeBreakdown: Boolean, + description: String = "", + messages: ArrayBuffer[String], + timeout: Long): BenchmarkResult = { + val jobgroup = UUID.randomUUID().toString + var result: BenchmarkResult = null + val thread = new Thread("benchmark runner") { + override def run(): Unit = { + sparkContext.setJobGroup(jobgroup, s"benchmark $name", true) + result = doBenchmark(includeBreakdown, description, messages) + } + } + thread.setDaemon(true) + thread.start() + thread.join(timeout) + if (thread.isAlive) { + sparkContext.cancelJobGroup(jobgroup) + thread.interrupt() + result = BenchmarkResult( + name = name, + mode = executionMode.toString, + failure = Some(Failure("Timeout", s"timeout after ${timeout / 1000} seconds")) + ) + } + result + } + + protected def doBenchmark( + includeBreakdown: Boolean, + description: String = "", + messages: ArrayBuffer[String]): BenchmarkResult + + protected def measureTimeMs[A](f: => A): Double = { + val startTime = System.nanoTime() + f + val endTime = System.nanoTime() + (endTime - startTime).toDouble / 1000000 + } +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/DatasetPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/DatasetPerformance.scala index d58ecfc..7c97348 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/DatasetPerformance.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/DatasetPerformance.scala @@ -1,12 +1,29 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.databricks.spark.sql.perf +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.expressions.Aggregator case class Data(id: Long) case class SumAndCount(var sum: Long, var count: Int) -trait DatasetPerformance extends Benchmark { +class DatasetPerformance extends Benchmark { import sqlContext.implicits._ @@ -31,7 +48,7 @@ trait DatasetPerformance extends Benchmark { executionMode = ExecutionMode.ForeachResults), RDDCount( "RDD: range", - rdd.map(Data)) + rdd.map(Data(_))) ) val backToBackFilters = Seq( @@ -51,7 +68,7 @@ trait DatasetPerformance extends Benchmark { .filter("id % 103 != 0")), RDDCount( "RDD: back-to-back filters", - rdd.map(Data) + rdd.map(Data(_)) .filter(_.id % 100 != 0) .filter(_.id % 101 != 0) .filter(_.id % 102 != 0) diff --git a/src/main/scala/com/databricks/spark/sql/perf/ExecutionMode.scala b/src/main/scala/com/databricks/spark/sql/perf/ExecutionMode.scala new file mode 100644 index 0000000..e44bd87 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/ExecutionMode.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf + +/** + * Describes how a given Spark benchmark should be run (i.e. should the results be collected to + * the driver or just computed on the executors. + */ +trait ExecutionMode extends Serializable +case object ExecutionMode { + /** Benchmark run by collecting queries results (e.g. rdd.collect()) */ + case object CollectResults extends ExecutionMode { + override def toString: String = "collect" + } + + /** Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit)) */ + case object ForeachResults extends ExecutionMode { + override def toString: String = "foreach" + } + + /** Benchmark run by saving the output of each query as a parquet file. */ + case class WriteParquet(location: String) extends ExecutionMode { + override def toString: String = "saveToParquet" + } + + /** + * Benchmark run by calculating the sum of the hash value of all rows. This is used to check + * query results do not change. + */ + case object HashResults extends ExecutionMode { + override def toString: String = "hash" + } + + /** Results from Spark perf */ + case object SparkPerfResults extends ExecutionMode { + override def toString: String = "sparkPerf" + } +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/RunBenchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/RunBenchmark.scala new file mode 100644 index 0000000..4069167 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/RunBenchmark.scala @@ -0,0 +1,112 @@ +/* + * Copyright 2015 Databricks Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.spark.sql.perf + +import java.net.InetAddress + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.functions._ +import org.apache.spark.{SparkContext, SparkConf} + +import scala.util.Try + +case class RunConfig( + benchmarkName: String = null, + filter: Option[String] = None, + iterations: Int = 3) + +/** + * Runs a benchmark locally and prints the results to the screen. + */ +object RunBenchmark { + def main(args: Array[String]): Unit = { + val parser = new scopt.OptionParser[RunConfig]("spark-sql-perf") { + head("spark-sql-perf", "0.2.0") + opt[String]('b', "benchmark") + .action { (x, c) => c.copy(benchmarkName = x) } + .text("the name of the benchmark to run") + .required() + opt[String]('f', "filter") + .action((x, c) => c.copy(filter = Some(x))) + .text("a filter on the name of the queries to run") + opt[Int]('i', "iterations") + .action((x, c) => c.copy(iterations = x)) + .text("the number of iterations to run") + help("help") + .text("prints this usage text") + } + + parser.parse(args, RunConfig()) match { + case Some(config) => + run(config) + case None => + System.exit(1) + } + } + + def run(config: RunConfig): Unit = { + val conf = new SparkConf() + .setMaster("local[*]") + .setAppName(getClass.getName) + + val sc = SparkContext.getOrCreate(conf) + val sqlContext = SQLContext.getOrCreate(sc) + import sqlContext.implicits._ + + sqlContext.setConf("spark.sql.perf.results", new java.io.File("performance").toURI.toString) + val benchmark = Try { + Class.forName(config.benchmarkName) + .newInstance() + .asInstanceOf[Benchmark] + } getOrElse { + Class.forName("com.databricks.spark.sql.perf." + config.benchmarkName) + .newInstance() + .asInstanceOf[Benchmark] + } + + val allQueries = config.filter.map { f => + benchmark.allQueries.filter(_.name contains f) + } getOrElse { + benchmark.allQueries + } + + println("== QUERY LIST ==") + allQueries.foreach(println) + + val experiment = benchmark.runExperiment( + executionsToRun = allQueries, + iterations = config.iterations, + tags = Map( + "runtype" -> "local", + "host" -> InetAddress.getLocalHost().getHostName())) + + println("== STARTING EXPERIMENT ==") + experiment.waitForFinish(1000 * 60 * 30) + experiment.getCurrentRuns() + .withColumn("result", explode($"results")) + .select("result.*") + .groupBy("name") + .agg( + min($"executionTime") as 'minTimeMs, + max($"executionTime") as 'maxTimeMs, + avg($"executionTime") as 'avgTimeMs, + stddev($"executionTime") as 'stdDev) + .orderBy("name") + .show() + println(s"""Results: sqlContext.read.json("${experiment.resultPath}")""") + } +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala index b9fb016..e1b1c69 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala @@ -16,7 +16,7 @@ package com.databricks.spark.sql.perf.bigdata -import com.databricks.spark.sql.perf.Benchmark +import com.databricks.spark.sql.perf.{ExecutionMode, Benchmark} trait Queries extends Benchmark { diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala index 752dacf..4765c3a 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/ImpalaKitQueries.scala @@ -16,7 +16,7 @@ package com.databricks.spark.sql.perf.tpcds -import com.databricks.spark.sql.perf.Benchmark +import com.databricks.spark.sql.perf.{ExecutionMode, Benchmark} trait ImpalaKitQueries extends Benchmark { diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/SimpleQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/SimpleQueries.scala index 2842f00..1f7f355 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/SimpleQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/SimpleQueries.scala @@ -16,7 +16,7 @@ package com.databricks.spark.sql.perf.tpcds -import com.databricks.spark.sql.perf.Benchmark +import com.databricks.spark.sql.perf.{ExecutionMode, Benchmark} trait SimpleQueries extends Benchmark { diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala index b2a07cf..85ab4f8 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala @@ -25,12 +25,12 @@ import org.apache.spark.sql.SQLContext * TPC-DS benchmark's dataset. * @param sqlContext An existing SQLContext. */ -class TPCDS ( - @transient sqlContext: SQLContext, - resultsLocation: String = "/spark/sql/performance", - resultsTableName: String = "sqlPerformance") - extends Benchmark(sqlContext, resultsLocation, resultsTableName) - with ImpalaKitQueries with SimpleQueries with Tpcds_1_4_Queries with Serializable { +class TPCDS + extends Benchmark + with ImpalaKitQueries + with SimpleQueries + with Tpcds_1_4_Queries + with Serializable { /* def setupBroadcast(skipTables: Seq[String] = Seq("store_sales", "customer")) = { diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS_1_4_Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS_1_4_Queries.scala index bf88672..9042eec 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS_1_4_Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS_1_4_Queries.scala @@ -16,7 +16,7 @@ package com.databricks.spark.sql.perf.tpcds -import com.databricks.spark.sql.perf.Benchmark +import com.databricks.spark.sql.perf.{ExecutionMode, Benchmark} /** * This implements the official TPCDS v1.4 queries with only cosmetic modifications diff --git a/src/test/scala/DatasetPerformanceSuite.scala b/src/test/scala/DatasetPerformanceSuite.scala index b5179c2..d0c10c1 100644 --- a/src/test/scala/DatasetPerformanceSuite.scala +++ b/src/test/scala/DatasetPerformanceSuite.scala @@ -6,7 +6,8 @@ import org.scalatest.FunSuite class DatasetPerformanceSuite extends FunSuite { test("run benchmark") { - val benchmark = new Benchmark(TestHive) with DatasetPerformance { + TestHive // Init HiveContext + val benchmark = new DatasetPerformance() { override val numLongs = 100 } import benchmark._