From 9febc34f66cb465d23ab59b2bd123ae3354aa7a0 Mon Sep 17 00:00:00 2001 From: Siddharth Murching Date: Mon, 28 Aug 2017 13:23:59 -0700 Subject: [PATCH] Refactor MLParams for spark-sql-perf (#114) A case class (MLParams) is currently used to store/access parameters for ML tests in spark-sql-perf. With the addition of new ML tests to spark-sql-perf (in this PR: #112), the number of ML-related test params will be > 22, but Scala only allows up to 22 params in a case class. This PR addresses the issue by: * Introducing a new MLParameters class (class MLParameters) that provides access to the same parameters as MLParams, except as a class instead of a case class. * Replacing usages of MLParams with MLParameters * Storing the members of MLParameters in BenchmarkResult.parameters for logging/persistence. Tested by running default performance tests in src/main/scala/configs/mllib-small.yaml. --- .../spark/sql/perf/mllib/MLBenchmarks.scala | 2 +- .../mllib/MLPipelineStageBenchmarkable.scala | 6 +- .../sql/perf/mllib/ReflectionUtils.scala | 39 +++++++ .../mllib/classification/NaiveBayes.scala | 2 +- .../spark/sql/perf/mllib/clustering/LDA.scala | 6 +- .../databricks/spark/sql/perf/results.scala | 108 +++++++++++++----- src/main/scala/configs/mllib-small.yaml | 6 +- 7 files changed, 129 insertions(+), 40 deletions(-) create mode 100644 src/main/scala/com/databricks/spark/sql/perf/mllib/ReflectionUtils.scala diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala index b25453b..1103804 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala @@ -17,7 +17,7 @@ object MLBenchmarks { val benchmarks: Seq[MLTest] = List( MLTest( LogisticRegression, - MLParams( + new MLParams( numFeatures = 10, numExamples = 10, numTestExamples = 10, diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala index ffef088..b9b2827 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala @@ -80,17 +80,15 @@ class MLPipelineStageBenchmarkable( BenchmarkResult( name = name, mode = executionMode.toString, - parameters = Map.empty, + parameters = params.toMap, executionTime = Some(trainingTime.toMillis), - mlParams = Some(params), mlResult = Some(ml)) } catch { case e: Exception => BenchmarkResult( name = name, mode = executionMode.toString, - parameters = Map.empty, - mlParams = Some(params), + parameters = params.toMap, failure = Some(Failure(e.getClass.getSimpleName, e.getMessage + ":\n" + e.getStackTraceString))) } finally { diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/ReflectionUtils.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/ReflectionUtils.scala new file mode 100644 index 0000000..75a2949 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/ReflectionUtils.scala @@ -0,0 +1,39 @@ +package com.databricks.spark.sql.perf.mllib + +import scala.reflect.ClassTag +import scala.reflect.runtime.universe._ + +/** Exposes methods to simplify implementation of classes like MLParams. */ +private[perf] object ReflectionUtils { + + private def getConstructor[T: TypeTag: ClassTag](obj: T): MethodSymbol = { + typeOf[T].declaration(nme.CONSTRUCTOR).asMethod + } + + /** + * Given an instance [[obj]] of a class whose constructor arguments are all of type Option[Any], + * returns a map of key-value pairs (argName -> argValue) where argName is the name + * of a constructor argument with a defined (not None) value and argValue is the corresponding + * value. + */ + def getConstructorArgs[T: TypeTag: ClassTag](obj: T): Map[String, Any] = { + // Get constructor of passed-in instance + val constructor = getConstructor(obj) + // Include each constructor argument not equal to None in the output map + constructor.paramss.flatten.flatMap { (param: Symbol) => + // Get name and value of the constructor argument + val paramName = param.name.toString + val getter = obj.getClass.getDeclaredField(paramName) + getter.setAccessible(true) + val paramValue = getter.get(obj) + // If the constructor argument is defined, include it in our output map + paramValue match { + case value: Option[Any] => if (value.isDefined) Seq(paramName -> paramValue) else Seq.empty + case _ => throw new UnsupportedOperationException("ReflectionUtils.getConstructorArgs " + + "can only be called on instances of classes whose constructor arguments are all of " + + s"type Option[Any]; constructor argument ${paramName} had invalid type.") + } + }.toMap + } + +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/NaiveBayes.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/NaiveBayes.scala index c866635..0c0a9d9 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/NaiveBayes.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/classification/NaiveBayes.scala @@ -57,7 +57,7 @@ object NaiveBayes extends BenchmarkAlgorithm override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { import ctx.params._ new ml.classification.NaiveBayes() - .setSmoothing(naiveBayesSmoothing) + .setSmoothing(smoothing) } override protected def evaluator(ctx: MLBenchContext): Evaluator = diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala index e4a836a..dfe9a2b 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/clustering/LDA.scala @@ -24,14 +24,14 @@ object LDA extends BenchmarkAlgorithm with TestFromTraining { numPartitions ) val seed: Int = randomSeed - val docLength = ldaDocLength.get - val numVocab = ldaNumVocabulary.get + val docLen = docLength.get + val numVocab = vocabSize.get val data: RDD[(Long, Vector)] = rdd.mapPartitionsWithIndex { (idx, partition) => val rng = new Well19937c(seed ^ idx) partition.map { docIndex => var currentSize = 0 val entries = MHashMap[Int, Int]() - while (currentSize < docLength) { + while (currentSize < docLen) { val index = rng.nextInt(numVocab) entries(index) = entries.getOrElse(index, 0) + 1 currentSize += 1 diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index 5ad1762..11cfcde 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -16,6 +16,8 @@ package com.databricks.spark.sql.perf +import com.databricks.spark.sql.perf.mllib.ReflectionUtils + /** * The performance results of all given queries for a single iteration. * @@ -81,7 +83,6 @@ case class BenchmarkResult( breakDown: Seq[BreakdownResult] = Nil, queryExecution: Option[String] = None, failure: Option[Failure] = None, - mlParams: Option[MLParams] = None, mlResult: Option[MLResult] = None) /** @@ -104,37 +105,88 @@ case class BreakdownResult( case class Failure(className: String, message: String) -// KEEP ARGUMENTS SORTED BY NAME. -// It simplifies lookup when checking if a parameter is here already. -case class MLParams( +/** + * Class wrapping parameters for ML tests. + * + * KEEP CONSTRUCTOR ARGUMENTS SORTED BY NAME. + * It simplifies lookup when checking if a parameter is here already. + */ +class MLParams( // *** Common to all algorithms *** - randomSeed: Option[Int] = Some(42), - numExamples: Option[Long] = None, - numTestExamples: Option[Long] = None, - numPartitions: Option[Int] = None, + val randomSeed: Option[Int] = Some(42), + val numExamples: Option[Long] = None, + val numTestExamples: Option[Long] = None, + val numPartitions: Option[Int] = None, // *** Specialized and sorted by name *** - bucketizerNumBuckets: Option[Int] = None, - depth: Option[Int] = None, - elasticNetParam: Option[Double] = None, - family: Option[String] = None, - k: Option[Int] = None, - ldaDocLength: Option[Int] = None, - ldaNumVocabulary: Option[Int] = None, - link: Option[String] = None, - maxIter: Option[Int] = None, - naiveBayesSmoothing: Option[Double] = None, - numClasses: Option[Int] = None, - numFeatures: Option[Int] = None, - numItems: Option[Int] = None, - numUsers: Option[Int] = None, - optimizer: Option[String] = None, - regParam: Option[Double] = None, - rank: Option[Int] = None, - tol: Option[Double] = None -) + val bucketizerNumBuckets: Option[Int] = None, + val depth: Option[Int] = None, + val docLength: Option[Int] = None, + val elasticNetParam: Option[Double] = None, + val family: Option[String] = None, + val k: Option[Int] = None, + val link: Option[String] = None, + val maxIter: Option[Int] = None, + val numClasses: Option[Int] = None, + val numFeatures: Option[Int] = None, + val numItems: Option[Int] = None, + val numUsers: Option[Int] = None, + val optimizer: Option[String] = None, + val regParam: Option[Double] = None, + val rank: Option[Int] = None, + val smoothing: Option[Double] = None, + val tol: Option[Double] = None, + val vocabSize: Option[Int] = None) { + + /** + * Returns a map of param names to string representations of their values. Only params that + * were defined (i.e., not equal to None) are included in the map. + */ + def toMap: Map[String, String] = { + val allParams = ReflectionUtils.getConstructorArgs(this) + allParams.map { case (key: String, value: Any) => + key -> value.toString + } + } + + /** Returns a copy of the current MLParams instance */ + def copy( + // *** Common to all algorithms *** + randomSeed: Option[Int] = randomSeed, + numExamples: Option[Long] = numExamples, + numTestExamples: Option[Long] = numTestExamples, + numPartitions: Option[Int] = numPartitions, + // *** Specialized and sorted by name *** + bucketizerNumBuckets: Option[Int] = bucketizerNumBuckets, + depth: Option[Int] = depth, + docLength: Option[Int] = docLength, + elasticNetParam: Option[Double] = elasticNetParam, + family: Option[String] = family, + k: Option[Int] = k, + link: Option[String] = link, + maxIter: Option[Int] = maxIter, + numClasses: Option[Int] = numClasses, + numFeatures: Option[Int] = numFeatures, + numItems: Option[Int] = numItems, + numUsers: Option[Int] = numUsers, + vocabSize: Option[Int] = vocabSize, + optimizer: Option[String] = optimizer, + regParam: Option[Double] = regParam, + rank: Option[Int] = rank, + smoothing: Option[Double] = smoothing, + tol: Option[Double] = tol): MLParams = { + new MLParams(randomSeed = randomSeed, numExamples = numExamples, + numTestExamples = numTestExamples, numPartitions = numPartitions, + bucketizerNumBuckets = bucketizerNumBuckets, depth = depth, docLength = docLength, + elasticNetParam = elasticNetParam, family = family, k = k, link = link, maxIter = maxIter, + numClasses = numClasses, numFeatures = numFeatures, + numItems = numItems, numUsers = numUsers, optimizer = optimizer, regParam = regParam, + rank = rank, smoothing = smoothing, tol = tol, vocabSize = vocabSize) + } +} + object MLParams { - val empty = MLParams() + val empty = new MLParams() } /** diff --git a/src/main/scala/configs/mllib-small.yaml b/src/main/scala/configs/mllib-small.yaml index dcde90c..c0bb650 100644 --- a/src/main/scala/configs/mllib-small.yaml +++ b/src/main/scala/configs/mllib-small.yaml @@ -17,8 +17,8 @@ benchmarks: params: numExamples: 10 numTestExamples: 10 - ldaDocLength: 20 - ldaNumVocabulary: 4 + docLength: 20 + vocabSize: 4 k: 5 maxIter: 10 optimizer: @@ -87,6 +87,6 @@ benchmarks: - name: classification.NaiveBayes params: numExamples: 100 - naiveBayesSmoothing: 1.0 + smoothing: 1.0 numClasses: 10 numFeatures: [10]