diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/data/ItemSetGenerator.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/ItemSetGenerator.scala new file mode 100644 index 0000000..ec47b87 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/ItemSetGenerator.scala @@ -0,0 +1,59 @@ +package com.databricks.spark.sql.perf.mllib.data + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.mllib.random.{PoissonGenerator, RandomDataGenerator} + +class ItemSetGenerator( + val numItems: Int, + val avgItemSetSize: Int) + extends RandomDataGenerator[Array[String]] { + + assert(avgItemSetSize > 2) + assert(numItems > 2) + + private val rng = new java.util.Random() + private val itemSetSizeRng = new PoissonGenerator(avgItemSetSize - 2) + private val itemRng = new PoissonGenerator(numItems / 2.0) + + override def setSeed(seed: Long) { + rng.setSeed(seed) + itemSetSizeRng.setSeed(seed) + itemRng.setSeed(seed) + } + + override def nextValue(): Array[String] = { + // 1. generate size of itemset + val size = DataGenUtil.nextPoisson(itemSetSizeRng, v => v >= 1 && v <= numItems).toInt + val arrayBuff = new ArrayBuffer[Int](size + 2) + + // 2. generate items in the itemset + var i = 0 + while (i < size) { + val nextVal = DataGenUtil.nextPoisson(itemRng, (item: Double) => { + item >= 0 && item < numItems && !arrayBuff.contains(item) + }).toInt + arrayBuff.append(nextVal) + i += 1 + } + + // 3 generate association rules by adding two computed items + + // 3.1 add a new item = (firstItem + numItems / 2) % numItems + val newItem1 = (arrayBuff(0) + numItems / 2) % numItems + if (!arrayBuff.contains(newItem1)) { + arrayBuff.append(newItem1) + } + // 3.2 add a new item = (firstItem + secondItem) % numItems + if (arrayBuff.size >= 2) { + val newItem2 = (arrayBuff(0) + arrayBuff(1)) % numItems + if (!arrayBuff.contains(newItem2)) { + arrayBuff.append(newItem2) + } + } + arrayBuff.map(_.toString).toArray + } + + override def copy(): ItemSetGenerator + = new ItemSetGenerator(numItems, avgItemSetSize) +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala index e0d98cf..d283815 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/data/dataGeneration.scala @@ -5,8 +5,7 @@ import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.recommendation.ALS.Rating import org.apache.spark.mllib.random._ import org.apache.spark.rdd.{PairRDDFunctions, RDD} -import org.apache.spark.sql.{SQLContext, DataFrame} - +import org.apache.spark.sql.{DataFrame, SQLContext} object DataGenerator { @@ -128,6 +127,22 @@ object DataGenerator { new DocGenerator(vocabSize, avgDocLength), numExamples, numPartitions, seed) sql.createDataFrame(rdd.map(Tuple1.apply)).toDF(dataColName) } + + def generateItemSet( + sql: SQLContext, + numExamples: Long, + seed: Long, + numPartitions: Int, + numItems: Int, + avgItemSetSize: Int): DataFrame = { + val rdd: RDD[Array[String]] = RandomRDDs.randomRDD( + sql.sparkContext, + new ItemSetGenerator(numItems, avgItemSetSize), + numExamples, + numPartitions, + seed) + sql.createDataFrame(rdd.map(Tuple1.apply)).toDF("items") + } } diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/fpm/FPGrowth.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/fpm/FPGrowth.scala new file mode 100644 index 0000000..35cf8a7 --- /dev/null +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/fpm/FPGrowth.scala @@ -0,0 +1,31 @@ +package com.databricks.spark.sql.perf.mllib.fpm + +import org.apache.spark.ml +import org.apache.spark.ml.PipelineStage +import org.apache.spark.sql.DataFrame + +import com.databricks.spark.sql.perf.mllib._ +import com.databricks.spark.sql.perf.mllib.OptionImplicits._ +import com.databricks.spark.sql.perf.mllib.data.DataGenerator + + +/** Object containing methods used in performance tests for FPGrowth */ +object FPGrowth extends BenchmarkAlgorithm with TestFromTraining { + + def trainingDataSet(ctx: MLBenchContext): DataFrame = { + import ctx.params._ + + DataGenerator.generateItemSet( + ctx.sqlContext, + numExamples, + ctx.seed(), + numPartitions, + numItems, + itemSetSize) + } + + override def getPipelineStage(ctx: MLBenchContext): PipelineStage = { + new ml.fpm.FPGrowth() + .setItemsCol("items") + } +} diff --git a/src/main/scala/com/databricks/spark/sql/perf/results.scala b/src/main/scala/com/databricks/spark/sql/perf/results.scala index 1ec93fd..a9f46c1 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/results.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/results.scala @@ -124,6 +124,7 @@ class MLParams( val elasticNetParam: Option[Double] = None, val family: Option[String] = None, val featureArity: Option[Int] = None, + val itemSetSize: Option[Int] = None, val k: Option[Int] = None, val link: Option[String] = None, val maxIter: Option[Int] = None, @@ -164,6 +165,7 @@ class MLParams( elasticNetParam: Option[Double] = elasticNetParam, family: Option[String] = family, featureArity: Option[Int] = featureArity, + itemSetSize: Option[Int] = itemSetSize, k: Option[Int] = k, link: Option[String] = link, maxIter: Option[Int] = maxIter, @@ -181,8 +183,8 @@ class MLParams( new MLParams(randomSeed = randomSeed, numExamples = numExamples, numTestExamples = numTestExamples, numPartitions = numPartitions, bucketizerNumBuckets = bucketizerNumBuckets, depth = depth, docLength = docLength, - elasticNetParam = elasticNetParam, family = family, featureArity = featureArity, k = k, - link = link, maxIter = maxIter, + elasticNetParam = elasticNetParam, family = family, featureArity = featureArity, + itemSetSize = itemSetSize, k = k, link = link, maxIter = maxIter, numClasses = numClasses, numFeatures = numFeatures, numInputCols = numInputCols, numItems = numItems, numUsers = numUsers, optimizer = optimizer, regParam = regParam, rank = rank, smoothing = smoothing, tol = tol, vocabSize = vocabSize) diff --git a/src/main/scala/configs/mllib-small.yaml b/src/main/scala/configs/mllib-small.yaml index 111df0c..d01f2be 100644 --- a/src/main/scala/configs/mllib-small.yaml +++ b/src/main/scala/configs/mllib-small.yaml @@ -146,3 +146,8 @@ benchmarks: depth: 3 numFeatures: 5 maxIter: 3 + - name: fpm.FPGrowth + params: + numExamples: 200 + numItems: 6 + itemSetSize: 3