make number of partitions configurable

This commit is contained in:
Davies Liu 2016-05-24 10:40:51 -07:00
parent 375e116b1a
commit c087b68a5c

View File

@ -34,7 +34,6 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
case class Table(name: String, partitionColumns: Seq[String], fields: StructField*) {
val schema = StructType(fields)
val partitions = if (partitionColumns.isEmpty) 1 else 100
def nonPartitioned: Table = {
Table(name, Nil, fields : _*)
@ -44,7 +43,8 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
* If convertToSchema is true, the data from generator will be parsed into columns and
* converted to `schema`. Otherwise, it just outputs the raw data (as a single STRING column).
*/
def df(convertToSchema: Boolean) = {
def df(convertToSchema: Boolean, numPartition: Int) = {
val partitions = if (partitionColumns.isEmpty) 1 else numPartition
val generatedData = {
sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
val localToolsDir = if (new java.io.File(dsdgen).exists) {
@ -121,10 +121,11 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
format: String,
overwrite: Boolean,
clusterByPartitionColumns: Boolean,
filterOutNullPartitionValues: Boolean): Unit = {
filterOutNullPartitionValues: Boolean,
numPartitions: Int): Unit = {
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore
val data = df(format != "text")
val data = df(format != "text", numPartitions)
val tempTableName = s"${name}_text"
data.registerTempTable(tempTableName)
@ -199,7 +200,8 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
useDoubleForDecimal: Boolean,
clusterByPartitionColumns: Boolean,
filterOutNullPartitionValues: Boolean,
tableFilter: String = ""): Unit = {
tableFilter: String = "",
numPartitions: Int = 100): Unit = {
var tablesToBeGenerated = if (partitionTables) {
tables
} else {
@ -222,7 +224,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
withSpecifiedDataType.foreach { table =>
val tableLocation = s"$location/${table.name}"
table.genData(tableLocation, format, overwrite, clusterByPartitionColumns,
filterOutNullPartitionValues)
filterOutNullPartitionValues, numPartitions)
}
}