Added decision tree, forest, GBT tests

This commit is contained in:
Joseph K. Bradley 2016-06-30 10:38:24 -07:00
parent 33a1e55366
commit ecf2eedbb8
10 changed files with 157 additions and 116 deletions

View File

@ -2,8 +2,8 @@ package com.databricks.spark.sql.perf.mllib
import com.typesafe.scalalogging.slf4j.Logging
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator, Evaluator}
import org.apache.spark.ml.{Estimator, Transformer}
import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
@ -25,10 +25,10 @@ trait BenchmarkAlgorithm extends Logging {
def testDataSet(ctx: MLBenchContext): DataFrame
@throws[Exception]("if training fails")
def train(
ctx: MLBenchContext,
trainingSet: DataFrame): Transformer
/**
* Create an [[Estimator]] with params set from the given [[MLBenchContext]].
*/
def getEstimator(ctx: MLBenchContext): Estimator[_]
/**
* The unnormalized score of the training procedure on a dataset. The normalization is

View File

@ -2,10 +2,12 @@ package com.databricks.spark.sql.perf.mllib
import com.databricks.spark.sql.perf._
import com.typesafe.scalalogging.slf4j.Logging
import org.apache.spark.sql._
import org.apache.spark.sql._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.ml.Transformer
class MLTransformerBenchmarkable(
params: MLParams,
test: BenchmarkAlgorithm,
@ -44,7 +46,12 @@ class MLTransformerBenchmarkable(
description: String,
messages: ArrayBuffer[String]): BenchmarkResult = {
try {
val (trainingTime, model) = measureTime(test.train(param, trainingData))
val (trainingTime, model: Transformer) = measureTime {
logger.info(s"$this: train: trainingSet=${trainingData.schema}")
val estimator = test.getEstimator(param)
estimator.fit(trainingData)
//test.train(param, trainingData)
}
logger.info(s"model: $model")
val (_, scoreTraining) = measureTime {
test.score(param, trainingData, model)

View File

@ -1,45 +1,46 @@
package com.databricks.spark.sql.perf.mllib.classification
import org.apache.spark.ml.ModelBuilder
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.{Estimator, ModelBuilder, Transformer}
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator, Evaluator}
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator}
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
import com.databricks.spark.sql.perf.mllib._
import com.databricks.spark.sql.perf.mllib.data.DataGenerator
object DecisionTreeClassification extends BenchmarkAlgorithm
abstract class TreeOrForestClassification extends BenchmarkAlgorithm
with TestFromTraining with TrainingSetFromTransformer with ScoringWithEvaluator {
def getFeatureArity(ctx: MLBenchContext): Array[Int] = {
val numFeatures = ctx.params.numFeatures
val fourthFeatures = numFeatures / 4
Array.fill[Int](fourthFeatures)(2) ++ // low-arity categorical
Array.fill[Int](fourthFeatures)(20) ++ // high-arity categorical
Array.fill[Int](numFeatures - 2 * fourthFeatures)(0) // continuous
}
override protected def initialData(ctx: MLBenchContext) = {
import ctx.params._
DataGenerator.generateFeatures(ctx.sqlContext, numExamples, ctx.seed(), numPartitions,
numFeatures)
DataGenerator.generateMixedFeatures(ctx.sqlContext, numExamples, ctx.seed(), numPartitions,
getFeatureArity(ctx))
}
override protected def trueModel(ctx: MLBenchContext): Transformer = {
//val rng = ctx.newGenerator()
val numFeatures = ctx.params.numFeatures.get
val fourthFeatures = numFeatures / 4
val featureArity: Array[Int] =
Array.fill[Int](fourthFeatures)(2) ++ // low-arity categorical
Array.fill[Int](fourthFeatures)(20) ++ // high-arity categorical
Array.fill[Int](numFeatures - 2 * fourthFeatures)(0) // continuous
ModelBuilder.newDecisionTreeClassificationModel(ctx.params.depth.get, ctx.params.numClasses.get,
featureArity, ctx.seed())
}
override def train(ctx: MLBenchContext, trainingSet: DataFrame): Transformer = {
logger.info(s"$this: train: trainingSet=${trainingSet.schema}")
import ctx.params._
new DecisionTreeClassifier()
.setMaxDepth(depth.get)
.fit(trainingSet)
ModelBuilder.newDecisionTreeClassificationModel(ctx.params.depth, ctx.params.numClasses,
getFeatureArity(ctx), ctx.seed())
}
override protected def evaluator(ctx: MLBenchContext): Evaluator =
new MulticlassClassificationEvaluator()
}
object DecisionTreeClassification extends TreeOrForestClassification {
override def getEstimator(ctx: MLBenchContext): Estimator[_] = {
import ctx.params._
new DecisionTreeClassifier()
.setMaxDepth(depth)
.setSeed(ctx.seed())
}
}

View File

@ -0,0 +1,48 @@
package com.databricks.spark.sql.perf.mllib.classification
import org.apache.spark.ml.{Estimator, ModelBuilder, Transformer}
import org.apache.spark.ml.classification.GBTClassifier
import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator}
import com.databricks.spark.sql.perf.mllib._
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
import com.databricks.spark.sql.perf.mllib.data.DataGenerator
object GBTClassification extends BenchmarkAlgorithm
with TestFromTraining with TrainingSetFromTransformer with ScoringWithEvaluator {
def getFeatureArity(ctx: MLBenchContext): Array[Int] = {
val numFeatures = ctx.params.numFeatures
val fourthFeatures = numFeatures / 4
Array.fill[Int](fourthFeatures)(2) ++ // low-arity categorical
Array.fill[Int](fourthFeatures)(20) ++ // high-arity categorical
Array.fill[Int](numFeatures - 2 * fourthFeatures)(0) // continuous
}
override protected def initialData(ctx: MLBenchContext) = {
import ctx.params._
DataGenerator.generateMixedFeatures(ctx.sqlContext, numExamples, ctx.seed(), numPartitions,
getFeatureArity(ctx))
}
override protected def trueModel(ctx: MLBenchContext): Transformer = {
// We add +1 to the depth to make it more likely that many iterations of boosting are needed
// to model the true tree.
ModelBuilder.newDecisionTreeClassificationModel(ctx.params.depth + 1, ctx.params.numClasses,
getFeatureArity(ctx), ctx.seed())
}
override def getEstimator(ctx: MLBenchContext): Estimator[_] = {
import ctx.params._
// TODO: subsamplingRate, featureSubsetStrategy
// TODO: cacheNodeIds, checkpoint?
new GBTClassifier()
.setMaxDepth(depth)
.setMaxIter(maxIter)
.setSeed(ctx.seed())
}
override protected def evaluator(ctx: MLBenchContext): Evaluator =
new MulticlassClassificationEvaluator()
}

View File

@ -3,19 +3,19 @@ package com.databricks.spark.sql.perf.mllib.classification
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
import com.databricks.spark.sql.perf.mllib._
import com.databricks.spark.sql.perf.mllib.data.DataGenerator
import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator, Evaluator}
import org.apache.spark.ml.{Transformer, ModelBuilder}
import org.apache.spark.ml.evaluation.{Evaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.{Estimator, ModelBuilder, Transformer}
import org.apache.spark.ml
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.DataFrame
object LogisticRegression extends BenchmarkAlgorithm
with TestFromTraining with TrainingSetFromTransformer with ScoringWithEvaluator {
override protected def initialData(ctx: MLBenchContext) = {
import ctx.params._
DataGenerator.generateFeatures(
DataGenerator.generateContinuousFeatures(
ctx.sqlContext,
numExamples,
ctx.seed(),
@ -32,15 +32,12 @@ object LogisticRegression extends BenchmarkAlgorithm
ModelBuilder.newLogisticRegressionModel(coefficients, intercept)
}
override def train(ctx: MLBenchContext,
trainingSet: DataFrame): Transformer = {
logger.info(s"$this: train: trainingSet=${trainingSet.schema}")
override def getEstimator(ctx: MLBenchContext): Estimator[_] = {
import ctx.params._
val lr = new ml.classification.LogisticRegression()
new ml.classification.LogisticRegression()
.setTol(tol)
.setMaxIter(maxIter)
.setRegParam(regParam)
lr.fit(trainingSet)
}
override protected def evaluator(ctx: MLBenchContext): Evaluator =

View File

@ -0,0 +1,20 @@
package com.databricks.spark.sql.perf.mllib.classification
import org.apache.spark.ml.Estimator
import org.apache.spark.ml.classification.RandomForestClassifier
import com.databricks.spark.sql.perf.mllib._
object RandomForestClassification extends TreeOrForestClassification {
override def getEstimator(ctx: MLBenchContext): Estimator[_] = {
import ctx.params._
// TODO: subsamplingRate, featureSubsetStrategy
// TODO: cacheNodeIds, checkpoint?
new RandomForestClassifier()
.setMaxDepth(depth.get)
.setNumTrees(maxIter.get)
.setSeed(ctx.seed())
}
}

View File

@ -1,13 +1,14 @@
package com.databricks.spark.sql.perf.mllib.clustering
import com.databricks.spark.sql.perf.mllib.{MLBenchContext, TestFromTraining, BenchmarkAlgorithm}
import com.databricks.spark.sql.perf.mllib.{BenchmarkAlgorithm, MLBenchContext, TestFromTraining}
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
import org.apache.commons.math3.random.Well19937c
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.Estimator
import org.apache.spark.ml
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.ml.linalg.{Vectors, Vector}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import scala.collection.mutable.{HashMap => MHashMap}
object LDA extends BenchmarkAlgorithm with TestFromTraining {
@ -40,15 +41,13 @@ object LDA extends BenchmarkAlgorithm with TestFromTraining {
ctx.sqlContext.createDataFrame(data).toDF("docIndex", "features")
}
override def train(ctx: MLBenchContext,
trainingSet: DataFrame): Transformer = {
override def getEstimator(ctx: MLBenchContext): Estimator[_] = {
import ctx.params._
new ml.clustering.LDA()
.setK(k)
.setSeed(randomSeed.toLong)
.setMaxIter(maxIter)
.setOptimizer(optimizer)
.fit(trainingSet)
.setK(k)
.setSeed(randomSeed.toLong)
.setMaxIter(maxIter)
.setOptimizer(optimizer)
}
// TODO(?) add a scoring method here.

View File

@ -3,82 +3,65 @@ package com.databricks.spark.sql.perf.mllib.data
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.mllib.random._
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, DataFrame}
object DataGenerator {
def generateFeatures(
def generateContinuousFeatures(
sql: SQLContext,
numExamples: Long,
seed: Long,
numPartitions: Int,
numFeatures: Int): DataFrame = {
val categoricalArities = Array.empty[Int]
val featureArity = Array.fill[Int](numFeatures)(0)
val rdd: RDD[Vector] = RandomRDDs.randomRDD(sql.sparkContext,
new FeaturesGenerator(categoricalArities, numFeatures),
numExamples, numPartitions, seed)
new FeaturesGenerator(featureArity), numExamples, numPartitions, seed)
sql.createDataFrame(rdd.map(Tuple1.apply)).toDF("features")
}
}
class BinaryLabeledDataGenerator(
private val numFeatures: Int,
private val threshold: Double) extends RandomDataGenerator[LabeledPoint] {
private val rng = new java.util.Random()
override def nextValue(): LabeledPoint = {
val y = if (rng.nextDouble() < threshold) 0.0 else 1.0
val x = Array.fill[Double](numFeatures) {
if (rng.nextDouble() < threshold) 0.0 else 1.0
}
???
// LabeledPoint(y, Vectors.dense(x))
def generateMixedFeatures(
sql: SQLContext,
numExamples: Long,
seed: Long,
numPartitions: Int,
featureArity: Array[Int]): DataFrame = {
val rdd: RDD[Vector] = RandomRDDs.randomRDD(sql.sparkContext,
new FeaturesGenerator(featureArity), numExamples, numPartitions, seed)
sql.createDataFrame(rdd.map(Tuple1.apply)).toDF("features")
}
override def setSeed(seed: Long) {
rng.setSeed(seed)
}
override def copy(): BinaryLabeledDataGenerator =
new BinaryLabeledDataGenerator(numFeatures, threshold)
}
/**
* Generator for a feature vector which can include a mix of categorical and continuous features.
* @param categoricalArities Specifies the number of categories for each categorical feature.
* @param numContinuous Number of continuous features. Feature values are in range [0,1].
* @param featureArity Length numFeatures, where 0 indicates continuous feature and > 0
* indicates a categorical feature of that arity.
*/
class FeaturesGenerator(val categoricalArities: Array[Int], val numContinuous: Int)
class FeaturesGenerator(val featureArity: Array[Int])
extends RandomDataGenerator[Vector] {
categoricalArities.foreach { arity =>
require(arity >= 2, s"FeaturesGenerator given categorical arity = $arity, " +
s"but arity should be >= 2.")
featureArity.foreach { arity =>
require(arity >= 0, s"FeaturesGenerator given categorical arity = $arity, " +
s"but arity should be >= 0.")
}
val numFeatures = categoricalArities.length + numContinuous
val numFeatures = featureArity.length
private val rng = new java.util.Random()
/**
* Generates vector with categorical features first, and continuous features in [0,1] second.
* Generates vector with features in the order given by [[featureArity]]
*/
override def nextValue(): Vector = {
// Feature ordering matches getCategoricalFeaturesInfo.
val arr = new Array[Double](numFeatures)
var j = 0
while (j < categoricalArities.length) {
arr(j) = rng.nextInt(categoricalArities(j))
j += 1
}
while (j < numFeatures) {
// Generating some centered data
arr(j) = 2 * rng.nextDouble() - 1
while (j < featureArity.length) {
if (featureArity(j) == 0)
arr(j) = 2 * rng.nextDouble() - 1 // centered uniform data
else
arr(j) = rng.nextInt(featureArity(j))
j += 1
}
Vectors.dense(arr)
@ -88,15 +71,5 @@ class FeaturesGenerator(val categoricalArities: Array[Int], val numContinuous: I
rng.setSeed(seed)
}
override def copy(): FeaturesGenerator = new FeaturesGenerator(categoricalArities, numContinuous)
/**
* @return categoricalFeaturesInfo Map storing arity of categorical features.
* E.g., an entry (n -> k) indicates that feature n is categorical
* with k categories indexed from 0: {0, 1, ..., k-1}.
*/
def getCategoricalFeaturesInfo: Map[Int, Int] = {
// Categorical features are indexed from 0 because of the implementation of nextValue().
categoricalArities.zipWithIndex.map(_.swap).toMap
}
}
override def copy(): FeaturesGenerator = new FeaturesGenerator(featureArity)
}

View File

@ -3,11 +3,11 @@ package com.databricks.spark.sql.perf.mllib.regression
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
import com.databricks.spark.sql.perf.mllib._
import com.databricks.spark.sql.perf.mllib.data.DataGenerator
import org.apache.spark.ml.evaluation.{Evaluator, RegressionEvaluator}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.GeneralizedLinearRegression
import org.apache.spark.ml.{ModelBuilder, Transformer}
import org.apache.spark.sql._
import org.apache.spark.ml.{Estimator, ModelBuilder, Transformer}
object GLMRegression extends BenchmarkAlgorithm with TestFromTraining with
@ -15,7 +15,7 @@ object GLMRegression extends BenchmarkAlgorithm with TestFromTraining with
override protected def initialData(ctx: MLBenchContext) = {
import ctx.params._
DataGenerator.generateFeatures(
DataGenerator.generateContinuousFeatures(
ctx.sqlContext,
numExamples,
ctx.seed(),
@ -36,18 +36,14 @@ object GLMRegression extends BenchmarkAlgorithm with TestFromTraining with
m
}
override def train(
ctx: MLBenchContext,
trainingSet: DataFrame): Transformer = {
logger.info(s"$this: train: trainingSet=${trainingSet.schema}")
override def getEstimator(ctx: MLBenchContext): Estimator[_] = {
import ctx.params._
val glr = new GeneralizedLinearRegression()
new GeneralizedLinearRegression()
.setLink(link)
.setFamily(family)
.setRegParam(regParam)
.setMaxIter(maxIter)
.setTol(tol)
glr.fit(trainingSet)
}
override protected def evaluator(ctx: MLBenchContext): Evaluator =

View File

@ -190,7 +190,7 @@ object TreeBuilder {
// nCatsSplit is in {1,...,arity-1}.
val nCatsSplit = rng.nextInt(featureArity(feature) - 1) + 1
val splitCategories: Array[Double] =
rng.shuffle(Range(0,featureArity(feature))).map(_.toDouble).toArray.take(nCatsSplit)
rng.shuffle(Range(0,featureArity(feature)).toList).toArray.map(_.toDouble).take(nCatsSplit)
new CategoricalSplit(featureIndex = feature,
_leftCategories = splitCategories, numCategories = featureArity(feature))
}