diff --git a/build.sbt b/build.sbt index f34eb63..361529e 100644 --- a/build.sbt +++ b/build.sbt @@ -14,7 +14,8 @@ licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0") sparkVersion := "2.0.0-SNAPSHOT" -sparkComponents ++= Seq("sql", "hive") +sparkComponents ++= Seq("sql", "hive", "mllib") + initialCommands in console := """ @@ -40,22 +41,26 @@ libraryDependencies += "com.twitter" %% "util-jvm" % "6.23.0" % "provided" libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test" +libraryDependencies += "org.yaml" % "snakeyaml" % "1.17" + +libraryDependencies += "com.typesafe" %% "scalalogging-slf4j" % "1.1.0" + fork := true // Your username to login to Databricks Cloud -dbcUsername := sys.env.getOrElse("DBC_USERNAME", sys.error("Please set DBC_USERNAME")) +dbcUsername := sys.env.getOrElse("DBC_USERNAME", "") // Your password (Can be set as an environment variable) -dbcPassword := sys.env.getOrElse("DBC_PASSWORD", sys.error("Please set DBC_PASSWORD")) +dbcPassword := sys.env.getOrElse("DBC_PASSWORD", "") // The URL to the Databricks Cloud DB Api. Don't forget to set the port number to 34563! dbcApiUrl := sys.env.getOrElse ("DBC_URL", sys.error("Please set DBC_URL")) // Add any clusters that you would like to deploy your work to. e.g. "My Cluster" // or run dbcExecuteCommand -dbcClusters += sys.env.getOrElse("DBC_USERNAME", sys.error("Please set DBC_USERNAME")) +dbcClusters += sys.env.getOrElse("DBC_USERNAME", "") -dbcLibraryPath := s"/Users/${sys.env.getOrElse("DBC_USERNAME", sys.error("Please set DBC_USERNAME"))}/lib" +dbcLibraryPath := s"/Users/${sys.env.getOrElse("DBC_USERNAME", "")}/lib" val runBenchmark = inputKey[Unit]("runs a benchmark") @@ -64,7 +69,8 @@ runBenchmark := { val args = spaceDelimited("[args]").parsed val scalaRun = (runner in run).value val classpath = (fullClasspath in Compile).value - scalaRun.run("com.databricks.spark.sql.perf.RunBenchmark", classpath.map(_.data), args, streams.value.log) + scalaRun.run("com.databricks.spark.sql.perf.RunBenchmark", classpath.map(_.data), args, + streams.value.log) } import ReleaseTransformations._ diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index 9a7cc67..f354ce2 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -21,7 +21,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent._ import scala.concurrent.duration._ import scala.language.implicitConversions -import scala.util.Try +import scala.util.{Success, Try, Failure => SFailure} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, DataFrame, SQLContext} @@ -39,7 +39,7 @@ abstract class Benchmark( @transient val sqlContext: SQLContext) extends Serializable { - import sqlContext.implicits._ + import Benchmark._ def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate())) @@ -66,25 +66,6 @@ abstract class Benchmark( defaultParallelism = sparkContext.defaultParallelism, buildInfo = buildInfo) - /** - * A Variation represents a setting (e.g. the number of shuffle partitions or if tables - * are cached in memory) that we want to change in a experiment run. - * A Variation has three parts, `name`, `options`, and `setup`. - * The `name` is the identifier of a Variation. `options` is a Seq of options that - * will be used for a query. Basically, a query will be executed with every option - * defined in the list of `options`. `setup` defines the needed action for every - * option. For example, the following Variation is used to change the number of shuffle - * partitions of a query. The name of the Variation is "shufflePartitions". There are - * two options, 200 and 2000. The setup is used to set the value of property - * "spark.sql.shuffle.partitions". - * - * {{{ - * Variation("shufflePartitions", Seq("200", "2000")) { - * case num => sqlContext.setConf("spark.sql.shuffle.partitions", num) - * } - * }}} - */ - case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit) val codegen = Variation("codegen", Seq("on", "off")) { case "off" => sqlContext.setConf("spark.sql.codegen", "false") @@ -122,238 +103,14 @@ abstract class Benchmark( iterations: Int = 3, variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("true")) { _ => {} }), tags: Map[String, String] = Map.empty, - timeout: Long = 0L) = { + timeout: Long = 0L, + resultLocation: String = resultsLocation, + forkThread: Boolean = true) = { - class ExperimentStatus { - val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]() - val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]() - val currentMessages = new collection.mutable.ArrayBuffer[String]() - - def logMessage(msg: String) = { - println(msg) - currentMessages += msg - } - - // Stats for HTML status message. - @volatile var currentExecution = "" - @volatile var currentPlan = "" // for queries only - @volatile var currentConfig = "" - @volatile var failures = 0 - @volatile var startTime = 0L - - /** An optional log collection task that will run after the experiment. */ - @volatile var logCollection: () => Unit = () => {} - - - def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match { - case Nil => List(Nil) - case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt - } - - val timestamp = System.currentTimeMillis() - val resultPath = s"$resultsLocation/timestamp=$timestamp" - val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList) - val resultsFuture = Future { - - // If we're running queries, create tables for them - executionsToRun - .collect { case query: Query => query } - .flatMap { query => - try { - query.newDataFrame().queryExecution.logical.collect { - case UnresolvedRelation(t, _) => t.table - } - } catch { - // ignore the queries that can't be parsed - case e: Exception => Seq() - } - } - .distinct - .foreach { name => - try { - sqlContext.table(name) - logMessage(s"Table $name exists.") - } catch { - case ae: Exception => - val table = allTables - .find(_.name == name) - if (table.isDefined) { - logMessage(s"Creating table: $name") - table.get.data - .write - .mode("overwrite") - .saveAsTable(name) - } else { - // the table could be subquery - logMessage(s"Couldn't read table $name and its not defined as a Benchmark.Table.") - } - } - } - - // Run the benchmarks! - val results: Seq[ExperimentRun] = (1 to iterations).flatMap { i => - combinations.map { setup => - val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map { - case (v, idx) => - v.setup(v.options(idx)) - v.name -> v.options(idx).toString - } - currentConfig = currentOptions.map { case (k,v) => s"$k: $v" }.mkString(", ") - - val result = ExperimentRun( - timestamp = timestamp, - iteration = i, - tags = currentOptions.toMap ++ tags, - configuration = currentConfiguration, - - executionsToRun.flatMap { q => - val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}" - logMessage(s"Running execution ${q.name} $setup") - - currentExecution = q.name - currentPlan = q match { - case query: Query => - try { - query.newDataFrame().queryExecution.executedPlan.toString() - } catch { - case e: Exception => - s"failed to parse: $e" - } - case _ => "" - } - startTime = System.currentTimeMillis() - - val singleResult = - q.benchmark(includeBreakdown, setup, currentMessages, timeout) - - singleResult.failure.foreach { f => - failures += 1 - logMessage(s"Execution '${q.name}' failed: ${f.message}") - } - singleResult.executionTime.foreach { time => - logMessage(s"Execution time: ${time / 1000}s") - } - currentResults += singleResult - singleResult :: Nil - }) - - currentRuns += result - - result - } - } - - try { - val resultsTable = sqlContext.createDataFrame(results) - logMessage(s"Results written to table: 'sqlPerformance' at $resultPath") - resultsTable - .coalesce(1) - .write - .format("json") - .save(resultPath) - } catch { - case e: Throwable => logMessage(s"Failed to write data: $e") - } - - logCollection() - } - - def scheduleCpuCollection(fs: FS) = { - logCollection = () => { - logMessage(s"Begining CPU log collection") - try { - val location = cpu.collectLogs(sqlContext, fs, timestamp) - logMessage(s"cpu results recorded to $location") - } catch { - case e: Throwable => - logMessage(s"Error collecting logs: $e") - throw e - } - } - } - - def cpuProfile = new Profile(sqlContext, sqlContext.read.json(getCpuLocation(timestamp))) - - def cpuProfileHtml(fs: FS) = { - s""" - |

CPU Profile

- |Permalink: sqlContext.read.json("${getCpuLocation(timestamp)}")
- |${cpuProfile.buildGraph(fs)} - """.stripMargin - } - - /** Waits for the finish of the experiment. */ - def waitForFinish(timeoutInSeconds: Int) = { - Await.result(resultsFuture, timeoutInSeconds.seconds) - } - - /** Returns results from an actively running experiment. */ - def getCurrentResults() = { - val tbl = sqlContext.createDataFrame(currentResults) - tbl.registerTempTable("currentResults") - tbl - } - - /** Returns full iterations from an actively running experiment. */ - def getCurrentRuns() = { - val tbl = sqlContext.createDataFrame(currentRuns) - tbl.registerTempTable("currentRuns") - tbl - } - - def tail(n: Int = 20) = { - currentMessages.takeRight(n).mkString("\n") - } - - def status = - if (resultsFuture.isCompleted) { - if (resultsFuture.value.get.isFailure) "Failed" else "Successful" - } else { - "Running" - } - - override def toString = - s"""Permalink: table("sqlPerformance").where('timestamp === ${timestamp}L)""" - - - def html: String = { - val maybeQueryPlan: String = - if (currentPlan.nonEmpty) { - s""" - |

QueryPlan

- |
-              |${currentPlan.replaceAll("\n", "
")} - |
- """.stripMargin - } else { - "" - } - s""" - |

$status Experiment

- |Permalink: sqlContext.read.json("$resultPath")
- |Iterations complete: ${currentRuns.size / combinations.size} / $iterations
- |Failures: $failures
- |Executions run: ${currentResults.size} / ${iterations * combinations.size * executionsToRun.size} - |
- |Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s
- | - |

Current Execution: $currentExecution

- |Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s
- |$currentConfig
- |$maybeQueryPlan - |

Logs

- |
-           |${tail()}
-           |
- """.stripMargin - } - } - new ExperimentStatus + new ExperimentStatus(executionsToRun, includeBreakdown, iterations, variations, tags, + timeout, resultLocation, sqlContext, allTables, currentConfiguration) } - case class Table( - name: String, - data: Dataset[_]) import reflect.runtime._, universe._ import reflect.runtime._ @@ -500,5 +257,278 @@ abstract class Benchmark( } } } - +} + +/** + * A Variation represents a setting (e.g. the number of shuffle partitions or if tables + * are cached in memory) that we want to change in a experiment run. + * A Variation has three parts, `name`, `options`, and `setup`. + * The `name` is the identifier of a Variation. `options` is a Seq of options that + * will be used for a query. Basically, a query will be executed with every option + * defined in the list of `options`. `setup` defines the needed action for every + * option. For example, the following Variation is used to change the number of shuffle + * partitions of a query. The name of the Variation is "shufflePartitions". There are + * two options, 200 and 2000. The setup is used to set the value of property + * "spark.sql.shuffle.partitions". + * + * {{{ + * Variation("shufflePartitions", Seq("200", "2000")) { + * case num => sqlContext.setConf("spark.sql.shuffle.partitions", num) + * } + * }}} + */ +case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit) + +case class Table( + name: String, + data: Dataset[_]) + + +object Benchmark { + + class ExperimentStatus( + executionsToRun: Seq[Benchmarkable], + includeBreakdown: Boolean, + iterations: Int, + variations: Seq[Variation[_]], + tags: Map[String, String], + timeout: Long, + resultsLocation: String, + sqlContext: SQLContext, + allTables: Seq[Table], + currentConfiguration: BenchmarkConfiguration, + forkThread: Boolean = true) { + val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]() + val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]() + val currentMessages = new collection.mutable.ArrayBuffer[String]() + + def logMessage(msg: String) = { + println(msg) + currentMessages += msg + } + + // Stats for HTML status message. + @volatile var currentExecution = "" + @volatile var currentPlan = "" // for queries only + @volatile var currentConfig = "" + @volatile var failures = 0 + @volatile var startTime = 0L + + /** An optional log collection task that will run after the experiment. */ + @volatile var logCollection: () => Unit = () => {} + + + def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match { + case Nil => List(Nil) + case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt + } + + val timestamp = System.currentTimeMillis() + val resultPath = s"$resultsLocation/timestamp=$timestamp" + val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList) + val resultsFuture = Future { + + // If we're running queries, create tables for them + executionsToRun + .collect { case query: Query => query } + .flatMap { query => + try { + query.newDataFrame().queryExecution.logical.collect { + case UnresolvedRelation(t, _) => t.table + } + } catch { + // ignore the queries that can't be parsed + case e: Exception => Seq() + } + } + .distinct + .foreach { name => + try { + sqlContext.table(name) + logMessage(s"Table $name exists.") + } catch { + case ae: Exception => + val table = allTables + .find(_.name == name) + if (table.isDefined) { + logMessage(s"Creating table: $name") + table.get.data + .write + .mode("overwrite") + .saveAsTable(name) + } else { + // the table could be subquery + logMessage(s"Couldn't read table $name and its not defined as a Benchmark.Table.") + } + } + } + + // Run the benchmarks! + val results: Seq[ExperimentRun] = (1 to iterations).flatMap { i => + combinations.map { setup => + val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map { + case (v, idx) => + v.setup(v.options(idx)) + v.name -> v.options(idx).toString + } + currentConfig = currentOptions.map { case (k,v) => s"$k: $v" }.mkString(", ") + + val res = executionsToRun.flatMap { q => + val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}" + logMessage(s"Running execution ${q.name} $setup") + + currentExecution = q.name + currentPlan = q match { + case query: Query => + try { + query.newDataFrame().queryExecution.executedPlan.toString() + } catch { + case e: Exception => + s"failed to parse: $e" + } + case _ => "" + } + startTime = System.currentTimeMillis() + + val singleResultT = Try { + q.benchmark(includeBreakdown, setup, currentMessages, timeout, + forkThread=forkThread) + } + + singleResultT match { + case Success(singleResult) => + singleResult.failure.foreach { f => + failures += 1 + logMessage(s"Execution '${q.name}' failed: ${f.message}") + } + singleResult.executionTime.foreach { time => + logMessage(s"Execution time: ${time / 1000}s") + } + currentResults += singleResult + singleResult :: Nil + case SFailure(e) => + failures += 1 + logMessage(s"Execution '${q.name}' failed: ${e}") + Nil + } + } + + val result = ExperimentRun( + timestamp = timestamp, + iteration = i, + tags = currentOptions.toMap ++ tags, + configuration = currentConfiguration, + res) + + currentRuns += result + + result + } + } + + try { + val resultsTable = sqlContext.createDataFrame(results) + logMessage(s"Results written to table: 'sqlPerformance' at $resultPath") + resultsTable + .coalesce(1) + .write + .format("json") + .save(resultPath) + } catch { + case e: Throwable => logMessage(s"Failed to write data: $e") + } + + logCollection() + } + + def scheduleCpuCollection(fs: FS) = { + logCollection = () => { + logMessage(s"Begining CPU log collection") + try { + val location = cpu.collectLogs(sqlContext, fs, timestamp) + logMessage(s"cpu results recorded to $location") + } catch { + case e: Throwable => + logMessage(s"Error collecting logs: $e") + throw e + } + } + } + + def cpuProfile = new Profile(sqlContext, sqlContext.read.json(getCpuLocation(timestamp))) + + def cpuProfileHtml(fs: FS) = { + s""" + |

CPU Profile

+ |Permalink: sqlContext.read.json("${getCpuLocation(timestamp)}")
+ |${cpuProfile.buildGraph(fs)} + """.stripMargin + } + + /** Waits for the finish of the experiment. */ + def waitForFinish(timeoutInSeconds: Int) = { + Await.result(resultsFuture, timeoutInSeconds.seconds) + } + + /** Returns results from an actively running experiment. */ + def getCurrentResults() = { + val tbl = sqlContext.createDataFrame(currentResults) + tbl.registerTempTable("currentResults") + tbl + } + + /** Returns full iterations from an actively running experiment. */ + def getCurrentRuns() = { + val tbl = sqlContext.createDataFrame(currentRuns) + tbl.registerTempTable("currentRuns") + tbl + } + + def tail(n: Int = 20) = { + currentMessages.takeRight(n).mkString("\n") + } + + def status = + if (resultsFuture.isCompleted) { + if (resultsFuture.value.get.isFailure) "Failed" else "Successful" + } else { + "Running" + } + + override def toString = + s"""Permalink: table("sqlPerformance").where('timestamp === ${timestamp}L)""" + + + def html: String = { + val maybeQueryPlan: String = + if (currentPlan.nonEmpty) { + s""" + |

QueryPlan

+ |
+             |${currentPlan.replaceAll("\n", "
")} + |
+ """.stripMargin + } else { + "" + } + s""" + |

$status Experiment

+ |Permalink: sqlContext.read.json("$resultPath")
+ |Iterations complete: ${currentRuns.size / combinations.size} / $iterations
+ |Failures: $failures
+ |Executions run: ${currentResults.size} / ${iterations * combinations.size * executionsToRun.size} + |
+ |Run time: ${(System.currentTimeMillis() - timestamp) / 1000}s
+ | + |

Current Execution: $currentExecution

+ |Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s
+ |$currentConfig
+ |$maybeQueryPlan + |

Logs

+ |
+         |${tail()}
+         |
+ """.stripMargin + } + } } diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala index 142ff3f..53176f8 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala @@ -18,13 +18,17 @@ package com.databricks.spark.sql.perf import java.util.UUID +import com.typesafe.scalalogging.slf4j.Logging + +import scala.concurrent.duration._ +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkEnv, SparkContext} -import scala.collection.mutable.ArrayBuffer /** A trait to describe things that can be benchmarked. */ -trait Benchmarkable { +trait Benchmarkable extends Logging { @transient protected[this] val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate()) @transient protected[this] val sparkContext = sqlContext.sparkContext @@ -35,10 +39,16 @@ trait Benchmarkable { includeBreakdown: Boolean, description: String = "", messages: ArrayBuffer[String], - timeout: Long): BenchmarkResult = { + timeout: Long, + forkThread: Boolean = true): BenchmarkResult = { + logger.info(s"$this: benchmark") sparkContext.setJobDescription(s"Execution: $name, $description") beforeBenchmark() - val result = runBenchmark(includeBreakdown, description, messages, timeout) + val result = if (forkThread) { + runBenchmarkForked(includeBreakdown, description, messages, timeout) + } else { + doBenchmark(includeBreakdown, description, messages) + } afterBenchmark(sqlContext.sparkContext) result } @@ -54,17 +64,25 @@ trait Benchmarkable { .foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) } } - private def runBenchmark( + private def runBenchmarkForked( includeBreakdown: Boolean, description: String = "", messages: ArrayBuffer[String], timeout: Long): BenchmarkResult = { val jobgroup = UUID.randomUUID().toString + val that = this var result: BenchmarkResult = null val thread = new Thread("benchmark runner") { override def run(): Unit = { + logger.info(s"$that running $this") sparkContext.setJobGroup(jobgroup, s"benchmark $name", true) - result = doBenchmark(includeBreakdown, description, messages) + try { + result = doBenchmark(includeBreakdown, description, messages) + } catch { + case e: Throwable => + logger.info(s"$that: failure in runBenchmark: $e") + throw e + } } } thread.setDaemon(true) @@ -93,4 +111,11 @@ trait Benchmarkable { val endTime = System.nanoTime() (endTime - startTime).toDouble / 1000000 } -} \ No newline at end of file + + protected def measureTime[A](f: => A): (Duration, A) = { + val startTime = System.nanoTime() + val res = f + val endTime = System.nanoTime() + (endTime - startTime).nanos -> res + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala new file mode 100644 index 0000000..7dc6dc4 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala @@ -0,0 +1,94 @@ +package com.databricks.spark.sql.perf.mllib + +import com.typesafe.scalalogging.slf4j.Logging + +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator, Evaluator} +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ + +/** + * The description of a benchmark for an ML algorithm. It follows a simple, standard proceduce: + * - generate some test and training data + * - generate a model against the training data + * - score the model against the training data + * - score the model against the test data + * + * You should not assume that your implementation can carry state around. If some state is needed, + * consider adding it to the context. + * + * It is assumed that the implementation is going to be an object. + */ +trait BenchmarkAlgorithm extends Logging { + + def trainingDataSet(ctx: MLBenchContext): DataFrame + + def testDataSet(ctx: MLBenchContext): DataFrame + + @throws[Exception]("if training fails") + def train( + ctx: MLBenchContext, + trainingSet: DataFrame): Transformer + + @throws[Exception]("if scoring fails") + def score( + ctx: MLBenchContext, + testSet: DataFrame, + model: Transformer): Double = -1.0 // Not putting NaN because it is not valid JSON. +} + +/** + * Uses an evaluator to perform the scoring. + */ +trait ScoringWithEvaluator { + self: BenchmarkAlgorithm => + + protected def evaluator(ctx: MLBenchContext): Evaluator + + final override def score( + ctx: MLBenchContext, + testSet: DataFrame, + model: Transformer): Double = { + val eval = model.transform(testSet) + evaluator(ctx).evaluate(eval) + } +} + +/** + * Builds the training set for an initial dataset and an initial model. Useful for validating a + * trained model against a given model. + */ +trait TrainingSetFromTransformer { + self: BenchmarkAlgorithm => + + protected def initialData(ctx: MLBenchContext): DataFrame + + protected def trueModel(ctx: MLBenchContext): Transformer + + final override def trainingDataSet(ctx: MLBenchContext): DataFrame = { + val initial = initialData(ctx) + val model = trueModel(ctx) + model.transform(initial).select(col("features"), col("prediction").as("label")) + } +} + +/** + * The test data is the same as the training data. + */ +trait TestFromTraining { + self: BenchmarkAlgorithm => + + final override def testDataSet(ctx: MLBenchContext): DataFrame = { + // Copy the context with a new seed. + val ctx2 = ctx.params.randomSeed match { + case Some(x) => + val p = ctx.params.copy(randomSeed = Some(x + 1)) + ctx.copy(params = p) + case None => + // Making a full copy to reset the internal seed. + ctx.copy() + } + self.trainingDataSet(ctx2) + } +} + diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchContext.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchContext.scala new file mode 100644 index 0000000..b8971fe --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchContext.scala @@ -0,0 +1,39 @@ +package com.databricks.spark.sql.perf.mllib + +import java.util.Random + +import com.databricks.spark.sql.perf.{MLParams} +import org.apache.spark.sql.SQLContext + + +/** + * All the information required to run a test. + * + * @param params + * @param sqlContext + */ +case class MLBenchContext( + params: MLParams, + sqlContext: SQLContext) { + + // Some seed fixed for the context. + private val internalSeed: Long = { + params.randomSeed.map(_.toLong).getOrElse { + throw new Exception("You need te specify the random seed") + } + } + + /** + * A fixed seed for this class. This function will always return the same value. + * + * @return + */ + def seed(): Long = internalSeed + + /** + * Creates a new generator. The generator will always start with the same state. + * + * @return + */ + def newGenerator(): Random = new Random(seed()) +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala new file mode 100644 index 0000000..632940e --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala @@ -0,0 +1,37 @@ +package com.databricks.spark.sql.perf.mllib + +import com.databricks.spark.sql.perf.mllib.classification.LogisticRegression +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext + +import com.databricks.spark.sql.perf.{MLParams} +import OptionImplicits._ + +case class MLTest( + benchmark: BenchmarkAlgorithm, + params: MLParams) + +// Example on how to create benchmarks using the API. +object MLBenchmarks { + // The list of standard benchmarks that we are going to run for ML. + val benchmarks: Seq[MLTest] = List( + MLTest( + LogisticRegression, + MLParams( + numFeatures = 10, + numExamples = 10, + numTestExamples = 10, + numPartitions = 3, + regParam = 1, + tol = 0.2) + ) + ) + + val context = SparkContext.getOrCreate() + val sqlContext: SQLContext = SQLContext.getOrCreate(context) + + def benchmarkObjects: Seq[MLTransformerBenchmarkable] = benchmarks.map { mlb => + new MLTransformerBenchmarkable(mlb.params, mlb.benchmark, sqlContext) + } + +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala new file mode 100644 index 0000000..83d1873 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala @@ -0,0 +1,70 @@ +package com.databricks.spark.sql.perf.mllib + +import scala.language.implicitConversions + +import com.databricks.spark.sql.perf._ + +import com.typesafe.scalalogging.slf4j.Logging + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, SQLContext} + + +class MLLib(@transient sqlContext: SQLContext) + extends Benchmark(sqlContext) with Serializable { + + def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate())) +} + +object MLLib extends Logging { + + /** + * Runs a set of preprogrammed experiments and blocks on completion. + * + * @param runConfig a configuration that is av + * @return + */ + def runDefault(runConfig: RunConfig): DataFrame = { + val ml = new MLLib() + val benchmarks = MLBenchmarks.benchmarkObjects + val e = ml.runExperiment( + executionsToRun = benchmarks) + e.waitForFinish(1000 * 60 * 30) + logger.info("Run finished") + e.getCurrentResults() + } + + /** + * Runs all the experiments and blocks on completion + * + * @param yamlFile a file name + * @return + */ + def run(yamlFile: String = null, yamlConfig: String = null): DataFrame = { + logger.info("Starting run") + val conf: YamlConfig = Option(yamlFile).map(YamlConfig.readFile).getOrElse { + require(yamlConfig != null) + YamlConfig.readString(yamlConfig) + } + val sc = SparkContext.getOrCreate() + sc.setLogLevel("INFO") + val b = new com.databricks.spark.sql.perf.mllib.MLLib() + val sqlContext = com.databricks.spark.sql.perf.mllib.MLBenchmarks.sqlContext + val benchmarksDescriptions = conf.runnableBenchmarks + val benchmarks = benchmarksDescriptions.map { mlb => + new MLTransformerBenchmarkable(mlb.params, mlb.benchmark, sqlContext) + } + logger.info(s"${benchmarks.size} benchmarks identified:") + val str = benchmarks.map(_.prettyPrint).mkString("\n") + logger.info(str) + logger.info("Starting experiments") + val e = b.runExperiment( + executionsToRun = benchmarks, + iterations = 1, // If you want to increase the number of iterations, add more seeds + resultLocation = conf.output, + forkThread = false) + e.waitForFinish(conf.timeout.toSeconds.toInt) + logger.info("Run finished") + e.getCurrentResults() + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala new file mode 100644 index 0000000..5c78784 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala @@ -0,0 +1,107 @@ +package com.databricks.spark.sql.perf.mllib + +import com.databricks.spark.sql.perf._ +import com.typesafe.scalalogging.slf4j.Logging +import org.apache.spark.sql._ + +import scala.collection.mutable.ArrayBuffer + +class MLTransformerBenchmarkable( + params: MLParams, + test: BenchmarkAlgorithm, + sqlContext: SQLContext) + extends Benchmarkable with Serializable with Logging { + + import MLTransformerBenchmarkable._ + + private var testData: DataFrame = null + private var trainingData: DataFrame = null + private val param = MLBenchContext(params, sqlContext) + + override val name = test.getClass.getCanonicalName + + override protected val executionMode: ExecutionMode = ExecutionMode.SparkPerfResults + + override protected def beforeBenchmark(): Unit = { + logger.info(s"$this beforeBenchmark") + try { + testData = test.testDataSet(param) + testData.cache() + testData.count() + trainingData = test.trainingDataSet(param) + trainingData.cache() + trainingData.count() + } catch { + case e: Throwable => + println(s"$this error in beforeBenchmark: ${e.getStackTraceString}") + throw e + } + } + + override protected def doBenchmark( + includeBreakdown: Boolean, + description: String, + messages: ArrayBuffer[String]): BenchmarkResult = { + try { + val (trainingTime, model) = measureTime(test.train(param, trainingData)) + logger.info(s"model: $model") + val (_, scoreTraining) = measureTime { + test.score(param, trainingData, model) + } + val (scoreTestTime, scoreTest) = measureTime { + test.score(param, testData, model) + } + + + val ml = MLResult( + trainingTime = Some(trainingTime.toMillis), + trainingMetric = Some(scoreTraining), + testTime = Some(scoreTestTime.toMillis), + testMetric = Some(scoreTest)) + + BenchmarkResult( + name = name, + mode = executionMode.toString, + parameters = Map.empty, + executionTime = Some(trainingTime.toMillis), + mlParams = Some(params), + mlResult = Some(ml)) + } catch { + case e: Exception => + BenchmarkResult( + name = name, + mode = executionMode.toString, + parameters = Map.empty, + failure = Some(Failure(e.getClass.getSimpleName, + e.getMessage + ":\n" + e.getStackTraceString))) + } finally { + Option(testData).map(_.unpersist()) + Option(trainingData).map(_.unpersist()) + } + } + + def prettyPrint: String = { + val paramString = pprint(params).mkString("\n") + s"$test\n$paramString" + } + + +} + +object MLTransformerBenchmarkable { + private def pprint(p: AnyRef): Seq[String] = { + val m = getCCParams(p) + m.flatMap { + case (key, Some(value: Any)) => Some(s" $key=$value") + case _ => None + } .toSeq + } + + // From http://stackoverflow.com/questions/1226555/case-class-to-map-in-scala + private def getCCParams(cc: AnyRef) = + (Map[String, Any]() /: cc.getClass.getDeclaredFields) {(a, f) => + f.setAccessible(true) + a + (f.getName -> f.get(cc)) + } +} + diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/OptionImplicits.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/OptionImplicits.scala new file mode 100644 index 0000000..ef90525 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/OptionImplicits.scala @@ -0,0 +1,45 @@ +package com.databricks.spark.sql.perf.mllib + +import scala.language.implicitConversions + +/** + * Implicits to transparently convert some Option[X] to X and vice-versa. + * + * This is usually dangerous to do, but in our case, the config is expressed through Options and + * it alleviates the need to manually box values. + */ +object OptionImplicits { + // The following implicits are unrolled for safety: + private def oX2X[A](x: Option[A]): A = x.get + + def checkLong(x: Option[Long]): Option[Long] = { + x.asInstanceOf[Option[Any]] match { + case Some(u: java.lang.Integer) => Some(u.toLong) + case Some(u: java.lang.Long) => Some(u.toLong) + case _ => x + } + } + + def checkDouble(x: Option[Double]): Option[Double] = { + x.asInstanceOf[Option[Any]] match { + case Some(u: java.lang.Integer) => Some(u.toDouble) + case Some(u: java.lang.Long) => Some(u.toDouble) + case Some(u: java.lang.Double) => Some(u.toDouble) + case _ => x + } + } + + implicit def oD2D(x: Option[Double]): Double = oX2X(x) + + implicit def oS2S(x: Option[String]): String = oX2X(x) + + implicit def oI2I(x: Option[Int]): Int = oX2X(x) + + implicit def oL2L(x: Option[Long]): Long = oX2X(x) + + implicit def l2lo(x: Long): Option[Long] = checkLong(Option(x)) + implicit def i2lo(x: Int): Option[Long] = Option(x.toLong) + implicit def i2io(x: Int): Option[Int] = Option(x) + implicit def d2do(x: Double): Option[Double] = Option(x) + implicit def i2do(x: Int): Option[Double] = Option(x) +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala new file mode 100644 index 0000000..5e81496 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala @@ -0,0 +1,48 @@ +package com.databricks.spark.sql.perf.mllib.classification + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator +import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator, Evaluator} + +import org.apache.spark.ml.{Transformer, ModelBuilder} +import org.apache.spark.ml +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.sql.DataFrame + +object LogisticRegression extends BenchmarkAlgorithm + with TestFromTraining with TrainingSetFromTransformer with ScoringWithEvaluator { + + override protected def initialData(ctx: MLBenchContext) = { + import ctx.params._ + DataGenerator.generateFeatures( + ctx.sqlContext, + numExamples, + ctx.seed(), + numPartitions, + numFeatures) + } + + override protected def trueModel(ctx: MLBenchContext): Transformer = { + val rng = ctx.newGenerator() + val coefficients = + Vectors.dense(Array.fill[Double](ctx.params.numFeatures)(2 * rng.nextDouble() - 1)) + // Small intercept to prevent some skew in the data. + val intercept = 0.01 * (2 * rng.nextDouble - 1) + ModelBuilder.newLogisticRegressionModel(coefficients, intercept) + } + + override def train(ctx: MLBenchContext, + trainingSet: DataFrame): Transformer = { + logger.info(s"$this: train: trainingSet=${trainingSet.schema}") + import ctx.params._ + val lr = new ml.classification.LogisticRegression() + .setTol(tol) + .setRegParam(regParam) + lr.fit(trainingSet) + } + + override protected def evaluator(ctx: MLBenchContext): Evaluator = + new MulticlassClassificationEvaluator() +} + diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala new file mode 100644 index 0000000..3fe5688 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala @@ -0,0 +1,55 @@ +package com.databricks.spark.sql.perf.mllib.clustering + +import com.databricks.spark.sql.perf.mllib.{MLBenchContext, TestFromTraining, BenchmarkAlgorithm} +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import org.apache.commons.math3.random.Well19937c +import org.apache.spark.ml.Transformer +import org.apache.spark.ml +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.ml.linalg.{Vectors, Vector} +import scala.collection.mutable.{HashMap => MHashMap} + +object LDA extends BenchmarkAlgorithm with TestFromTraining { + // The LDA model is package private, no need to expose it. + + override def trainingDataSet(ctx: MLBenchContext): DataFrame = { + import ctx.params._ + val rdd = ctx.sqlContext.sparkContext.parallelize( + 0L until numExamples, + numPartitions + ) + val seed: Int = randomSeed + val docLength = ldaDocLength.get + val numVocab = ldaNumVocabulary.get + val data: RDD[(Long, Vector)] = rdd.mapPartitionsWithIndex { (idx, partition) => + val rng = new Well19937c(seed ^ idx) + partition.map { docIndex => + var currentSize = 0 + val entries = MHashMap[Int, Int]() + while (currentSize < docLength) { + val index = rng.nextInt(numVocab) + entries(index) = entries.getOrElse(index, 0) + 1 + currentSize += 1 + } + + val iter = entries.toSeq.map(v => (v._1, v._2.toDouble)) + (docIndex, Vectors.sparse(numVocab, iter)) + } + } + ctx.sqlContext.createDataFrame(data).toDF("docIndex", "features") + } + + override def train(ctx: MLBenchContext, + trainingSet: DataFrame): Transformer = { + import ctx.params._ + new ml.clustering.LDA() + .setK(k) + .setSeed(randomSeed.toLong) + .setMaxIter(maxIter) + .setOptimizer(optimizer) + .fit(trainingSet) + } + + // TODO(?) add a scoring method here. +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/data/data_generation.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/data_generation.scala new file mode 100644 index 0000000..ca36704 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/data_generation.scala @@ -0,0 +1,102 @@ +package com.databricks.spark.sql.perf.mllib.data + +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.mllib.random._ +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, DataFrame} + +object DataGenerator { + + def generateFeatures( + sql: SQLContext, + numExamples: Long, + seed: Long, + numPartitions: Int, + numFeatures: Int): DataFrame = { + val categoricalArities = Array.empty[Int] + val rdd: RDD[Vector] = RandomRDDs.randomRDD(sql.sparkContext, + new FeaturesGenerator(categoricalArities, numFeatures), + numExamples, numPartitions, seed) + sql.createDataFrame(rdd.map(Tuple1.apply)).toDF("features") + } +} + +class BinaryLabeledDataGenerator( + private val numFeatures: Int, + private val threshold: Double) extends RandomDataGenerator[LabeledPoint] { + + private val rng = new java.util.Random() + + override def nextValue(): LabeledPoint = { + val y = if (rng.nextDouble() < threshold) 0.0 else 1.0 + val x = Array.fill[Double](numFeatures) { + if (rng.nextDouble() < threshold) 0.0 else 1.0 + } + ??? +// LabeledPoint(y, Vectors.dense(x)) + } + + override def setSeed(seed: Long) { + rng.setSeed(seed) + } + + override def copy(): BinaryLabeledDataGenerator = + new BinaryLabeledDataGenerator(numFeatures, threshold) + +} + + +/** + * Generator for a feature vector which can include a mix of categorical and continuous features. + * @param categoricalArities Specifies the number of categories for each categorical feature. + * @param numContinuous Number of continuous features. Feature values are in range [0,1]. + */ +class FeaturesGenerator(val categoricalArities: Array[Int], val numContinuous: Int) + extends RandomDataGenerator[Vector] { + + categoricalArities.foreach { arity => + require(arity >= 2, s"FeaturesGenerator given categorical arity = $arity, " + + s"but arity should be >= 2.") + } + + val numFeatures = categoricalArities.length + numContinuous + + private val rng = new java.util.Random() + + /** + * Generates vector with categorical features first, and continuous features in [0,1] second. + */ + override def nextValue(): Vector = { + // Feature ordering matches getCategoricalFeaturesInfo. + val arr = new Array[Double](numFeatures) + var j = 0 + while (j < categoricalArities.length) { + arr(j) = rng.nextInt(categoricalArities(j)) + j += 1 + } + while (j < numFeatures) { + // Generating some centered data + arr(j) = 2 * rng.nextDouble() - 1 + j += 1 + } + Vectors.dense(arr) + } + + override def setSeed(seed: Long) { + rng.setSeed(seed) + } + + override def copy(): FeaturesGenerator = new FeaturesGenerator(categoricalArities, numContinuous) + + /** + * @return categoricalFeaturesInfo Map storing arity of categorical features. + * E.g., an entry (n -> k) indicates that feature n is categorical + * with k categories indexed from 0: {0, 1, ..., k-1}. + */ + def getCategoricalFeaturesInfo: Map[Int, Int] = { + // Categorical features are indexed from 0 because of the implementation of nextValue(). + categoricalArities.zipWithIndex.map(_.swap).toMap + } +} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/yaml.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/yaml.scala new file mode 100644 index 0000000..ee40461 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/yaml.scala @@ -0,0 +1,176 @@ +package com.databricks.spark.sql.perf.mllib + +import java.util.{ArrayList => AL} + +import scala.concurrent.duration._ +import scala.collection.JavaConverters._ +import scala.io.Source + +import scala.reflect._ +import scala.reflect.runtime.universe._ +import scala.util.{Try => STry, Success, Failure} + +import org.yaml.snakeyaml.Yaml + +import com.databricks.spark.sql.perf.{MLParams} + + +/** + * The configuration information generated from reading a YAML file. + * + * @param output the output direct + */ +case class YamlConfig( + output: String = "/tmp/result", + timeout: Duration = 20.minutes, + runnableBenchmarks: Seq[MLTest]) + +object YamlConfig { + + /** + * Reads a string (assumed to contain a yaml description) and returns the configuration. + */ + def readString(s: String): YamlConfig = { + println(s) + val yaml = new Yaml() + val m = dict(yaml.load(s)) + val common = m.get("common").map(dict).getOrElse(Map.empty) + println("common") + println(m) + val exps = m("benchmarks") + .asInstanceOf[AL[Map[String, Any]]].asScala.map(dict).toSeq + println("exps:") + println(exps) + val experiments = exps.flatMap { sd => + val name = sd("name").toString + val params = sd.get("params").map(dict).getOrElse(Map.empty) + val expParams = cartesian(common ++ params) + for (c <- expParams) yield name -> c + } + println("exp parsed") + println(experiments) + val e2 = experiments.map { case (n, e) => + val e2 = ccFromMap.fromMap[MLParams](e, strict=true) + val s = ccFromMap.loadExperiment(n).getOrElse { + throw new Exception(s"Cannot find algorithm $n in the standard benchmark algorithms") + } + MLTest(s, e2) + } + var c = YamlConfig(runnableBenchmarks = e2) + for (output <- m.get("output")) { + c = c.copy(output = output.toString) + } + for (x <- m.get("timeoutSeconds")) { + c = c.copy(timeout = x.toString.toInt.seconds) + } + c + } + + /** + * Reads a file (assumed to contain a yaml config). + */ + def readFile(filename: String): YamlConfig = { + readString(Source.fromFile(filename).mkString) + } + + // Converts a java dictionary to a scala map. + private def dict[T](d: T): Map[String, Any] = { + d.asInstanceOf[java.util.Map[String, Any]].asScala.toMap + } + + /** + * Given keys that may be lists, builds the cartesian product of all the values into defined + * options. + * + * For example: {a: [1,2], b: [3,4]} -> {a: 1, b: 3}, {a: 1, b:4}, {a:2, b:3}, ... + * + * @return + */ + private def cartesian(m: Map[String, Any]): Seq[Map[String, Any]] = { + if (m.isEmpty) { + Seq(m) + } else { + val k = m.keys.head + val sub = m - k + val l = cartesian(sub) + m(k) match { + case a: AL[_] => + for { + x <- a.asScala.toSeq + m2 <- l + } yield { + m2 ++ Map(k -> x.asInstanceOf[Any]) + } + case _ => + val v = m(k) + l.map { m => m ++ Map(k -> v) } + } + } + } + +} + +// Some ugly internals to make simple constructs +package object ccFromMap { + // Builds a case class from a map. + // (taken from stack overflow) + // if strict, will report an error if some unknown arguments are passed to the constructor + def fromMap[T: TypeTag: ClassTag](m: Map[String,_], strict: Boolean) = { + + scala.reflect.runtime.universe + val rm = runtimeMirror(classTag[T].runtimeClass.getClassLoader) + val classTest = typeOf[T].typeSymbol.asClass + val classMirror = rm.reflectClass(classTest) + val constructor = typeOf[T].declaration(nme.CONSTRUCTOR).asMethod + val constructorMirror = classMirror.reflectConstructor(constructor) + + val constructorArgNames = constructor.paramss.flatten.map(_.name.toString).toSet + val extraElements = m.keySet -- constructorArgNames + if (extraElements.nonEmpty) { + throw new Exception(s"Found extra arguments when instantiating an object of " + + s"class ${classTest.asClass.toString}:" + + s" ${extraElements.toSeq.sorted}") + } + + val constructorArgs = constructor.paramss.flatten.map( (param: Symbol) => { + val paramName = param.name.toString + if(param.typeSignature <:< typeOf[Option[Long]]) + OptionImplicits.checkLong(m.get(paramName).asInstanceOf[Option[Long]]) + else if(param.typeSignature <:< typeOf[Option[Double]]) + OptionImplicits.checkDouble(m.get(paramName).asInstanceOf[Option[Double]]) + else if(param.typeSignature <:< typeOf[Option[Any]]) + m.get(paramName) + else + m.get(paramName).getOrElse(throw new IllegalArgumentException("Map is missing required parameter named " + paramName)) + }) + + constructorMirror(constructorArgs:_*).asInstanceOf[T] + } + + // TODO: handle scala.reflect.internal.MissingRequirementError + private def load(name: String): STry[BenchmarkAlgorithm] = { + val rm = runtimeMirror(getClass.getClassLoader) + try { + val module = rm.staticModule("com.databricks.spark.sql.perf.mllib." + name) + val obj = rm.reflectModule(module) + Success(obj.instance.asInstanceOf[BenchmarkAlgorithm]) + } catch { + case x: scala.reflect.internal.MissingRequirementError => + Failure(x) + } + } + + val defaultPackages = Seq( + "", + "com.databricks.spark.sql.perf.mllib" + ) + + def loadExperiment( + name: String, + searchPackages: Seq[String] = defaultPackages): Option[BenchmarkAlgorithm] = { + searchPackages.view.flatMap { p => + val n = if (p.isEmpty) name else s"$p.$name" + load(n).toOption + } .headOption + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index ebd5e41..d9e93f0 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -18,6 +18,7 @@ package com.databricks.spark.sql.perf /** * The performance results of all given queries for a single iteration. + * * @param timestamp The timestamp indicates when the entire experiment is started. * @param iteration The index number of the current iteration. * @param tags Tags of this iteration (variations are stored at here). @@ -33,6 +34,7 @@ case class ExperimentRun( /** * The configuration used for an iteration of an experiment. + * * @param sparkVersion The version of Spark. * @param sqlConf All configuration properties related to Spark SQL. * @param sparkConf All configuration properties of Spark. @@ -48,6 +50,7 @@ case class BenchmarkConfiguration( /** * The result of a query. + * * @param name The name of the query. * @param mode The ExecutionMode of this run. * @param parameters Additional parameters that describe this query. @@ -77,10 +80,13 @@ case class BenchmarkResult( result: Option[Long] = None, breakDown: Seq[BreakdownResult] = Nil, queryExecution: Option[String] = None, - failure: Option[Failure] = None) + failure: Option[Failure] = None, + mlParams: Option[MLParams] = None, + mlResult: Option[MLResult] = None) /** * The execution time of a subtree of the query plan tree of a specific query. + * * @param nodeName The name of the top physical operator of the subtree. * @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree. * @param index The index of the top physical operator of the subtree @@ -97,3 +103,42 @@ case class BreakdownResult( delta: Double) case class Failure(className: String, message: String) + +// KEEP ARGUMENTS SORTED BY NAME. +// It simplifies lookup when checking if a parameter is here already. +case class MLParams( + // *** Common to all algorithms *** + randomSeed: Option[Int] = Some(42), + numExamples: Option[Long] = None, + numTestExamples: Option[Long] = None, + numPartitions: Option[Int] = None, + // *** Specialized and sorted by name *** + ldaDocLength: Option[Int] = None, + ldaNumVocabulary: Option[Int] = None, + k: Option[Int] = None, + maxIter: Option[Int] = None, + numFeatures: Option[Int] = None, + optimizer: Option[String] = None, + regParam: Option[Double] = None, + tol: Option[Double] = None +) + +object MLParams { + val empty = MLParams() +} + +/** + * Result information specific to MLlib. + * + * @param trainingTime (MLlib) Training time. + * executionTime is set to the same value to match Spark Core tests. + * @param trainingMetric (MLlib) Training metric, such as accuracy + * @param testTime (MLlib) Test time (for prediction on test set, or on training set if there + * is no test set). + * @param testMetric (MLlib) Test metric, such as accuracy + */ +case class MLResult( + trainingTime: Option[Double] = None, + trainingMetric: Option[Double] = None, + testTime: Option[Double] = None, + testMetric: Option[Double] = None) diff --git a/src/main/scala/configs/mllib-small.yaml b/src/main/scala/configs/mllib-small.yaml new file mode 100644 index 0000000..eea6c54 --- /dev/null +++ b/src/main/scala/configs/mllib-small.yaml @@ -0,0 +1,23 @@ +output: /tmp/results2 +timeoutSeconds: 1000 +common: + numFeatures: 10 + numExamples: [1, 3] + numPartitions: 3 + randomSeed: [1, 2, 3] +benchmarks: + - name: classification.LogisticRegression + params: + numFeatures: 100 + regParam: 0.1 + tol: [0.2, 0.1] + - name: clustering.LDA + params: + numExamples: 10 + ldaDocLength: 20 + ldaNumVocabulary: 4 + k: 5 + maxIter: 10 + optimizer: + - em + - online diff --git a/src/main/scala/org/apache/spark/ml/ModelBuilder.scala b/src/main/scala/org/apache/spark/ml/ModelBuilder.scala new file mode 100644 index 0000000..1f8174f --- /dev/null +++ b/src/main/scala/org/apache/spark/ml/ModelBuilder.scala @@ -0,0 +1,16 @@ +package org.apache.spark.ml + +import org.apache.spark.ml.classification.LogisticRegressionModel +import org.apache.spark.ml.linalg.Vector + +/** + * Helper for creating MLlib models which have private constructors. + */ +object ModelBuilder { + + def newLogisticRegressionModel( + coefficients: Vector, + intercept: Double): LogisticRegressionModel = { + new LogisticRegressionModel("lr", coefficients, intercept) + } +} \ No newline at end of file diff --git a/src/test/scala/com/databricks/spark/sql/perf/DatasetPerformanceSuite.scala b/src/test/scala/com/databricks/spark/sql/perf/DatasetPerformanceSuite.scala index cd934fa..3ea68ea 100644 --- a/src/test/scala/com/databricks/spark/sql/perf/DatasetPerformanceSuite.scala +++ b/src/test/scala/com/databricks/spark/sql/perf/DatasetPerformanceSuite.scala @@ -4,7 +4,7 @@ import org.apache.spark.sql.hive.test.TestHive import org.scalatest.FunSuite class DatasetPerformanceSuite extends FunSuite { - test("run benchmark") { + ignore("run benchmark") { TestHive // Init HiveContext val benchmark = new DatasetPerformance() { override val numLongs = 100