From 06eb11f326019eb085782329def28cb3c47f41de Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 25 Aug 2015 20:43:56 -0700 Subject: [PATCH] Fix the seed to 100 and use distribute by instead of order by. --- README.md | 2 +- .../spark/sql/perf/tpcds/Tables.scala | 39 +++++++++++++++---- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index b6e0aee..198cd63 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ import com.databricks.spark.sql.perf.tpcds.Tables // dsdgenDir is the location of dsdgen tool installed in your machines. val tables = new Tables(sqlContext, dsdgenDir, scaleFactor) // Generate data. -tables.genData(location, format, overwrite, partitionTables, useDoubleForDecimal, orderByPartitionColumns) +tables.genData(location, format, overwrite, partitionTables, useDoubleForDecimal, clusterByPartitionColumns) // Create metastore tables in a specified database for your data. // Once tables are created, the current database will be switched to the specified database. tables.createExternalTables(location, format, databaseName, overwrite) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index 702e8b1..620b077 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -51,10 +51,11 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend sys.error(s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install") } + // Note: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100. val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else "" val commands = Seq( "bash", "-c", - s"cd $localToolsDir && ./dsdgen -table $name -filter Y -scale $scaleFactor $parallel") + s"cd $localToolsDir && ./dsdgen -table $name -filter Y -scale $scaleFactor -RNGSEED 100 $parallel") println(commands) commands.lines } @@ -103,18 +104,39 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend Table(name, partitionColumns, newFields:_*) } - def genData(location: String, format: String, overwrite: Boolean, orderByPartitionColumns: Boolean): Unit = { + def genData(location: String, format: String, overwrite: Boolean, clusterByPartitionColumns: Boolean): Unit = { val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore + val data = df + val tempTableName = s"${name}_text" + data.registerTempTable(tempTableName) + val writer = if (partitionColumns.nonEmpty) { - if (orderByPartitionColumns) { - df.orderBy(partitionColumns.map(col): _*).write + if (clusterByPartitionColumns) { + val columnString = data.schema.fields.map { field => + field.name + }.mkString(",") + val partitionColumnString = partitionColumns.mkString(",") + + val query = + s""" + |SELECT + | $columnString + |FROM + | $tempTableName + |DISTRIBUTED BY + | $partitionColumnString + """.stripMargin + val grouped = sqlContext.sql(query) + println(s"Pre-clustering with partitioning columns with query $query.") + logInfo(s"Pre-clustering with partitioning columns with query $query.") + grouped.write } else { - df.write + data.write } } else { // If the table is not partitioned, coalesce the data to a single file. - df.coalesce(1).write + data.coalesce(1).write } writer.format(format).mode(mode) if (partitionColumns.nonEmpty) { @@ -123,6 +145,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend println(s"Generating table $name in database to $location with save mode $mode.") logInfo(s"Generating table $name in database to $location with save mode $mode.") writer.save(location) + sqlContext.dropTempTable(tempTableName) } def createExternalTable(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = { @@ -151,7 +174,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend overwrite: Boolean, partitionTables: Boolean, useDoubleForDecimal: Boolean, - orderByPartitionColumns: Boolean): Unit = { + clusterByPartitionColumns: Boolean): Unit = { val tablesToBeGenerated = if (partitionTables) { tables } else { @@ -166,7 +189,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend withSpecifiedDataType.foreach { table => val tableLocation = s"$location/${table.name}" - table.genData(tableLocation, format, overwrite, orderByPartitionColumns) + table.genData(tableLocation, format, overwrite, clusterByPartitionColumns) } }