ALS algorithm for spark-sql-perf

This has been tested locally with a small amount of data.

I have not bothered to reimplement a more robust version of the ALS synthetic data generation, so it will still require some manual parameter tweaking as before.

Author: Timothy Hunter <timhunter@databricks.com>

Closes #76 from thunterdb/1607-als.
This commit is contained in:
Timothy Hunter 2016-07-05 15:54:08 -07:00
parent 93c0407bbe
commit 2672bcd5b7
5 changed files with 153 additions and 2 deletions

View File

@ -0,0 +1,35 @@
package com.databricks.spark.sql.perf.mllib.data
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.mllib.random.RandomDataGenerator
import scala.collection.mutable
class RatingGenerator(
private val numUsers: Int,
private val numProducts: Int,
private val implicitPrefs: Boolean) extends RandomDataGenerator[Rating[Int]] {
private val rng = new java.util.Random()
private val observed = new mutable.HashMap[(Int, Int), Boolean]()
override def nextValue(): Rating[Int] = {
var tuple = (rng.nextInt(numUsers),rng.nextInt(numProducts))
while (observed.getOrElse(tuple,false)){
tuple = (rng.nextInt(numUsers),rng.nextInt(numProducts))
}
observed += (tuple -> true)
val rating = if (implicitPrefs) rng.nextInt(2)*1.0 else rng.nextDouble()*5
new Rating(tuple._1, tuple._2, rating.toFloat)
}
override def setSeed(seed: Long) {
rng.setSeed(seed)
}
override def copy(): RatingGenerator =
new RatingGenerator(numUsers, numProducts, implicitPrefs)
}

View File

@ -2,8 +2,9 @@ package com.databricks.spark.sql.perf.mllib.data
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.mllib.random._
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.sql.{SQLContext, DataFrame}
@ -53,11 +54,61 @@ object DataGenerator {
seed)
sql.createDataFrame(rdd.map(Tuple1.apply)).toDF("features")
}
def generateRatings(
sql: SQLContext,
numUsers: Int,
numProducts: Int,
numExamples: Long,
numTestExamples: Long,
implicitPrefs: Boolean,
numPartitions: Int,
seed: Long): (DataFrame, DataFrame) = {
val sc = sql.sparkContext
val train = RandomRDDs.randomRDD(sc,
new RatingGenerator(numUsers, numProducts, implicitPrefs),
numExamples, numPartitions, seed).cache()
val test = RandomRDDs.randomRDD(sc,
new RatingGenerator(numUsers, numProducts, implicitPrefs),
numTestExamples, numPartitions, seed + 24)
// Now get rid of duplicate ratings and remove non-existant userID's
// and prodID's from the test set
val commons: PairRDDFunctions[(Int,Int),Rating[Int]] =
new PairRDDFunctions(train.keyBy(rating => (rating.user, rating.item)).cache())
val exact = commons.join(test.keyBy(rating => (rating.user, rating.item)))
val trainPruned = commons.subtractByKey(exact).map(_._2).cache()
// Now get rid of users that don't exist in the train set
val trainUsers: RDD[(Int,Rating[Int])] = trainPruned.keyBy(rating => rating.user)
val testUsers: PairRDDFunctions[Int,Rating[Int]] =
new PairRDDFunctions(test.keyBy(rating => rating.user))
val testWithAdditionalUsers = testUsers.subtractByKey(trainUsers)
val userPrunedTestProds: RDD[(Int,Rating[Int])] =
testUsers.subtractByKey(testWithAdditionalUsers).map(_._2).keyBy(rating => rating.item)
val trainProds: RDD[(Int,Rating[Int])] = trainPruned.keyBy(rating => rating.item)
val testWithAdditionalProds =
new PairRDDFunctions[Int, Rating[Int]](userPrunedTestProds).subtractByKey(trainProds)
val finalTest =
new PairRDDFunctions[Int, Rating[Int]](userPrunedTestProds)
.subtractByKey(testWithAdditionalProds)
.map(_._2)
(sql.createDataFrame(trainPruned), sql.createDataFrame(finalTest))
}
}
/**
* Generator for a feature vector which can include a mix of categorical and continuous features.
*
* @param featureArity Length numFeatures, where 0 indicates continuous feature and > 0
* indicates a categorical feature of that arity.
*/

View File

@ -0,0 +1,53 @@
package com.databricks.spark.sql.perf.mllib.recommendation
import org.apache.spark.ml
import org.apache.spark.ml.evaluation.{RegressionEvaluator, Evaluator}
import org.apache.spark.ml.{Transformer, Estimator}
import org.apache.spark.sql._
import com.databricks.spark.sql.perf.mllib.OptionImplicits._
import com.databricks.spark.sql.perf.mllib.data.DataGenerator
import com.databricks.spark.sql.perf.mllib.{ScoringWithEvaluator, BenchmarkAlgorithm, MLBenchContext}
object ALS extends BenchmarkAlgorithm with ScoringWithEvaluator {
override def trainingDataSet(ctx: MLBenchContext): DataFrame = {
import ctx.params._
DataGenerator.generateRatings(
ctx.sqlContext,
numUsers,
numItems,
numExamples,
numTestExamples,
implicitPrefs = false,
numPartitions,
ctx.seed())._1
}
override def testDataSet(ctx: MLBenchContext): DataFrame = {
import ctx.params._
DataGenerator.generateRatings(
ctx.sqlContext,
numUsers,
numItems,
numExamples,
numTestExamples,
implicitPrefs = false,
numPartitions,
ctx.seed())._2
}
override def getEstimator(ctx: MLBenchContext): Estimator[_] = {
import ctx.params._
new ml.recommendation.ALS()
.setSeed(ctx.seed())
.setRegParam(regParam)
.setNumBlocks(numPartitions)
.setRank(rank)
.setMaxIter(maxIter)
}
override protected def evaluator(ctx: MLBenchContext): Evaluator = {
new RegressionEvaluator().setLabelCol("rating")
}
}

View File

@ -123,8 +123,11 @@ case class MLParams(
maxIter: Option[Int] = None,
numClasses: Option[Int] = None,
numFeatures: Option[Int] = None,
numItems: Option[Int] = None,
numUsers: Option[Int] = None,
optimizer: Option[String] = None,
regParam: Option[Double] = None,
rank: Option[Int] = None,
tol: Option[Double] = None
)

View File

@ -5,7 +5,7 @@ common:
numExamples: [1, 3]
numTestExamples: 100
numPartitions: 3
randomSeed: [1, 2, 3]
randomSeed: [1]
benchmarks:
- name: classification.LogisticRegression
params:
@ -71,3 +71,12 @@ benchmarks:
regParam: 0.1
tol: [0.0]
maxIter: 10
- name: recommendation.ALS
params:
numExamples: 100
numTestExamples: 100
numUsers: 100
numItems: 100
regParam: 0.1
rank: 10
maxIter: 6