Initial commit for adding MLlib reporting in spark-sql-perf
This PR adds basic MLlib infrastructure to run some benchmarks against ML pipelines. There are 2 ways to describe and run ML pipelines: - programatically, in scala (see MLBenchmarks.scala) - using a simple YAML file (see mllib-small.yaml for an example) The YAML approach is preferred because it generates programmatically the cartesian product of all the experiments to run and validates the types of the objects in the yaml file. In both cases, all the ML experiments are standard benchmarks. This PR also moves some code in `Benchmark.scala` : the current code generates path-dependent structural signatures and confuses intellij. It does not include tests, but some small benchmarks can be run locally against a spark 2 installation: ``` $SPARK_HOME/bin/spark-shell --jars $PWD/target/scala-2.10/spark-sql-perf-assembly-0.4.9-SNAPSHOT.jar ``` and then: ```scala com.databricks.spark.sql.perf.mllib.MLLib.run(yamlFile="src/main/scala/configs/mllib-small.yaml") ``` Author: Timothy Hunter <timhunter@databricks.com> Closes #69 from thunterdb/1605-mllib2.
This commit is contained in:
parent
ea342c6165
commit
1388722b81
18
build.sbt
18
build.sbt
@ -14,7 +14,8 @@ licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")
|
||||
|
||||
sparkVersion := "2.0.0-SNAPSHOT"
|
||||
|
||||
sparkComponents ++= Seq("sql", "hive")
|
||||
sparkComponents ++= Seq("sql", "hive", "mllib")
|
||||
|
||||
|
||||
initialCommands in console :=
|
||||
"""
|
||||
@ -40,22 +41,26 @@ libraryDependencies += "com.twitter" %% "util-jvm" % "6.23.0" % "provided"
|
||||
|
||||
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test"
|
||||
|
||||
libraryDependencies += "org.yaml" % "snakeyaml" % "1.17"
|
||||
|
||||
libraryDependencies += "com.typesafe" %% "scalalogging-slf4j" % "1.1.0"
|
||||
|
||||
fork := true
|
||||
|
||||
// Your username to login to Databricks Cloud
|
||||
dbcUsername := sys.env.getOrElse("DBC_USERNAME", sys.error("Please set DBC_USERNAME"))
|
||||
dbcUsername := sys.env.getOrElse("DBC_USERNAME", "")
|
||||
|
||||
// Your password (Can be set as an environment variable)
|
||||
dbcPassword := sys.env.getOrElse("DBC_PASSWORD", sys.error("Please set DBC_PASSWORD"))
|
||||
dbcPassword := sys.env.getOrElse("DBC_PASSWORD", "")
|
||||
|
||||
// The URL to the Databricks Cloud DB Api. Don't forget to set the port number to 34563!
|
||||
dbcApiUrl := sys.env.getOrElse ("DBC_URL", sys.error("Please set DBC_URL"))
|
||||
|
||||
// Add any clusters that you would like to deploy your work to. e.g. "My Cluster"
|
||||
// or run dbcExecuteCommand
|
||||
dbcClusters += sys.env.getOrElse("DBC_USERNAME", sys.error("Please set DBC_USERNAME"))
|
||||
dbcClusters += sys.env.getOrElse("DBC_USERNAME", "")
|
||||
|
||||
dbcLibraryPath := s"/Users/${sys.env.getOrElse("DBC_USERNAME", sys.error("Please set DBC_USERNAME"))}/lib"
|
||||
dbcLibraryPath := s"/Users/${sys.env.getOrElse("DBC_USERNAME", "")}/lib"
|
||||
|
||||
val runBenchmark = inputKey[Unit]("runs a benchmark")
|
||||
|
||||
@ -64,7 +69,8 @@ runBenchmark := {
|
||||
val args = spaceDelimited("[args]").parsed
|
||||
val scalaRun = (runner in run).value
|
||||
val classpath = (fullClasspath in Compile).value
|
||||
scalaRun.run("com.databricks.spark.sql.perf.RunBenchmark", classpath.map(_.data), args, streams.value.log)
|
||||
scalaRun.run("com.databricks.spark.sql.perf.RunBenchmark", classpath.map(_.data), args,
|
||||
streams.value.log)
|
||||
}
|
||||
|
||||
import ReleaseTransformations._
|
||||
|
||||
@ -21,7 +21,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.concurrent._
|
||||
import scala.concurrent.duration._
|
||||
import scala.language.implicitConversions
|
||||
import scala.util.Try
|
||||
import scala.util.{Success, Try, Failure => SFailure}
|
||||
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{Dataset, DataFrame, SQLContext}
|
||||
@ -39,7 +39,7 @@ abstract class Benchmark(
|
||||
@transient val sqlContext: SQLContext)
|
||||
extends Serializable {
|
||||
|
||||
import sqlContext.implicits._
|
||||
import Benchmark._
|
||||
|
||||
def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate()))
|
||||
|
||||
@ -66,25 +66,6 @@ abstract class Benchmark(
|
||||
defaultParallelism = sparkContext.defaultParallelism,
|
||||
buildInfo = buildInfo)
|
||||
|
||||
/**
|
||||
* A Variation represents a setting (e.g. the number of shuffle partitions or if tables
|
||||
* are cached in memory) that we want to change in a experiment run.
|
||||
* A Variation has three parts, `name`, `options`, and `setup`.
|
||||
* The `name` is the identifier of a Variation. `options` is a Seq of options that
|
||||
* will be used for a query. Basically, a query will be executed with every option
|
||||
* defined in the list of `options`. `setup` defines the needed action for every
|
||||
* option. For example, the following Variation is used to change the number of shuffle
|
||||
* partitions of a query. The name of the Variation is "shufflePartitions". There are
|
||||
* two options, 200 and 2000. The setup is used to set the value of property
|
||||
* "spark.sql.shuffle.partitions".
|
||||
*
|
||||
* {{{
|
||||
* Variation("shufflePartitions", Seq("200", "2000")) {
|
||||
* case num => sqlContext.setConf("spark.sql.shuffle.partitions", num)
|
||||
* }
|
||||
* }}}
|
||||
*/
|
||||
case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit)
|
||||
|
||||
val codegen = Variation("codegen", Seq("on", "off")) {
|
||||
case "off" => sqlContext.setConf("spark.sql.codegen", "false")
|
||||
@ -122,238 +103,14 @@ abstract class Benchmark(
|
||||
iterations: Int = 3,
|
||||
variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("true")) { _ => {} }),
|
||||
tags: Map[String, String] = Map.empty,
|
||||
timeout: Long = 0L) = {
|
||||
timeout: Long = 0L,
|
||||
resultLocation: String = resultsLocation,
|
||||
forkThread: Boolean = true) = {
|
||||
|
||||
class ExperimentStatus {
|
||||
val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]()
|
||||
val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]()
|
||||
val currentMessages = new collection.mutable.ArrayBuffer[String]()
|
||||
|
||||
def logMessage(msg: String) = {
|
||||
println(msg)
|
||||
currentMessages += msg
|
||||
}
|
||||
|
||||
// Stats for HTML status message.
|
||||
@volatile var currentExecution = ""
|
||||
@volatile var currentPlan = "" // for queries only
|
||||
@volatile var currentConfig = ""
|
||||
@volatile var failures = 0
|
||||
@volatile var startTime = 0L
|
||||
|
||||
/** An optional log collection task that will run after the experiment. */
|
||||
@volatile var logCollection: () => Unit = () => {}
|
||||
|
||||
|
||||
def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match {
|
||||
case Nil => List(Nil)
|
||||
case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt
|
||||
}
|
||||
|
||||
val timestamp = System.currentTimeMillis()
|
||||
val resultPath = s"$resultsLocation/timestamp=$timestamp"
|
||||
val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList)
|
||||
val resultsFuture = Future {
|
||||
|
||||
// If we're running queries, create tables for them
|
||||
executionsToRun
|
||||
.collect { case query: Query => query }
|
||||
.flatMap { query =>
|
||||
try {
|
||||
query.newDataFrame().queryExecution.logical.collect {
|
||||
case UnresolvedRelation(t, _) => t.table
|
||||
}
|
||||
} catch {
|
||||
// ignore the queries that can't be parsed
|
||||
case e: Exception => Seq()
|
||||
}
|
||||
}
|
||||
.distinct
|
||||
.foreach { name =>
|
||||
try {
|
||||
sqlContext.table(name)
|
||||
logMessage(s"Table $name exists.")
|
||||
} catch {
|
||||
case ae: Exception =>
|
||||
val table = allTables
|
||||
.find(_.name == name)
|
||||
if (table.isDefined) {
|
||||
logMessage(s"Creating table: $name")
|
||||
table.get.data
|
||||
.write
|
||||
.mode("overwrite")
|
||||
.saveAsTable(name)
|
||||
} else {
|
||||
// the table could be subquery
|
||||
logMessage(s"Couldn't read table $name and its not defined as a Benchmark.Table.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run the benchmarks!
|
||||
val results: Seq[ExperimentRun] = (1 to iterations).flatMap { i =>
|
||||
combinations.map { setup =>
|
||||
val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map {
|
||||
case (v, idx) =>
|
||||
v.setup(v.options(idx))
|
||||
v.name -> v.options(idx).toString
|
||||
}
|
||||
currentConfig = currentOptions.map { case (k,v) => s"$k: $v" }.mkString(", ")
|
||||
|
||||
val result = ExperimentRun(
|
||||
timestamp = timestamp,
|
||||
iteration = i,
|
||||
tags = currentOptions.toMap ++ tags,
|
||||
configuration = currentConfiguration,
|
||||
|
||||
executionsToRun.flatMap { q =>
|
||||
val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}"
|
||||
logMessage(s"Running execution ${q.name} $setup")
|
||||
|
||||
currentExecution = q.name
|
||||
currentPlan = q match {
|
||||
case query: Query =>
|
||||
try {
|
||||
query.newDataFrame().queryExecution.executedPlan.toString()
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
s"failed to parse: $e"
|
||||
}
|
||||
case _ => ""
|
||||
}
|
||||
startTime = System.currentTimeMillis()
|
||||
|
||||
val singleResult =
|
||||
q.benchmark(includeBreakdown, setup, currentMessages, timeout)
|
||||
|
||||
singleResult.failure.foreach { f =>
|
||||
failures += 1
|
||||
logMessage(s"Execution '${q.name}' failed: ${f.message}")
|
||||
}
|
||||
singleResult.executionTime.foreach { time =>
|
||||
logMessage(s"Execution time: ${time / 1000}s")
|
||||
}
|
||||
currentResults += singleResult
|
||||
singleResult :: Nil
|
||||
})
|
||||
|
||||
currentRuns += result
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
val resultsTable = sqlContext.createDataFrame(results)
|
||||
logMessage(s"Results written to table: 'sqlPerformance' at $resultPath")
|
||||
resultsTable
|
||||
.coalesce(1)
|
||||
.write
|
||||
.format("json")
|
||||
.save(resultPath)
|
||||
} catch {
|
||||
case e: Throwable => logMessage(s"Failed to write data: $e")
|
||||
}
|
||||
|
||||
logCollection()
|
||||
}
|
||||
|
||||
def scheduleCpuCollection(fs: FS) = {
|
||||
logCollection = () => {
|
||||
logMessage(s"Begining CPU log collection")
|
||||
try {
|
||||
val location = cpu.collectLogs(sqlContext, fs, timestamp)
|
||||
logMessage(s"cpu results recorded to $location")
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
logMessage(s"Error collecting logs: $e")
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def cpuProfile = new Profile(sqlContext, sqlContext.read.json(getCpuLocation(timestamp)))
|
||||
|
||||
def cpuProfileHtml(fs: FS) = {
|
||||
s"""
|
||||
|<h1>CPU Profile</h1>
|
||||
|<b>Permalink:</b> <tt>sqlContext.read.json("${getCpuLocation(timestamp)}")</tt></br>
|
||||
|${cpuProfile.buildGraph(fs)}
|
||||
""".stripMargin
|
||||
}
|
||||
|
||||
/** Waits for the finish of the experiment. */
|
||||
def waitForFinish(timeoutInSeconds: Int) = {
|
||||
Await.result(resultsFuture, timeoutInSeconds.seconds)
|
||||
}
|
||||
|
||||
/** Returns results from an actively running experiment. */
|
||||
def getCurrentResults() = {
|
||||
val tbl = sqlContext.createDataFrame(currentResults)
|
||||
tbl.registerTempTable("currentResults")
|
||||
tbl
|
||||
}
|
||||
|
||||
/** Returns full iterations from an actively running experiment. */
|
||||
def getCurrentRuns() = {
|
||||
val tbl = sqlContext.createDataFrame(currentRuns)
|
||||
tbl.registerTempTable("currentRuns")
|
||||
tbl
|
||||
}
|
||||
|
||||
def tail(n: Int = 20) = {
|
||||
currentMessages.takeRight(n).mkString("\n")
|
||||
}
|
||||
|
||||
def status =
|
||||
if (resultsFuture.isCompleted) {
|
||||
if (resultsFuture.value.get.isFailure) "Failed" else "Successful"
|
||||
} else {
|
||||
"Running"
|
||||
}
|
||||
|
||||
override def toString =
|
||||
s"""Permalink: table("sqlPerformance").where('timestamp === ${timestamp}L)"""
|
||||
|
||||
|
||||
def html: String = {
|
||||
val maybeQueryPlan: String =
|
||||
if (currentPlan.nonEmpty) {
|
||||
s"""
|
||||
|<h3>QueryPlan</h3>
|
||||
|<pre>
|
||||
|${currentPlan.replaceAll("\n", "<br/>")}
|
||||
|</pre>
|
||||
""".stripMargin
|
||||
} else {
|
||||
""
|
||||
}
|
||||
s"""
|
||||
|<h2>$status Experiment</h2>
|
||||
|<b>Permalink:</b> <tt>sqlContext.read.json("$resultPath")</tt><br/>
|
||||
|<b>Iterations complete:</b> ${currentRuns.size / combinations.size} / $iterations<br/>
|
||||
|<b>Failures:</b> $failures<br/>
|
||||
|<b>Executions run:</b> ${currentResults.size} / ${iterations * combinations.size * executionsToRun.size}
|
||||
|<br/>
|
||||
|<b>Run time:</b> ${(System.currentTimeMillis() - timestamp) / 1000}s<br/>
|
||||
|
|
||||
|<h2>Current Execution: $currentExecution</h2>
|
||||
|Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s<br/>
|
||||
|$currentConfig<br/>
|
||||
|$maybeQueryPlan
|
||||
|<h2>Logs</h2>
|
||||
|<pre>
|
||||
|${tail()}
|
||||
|</pre>
|
||||
""".stripMargin
|
||||
}
|
||||
}
|
||||
new ExperimentStatus
|
||||
new ExperimentStatus(executionsToRun, includeBreakdown, iterations, variations, tags,
|
||||
timeout, resultLocation, sqlContext, allTables, currentConfiguration)
|
||||
}
|
||||
|
||||
case class Table(
|
||||
name: String,
|
||||
data: Dataset[_])
|
||||
|
||||
import reflect.runtime._, universe._
|
||||
import reflect.runtime._
|
||||
@ -500,5 +257,278 @@ abstract class Benchmark(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A Variation represents a setting (e.g. the number of shuffle partitions or if tables
|
||||
* are cached in memory) that we want to change in a experiment run.
|
||||
* A Variation has three parts, `name`, `options`, and `setup`.
|
||||
* The `name` is the identifier of a Variation. `options` is a Seq of options that
|
||||
* will be used for a query. Basically, a query will be executed with every option
|
||||
* defined in the list of `options`. `setup` defines the needed action for every
|
||||
* option. For example, the following Variation is used to change the number of shuffle
|
||||
* partitions of a query. The name of the Variation is "shufflePartitions". There are
|
||||
* two options, 200 and 2000. The setup is used to set the value of property
|
||||
* "spark.sql.shuffle.partitions".
|
||||
*
|
||||
* {{{
|
||||
* Variation("shufflePartitions", Seq("200", "2000")) {
|
||||
* case num => sqlContext.setConf("spark.sql.shuffle.partitions", num)
|
||||
* }
|
||||
* }}}
|
||||
*/
|
||||
case class Variation[T](name: String, options: Seq[T])(val setup: T => Unit)
|
||||
|
||||
case class Table(
|
||||
name: String,
|
||||
data: Dataset[_])
|
||||
|
||||
|
||||
object Benchmark {
|
||||
|
||||
class ExperimentStatus(
|
||||
executionsToRun: Seq[Benchmarkable],
|
||||
includeBreakdown: Boolean,
|
||||
iterations: Int,
|
||||
variations: Seq[Variation[_]],
|
||||
tags: Map[String, String],
|
||||
timeout: Long,
|
||||
resultsLocation: String,
|
||||
sqlContext: SQLContext,
|
||||
allTables: Seq[Table],
|
||||
currentConfiguration: BenchmarkConfiguration,
|
||||
forkThread: Boolean = true) {
|
||||
val currentResults = new collection.mutable.ArrayBuffer[BenchmarkResult]()
|
||||
val currentRuns = new collection.mutable.ArrayBuffer[ExperimentRun]()
|
||||
val currentMessages = new collection.mutable.ArrayBuffer[String]()
|
||||
|
||||
def logMessage(msg: String) = {
|
||||
println(msg)
|
||||
currentMessages += msg
|
||||
}
|
||||
|
||||
// Stats for HTML status message.
|
||||
@volatile var currentExecution = ""
|
||||
@volatile var currentPlan = "" // for queries only
|
||||
@volatile var currentConfig = ""
|
||||
@volatile var failures = 0
|
||||
@volatile var startTime = 0L
|
||||
|
||||
/** An optional log collection task that will run after the experiment. */
|
||||
@volatile var logCollection: () => Unit = () => {}
|
||||
|
||||
|
||||
def cartesianProduct[T](xss: List[List[T]]): List[List[T]] = xss match {
|
||||
case Nil => List(Nil)
|
||||
case h :: t => for(xh <- h; xt <- cartesianProduct(t)) yield xh :: xt
|
||||
}
|
||||
|
||||
val timestamp = System.currentTimeMillis()
|
||||
val resultPath = s"$resultsLocation/timestamp=$timestamp"
|
||||
val combinations = cartesianProduct(variations.map(l => (0 until l.options.size).toList).toList)
|
||||
val resultsFuture = Future {
|
||||
|
||||
// If we're running queries, create tables for them
|
||||
executionsToRun
|
||||
.collect { case query: Query => query }
|
||||
.flatMap { query =>
|
||||
try {
|
||||
query.newDataFrame().queryExecution.logical.collect {
|
||||
case UnresolvedRelation(t, _) => t.table
|
||||
}
|
||||
} catch {
|
||||
// ignore the queries that can't be parsed
|
||||
case e: Exception => Seq()
|
||||
}
|
||||
}
|
||||
.distinct
|
||||
.foreach { name =>
|
||||
try {
|
||||
sqlContext.table(name)
|
||||
logMessage(s"Table $name exists.")
|
||||
} catch {
|
||||
case ae: Exception =>
|
||||
val table = allTables
|
||||
.find(_.name == name)
|
||||
if (table.isDefined) {
|
||||
logMessage(s"Creating table: $name")
|
||||
table.get.data
|
||||
.write
|
||||
.mode("overwrite")
|
||||
.saveAsTable(name)
|
||||
} else {
|
||||
// the table could be subquery
|
||||
logMessage(s"Couldn't read table $name and its not defined as a Benchmark.Table.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run the benchmarks!
|
||||
val results: Seq[ExperimentRun] = (1 to iterations).flatMap { i =>
|
||||
combinations.map { setup =>
|
||||
val currentOptions = variations.asInstanceOf[Seq[Variation[Any]]].zip(setup).map {
|
||||
case (v, idx) =>
|
||||
v.setup(v.options(idx))
|
||||
v.name -> v.options(idx).toString
|
||||
}
|
||||
currentConfig = currentOptions.map { case (k,v) => s"$k: $v" }.mkString(", ")
|
||||
|
||||
val res = executionsToRun.flatMap { q =>
|
||||
val setup = s"iteration: $i, ${currentOptions.map { case (k, v) => s"$k=$v"}.mkString(", ")}"
|
||||
logMessage(s"Running execution ${q.name} $setup")
|
||||
|
||||
currentExecution = q.name
|
||||
currentPlan = q match {
|
||||
case query: Query =>
|
||||
try {
|
||||
query.newDataFrame().queryExecution.executedPlan.toString()
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
s"failed to parse: $e"
|
||||
}
|
||||
case _ => ""
|
||||
}
|
||||
startTime = System.currentTimeMillis()
|
||||
|
||||
val singleResultT = Try {
|
||||
q.benchmark(includeBreakdown, setup, currentMessages, timeout,
|
||||
forkThread=forkThread)
|
||||
}
|
||||
|
||||
singleResultT match {
|
||||
case Success(singleResult) =>
|
||||
singleResult.failure.foreach { f =>
|
||||
failures += 1
|
||||
logMessage(s"Execution '${q.name}' failed: ${f.message}")
|
||||
}
|
||||
singleResult.executionTime.foreach { time =>
|
||||
logMessage(s"Execution time: ${time / 1000}s")
|
||||
}
|
||||
currentResults += singleResult
|
||||
singleResult :: Nil
|
||||
case SFailure(e) =>
|
||||
failures += 1
|
||||
logMessage(s"Execution '${q.name}' failed: ${e}")
|
||||
Nil
|
||||
}
|
||||
}
|
||||
|
||||
val result = ExperimentRun(
|
||||
timestamp = timestamp,
|
||||
iteration = i,
|
||||
tags = currentOptions.toMap ++ tags,
|
||||
configuration = currentConfiguration,
|
||||
res)
|
||||
|
||||
currentRuns += result
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
val resultsTable = sqlContext.createDataFrame(results)
|
||||
logMessage(s"Results written to table: 'sqlPerformance' at $resultPath")
|
||||
resultsTable
|
||||
.coalesce(1)
|
||||
.write
|
||||
.format("json")
|
||||
.save(resultPath)
|
||||
} catch {
|
||||
case e: Throwable => logMessage(s"Failed to write data: $e")
|
||||
}
|
||||
|
||||
logCollection()
|
||||
}
|
||||
|
||||
def scheduleCpuCollection(fs: FS) = {
|
||||
logCollection = () => {
|
||||
logMessage(s"Begining CPU log collection")
|
||||
try {
|
||||
val location = cpu.collectLogs(sqlContext, fs, timestamp)
|
||||
logMessage(s"cpu results recorded to $location")
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
logMessage(s"Error collecting logs: $e")
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def cpuProfile = new Profile(sqlContext, sqlContext.read.json(getCpuLocation(timestamp)))
|
||||
|
||||
def cpuProfileHtml(fs: FS) = {
|
||||
s"""
|
||||
|<h1>CPU Profile</h1>
|
||||
|<b>Permalink:</b> <tt>sqlContext.read.json("${getCpuLocation(timestamp)}")</tt></br>
|
||||
|${cpuProfile.buildGraph(fs)}
|
||||
""".stripMargin
|
||||
}
|
||||
|
||||
/** Waits for the finish of the experiment. */
|
||||
def waitForFinish(timeoutInSeconds: Int) = {
|
||||
Await.result(resultsFuture, timeoutInSeconds.seconds)
|
||||
}
|
||||
|
||||
/** Returns results from an actively running experiment. */
|
||||
def getCurrentResults() = {
|
||||
val tbl = sqlContext.createDataFrame(currentResults)
|
||||
tbl.registerTempTable("currentResults")
|
||||
tbl
|
||||
}
|
||||
|
||||
/** Returns full iterations from an actively running experiment. */
|
||||
def getCurrentRuns() = {
|
||||
val tbl = sqlContext.createDataFrame(currentRuns)
|
||||
tbl.registerTempTable("currentRuns")
|
||||
tbl
|
||||
}
|
||||
|
||||
def tail(n: Int = 20) = {
|
||||
currentMessages.takeRight(n).mkString("\n")
|
||||
}
|
||||
|
||||
def status =
|
||||
if (resultsFuture.isCompleted) {
|
||||
if (resultsFuture.value.get.isFailure) "Failed" else "Successful"
|
||||
} else {
|
||||
"Running"
|
||||
}
|
||||
|
||||
override def toString =
|
||||
s"""Permalink: table("sqlPerformance").where('timestamp === ${timestamp}L)"""
|
||||
|
||||
|
||||
def html: String = {
|
||||
val maybeQueryPlan: String =
|
||||
if (currentPlan.nonEmpty) {
|
||||
s"""
|
||||
|<h3>QueryPlan</h3>
|
||||
|<pre>
|
||||
|${currentPlan.replaceAll("\n", "<br/>")}
|
||||
|</pre>
|
||||
""".stripMargin
|
||||
} else {
|
||||
""
|
||||
}
|
||||
s"""
|
||||
|<h2>$status Experiment</h2>
|
||||
|<b>Permalink:</b> <tt>sqlContext.read.json("$resultPath")</tt><br/>
|
||||
|<b>Iterations complete:</b> ${currentRuns.size / combinations.size} / $iterations<br/>
|
||||
|<b>Failures:</b> $failures<br/>
|
||||
|<b>Executions run:</b> ${currentResults.size} / ${iterations * combinations.size * executionsToRun.size}
|
||||
|<br/>
|
||||
|<b>Run time:</b> ${(System.currentTimeMillis() - timestamp) / 1000}s<br/>
|
||||
|
|
||||
|<h2>Current Execution: $currentExecution</h2>
|
||||
|Runtime: ${(System.currentTimeMillis() - startTime) / 1000}s<br/>
|
||||
|$currentConfig<br/>
|
||||
|$maybeQueryPlan
|
||||
|<h2>Logs</h2>
|
||||
|<pre>
|
||||
|${tail()}
|
||||
|</pre>
|
||||
""".stripMargin
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,13 +18,17 @@ package com.databricks.spark.sql.perf
|
||||
|
||||
import java.util.UUID
|
||||
|
||||
import com.typesafe.scalalogging.slf4j.Logging
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.{SparkEnv, SparkContext}
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
/** A trait to describe things that can be benchmarked. */
|
||||
trait Benchmarkable {
|
||||
trait Benchmarkable extends Logging {
|
||||
@transient protected[this] val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
|
||||
@transient protected[this] val sparkContext = sqlContext.sparkContext
|
||||
|
||||
@ -35,10 +39,16 @@ trait Benchmarkable {
|
||||
includeBreakdown: Boolean,
|
||||
description: String = "",
|
||||
messages: ArrayBuffer[String],
|
||||
timeout: Long): BenchmarkResult = {
|
||||
timeout: Long,
|
||||
forkThread: Boolean = true): BenchmarkResult = {
|
||||
logger.info(s"$this: benchmark")
|
||||
sparkContext.setJobDescription(s"Execution: $name, $description")
|
||||
beforeBenchmark()
|
||||
val result = runBenchmark(includeBreakdown, description, messages, timeout)
|
||||
val result = if (forkThread) {
|
||||
runBenchmarkForked(includeBreakdown, description, messages, timeout)
|
||||
} else {
|
||||
doBenchmark(includeBreakdown, description, messages)
|
||||
}
|
||||
afterBenchmark(sqlContext.sparkContext)
|
||||
result
|
||||
}
|
||||
@ -54,17 +64,25 @@ trait Benchmarkable {
|
||||
.foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) }
|
||||
}
|
||||
|
||||
private def runBenchmark(
|
||||
private def runBenchmarkForked(
|
||||
includeBreakdown: Boolean,
|
||||
description: String = "",
|
||||
messages: ArrayBuffer[String],
|
||||
timeout: Long): BenchmarkResult = {
|
||||
val jobgroup = UUID.randomUUID().toString
|
||||
val that = this
|
||||
var result: BenchmarkResult = null
|
||||
val thread = new Thread("benchmark runner") {
|
||||
override def run(): Unit = {
|
||||
logger.info(s"$that running $this")
|
||||
sparkContext.setJobGroup(jobgroup, s"benchmark $name", true)
|
||||
result = doBenchmark(includeBreakdown, description, messages)
|
||||
try {
|
||||
result = doBenchmark(includeBreakdown, description, messages)
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
logger.info(s"$that: failure in runBenchmark: $e")
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
thread.setDaemon(true)
|
||||
@ -93,4 +111,11 @@ trait Benchmarkable {
|
||||
val endTime = System.nanoTime()
|
||||
(endTime - startTime).toDouble / 1000000
|
||||
}
|
||||
}
|
||||
|
||||
protected def measureTime[A](f: => A): (Duration, A) = {
|
||||
val startTime = System.nanoTime()
|
||||
val res = f
|
||||
val endTime = System.nanoTime()
|
||||
(endTime - startTime).nanos -> res
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,94 @@
|
||||
package com.databricks.spark.sql.perf.mllib
|
||||
|
||||
import com.typesafe.scalalogging.slf4j.Logging
|
||||
|
||||
import org.apache.spark.ml.Transformer
|
||||
import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator, Evaluator}
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.functions._
|
||||
|
||||
/**
|
||||
* The description of a benchmark for an ML algorithm. It follows a simple, standard proceduce:
|
||||
* - generate some test and training data
|
||||
* - generate a model against the training data
|
||||
* - score the model against the training data
|
||||
* - score the model against the test data
|
||||
*
|
||||
* You should not assume that your implementation can carry state around. If some state is needed,
|
||||
* consider adding it to the context.
|
||||
*
|
||||
* It is assumed that the implementation is going to be an object.
|
||||
*/
|
||||
trait BenchmarkAlgorithm extends Logging {
|
||||
|
||||
def trainingDataSet(ctx: MLBenchContext): DataFrame
|
||||
|
||||
def testDataSet(ctx: MLBenchContext): DataFrame
|
||||
|
||||
@throws[Exception]("if training fails")
|
||||
def train(
|
||||
ctx: MLBenchContext,
|
||||
trainingSet: DataFrame): Transformer
|
||||
|
||||
@throws[Exception]("if scoring fails")
|
||||
def score(
|
||||
ctx: MLBenchContext,
|
||||
testSet: DataFrame,
|
||||
model: Transformer): Double = -1.0 // Not putting NaN because it is not valid JSON.
|
||||
}
|
||||
|
||||
/**
|
||||
* Uses an evaluator to perform the scoring.
|
||||
*/
|
||||
trait ScoringWithEvaluator {
|
||||
self: BenchmarkAlgorithm =>
|
||||
|
||||
protected def evaluator(ctx: MLBenchContext): Evaluator
|
||||
|
||||
final override def score(
|
||||
ctx: MLBenchContext,
|
||||
testSet: DataFrame,
|
||||
model: Transformer): Double = {
|
||||
val eval = model.transform(testSet)
|
||||
evaluator(ctx).evaluate(eval)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the training set for an initial dataset and an initial model. Useful for validating a
|
||||
* trained model against a given model.
|
||||
*/
|
||||
trait TrainingSetFromTransformer {
|
||||
self: BenchmarkAlgorithm =>
|
||||
|
||||
protected def initialData(ctx: MLBenchContext): DataFrame
|
||||
|
||||
protected def trueModel(ctx: MLBenchContext): Transformer
|
||||
|
||||
final override def trainingDataSet(ctx: MLBenchContext): DataFrame = {
|
||||
val initial = initialData(ctx)
|
||||
val model = trueModel(ctx)
|
||||
model.transform(initial).select(col("features"), col("prediction").as("label"))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The test data is the same as the training data.
|
||||
*/
|
||||
trait TestFromTraining {
|
||||
self: BenchmarkAlgorithm =>
|
||||
|
||||
final override def testDataSet(ctx: MLBenchContext): DataFrame = {
|
||||
// Copy the context with a new seed.
|
||||
val ctx2 = ctx.params.randomSeed match {
|
||||
case Some(x) =>
|
||||
val p = ctx.params.copy(randomSeed = Some(x + 1))
|
||||
ctx.copy(params = p)
|
||||
case None =>
|
||||
// Making a full copy to reset the internal seed.
|
||||
ctx.copy()
|
||||
}
|
||||
self.trainingDataSet(ctx2)
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,39 @@
|
||||
package com.databricks.spark.sql.perf.mllib
|
||||
|
||||
import java.util.Random
|
||||
|
||||
import com.databricks.spark.sql.perf.{MLParams}
|
||||
import org.apache.spark.sql.SQLContext
|
||||
|
||||
|
||||
/**
|
||||
* All the information required to run a test.
|
||||
*
|
||||
* @param params
|
||||
* @param sqlContext
|
||||
*/
|
||||
case class MLBenchContext(
|
||||
params: MLParams,
|
||||
sqlContext: SQLContext) {
|
||||
|
||||
// Some seed fixed for the context.
|
||||
private val internalSeed: Long = {
|
||||
params.randomSeed.map(_.toLong).getOrElse {
|
||||
throw new Exception("You need te specify the random seed")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A fixed seed for this class. This function will always return the same value.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
def seed(): Long = internalSeed
|
||||
|
||||
/**
|
||||
* Creates a new generator. The generator will always start with the same state.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
def newGenerator(): Random = new Random(seed())
|
||||
}
|
||||
@ -0,0 +1,37 @@
|
||||
package com.databricks.spark.sql.perf.mllib
|
||||
|
||||
import com.databricks.spark.sql.perf.mllib.classification.LogisticRegression
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.sql.SQLContext
|
||||
|
||||
import com.databricks.spark.sql.perf.{MLParams}
|
||||
import OptionImplicits._
|
||||
|
||||
case class MLTest(
|
||||
benchmark: BenchmarkAlgorithm,
|
||||
params: MLParams)
|
||||
|
||||
// Example on how to create benchmarks using the API.
|
||||
object MLBenchmarks {
|
||||
// The list of standard benchmarks that we are going to run for ML.
|
||||
val benchmarks: Seq[MLTest] = List(
|
||||
MLTest(
|
||||
LogisticRegression,
|
||||
MLParams(
|
||||
numFeatures = 10,
|
||||
numExamples = 10,
|
||||
numTestExamples = 10,
|
||||
numPartitions = 3,
|
||||
regParam = 1,
|
||||
tol = 0.2)
|
||||
)
|
||||
)
|
||||
|
||||
val context = SparkContext.getOrCreate()
|
||||
val sqlContext: SQLContext = SQLContext.getOrCreate(context)
|
||||
|
||||
def benchmarkObjects: Seq[MLTransformerBenchmarkable] = benchmarks.map { mlb =>
|
||||
new MLTransformerBenchmarkable(mlb.params, mlb.benchmark, sqlContext)
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,70 @@
|
||||
package com.databricks.spark.sql.perf.mllib
|
||||
|
||||
import scala.language.implicitConversions
|
||||
|
||||
import com.databricks.spark.sql.perf._
|
||||
|
||||
import com.typesafe.scalalogging.slf4j.Logging
|
||||
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.sql.{DataFrame, SQLContext}
|
||||
|
||||
|
||||
class MLLib(@transient sqlContext: SQLContext)
|
||||
extends Benchmark(sqlContext) with Serializable {
|
||||
|
||||
def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate()))
|
||||
}
|
||||
|
||||
object MLLib extends Logging {
|
||||
|
||||
/**
|
||||
* Runs a set of preprogrammed experiments and blocks on completion.
|
||||
*
|
||||
* @param runConfig a configuration that is av
|
||||
* @return
|
||||
*/
|
||||
def runDefault(runConfig: RunConfig): DataFrame = {
|
||||
val ml = new MLLib()
|
||||
val benchmarks = MLBenchmarks.benchmarkObjects
|
||||
val e = ml.runExperiment(
|
||||
executionsToRun = benchmarks)
|
||||
e.waitForFinish(1000 * 60 * 30)
|
||||
logger.info("Run finished")
|
||||
e.getCurrentResults()
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs all the experiments and blocks on completion
|
||||
*
|
||||
* @param yamlFile a file name
|
||||
* @return
|
||||
*/
|
||||
def run(yamlFile: String = null, yamlConfig: String = null): DataFrame = {
|
||||
logger.info("Starting run")
|
||||
val conf: YamlConfig = Option(yamlFile).map(YamlConfig.readFile).getOrElse {
|
||||
require(yamlConfig != null)
|
||||
YamlConfig.readString(yamlConfig)
|
||||
}
|
||||
val sc = SparkContext.getOrCreate()
|
||||
sc.setLogLevel("INFO")
|
||||
val b = new com.databricks.spark.sql.perf.mllib.MLLib()
|
||||
val sqlContext = com.databricks.spark.sql.perf.mllib.MLBenchmarks.sqlContext
|
||||
val benchmarksDescriptions = conf.runnableBenchmarks
|
||||
val benchmarks = benchmarksDescriptions.map { mlb =>
|
||||
new MLTransformerBenchmarkable(mlb.params, mlb.benchmark, sqlContext)
|
||||
}
|
||||
logger.info(s"${benchmarks.size} benchmarks identified:")
|
||||
val str = benchmarks.map(_.prettyPrint).mkString("\n")
|
||||
logger.info(str)
|
||||
logger.info("Starting experiments")
|
||||
val e = b.runExperiment(
|
||||
executionsToRun = benchmarks,
|
||||
iterations = 1, // If you want to increase the number of iterations, add more seeds
|
||||
resultLocation = conf.output,
|
||||
forkThread = false)
|
||||
e.waitForFinish(conf.timeout.toSeconds.toInt)
|
||||
logger.info("Run finished")
|
||||
e.getCurrentResults()
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,107 @@
|
||||
package com.databricks.spark.sql.perf.mllib
|
||||
|
||||
import com.databricks.spark.sql.perf._
|
||||
import com.typesafe.scalalogging.slf4j.Logging
|
||||
import org.apache.spark.sql._
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
class MLTransformerBenchmarkable(
|
||||
params: MLParams,
|
||||
test: BenchmarkAlgorithm,
|
||||
sqlContext: SQLContext)
|
||||
extends Benchmarkable with Serializable with Logging {
|
||||
|
||||
import MLTransformerBenchmarkable._
|
||||
|
||||
private var testData: DataFrame = null
|
||||
private var trainingData: DataFrame = null
|
||||
private val param = MLBenchContext(params, sqlContext)
|
||||
|
||||
override val name = test.getClass.getCanonicalName
|
||||
|
||||
override protected val executionMode: ExecutionMode = ExecutionMode.SparkPerfResults
|
||||
|
||||
override protected def beforeBenchmark(): Unit = {
|
||||
logger.info(s"$this beforeBenchmark")
|
||||
try {
|
||||
testData = test.testDataSet(param)
|
||||
testData.cache()
|
||||
testData.count()
|
||||
trainingData = test.trainingDataSet(param)
|
||||
trainingData.cache()
|
||||
trainingData.count()
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
println(s"$this error in beforeBenchmark: ${e.getStackTraceString}")
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
override protected def doBenchmark(
|
||||
includeBreakdown: Boolean,
|
||||
description: String,
|
||||
messages: ArrayBuffer[String]): BenchmarkResult = {
|
||||
try {
|
||||
val (trainingTime, model) = measureTime(test.train(param, trainingData))
|
||||
logger.info(s"model: $model")
|
||||
val (_, scoreTraining) = measureTime {
|
||||
test.score(param, trainingData, model)
|
||||
}
|
||||
val (scoreTestTime, scoreTest) = measureTime {
|
||||
test.score(param, testData, model)
|
||||
}
|
||||
|
||||
|
||||
val ml = MLResult(
|
||||
trainingTime = Some(trainingTime.toMillis),
|
||||
trainingMetric = Some(scoreTraining),
|
||||
testTime = Some(scoreTestTime.toMillis),
|
||||
testMetric = Some(scoreTest))
|
||||
|
||||
BenchmarkResult(
|
||||
name = name,
|
||||
mode = executionMode.toString,
|
||||
parameters = Map.empty,
|
||||
executionTime = Some(trainingTime.toMillis),
|
||||
mlParams = Some(params),
|
||||
mlResult = Some(ml))
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
BenchmarkResult(
|
||||
name = name,
|
||||
mode = executionMode.toString,
|
||||
parameters = Map.empty,
|
||||
failure = Some(Failure(e.getClass.getSimpleName,
|
||||
e.getMessage + ":\n" + e.getStackTraceString)))
|
||||
} finally {
|
||||
Option(testData).map(_.unpersist())
|
||||
Option(trainingData).map(_.unpersist())
|
||||
}
|
||||
}
|
||||
|
||||
def prettyPrint: String = {
|
||||
val paramString = pprint(params).mkString("\n")
|
||||
s"$test\n$paramString"
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
object MLTransformerBenchmarkable {
|
||||
private def pprint(p: AnyRef): Seq[String] = {
|
||||
val m = getCCParams(p)
|
||||
m.flatMap {
|
||||
case (key, Some(value: Any)) => Some(s" $key=$value")
|
||||
case _ => None
|
||||
} .toSeq
|
||||
}
|
||||
|
||||
// From http://stackoverflow.com/questions/1226555/case-class-to-map-in-scala
|
||||
private def getCCParams(cc: AnyRef) =
|
||||
(Map[String, Any]() /: cc.getClass.getDeclaredFields) {(a, f) =>
|
||||
f.setAccessible(true)
|
||||
a + (f.getName -> f.get(cc))
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,45 @@
|
||||
package com.databricks.spark.sql.perf.mllib
|
||||
|
||||
import scala.language.implicitConversions
|
||||
|
||||
/**
|
||||
* Implicits to transparently convert some Option[X] to X and vice-versa.
|
||||
*
|
||||
* This is usually dangerous to do, but in our case, the config is expressed through Options and
|
||||
* it alleviates the need to manually box values.
|
||||
*/
|
||||
object OptionImplicits {
|
||||
// The following implicits are unrolled for safety:
|
||||
private def oX2X[A](x: Option[A]): A = x.get
|
||||
|
||||
def checkLong(x: Option[Long]): Option[Long] = {
|
||||
x.asInstanceOf[Option[Any]] match {
|
||||
case Some(u: java.lang.Integer) => Some(u.toLong)
|
||||
case Some(u: java.lang.Long) => Some(u.toLong)
|
||||
case _ => x
|
||||
}
|
||||
}
|
||||
|
||||
def checkDouble(x: Option[Double]): Option[Double] = {
|
||||
x.asInstanceOf[Option[Any]] match {
|
||||
case Some(u: java.lang.Integer) => Some(u.toDouble)
|
||||
case Some(u: java.lang.Long) => Some(u.toDouble)
|
||||
case Some(u: java.lang.Double) => Some(u.toDouble)
|
||||
case _ => x
|
||||
}
|
||||
}
|
||||
|
||||
implicit def oD2D(x: Option[Double]): Double = oX2X(x)
|
||||
|
||||
implicit def oS2S(x: Option[String]): String = oX2X(x)
|
||||
|
||||
implicit def oI2I(x: Option[Int]): Int = oX2X(x)
|
||||
|
||||
implicit def oL2L(x: Option[Long]): Long = oX2X(x)
|
||||
|
||||
implicit def l2lo(x: Long): Option[Long] = checkLong(Option(x))
|
||||
implicit def i2lo(x: Int): Option[Long] = Option(x.toLong)
|
||||
implicit def i2io(x: Int): Option[Int] = Option(x)
|
||||
implicit def d2do(x: Double): Option[Double] = Option(x)
|
||||
implicit def i2do(x: Int): Option[Double] = Option(x)
|
||||
}
|
||||
@ -0,0 +1,48 @@
|
||||
package com.databricks.spark.sql.perf.mllib.classification
|
||||
|
||||
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
|
||||
import com.databricks.spark.sql.perf.mllib._
|
||||
import com.databricks.spark.sql.perf.mllib.data.DataGenerator
|
||||
import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator, Evaluator}
|
||||
|
||||
import org.apache.spark.ml.{Transformer, ModelBuilder}
|
||||
import org.apache.spark.ml
|
||||
import org.apache.spark.ml.linalg.Vectors
|
||||
import org.apache.spark.sql.DataFrame
|
||||
|
||||
object LogisticRegression extends BenchmarkAlgorithm
|
||||
with TestFromTraining with TrainingSetFromTransformer with ScoringWithEvaluator {
|
||||
|
||||
override protected def initialData(ctx: MLBenchContext) = {
|
||||
import ctx.params._
|
||||
DataGenerator.generateFeatures(
|
||||
ctx.sqlContext,
|
||||
numExamples,
|
||||
ctx.seed(),
|
||||
numPartitions,
|
||||
numFeatures)
|
||||
}
|
||||
|
||||
override protected def trueModel(ctx: MLBenchContext): Transformer = {
|
||||
val rng = ctx.newGenerator()
|
||||
val coefficients =
|
||||
Vectors.dense(Array.fill[Double](ctx.params.numFeatures)(2 * rng.nextDouble() - 1))
|
||||
// Small intercept to prevent some skew in the data.
|
||||
val intercept = 0.01 * (2 * rng.nextDouble - 1)
|
||||
ModelBuilder.newLogisticRegressionModel(coefficients, intercept)
|
||||
}
|
||||
|
||||
override def train(ctx: MLBenchContext,
|
||||
trainingSet: DataFrame): Transformer = {
|
||||
logger.info(s"$this: train: trainingSet=${trainingSet.schema}")
|
||||
import ctx.params._
|
||||
val lr = new ml.classification.LogisticRegression()
|
||||
.setTol(tol)
|
||||
.setRegParam(regParam)
|
||||
lr.fit(trainingSet)
|
||||
}
|
||||
|
||||
override protected def evaluator(ctx: MLBenchContext): Evaluator =
|
||||
new MulticlassClassificationEvaluator()
|
||||
}
|
||||
|
||||
@ -0,0 +1,55 @@
|
||||
package com.databricks.spark.sql.perf.mllib.clustering
|
||||
|
||||
import com.databricks.spark.sql.perf.mllib.{MLBenchContext, TestFromTraining, BenchmarkAlgorithm}
|
||||
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
|
||||
import org.apache.commons.math3.random.Well19937c
|
||||
import org.apache.spark.ml.Transformer
|
||||
import org.apache.spark.ml
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.ml.linalg.{Vectors, Vector}
|
||||
import scala.collection.mutable.{HashMap => MHashMap}
|
||||
|
||||
object LDA extends BenchmarkAlgorithm with TestFromTraining {
|
||||
// The LDA model is package private, no need to expose it.
|
||||
|
||||
override def trainingDataSet(ctx: MLBenchContext): DataFrame = {
|
||||
import ctx.params._
|
||||
val rdd = ctx.sqlContext.sparkContext.parallelize(
|
||||
0L until numExamples,
|
||||
numPartitions
|
||||
)
|
||||
val seed: Int = randomSeed
|
||||
val docLength = ldaDocLength.get
|
||||
val numVocab = ldaNumVocabulary.get
|
||||
val data: RDD[(Long, Vector)] = rdd.mapPartitionsWithIndex { (idx, partition) =>
|
||||
val rng = new Well19937c(seed ^ idx)
|
||||
partition.map { docIndex =>
|
||||
var currentSize = 0
|
||||
val entries = MHashMap[Int, Int]()
|
||||
while (currentSize < docLength) {
|
||||
val index = rng.nextInt(numVocab)
|
||||
entries(index) = entries.getOrElse(index, 0) + 1
|
||||
currentSize += 1
|
||||
}
|
||||
|
||||
val iter = entries.toSeq.map(v => (v._1, v._2.toDouble))
|
||||
(docIndex, Vectors.sparse(numVocab, iter))
|
||||
}
|
||||
}
|
||||
ctx.sqlContext.createDataFrame(data).toDF("docIndex", "features")
|
||||
}
|
||||
|
||||
override def train(ctx: MLBenchContext,
|
||||
trainingSet: DataFrame): Transformer = {
|
||||
import ctx.params._
|
||||
new ml.clustering.LDA()
|
||||
.setK(k)
|
||||
.setSeed(randomSeed.toLong)
|
||||
.setMaxIter(maxIter)
|
||||
.setOptimizer(optimizer)
|
||||
.fit(trainingSet)
|
||||
}
|
||||
|
||||
// TODO(?) add a scoring method here.
|
||||
}
|
||||
@ -0,0 +1,102 @@
|
||||
package com.databricks.spark.sql.perf.mllib.data
|
||||
|
||||
import org.apache.spark.ml.linalg.Vectors
|
||||
import org.apache.spark.ml.linalg.Vector
|
||||
import org.apache.spark.mllib.random._
|
||||
import org.apache.spark.mllib.regression.LabeledPoint
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{SQLContext, DataFrame}
|
||||
|
||||
object DataGenerator {
|
||||
|
||||
def generateFeatures(
|
||||
sql: SQLContext,
|
||||
numExamples: Long,
|
||||
seed: Long,
|
||||
numPartitions: Int,
|
||||
numFeatures: Int): DataFrame = {
|
||||
val categoricalArities = Array.empty[Int]
|
||||
val rdd: RDD[Vector] = RandomRDDs.randomRDD(sql.sparkContext,
|
||||
new FeaturesGenerator(categoricalArities, numFeatures),
|
||||
numExamples, numPartitions, seed)
|
||||
sql.createDataFrame(rdd.map(Tuple1.apply)).toDF("features")
|
||||
}
|
||||
}
|
||||
|
||||
class BinaryLabeledDataGenerator(
|
||||
private val numFeatures: Int,
|
||||
private val threshold: Double) extends RandomDataGenerator[LabeledPoint] {
|
||||
|
||||
private val rng = new java.util.Random()
|
||||
|
||||
override def nextValue(): LabeledPoint = {
|
||||
val y = if (rng.nextDouble() < threshold) 0.0 else 1.0
|
||||
val x = Array.fill[Double](numFeatures) {
|
||||
if (rng.nextDouble() < threshold) 0.0 else 1.0
|
||||
}
|
||||
???
|
||||
// LabeledPoint(y, Vectors.dense(x))
|
||||
}
|
||||
|
||||
override def setSeed(seed: Long) {
|
||||
rng.setSeed(seed)
|
||||
}
|
||||
|
||||
override def copy(): BinaryLabeledDataGenerator =
|
||||
new BinaryLabeledDataGenerator(numFeatures, threshold)
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Generator for a feature vector which can include a mix of categorical and continuous features.
|
||||
* @param categoricalArities Specifies the number of categories for each categorical feature.
|
||||
* @param numContinuous Number of continuous features. Feature values are in range [0,1].
|
||||
*/
|
||||
class FeaturesGenerator(val categoricalArities: Array[Int], val numContinuous: Int)
|
||||
extends RandomDataGenerator[Vector] {
|
||||
|
||||
categoricalArities.foreach { arity =>
|
||||
require(arity >= 2, s"FeaturesGenerator given categorical arity = $arity, " +
|
||||
s"but arity should be >= 2.")
|
||||
}
|
||||
|
||||
val numFeatures = categoricalArities.length + numContinuous
|
||||
|
||||
private val rng = new java.util.Random()
|
||||
|
||||
/**
|
||||
* Generates vector with categorical features first, and continuous features in [0,1] second.
|
||||
*/
|
||||
override def nextValue(): Vector = {
|
||||
// Feature ordering matches getCategoricalFeaturesInfo.
|
||||
val arr = new Array[Double](numFeatures)
|
||||
var j = 0
|
||||
while (j < categoricalArities.length) {
|
||||
arr(j) = rng.nextInt(categoricalArities(j))
|
||||
j += 1
|
||||
}
|
||||
while (j < numFeatures) {
|
||||
// Generating some centered data
|
||||
arr(j) = 2 * rng.nextDouble() - 1
|
||||
j += 1
|
||||
}
|
||||
Vectors.dense(arr)
|
||||
}
|
||||
|
||||
override def setSeed(seed: Long) {
|
||||
rng.setSeed(seed)
|
||||
}
|
||||
|
||||
override def copy(): FeaturesGenerator = new FeaturesGenerator(categoricalArities, numContinuous)
|
||||
|
||||
/**
|
||||
* @return categoricalFeaturesInfo Map storing arity of categorical features.
|
||||
* E.g., an entry (n -> k) indicates that feature n is categorical
|
||||
* with k categories indexed from 0: {0, 1, ..., k-1}.
|
||||
*/
|
||||
def getCategoricalFeaturesInfo: Map[Int, Int] = {
|
||||
// Categorical features are indexed from 0 because of the implementation of nextValue().
|
||||
categoricalArities.zipWithIndex.map(_.swap).toMap
|
||||
}
|
||||
}
|
||||
176
src/main/scala/com/databricks/spark/sql/perf/mllib/yaml.scala
Normal file
176
src/main/scala/com/databricks/spark/sql/perf/mllib/yaml.scala
Normal file
@ -0,0 +1,176 @@
|
||||
package com.databricks.spark.sql.perf.mllib
|
||||
|
||||
import java.util.{ArrayList => AL}
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.io.Source
|
||||
|
||||
import scala.reflect._
|
||||
import scala.reflect.runtime.universe._
|
||||
import scala.util.{Try => STry, Success, Failure}
|
||||
|
||||
import org.yaml.snakeyaml.Yaml
|
||||
|
||||
import com.databricks.spark.sql.perf.{MLParams}
|
||||
|
||||
|
||||
/**
|
||||
* The configuration information generated from reading a YAML file.
|
||||
*
|
||||
* @param output the output direct
|
||||
*/
|
||||
case class YamlConfig(
|
||||
output: String = "/tmp/result",
|
||||
timeout: Duration = 20.minutes,
|
||||
runnableBenchmarks: Seq[MLTest])
|
||||
|
||||
object YamlConfig {
|
||||
|
||||
/**
|
||||
* Reads a string (assumed to contain a yaml description) and returns the configuration.
|
||||
*/
|
||||
def readString(s: String): YamlConfig = {
|
||||
println(s)
|
||||
val yaml = new Yaml()
|
||||
val m = dict(yaml.load(s))
|
||||
val common = m.get("common").map(dict).getOrElse(Map.empty)
|
||||
println("common")
|
||||
println(m)
|
||||
val exps = m("benchmarks")
|
||||
.asInstanceOf[AL[Map[String, Any]]].asScala.map(dict).toSeq
|
||||
println("exps:")
|
||||
println(exps)
|
||||
val experiments = exps.flatMap { sd =>
|
||||
val name = sd("name").toString
|
||||
val params = sd.get("params").map(dict).getOrElse(Map.empty)
|
||||
val expParams = cartesian(common ++ params)
|
||||
for (c <- expParams) yield name -> c
|
||||
}
|
||||
println("exp parsed")
|
||||
println(experiments)
|
||||
val e2 = experiments.map { case (n, e) =>
|
||||
val e2 = ccFromMap.fromMap[MLParams](e, strict=true)
|
||||
val s = ccFromMap.loadExperiment(n).getOrElse {
|
||||
throw new Exception(s"Cannot find algorithm $n in the standard benchmark algorithms")
|
||||
}
|
||||
MLTest(s, e2)
|
||||
}
|
||||
var c = YamlConfig(runnableBenchmarks = e2)
|
||||
for (output <- m.get("output")) {
|
||||
c = c.copy(output = output.toString)
|
||||
}
|
||||
for (x <- m.get("timeoutSeconds")) {
|
||||
c = c.copy(timeout = x.toString.toInt.seconds)
|
||||
}
|
||||
c
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a file (assumed to contain a yaml config).
|
||||
*/
|
||||
def readFile(filename: String): YamlConfig = {
|
||||
readString(Source.fromFile(filename).mkString)
|
||||
}
|
||||
|
||||
// Converts a java dictionary to a scala map.
|
||||
private def dict[T](d: T): Map[String, Any] = {
|
||||
d.asInstanceOf[java.util.Map[String, Any]].asScala.toMap
|
||||
}
|
||||
|
||||
/**
|
||||
* Given keys that may be lists, builds the cartesian product of all the values into defined
|
||||
* options.
|
||||
*
|
||||
* For example: {a: [1,2], b: [3,4]} -> {a: 1, b: 3}, {a: 1, b:4}, {a:2, b:3}, ...
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
private def cartesian(m: Map[String, Any]): Seq[Map[String, Any]] = {
|
||||
if (m.isEmpty) {
|
||||
Seq(m)
|
||||
} else {
|
||||
val k = m.keys.head
|
||||
val sub = m - k
|
||||
val l = cartesian(sub)
|
||||
m(k) match {
|
||||
case a: AL[_] =>
|
||||
for {
|
||||
x <- a.asScala.toSeq
|
||||
m2 <- l
|
||||
} yield {
|
||||
m2 ++ Map(k -> x.asInstanceOf[Any])
|
||||
}
|
||||
case _ =>
|
||||
val v = m(k)
|
||||
l.map { m => m ++ Map(k -> v) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Some ugly internals to make simple constructs
|
||||
package object ccFromMap {
|
||||
// Builds a case class from a map.
|
||||
// (taken from stack overflow)
|
||||
// if strict, will report an error if some unknown arguments are passed to the constructor
|
||||
def fromMap[T: TypeTag: ClassTag](m: Map[String,_], strict: Boolean) = {
|
||||
|
||||
scala.reflect.runtime.universe
|
||||
val rm = runtimeMirror(classTag[T].runtimeClass.getClassLoader)
|
||||
val classTest = typeOf[T].typeSymbol.asClass
|
||||
val classMirror = rm.reflectClass(classTest)
|
||||
val constructor = typeOf[T].declaration(nme.CONSTRUCTOR).asMethod
|
||||
val constructorMirror = classMirror.reflectConstructor(constructor)
|
||||
|
||||
val constructorArgNames = constructor.paramss.flatten.map(_.name.toString).toSet
|
||||
val extraElements = m.keySet -- constructorArgNames
|
||||
if (extraElements.nonEmpty) {
|
||||
throw new Exception(s"Found extra arguments when instantiating an object of " +
|
||||
s"class ${classTest.asClass.toString}:" +
|
||||
s" ${extraElements.toSeq.sorted}")
|
||||
}
|
||||
|
||||
val constructorArgs = constructor.paramss.flatten.map( (param: Symbol) => {
|
||||
val paramName = param.name.toString
|
||||
if(param.typeSignature <:< typeOf[Option[Long]])
|
||||
OptionImplicits.checkLong(m.get(paramName).asInstanceOf[Option[Long]])
|
||||
else if(param.typeSignature <:< typeOf[Option[Double]])
|
||||
OptionImplicits.checkDouble(m.get(paramName).asInstanceOf[Option[Double]])
|
||||
else if(param.typeSignature <:< typeOf[Option[Any]])
|
||||
m.get(paramName)
|
||||
else
|
||||
m.get(paramName).getOrElse(throw new IllegalArgumentException("Map is missing required parameter named " + paramName))
|
||||
})
|
||||
|
||||
constructorMirror(constructorArgs:_*).asInstanceOf[T]
|
||||
}
|
||||
|
||||
// TODO: handle scala.reflect.internal.MissingRequirementError
|
||||
private def load(name: String): STry[BenchmarkAlgorithm] = {
|
||||
val rm = runtimeMirror(getClass.getClassLoader)
|
||||
try {
|
||||
val module = rm.staticModule("com.databricks.spark.sql.perf.mllib." + name)
|
||||
val obj = rm.reflectModule(module)
|
||||
Success(obj.instance.asInstanceOf[BenchmarkAlgorithm])
|
||||
} catch {
|
||||
case x: scala.reflect.internal.MissingRequirementError =>
|
||||
Failure(x)
|
||||
}
|
||||
}
|
||||
|
||||
val defaultPackages = Seq(
|
||||
"",
|
||||
"com.databricks.spark.sql.perf.mllib"
|
||||
)
|
||||
|
||||
def loadExperiment(
|
||||
name: String,
|
||||
searchPackages: Seq[String] = defaultPackages): Option[BenchmarkAlgorithm] = {
|
||||
searchPackages.view.flatMap { p =>
|
||||
val n = if (p.isEmpty) name else s"$p.$name"
|
||||
load(n).toOption
|
||||
} .headOption
|
||||
}
|
||||
}
|
||||
@ -18,6 +18,7 @@ package com.databricks.spark.sql.perf
|
||||
|
||||
/**
|
||||
* The performance results of all given queries for a single iteration.
|
||||
*
|
||||
* @param timestamp The timestamp indicates when the entire experiment is started.
|
||||
* @param iteration The index number of the current iteration.
|
||||
* @param tags Tags of this iteration (variations are stored at here).
|
||||
@ -33,6 +34,7 @@ case class ExperimentRun(
|
||||
|
||||
/**
|
||||
* The configuration used for an iteration of an experiment.
|
||||
*
|
||||
* @param sparkVersion The version of Spark.
|
||||
* @param sqlConf All configuration properties related to Spark SQL.
|
||||
* @param sparkConf All configuration properties of Spark.
|
||||
@ -48,6 +50,7 @@ case class BenchmarkConfiguration(
|
||||
|
||||
/**
|
||||
* The result of a query.
|
||||
*
|
||||
* @param name The name of the query.
|
||||
* @param mode The ExecutionMode of this run.
|
||||
* @param parameters Additional parameters that describe this query.
|
||||
@ -77,10 +80,13 @@ case class BenchmarkResult(
|
||||
result: Option[Long] = None,
|
||||
breakDown: Seq[BreakdownResult] = Nil,
|
||||
queryExecution: Option[String] = None,
|
||||
failure: Option[Failure] = None)
|
||||
failure: Option[Failure] = None,
|
||||
mlParams: Option[MLParams] = None,
|
||||
mlResult: Option[MLResult] = None)
|
||||
|
||||
/**
|
||||
* The execution time of a subtree of the query plan tree of a specific query.
|
||||
*
|
||||
* @param nodeName The name of the top physical operator of the subtree.
|
||||
* @param nodeNameWithArgs The name and arguments of the top physical operator of the subtree.
|
||||
* @param index The index of the top physical operator of the subtree
|
||||
@ -97,3 +103,42 @@ case class BreakdownResult(
|
||||
delta: Double)
|
||||
|
||||
case class Failure(className: String, message: String)
|
||||
|
||||
// KEEP ARGUMENTS SORTED BY NAME.
|
||||
// It simplifies lookup when checking if a parameter is here already.
|
||||
case class MLParams(
|
||||
// *** Common to all algorithms ***
|
||||
randomSeed: Option[Int] = Some(42),
|
||||
numExamples: Option[Long] = None,
|
||||
numTestExamples: Option[Long] = None,
|
||||
numPartitions: Option[Int] = None,
|
||||
// *** Specialized and sorted by name ***
|
||||
ldaDocLength: Option[Int] = None,
|
||||
ldaNumVocabulary: Option[Int] = None,
|
||||
k: Option[Int] = None,
|
||||
maxIter: Option[Int] = None,
|
||||
numFeatures: Option[Int] = None,
|
||||
optimizer: Option[String] = None,
|
||||
regParam: Option[Double] = None,
|
||||
tol: Option[Double] = None
|
||||
)
|
||||
|
||||
object MLParams {
|
||||
val empty = MLParams()
|
||||
}
|
||||
|
||||
/**
|
||||
* Result information specific to MLlib.
|
||||
*
|
||||
* @param trainingTime (MLlib) Training time.
|
||||
* executionTime is set to the same value to match Spark Core tests.
|
||||
* @param trainingMetric (MLlib) Training metric, such as accuracy
|
||||
* @param testTime (MLlib) Test time (for prediction on test set, or on training set if there
|
||||
* is no test set).
|
||||
* @param testMetric (MLlib) Test metric, such as accuracy
|
||||
*/
|
||||
case class MLResult(
|
||||
trainingTime: Option[Double] = None,
|
||||
trainingMetric: Option[Double] = None,
|
||||
testTime: Option[Double] = None,
|
||||
testMetric: Option[Double] = None)
|
||||
|
||||
23
src/main/scala/configs/mllib-small.yaml
Normal file
23
src/main/scala/configs/mllib-small.yaml
Normal file
@ -0,0 +1,23 @@
|
||||
output: /tmp/results2
|
||||
timeoutSeconds: 1000
|
||||
common:
|
||||
numFeatures: 10
|
||||
numExamples: [1, 3]
|
||||
numPartitions: 3
|
||||
randomSeed: [1, 2, 3]
|
||||
benchmarks:
|
||||
- name: classification.LogisticRegression
|
||||
params:
|
||||
numFeatures: 100
|
||||
regParam: 0.1
|
||||
tol: [0.2, 0.1]
|
||||
- name: clustering.LDA
|
||||
params:
|
||||
numExamples: 10
|
||||
ldaDocLength: 20
|
||||
ldaNumVocabulary: 4
|
||||
k: 5
|
||||
maxIter: 10
|
||||
optimizer:
|
||||
- em
|
||||
- online
|
||||
16
src/main/scala/org/apache/spark/ml/ModelBuilder.scala
Normal file
16
src/main/scala/org/apache/spark/ml/ModelBuilder.scala
Normal file
@ -0,0 +1,16 @@
|
||||
package org.apache.spark.ml
|
||||
|
||||
import org.apache.spark.ml.classification.LogisticRegressionModel
|
||||
import org.apache.spark.ml.linalg.Vector
|
||||
|
||||
/**
|
||||
* Helper for creating MLlib models which have private constructors.
|
||||
*/
|
||||
object ModelBuilder {
|
||||
|
||||
def newLogisticRegressionModel(
|
||||
coefficients: Vector,
|
||||
intercept: Double): LogisticRegressionModel = {
|
||||
new LogisticRegressionModel("lr", coefficients, intercept)
|
||||
}
|
||||
}
|
||||
@ -4,7 +4,7 @@ import org.apache.spark.sql.hive.test.TestHive
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
class DatasetPerformanceSuite extends FunSuite {
|
||||
test("run benchmark") {
|
||||
ignore("run benchmark") {
|
||||
TestHive // Init HiveContext
|
||||
val benchmark = new DatasetPerformance() {
|
||||
override val numLongs = 100
|
||||
|
||||
Loading…
Reference in New Issue
Block a user