diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/data/RatingGenerator.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/RatingGenerator.scala new file mode 100644 index 0000000..b1b197a --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/RatingGenerator.scala @@ -0,0 +1,35 @@ +package com.databricks.spark.sql.perf.mllib.data + +import org.apache.spark.ml.recommendation.ALS.Rating +import org.apache.spark.mllib.random.RandomDataGenerator + +import scala.collection.mutable + +class RatingGenerator( + private val numUsers: Int, + private val numProducts: Int, + private val implicitPrefs: Boolean) extends RandomDataGenerator[Rating[Int]] { + + private val rng = new java.util.Random() + + private val observed = new mutable.HashMap[(Int, Int), Boolean]() + + override def nextValue(): Rating[Int] = { + var tuple = (rng.nextInt(numUsers),rng.nextInt(numProducts)) + while (observed.getOrElse(tuple,false)){ + tuple = (rng.nextInt(numUsers),rng.nextInt(numProducts)) + } + observed += (tuple -> true) + + val rating = if (implicitPrefs) rng.nextInt(2)*1.0 else rng.nextDouble()*5 + + new Rating(tuple._1, tuple._2, rating.toFloat) + } + + override def setSeed(seed: Long) { + rng.setSeed(seed) + } + + override def copy(): RatingGenerator = + new RatingGenerator(numUsers, numProducts, implicitPrefs) +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala index af10387..07a68a6 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala @@ -2,8 +2,9 @@ package com.databricks.spark.sql.perf.mllib.data import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.recommendation.ALS.Rating import org.apache.spark.mllib.random._ -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{PairRDDFunctions, RDD} import org.apache.spark.sql.{SQLContext, DataFrame} @@ -53,11 +54,61 @@ object DataGenerator { seed) sql.createDataFrame(rdd.map(Tuple1.apply)).toDF("features") } + + def generateRatings( + sql: SQLContext, + numUsers: Int, + numProducts: Int, + numExamples: Long, + numTestExamples: Long, + implicitPrefs: Boolean, + numPartitions: Int, + seed: Long): (DataFrame, DataFrame) = { + + val sc = sql.sparkContext + val train = RandomRDDs.randomRDD(sc, + new RatingGenerator(numUsers, numProducts, implicitPrefs), + numExamples, numPartitions, seed).cache() + + val test = RandomRDDs.randomRDD(sc, + new RatingGenerator(numUsers, numProducts, implicitPrefs), + numTestExamples, numPartitions, seed + 24) + + // Now get rid of duplicate ratings and remove non-existant userID's + // and prodID's from the test set + val commons: PairRDDFunctions[(Int,Int),Rating[Int]] = + new PairRDDFunctions(train.keyBy(rating => (rating.user, rating.item)).cache()) + + val exact = commons.join(test.keyBy(rating => (rating.user, rating.item))) + + val trainPruned = commons.subtractByKey(exact).map(_._2).cache() + + // Now get rid of users that don't exist in the train set + val trainUsers: RDD[(Int,Rating[Int])] = trainPruned.keyBy(rating => rating.user) + val testUsers: PairRDDFunctions[Int,Rating[Int]] = + new PairRDDFunctions(test.keyBy(rating => rating.user)) + val testWithAdditionalUsers = testUsers.subtractByKey(trainUsers) + + val userPrunedTestProds: RDD[(Int,Rating[Int])] = + testUsers.subtractByKey(testWithAdditionalUsers).map(_._2).keyBy(rating => rating.item) + + val trainProds: RDD[(Int,Rating[Int])] = trainPruned.keyBy(rating => rating.item) + + val testWithAdditionalProds = + new PairRDDFunctions[Int, Rating[Int]](userPrunedTestProds).subtractByKey(trainProds) + val finalTest = + new PairRDDFunctions[Int, Rating[Int]](userPrunedTestProds) + .subtractByKey(testWithAdditionalProds) + .map(_._2) + + (sql.createDataFrame(trainPruned), sql.createDataFrame(finalTest)) + } } /** * Generator for a feature vector which can include a mix of categorical and continuous features. + * * @param featureArity Length numFeatures, where 0 indicates continuous feature and > 0 * indicates a categorical feature of that arity. */ diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/recommendation/ALS.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/recommendation/ALS.scala new file mode 100644 index 0000000..f6dcb7b --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/recommendation/ALS.scala @@ -0,0 +1,53 @@ +package com.databricks.spark.sql.perf.mllib.recommendation + +import org.apache.spark.ml +import org.apache.spark.ml.evaluation.{RegressionEvaluator, Evaluator} +import org.apache.spark.ml.{Transformer, Estimator} +import org.apache.spark.sql._ + +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator +import com.databricks.spark.sql.perf.mllib.{ScoringWithEvaluator, BenchmarkAlgorithm, MLBenchContext} + +object ALS extends BenchmarkAlgorithm with ScoringWithEvaluator { + + override def trainingDataSet(ctx: MLBenchContext): DataFrame = { + import ctx.params._ + DataGenerator.generateRatings( + ctx.sqlContext, + numUsers, + numItems, + numExamples, + numTestExamples, + implicitPrefs = false, + numPartitions, + ctx.seed())._1 + } + + override def testDataSet(ctx: MLBenchContext): DataFrame = { + import ctx.params._ + DataGenerator.generateRatings( + ctx.sqlContext, + numUsers, + numItems, + numExamples, + numTestExamples, + implicitPrefs = false, + numPartitions, + ctx.seed())._2 + } + + override def getEstimator(ctx: MLBenchContext): Estimator[_] = { + import ctx.params._ + new ml.recommendation.ALS() + .setSeed(ctx.seed()) + .setRegParam(regParam) + .setNumBlocks(numPartitions) + .setRank(rank) + .setMaxIter(maxIter) + } + + override protected def evaluator(ctx: MLBenchContext): Evaluator = { + new RegressionEvaluator().setLabelCol("rating") + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index 62a2435..f133ef6 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -123,8 +123,11 @@ case class MLParams( maxIter: Option[Int] = None, numClasses: Option[Int] = None, numFeatures: Option[Int] = None, + numItems: Option[Int] = None, + numUsers: Option[Int] = None, optimizer: Option[String] = None, regParam: Option[Double] = None, + rank: Option[Int] = None, tol: Option[Double] = None ) diff --git a/src/main/scala/configs/mllib-small.yaml b/src/main/scala/configs/mllib-small.yaml index 24408f8..6beeedd 100644 --- a/src/main/scala/configs/mllib-small.yaml +++ b/src/main/scala/configs/mllib-small.yaml @@ -5,7 +5,7 @@ common: numExamples: [1, 3] numTestExamples: 100 numPartitions: 3 - randomSeed: [1, 2, 3] + randomSeed: [1] benchmarks: - name: classification.LogisticRegression params: @@ -71,3 +71,12 @@ benchmarks: regParam: 0.1 tol: [0.0] maxIter: 10 + - name: recommendation.ALS + params: + numExamples: 100 + numTestExamples: 100 + numUsers: 100 + numItems: 100 + regParam: 0.1 + rank: 10 + maxIter: 6