diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index f354ce2..4ff08ea 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -108,7 +108,7 @@ abstract class Benchmark( forkThread: Boolean = true) = { new ExperimentStatus(executionsToRun, includeBreakdown, iterations, variations, tags, - timeout, resultLocation, sqlContext, allTables, currentConfiguration) + timeout, resultLocation, sqlContext, allTables, currentConfiguration, forkThread = forkThread) } diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala index 53176f8..6303843 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala @@ -81,7 +81,13 @@ trait Benchmarkable extends Logging { } catch { case e: Throwable => logger.info(s"$that: failure in runBenchmark: $e") - throw e + println(s"$that: failure in runBenchmark: $e") + result = BenchmarkResult( + name = name, + mode = executionMode.toString, + parameters = Map.empty, + failure = Some(Failure(e.getClass.getSimpleName, + e.getMessage + ":\n" + e.getStackTraceString))) } } } diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala index 7dc6dc4..ad714e5 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala @@ -30,11 +30,19 @@ trait BenchmarkAlgorithm extends Logging { ctx: MLBenchContext, trainingSet: DataFrame): Transformer + /** + * The unnormalized score of the training procedure on a dataset. The normalization is + * performed by the caller. + */ @throws[Exception]("if scoring fails") def score( ctx: MLBenchContext, testSet: DataFrame, model: Transformer): Double = -1.0 // Not putting NaN because it is not valid JSON. + + def name: String = { + this.getClass.getCanonicalName.replace("$", "") + } } /** @@ -82,7 +90,9 @@ trait TestFromTraining { // Copy the context with a new seed. val ctx2 = ctx.params.randomSeed match { case Some(x) => - val p = ctx.params.copy(randomSeed = Some(x + 1)) + // Also set the number of examples to the number of test examples. + assert(ctx.params.numTestExamples.nonEmpty, "You must specify test examples") + val p = ctx.params.copy(randomSeed = Some(x + 1), numExamples = ctx.params.numTestExamples) ctx.copy(params = p) case None => // Making a full copy to reset the internal seed. diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala index 83d1873..e59ad01 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala @@ -54,9 +54,9 @@ object MLLib extends Logging { val benchmarks = benchmarksDescriptions.map { mlb => new MLTransformerBenchmarkable(mlb.params, mlb.benchmark, sqlContext) } - logger.info(s"${benchmarks.size} benchmarks identified:") + println(s"${benchmarks.size} benchmarks identified:") val str = benchmarks.map(_.prettyPrint).mkString("\n") - logger.info(str) + println(str) logger.info("Starting experiments") val e = b.runExperiment( executionsToRun = benchmarks, diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala index 5c78784..8794c07 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala @@ -16,9 +16,10 @@ class MLTransformerBenchmarkable( private var testData: DataFrame = null private var trainingData: DataFrame = null + private var testDataCount: Option[Long] = None private val param = MLBenchContext(params, sqlContext) - override val name = test.getClass.getCanonicalName + override val name = test.name override protected val executionMode: ExecutionMode = ExecutionMode.SparkPerfResults @@ -27,7 +28,7 @@ class MLTransformerBenchmarkable( try { testData = test.testDataSet(param) testData.cache() - testData.count() + testDataCount = Some(testData.count()) trainingData = test.trainingDataSet(param) trainingData.cache() trainingData.count() @@ -57,7 +58,7 @@ class MLTransformerBenchmarkable( trainingTime = Some(trainingTime.toMillis), trainingMetric = Some(scoreTraining), testTime = Some(scoreTestTime.toMillis), - testMetric = Some(scoreTest)) + testMetric = Some(scoreTest / testDataCount.get)) BenchmarkResult( name = name, diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala index 5e81496..284905d 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/LogisticRegression.scala @@ -38,6 +38,7 @@ object LogisticRegression extends BenchmarkAlgorithm import ctx.params._ val lr = new ml.classification.LogisticRegression() .setTol(tol) + .setMaxIter(maxIter) .setRegParam(regParam) lr.fit(trainingSet) } diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala new file mode 100644 index 0000000..6797e50 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala @@ -0,0 +1,55 @@ +package com.databricks.spark.sql.perf.mllib.regression + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator +import org.apache.spark.ml.evaluation.{Evaluator, RegressionEvaluator} +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.regression.GeneralizedLinearRegression +import org.apache.spark.ml.{ModelBuilder, Transformer} +import org.apache.spark.sql._ + + +object GLMRegression extends BenchmarkAlgorithm with TestFromTraining with + TrainingSetFromTransformer with ScoringWithEvaluator { + + override protected def initialData(ctx: MLBenchContext) = { + import ctx.params._ + DataGenerator.generateFeatures( + ctx.sqlContext, + numExamples, + ctx.seed(), + numPartitions, + numFeatures) + } + + override protected def trueModel(ctx: MLBenchContext): Transformer = { + import ctx.params._ + val rng = ctx.newGenerator() + val coefficients = + Vectors.dense(Array.fill[Double](ctx.params.numFeatures)(2 * rng.nextDouble() - 1)) + // Small intercept to prevent some skew in the data. + val intercept = 0.01 * (2 * rng.nextDouble - 1) + val m = ModelBuilder.newGLR(coefficients, intercept) + m.set(m.link, link.get) + m.set(m.family, family.get) + m + } + + override def train( + ctx: MLBenchContext, + trainingSet: DataFrame): Transformer = { + logger.info(s"$this: train: trainingSet=${trainingSet.schema}") + import ctx.params._ + val glr = new GeneralizedLinearRegression() + .setLink(link) + .setFamily(family) + .setRegParam(regParam) + .setMaxIter(maxIter) + .setTol(tol) + glr.fit(trainingSet) + } + + override protected def evaluator(ctx: MLBenchContext): Evaluator = + new RegressionEvaluator() +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/yaml.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/yaml.scala index ee40461..edd54a7 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/yaml.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/yaml.scala @@ -111,12 +111,11 @@ object YamlConfig { } // Some ugly internals to make simple constructs -package object ccFromMap { +object ccFromMap { // Builds a case class from a map. // (taken from stack overflow) // if strict, will report an error if some unknown arguments are passed to the constructor def fromMap[T: TypeTag: ClassTag](m: Map[String,_], strict: Boolean) = { - scala.reflect.runtime.universe val rm = runtimeMirror(classTag[T].runtimeClass.getClassLoader) val classTest = typeOf[T].typeSymbol.asClass @@ -144,7 +143,8 @@ package object ccFromMap { m.get(paramName).getOrElse(throw new IllegalArgumentException("Map is missing required parameter named " + paramName)) }) - constructorMirror(constructorArgs:_*).asInstanceOf[T] + val res = constructorMirror(constructorArgs:_*).asInstanceOf[T] + res } // TODO: handle scala.reflect.internal.MissingRequirementError diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index d9e93f0..1effa9c 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -113,9 +113,12 @@ case class MLParams( numTestExamples: Option[Long] = None, numPartitions: Option[Int] = None, // *** Specialized and sorted by name *** + elasticNetParam: Option[Double] = None, + family: Option[String] = None, + link: Option[String] = None, + k: Option[Int] = None, ldaDocLength: Option[Int] = None, ldaNumVocabulary: Option[Int] = None, - k: Option[Int] = None, maxIter: Option[Int] = None, numFeatures: Option[Int] = None, optimizer: Option[String] = None, diff --git a/src/main/scala/configs/mllib-small.yaml b/src/main/scala/configs/mllib-small.yaml index eea6c54..5d4fac7 100644 --- a/src/main/scala/configs/mllib-small.yaml +++ b/src/main/scala/configs/mllib-small.yaml @@ -3,6 +3,7 @@ timeoutSeconds: 1000 common: numFeatures: 10 numExamples: [1, 3] + numTestExamples: 100 numPartitions: 3 randomSeed: [1, 2, 3] benchmarks: @@ -14,6 +15,7 @@ benchmarks: - name: clustering.LDA params: numExamples: 10 + numTestExamples: 10 ldaDocLength: 20 ldaNumVocabulary: 4 k: 5 @@ -21,3 +23,13 @@ benchmarks: optimizer: - em - online + - name: regression.GLMRegression + params: + numExamples: 100 + numTestExamples: 10 + numFeatures: 5 + link: log + family: gaussian + tol: 0.0 + maxIter: 10 + regParam: 0.1 diff --git a/src/main/scala/org/apache/spark/ml/ModelBuilder.scala b/src/main/scala/org/apache/spark/ml/ModelBuilder.scala index 1f8174f..089376a 100644 --- a/src/main/scala/org/apache/spark/ml/ModelBuilder.scala +++ b/src/main/scala/org/apache/spark/ml/ModelBuilder.scala @@ -2,6 +2,7 @@ package org.apache.spark.ml import org.apache.spark.ml.classification.LogisticRegressionModel import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.regression.GeneralizedLinearRegressionModel /** * Helper for creating MLlib models which have private constructors. @@ -13,4 +14,9 @@ object ModelBuilder { intercept: Double): LogisticRegressionModel = { new LogisticRegressionModel("lr", coefficients, intercept) } + + def newGLR( + coefficients: Vector, + intercept: Double): GeneralizedLinearRegressionModel = + new GeneralizedLinearRegressionModel("glr-uid", coefficients, intercept) } \ No newline at end of file diff --git a/version.sbt b/version.sbt index f4273b3..078e0fd 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.4.9-SNAPSHOT" +version in ThisBuild := "0.4.10-SNAPSHOT"