Fix the seed to 100 and use distribute by instead of order by.

This commit is contained in:
Yin Huai 2015-08-25 20:43:56 -07:00
parent 9936d49239
commit 06eb11f326
2 changed files with 32 additions and 9 deletions

View File

@ -16,7 +16,7 @@ import com.databricks.spark.sql.perf.tpcds.Tables
// dsdgenDir is the location of dsdgen tool installed in your machines.
val tables = new Tables(sqlContext, dsdgenDir, scaleFactor)
// Generate data.
tables.genData(location, format, overwrite, partitionTables, useDoubleForDecimal, orderByPartitionColumns)
tables.genData(location, format, overwrite, partitionTables, useDoubleForDecimal, clusterByPartitionColumns)
// Create metastore tables in a specified database for your data.
// Once tables are created, the current database will be switched to the specified database.
tables.createExternalTables(location, format, databaseName, overwrite)

View File

@ -51,10 +51,11 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
sys.error(s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
}
// Note: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100.
val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
val commands = Seq(
"bash", "-c",
s"cd $localToolsDir && ./dsdgen -table $name -filter Y -scale $scaleFactor $parallel")
s"cd $localToolsDir && ./dsdgen -table $name -filter Y -scale $scaleFactor -RNGSEED 100 $parallel")
println(commands)
commands.lines
}
@ -103,18 +104,39 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
Table(name, partitionColumns, newFields:_*)
}
def genData(location: String, format: String, overwrite: Boolean, orderByPartitionColumns: Boolean): Unit = {
def genData(location: String, format: String, overwrite: Boolean, clusterByPartitionColumns: Boolean): Unit = {
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore
val data = df
val tempTableName = s"${name}_text"
data.registerTempTable(tempTableName)
val writer = if (partitionColumns.nonEmpty) {
if (orderByPartitionColumns) {
df.orderBy(partitionColumns.map(col): _*).write
if (clusterByPartitionColumns) {
val columnString = data.schema.fields.map { field =>
field.name
}.mkString(",")
val partitionColumnString = partitionColumns.mkString(",")
val query =
s"""
|SELECT
| $columnString
|FROM
| $tempTableName
|DISTRIBUTED BY
| $partitionColumnString
""".stripMargin
val grouped = sqlContext.sql(query)
println(s"Pre-clustering with partitioning columns with query $query.")
logInfo(s"Pre-clustering with partitioning columns with query $query.")
grouped.write
} else {
df.write
data.write
}
} else {
// If the table is not partitioned, coalesce the data to a single file.
df.coalesce(1).write
data.coalesce(1).write
}
writer.format(format).mode(mode)
if (partitionColumns.nonEmpty) {
@ -123,6 +145,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
println(s"Generating table $name in database to $location with save mode $mode.")
logInfo(s"Generating table $name in database to $location with save mode $mode.")
writer.save(location)
sqlContext.dropTempTable(tempTableName)
}
def createExternalTable(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = {
@ -151,7 +174,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
overwrite: Boolean,
partitionTables: Boolean,
useDoubleForDecimal: Boolean,
orderByPartitionColumns: Boolean): Unit = {
clusterByPartitionColumns: Boolean): Unit = {
val tablesToBeGenerated = if (partitionTables) {
tables
} else {
@ -166,7 +189,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
withSpecifiedDataType.foreach { table =>
val tableLocation = s"$location/${table.name}"
table.genData(tableLocation, format, overwrite, orderByPartitionColumns)
table.genData(tableLocation, format, overwrite, clusterByPartitionColumns)
}
}