From 87dc42a466602adf8e0b9689979bd0dec6608377 Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Thu, 23 Jun 2016 12:13:11 -0700 Subject: [PATCH] work on glm, and some notbooks --- .../spark/sql/perf/Benchmarkable.scala | 12 +++- .../sql/perf/mllib/BenchmarkAlgorithm.scala | 8 ++- .../spark/sql/perf/mllib/MLLib.scala | 4 +- .../mllib/MLTransformerBenchmarkable.scala | 2 +- .../perf/mllib/regression/GLMRegression.scala | 55 +++++++++++++++++++ .../spark/sql/perf/mllib/yaml.scala | 6 +- .../databricks/spark/sql/perf/results.scala | 5 +- src/main/scala/configs/mllib-small.yaml | 12 ++++ .../org/apache/spark/ml/ModelBuilder.scala | 6 ++ version.sbt | 2 +- 10 files changed, 100 insertions(+), 12 deletions(-) create mode 100644 src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala index 53176f8..d8c7148 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala @@ -45,9 +45,9 @@ trait Benchmarkable extends Logging { sparkContext.setJobDescription(s"Execution: $name, $description") beforeBenchmark() val result = if (forkThread) { - runBenchmarkForked(includeBreakdown, description, messages, timeout) - } else { doBenchmark(includeBreakdown, description, messages) + } else { + runBenchmarkForked(includeBreakdown, description, messages, timeout) } afterBenchmark(sqlContext.sparkContext) result @@ -81,7 +81,13 @@ trait Benchmarkable extends Logging { } catch { case e: Throwable => logger.info(s"$that: failure in runBenchmark: $e") - throw e + println(s"$that: failure in runBenchmark: $e") + result = BenchmarkResult( + name = name, + mode = executionMode.toString, + parameters = Map.empty, + failure = Some(Failure(e.getClass.getSimpleName, + e.getMessage + ":\n" + e.getStackTraceString))) } } } diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala index 7dc6dc4..3839ab5 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala @@ -35,6 +35,10 @@ trait BenchmarkAlgorithm extends Logging { ctx: MLBenchContext, testSet: DataFrame, model: Transformer): Double = -1.0 // Not putting NaN because it is not valid JSON. + + def name: String = { + this.getClass.getCanonicalName.replace("$", "") + } } /** @@ -82,7 +86,9 @@ trait TestFromTraining { // Copy the context with a new seed. val ctx2 = ctx.params.randomSeed match { case Some(x) => - val p = ctx.params.copy(randomSeed = Some(x + 1)) + // Also set the number of examples to the number of test examples. + assert(ctx.params.numTestExamples.nonEmpty, "You must specify test examples") + val p = ctx.params.copy(randomSeed = Some(x + 1), numExamples = ctx.params.numTestExamples) ctx.copy(params = p) case None => // Making a full copy to reset the internal seed. diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala index 83d1873..e59ad01 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala @@ -54,9 +54,9 @@ object MLLib extends Logging { val benchmarks = benchmarksDescriptions.map { mlb => new MLTransformerBenchmarkable(mlb.params, mlb.benchmark, sqlContext) } - logger.info(s"${benchmarks.size} benchmarks identified:") + println(s"${benchmarks.size} benchmarks identified:") val str = benchmarks.map(_.prettyPrint).mkString("\n") - logger.info(str) + println(str) logger.info("Starting experiments") val e = b.runExperiment( executionsToRun = benchmarks, diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala index 5c78784..951cd26 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLTransformerBenchmarkable.scala @@ -18,7 +18,7 @@ class MLTransformerBenchmarkable( private var trainingData: DataFrame = null private val param = MLBenchContext(params, sqlContext) - override val name = test.getClass.getCanonicalName + override val name = test.name override protected val executionMode: ExecutionMode = ExecutionMode.SparkPerfResults diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala new file mode 100644 index 0000000..6797e50 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/regression/GLMRegression.scala @@ -0,0 +1,55 @@ +package com.databricks.spark.sql.perf.mllib.regression + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator +import org.apache.spark.ml.evaluation.{Evaluator, RegressionEvaluator} +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.regression.GeneralizedLinearRegression +import org.apache.spark.ml.{ModelBuilder, Transformer} +import org.apache.spark.sql._ + + +object GLMRegression extends BenchmarkAlgorithm with TestFromTraining with + TrainingSetFromTransformer with ScoringWithEvaluator { + + override protected def initialData(ctx: MLBenchContext) = { + import ctx.params._ + DataGenerator.generateFeatures( + ctx.sqlContext, + numExamples, + ctx.seed(), + numPartitions, + numFeatures) + } + + override protected def trueModel(ctx: MLBenchContext): Transformer = { + import ctx.params._ + val rng = ctx.newGenerator() + val coefficients = + Vectors.dense(Array.fill[Double](ctx.params.numFeatures)(2 * rng.nextDouble() - 1)) + // Small intercept to prevent some skew in the data. + val intercept = 0.01 * (2 * rng.nextDouble - 1) + val m = ModelBuilder.newGLR(coefficients, intercept) + m.set(m.link, link.get) + m.set(m.family, family.get) + m + } + + override def train( + ctx: MLBenchContext, + trainingSet: DataFrame): Transformer = { + logger.info(s"$this: train: trainingSet=${trainingSet.schema}") + import ctx.params._ + val glr = new GeneralizedLinearRegression() + .setLink(link) + .setFamily(family) + .setRegParam(regParam) + .setMaxIter(maxIter) + .setTol(tol) + glr.fit(trainingSet) + } + + override protected def evaluator(ctx: MLBenchContext): Evaluator = + new RegressionEvaluator() +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/yaml.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/yaml.scala index ee40461..edd54a7 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/yaml.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/yaml.scala @@ -111,12 +111,11 @@ object YamlConfig { } // Some ugly internals to make simple constructs -package object ccFromMap { +object ccFromMap { // Builds a case class from a map. // (taken from stack overflow) // if strict, will report an error if some unknown arguments are passed to the constructor def fromMap[T: TypeTag: ClassTag](m: Map[String,_], strict: Boolean) = { - scala.reflect.runtime.universe val rm = runtimeMirror(classTag[T].runtimeClass.getClassLoader) val classTest = typeOf[T].typeSymbol.asClass @@ -144,7 +143,8 @@ package object ccFromMap { m.get(paramName).getOrElse(throw new IllegalArgumentException("Map is missing required parameter named " + paramName)) }) - constructorMirror(constructorArgs:_*).asInstanceOf[T] + val res = constructorMirror(constructorArgs:_*).asInstanceOf[T] + res } // TODO: handle scala.reflect.internal.MissingRequirementError diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index d9e93f0..1effa9c 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -113,9 +113,12 @@ case class MLParams( numTestExamples: Option[Long] = None, numPartitions: Option[Int] = None, // *** Specialized and sorted by name *** + elasticNetParam: Option[Double] = None, + family: Option[String] = None, + link: Option[String] = None, + k: Option[Int] = None, ldaDocLength: Option[Int] = None, ldaNumVocabulary: Option[Int] = None, - k: Option[Int] = None, maxIter: Option[Int] = None, numFeatures: Option[Int] = None, optimizer: Option[String] = None, diff --git a/src/main/scala/configs/mllib-small.yaml b/src/main/scala/configs/mllib-small.yaml index eea6c54..5d4fac7 100644 --- a/src/main/scala/configs/mllib-small.yaml +++ b/src/main/scala/configs/mllib-small.yaml @@ -3,6 +3,7 @@ timeoutSeconds: 1000 common: numFeatures: 10 numExamples: [1, 3] + numTestExamples: 100 numPartitions: 3 randomSeed: [1, 2, 3] benchmarks: @@ -14,6 +15,7 @@ benchmarks: - name: clustering.LDA params: numExamples: 10 + numTestExamples: 10 ldaDocLength: 20 ldaNumVocabulary: 4 k: 5 @@ -21,3 +23,13 @@ benchmarks: optimizer: - em - online + - name: regression.GLMRegression + params: + numExamples: 100 + numTestExamples: 10 + numFeatures: 5 + link: log + family: gaussian + tol: 0.0 + maxIter: 10 + regParam: 0.1 diff --git a/src/main/scala/org/apache/spark/ml/ModelBuilder.scala b/src/main/scala/org/apache/spark/ml/ModelBuilder.scala index 1f8174f..089376a 100644 --- a/src/main/scala/org/apache/spark/ml/ModelBuilder.scala +++ b/src/main/scala/org/apache/spark/ml/ModelBuilder.scala @@ -2,6 +2,7 @@ package org.apache.spark.ml import org.apache.spark.ml.classification.LogisticRegressionModel import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.regression.GeneralizedLinearRegressionModel /** * Helper for creating MLlib models which have private constructors. @@ -13,4 +14,9 @@ object ModelBuilder { intercept: Double): LogisticRegressionModel = { new LogisticRegressionModel("lr", coefficients, intercept) } + + def newGLR( + coefficients: Vector, + intercept: Double): GeneralizedLinearRegressionModel = + new GeneralizedLinearRegressionModel("glr-uid", coefficients, intercept) } \ No newline at end of file diff --git a/version.sbt b/version.sbt index f4273b3..078e0fd 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.4.9-SNAPSHOT" +version in ThisBuild := "0.4.10-SNAPSHOT"