From 544adce70fafcc050f7e4c7c73fd2e1670228d49 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 20 Aug 2015 13:37:29 -0700 Subject: [PATCH 01/10] Add methods to genData. --- .../spark/sql/perf/tpcds/Tables.scala | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index 011221e..aee846d 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -16,6 +16,7 @@ package com.databricks.spark.sql.perf.tpcds +import java.io.File import java.text.SimpleDateFormat import java.util.Date @@ -31,7 +32,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext, RecordWriter, Job} import org.apache.spark.SerializableWritable -import org.apache.spark.sql.{Column, ColumnName, SQLContext} +import org.apache.spark.sql.{SaveMode, Column, ColumnName, SQLContext} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{StringType, StructField, StructType} import parquet.hadoop.ParquetOutputFormat @@ -93,6 +94,41 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend convertedData } + + def genData(location: String, format: String, overwrite: Boolean): Unit = { + val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore + + val withFormatAndMode = df.write.format(format).mode(mode) + val withPartitionColumns = if (partitionColumns.isEmpty) { + withFormatAndMode + } else { + withFormatAndMode.partitionBy(partitionColumns : _*) + } + + withPartitionColumns.save(location) + } + + def createExternalTables(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = { + val qualifiedTableName = databaseName + "." + name + if (overwrite) { + sqlContext.sql(s"DROP TABLE IF EXISTS $databaseName.$name") + } + sqlContext.createExternalTable(qualifiedTableName, location, format) + } + } + + def genData(location: String, format: String, overwrite: Boolean): Unit = { + tables.foreach { table => + val tableLocation = location + File.separator + format + File.separator + table.name + table.genData(tableLocation, format, overwrite) + } + } + + def createExternalTables(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = { + tables.foreach { table => + val tableLocation = location + File.separator + format + File.separator + table.name + table.createExternalTables(tableLocation, format, databaseName, overwrite) + } } val tables = Seq( From edb4daba80602b3cf2d991a38f442bfa0c5b37af Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 20 Aug 2015 14:31:54 -0700 Subject: [PATCH 02/10] Bug fix. --- .../spark/sql/perf/tpcds/Tables.scala | 53 +++++++++---------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index aee846d..a2646f1 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -17,28 +17,16 @@ package com.databricks.spark.sql.perf.tpcds import java.io.File -import java.text.SimpleDateFormat -import java.util.Date - -import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil import scala.sys.process._ - -import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} - -import com.databricks.spark.sql.perf._ -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext, RecordWriter, Job} -import org.apache.spark.SerializableWritable -import org.apache.spark.sql.{SaveMode, Column, ColumnName, SQLContext} -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.Logging +import org.apache.spark.sql.{SaveMode, SQLContext} +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StringType, StructField, StructType} -import parquet.hadoop.ParquetOutputFormat -import parquet.hadoop.util.ContextUtil -class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extends Serializable { +class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extends Serializable with Logging { import sqlContext.implicits._ def sparkContext = sqlContext.sparkContext @@ -71,11 +59,16 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend generatedData.setName(s"$name, sf=$scaleFactor, strings") val rows = generatedData.mapPartitions { iter => - val currentRow = new GenericMutableRow(schema.fields.size) iter.map { l => - (0 until schema.fields.length).foreach(currentRow.setNullAt) - l.split("\\|", -1).zipWithIndex.dropRight(1).foreach { case (f, i) => currentRow(i) = f} - currentRow: Row + val values = l.split("\\|", -1).dropRight(1).map { v => + if (v.equals("")) { + // If the string value is an empty string, we turn it to a null + null + } else { + v + } + } + Row.fromSeq(values) } } @@ -86,8 +79,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend val convertedData = { val columns = schema.fields.map { f => - val columnName = new ColumnName(f.name) - columnName.cast(f.dataType).as(f.name) + col(f.name).cast(f.dataType).as(f.name) } stringData.select(columns: _*) } @@ -104,29 +96,36 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend } else { withFormatAndMode.partitionBy(partitionColumns : _*) } - + logInfo(s"Generating table $name in database to $location with save mode $mode.") withPartitionColumns.save(location) } def createExternalTables(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = { val qualifiedTableName = databaseName + "." + name + val tableExists = sqlContext.tableNames(databaseName).contains(name) if (overwrite) { sqlContext.sql(s"DROP TABLE IF EXISTS $databaseName.$name") } - sqlContext.createExternalTable(qualifiedTableName, location, format) + if (!tableExists || overwrite) { + logInfo(s"Creating external table $name in database $databaseName.") + sqlContext.createExternalTable(qualifiedTableName, location, format) + } } } def genData(location: String, format: String, overwrite: Boolean): Unit = { tables.foreach { table => - val tableLocation = location + File.separator + format + File.separator + table.name + val tableLocation = + location + File.separator + format + File.separator + "sf" + scaleFactor + File.separator + table.name table.genData(tableLocation, format, overwrite) } } def createExternalTables(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = { + sqlContext.sql(s"CREATE DATABASE IF NOT EXISTS $databaseName") tables.foreach { table => - val tableLocation = location + File.separator + format + File.separator + table.name + val tableLocation = + location + File.separator + format + File.separator + "sf" + scaleFactor + File.separator + table.name table.createExternalTables(tableLocation, format, databaseName, overwrite) } } From 97093a45cd6389aaa64003eaa94239ceafbe218c Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 20 Aug 2015 14:50:53 -0700 Subject: [PATCH 03/10] Update readme and register temp tables. --- README.md | 25 ++++++++++++------- .../spark/sql/perf/tpcds/Tables.scala | 21 +++++++++++++--- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 94308f2..134ca23 100644 --- a/README.md +++ b/README.md @@ -11,10 +11,19 @@ The rest of document will use TPC-DS benchmark as an example. We will add conten Before running any query, a dataset needs to be setup by creating a `Benchmark` object. ``` -import org.apache.spark.sql.parquet.Tables +import com.databricks.spark.sql.perf.tpcds.Tables // Tables in TPC-DS benchmark used by experiments. -val tables = Tables(sqlContext) +// dsdgenDir is the location of dsdgen tool installed in your machines. +val tables = Tables(sqlContext, dsdgenDir, scaleFactor) +// Generate data. +tables.genData(location, format, overwrite) +// Create metastore tables in a specified database for your data. +// Once tables are created, the current database will be switched to the specified database. +tables.createExternalTables(location, format, databaseName, overwrite) +// Or, if you want to create temporary tables +tables.createTemporaryTables(location, format) // Setup TPC-DS experiment +import com.databricks.spark.sql.perf.tpcds.TPCDS val tpcds = new TPCDS (sqlContext = sqlContext) ``` @@ -31,11 +40,9 @@ For every experiment run (i.e.\ every call of `runExperiment`), Spark SQL Perf w While the experiment is running you can use `experiment.html` to list the status. Once the experiment is complete, the results will be saved to the table sqlPerformance in json. ``` -// Get experiments results. -import com.databricks.spark.sql.perf.Results -val results = Results(resultsLocation = , sqlContext = sqlContext) -// Get the DataFrame representing all results stored in the dir specified by resultsLocation. -val allResults = results.allResults -// Use DataFrame API to get results of a single run. -allResults.filter("timestamp = 1429132621024") +// Get all experiments results. +tpcds.createResultsTable() +sqlContext.sql("sqlPerformance") +// Get the result of a particular run by specifying the timestamp of that run. +sqlContext.sql("sqlPerformance").filter("timestamp = 1429132621024") ``` \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index a2646f1..cce504f 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -100,17 +100,22 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend withPartitionColumns.save(location) } - def createExternalTables(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = { + def createExternalTable(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = { val qualifiedTableName = databaseName + "." + name val tableExists = sqlContext.tableNames(databaseName).contains(name) if (overwrite) { sqlContext.sql(s"DROP TABLE IF EXISTS $databaseName.$name") } if (!tableExists || overwrite) { - logInfo(s"Creating external table $name in database $databaseName.") + logInfo(s"Creating external table $name in database $databaseName using data stored in $location.") sqlContext.createExternalTable(qualifiedTableName, location, format) } } + + def createTemporaryTable(location: String, format: String): Unit = { + logInfo(s"Creating temporary table $name using data stored in $location.") + sqlContext.read.format(format).load(location).registerTempTable(name) + } } def genData(location: String, format: String, overwrite: Boolean): Unit = { @@ -126,7 +131,17 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend tables.foreach { table => val tableLocation = location + File.separator + format + File.separator + "sf" + scaleFactor + File.separator + table.name - table.createExternalTables(tableLocation, format, databaseName, overwrite) + table.createExternalTable(tableLocation, format, databaseName, overwrite) + } + sqlContext.sql(s"USE $databaseName") + logInfo(s"The current database has been set to $databaseName.") + } + + def createTemporaryTables(location: String, format: String): Unit = { + tables.foreach { table => + val tableLocation = + location + File.separator + format + File.separator + "sf" + scaleFactor + File.separator + table.name + table.createTemporaryTable(tableLocation, format) } } From 77fbe22b7b40d3cbf7926c38240e32deddddeb76 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 20 Aug 2015 16:19:04 -0700 Subject: [PATCH 04/10] address comments. --- .../spark/sql/perf/tpcds/Tables.scala | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index cce504f..7bdb978 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -36,6 +36,10 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend val schema = StructType(fields) val partitions = if (partitionColumns.isEmpty) 1 else 100 + def nonPartitioned: Table = { + Table(name, Nil, fields : _*) + } + def df = { val generatedData = { sparkContext.parallelize(1 to partitions, partitions).flatMap { i => @@ -90,14 +94,19 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend def genData(location: String, format: String, overwrite: Boolean): Unit = { val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore - val withFormatAndMode = df.write.format(format).mode(mode) - val withPartitionColumns = if (partitionColumns.isEmpty) { - withFormatAndMode + val writer = if (!partitionColumns.isEmpty) { + df.write } else { - withFormatAndMode.partitionBy(partitionColumns : _*) + // If the table is not partitioned, coalesce the data to a single file. + df.coalesce(1).write } + writer.format(format).mode(mode) + if (!partitionColumns.isEmpty) { + writer.partitionBy(partitionColumns : _*) + } + println(s"Generating table $name in database to $location with save mode $mode.") logInfo(s"Generating table $name in database to $location with save mode $mode.") - withPartitionColumns.save(location) + writer.save(location) } def createExternalTable(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = { @@ -107,21 +116,27 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend sqlContext.sql(s"DROP TABLE IF EXISTS $databaseName.$name") } if (!tableExists || overwrite) { + println(s"Creating external table $name in database $databaseName using data stored in $location.") logInfo(s"Creating external table $name in database $databaseName using data stored in $location.") sqlContext.createExternalTable(qualifiedTableName, location, format) } } def createTemporaryTable(location: String, format: String): Unit = { + println(s"Creating temporary table $name using data stored in $location.") logInfo(s"Creating temporary table $name using data stored in $location.") sqlContext.read.format(format).load(location).registerTempTable(name) } } - def genData(location: String, format: String, overwrite: Boolean): Unit = { - tables.foreach { table => - val tableLocation = - location + File.separator + format + File.separator + "sf" + scaleFactor + File.separator + table.name + def genData(location: String, format: String, overwrite: Boolean, partitionTables: Boolean): Unit = { + val tablesToBeGenerated = if (partitionTables) { + tables + } else { + tables.map(_.nonPartitioned) + } + tablesToBeGenerated.foreach { table => + val tableLocation = s"$location/${table.name}" table.genData(tableLocation, format, overwrite) } } @@ -129,18 +144,17 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend def createExternalTables(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = { sqlContext.sql(s"CREATE DATABASE IF NOT EXISTS $databaseName") tables.foreach { table => - val tableLocation = - location + File.separator + format + File.separator + "sf" + scaleFactor + File.separator + table.name + val tableLocation = s"$location/${table.name}" table.createExternalTable(tableLocation, format, databaseName, overwrite) } sqlContext.sql(s"USE $databaseName") + println(s"The current database has been set to $databaseName.") logInfo(s"The current database has been set to $databaseName.") } def createTemporaryTables(location: String, format: String): Unit = { tables.foreach { table => - val tableLocation = - location + File.separator + format + File.separator + "sf" + scaleFactor + File.separator + table.name + val tableLocation = s"$location/${table.name}" table.createTemporaryTable(tableLocation, format) } } From 88aadb45a41d1be16b1f7ae76494268f90b70a36 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 24 Aug 2015 11:39:59 -0700 Subject: [PATCH 05/10] Update README. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 134ca23..4f7d887 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ import com.databricks.spark.sql.perf.tpcds.Tables // dsdgenDir is the location of dsdgen tool installed in your machines. val tables = Tables(sqlContext, dsdgenDir, scaleFactor) // Generate data. -tables.genData(location, format, overwrite) +tables.genData(location, format, overwrite, partitionTables) // Create metastore tables in a specified database for your data. // Once tables are created, the current database will be switched to the specified database. tables.createExternalTables(location, format, databaseName, overwrite) From 58188c671104f6b54171a1333dbae12e704f0f89 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 24 Aug 2015 12:17:25 -0700 Subject: [PATCH 06/10] Allow users to use double instead of decimal for generated tables. --- README.md | 4 +-- .../spark/sql/perf/tpcds/Tables.scala | 30 +++++++++++++++++-- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 4f7d887..a2f41e2 100644 --- a/README.md +++ b/README.md @@ -14,9 +14,9 @@ Before running any query, a dataset needs to be setup by creating a `Benchmark` import com.databricks.spark.sql.perf.tpcds.Tables // Tables in TPC-DS benchmark used by experiments. // dsdgenDir is the location of dsdgen tool installed in your machines. -val tables = Tables(sqlContext, dsdgenDir, scaleFactor) +val tables = new Tables(sqlContext, dsdgenDir, scaleFactor) // Generate data. -tables.genData(location, format, overwrite, partitionTables) +tables.genData(location, format, overwrite, partitionTables, useDoubleForDecimal) // Create metastore tables in a specified database for your data. // Once tables are created, the current database will be switched to the specified database. tables.createExternalTables(location, format, databaseName, overwrite) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index 7bdb978..51b687b 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -24,7 +24,7 @@ import org.apache.spark.Logging import org.apache.spark.sql.{SaveMode, SQLContext} import org.apache.spark.sql.Row import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.types._ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extends Serializable with Logging { import sqlContext.implicits._ @@ -91,6 +91,18 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend convertedData } + def useDoubleForDecimal(): Table = { + val newFields = fields.map { field => + val newDataType = field.dataType match { + case decimal: DecimalType => DoubleType + case other => other + } + field.copy(dataType = newDataType) + } + + Table(name, partitionColumns, newFields:_*) + } + def genData(location: String, format: String, overwrite: Boolean): Unit = { val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore @@ -129,13 +141,25 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend } } - def genData(location: String, format: String, overwrite: Boolean, partitionTables: Boolean): Unit = { + def genData( + location: String, + format: String, + overwrite: Boolean, + partitionTables: Boolean, + useDoubleForDecimal: Boolean): Unit = { val tablesToBeGenerated = if (partitionTables) { tables } else { tables.map(_.nonPartitioned) } - tablesToBeGenerated.foreach { table => + + val withSpecifiedDataType = if (useDoubleForDecimal) { + tablesToBeGenerated.map(_.useDoubleForDecimal()) + } else { + tablesToBeGenerated + } + + withSpecifiedDataType.foreach { table => val tableLocation = s"$location/${table.name}" table.genData(tableLocation, format, overwrite) } From 9936d4923903171c53d0413e312b02c094254d46 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 24 Aug 2015 15:32:47 -0700 Subject: [PATCH 07/10] Add a option to orderBy partition columns. --- README.md | 2 +- .../spark/sql/perf/tpcds/Tables.scala | 17 +++++++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index a2f41e2..b6e0aee 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ import com.databricks.spark.sql.perf.tpcds.Tables // dsdgenDir is the location of dsdgen tool installed in your machines. val tables = new Tables(sqlContext, dsdgenDir, scaleFactor) // Generate data. -tables.genData(location, format, overwrite, partitionTables, useDoubleForDecimal) +tables.genData(location, format, overwrite, partitionTables, useDoubleForDecimal, orderByPartitionColumns) // Create metastore tables in a specified database for your data. // Once tables are created, the current database will be switched to the specified database. tables.createExternalTables(location, format, databaseName, overwrite) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index 51b687b..702e8b1 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -103,17 +103,21 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend Table(name, partitionColumns, newFields:_*) } - def genData(location: String, format: String, overwrite: Boolean): Unit = { + def genData(location: String, format: String, overwrite: Boolean, orderByPartitionColumns: Boolean): Unit = { val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore - val writer = if (!partitionColumns.isEmpty) { - df.write + val writer = if (partitionColumns.nonEmpty) { + if (orderByPartitionColumns) { + df.orderBy(partitionColumns.map(col): _*).write + } else { + df.write + } } else { // If the table is not partitioned, coalesce the data to a single file. df.coalesce(1).write } writer.format(format).mode(mode) - if (!partitionColumns.isEmpty) { + if (partitionColumns.nonEmpty) { writer.partitionBy(partitionColumns : _*) } println(s"Generating table $name in database to $location with save mode $mode.") @@ -146,7 +150,8 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend format: String, overwrite: Boolean, partitionTables: Boolean, - useDoubleForDecimal: Boolean): Unit = { + useDoubleForDecimal: Boolean, + orderByPartitionColumns: Boolean): Unit = { val tablesToBeGenerated = if (partitionTables) { tables } else { @@ -161,7 +166,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend withSpecifiedDataType.foreach { table => val tableLocation = s"$location/${table.name}" - table.genData(tableLocation, format, overwrite) + table.genData(tableLocation, format, overwrite, orderByPartitionColumns) } } From 06eb11f326019eb085782329def28cb3c47f41de Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 25 Aug 2015 20:43:56 -0700 Subject: [PATCH 08/10] Fix the seed to 100 and use distribute by instead of order by. --- README.md | 2 +- .../spark/sql/perf/tpcds/Tables.scala | 39 +++++++++++++++---- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index b6e0aee..198cd63 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ import com.databricks.spark.sql.perf.tpcds.Tables // dsdgenDir is the location of dsdgen tool installed in your machines. val tables = new Tables(sqlContext, dsdgenDir, scaleFactor) // Generate data. -tables.genData(location, format, overwrite, partitionTables, useDoubleForDecimal, orderByPartitionColumns) +tables.genData(location, format, overwrite, partitionTables, useDoubleForDecimal, clusterByPartitionColumns) // Create metastore tables in a specified database for your data. // Once tables are created, the current database will be switched to the specified database. tables.createExternalTables(location, format, databaseName, overwrite) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index 702e8b1..620b077 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -51,10 +51,11 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend sys.error(s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install") } + // Note: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to 100. val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else "" val commands = Seq( "bash", "-c", - s"cd $localToolsDir && ./dsdgen -table $name -filter Y -scale $scaleFactor $parallel") + s"cd $localToolsDir && ./dsdgen -table $name -filter Y -scale $scaleFactor -RNGSEED 100 $parallel") println(commands) commands.lines } @@ -103,18 +104,39 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend Table(name, partitionColumns, newFields:_*) } - def genData(location: String, format: String, overwrite: Boolean, orderByPartitionColumns: Boolean): Unit = { + def genData(location: String, format: String, overwrite: Boolean, clusterByPartitionColumns: Boolean): Unit = { val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore + val data = df + val tempTableName = s"${name}_text" + data.registerTempTable(tempTableName) + val writer = if (partitionColumns.nonEmpty) { - if (orderByPartitionColumns) { - df.orderBy(partitionColumns.map(col): _*).write + if (clusterByPartitionColumns) { + val columnString = data.schema.fields.map { field => + field.name + }.mkString(",") + val partitionColumnString = partitionColumns.mkString(",") + + val query = + s""" + |SELECT + | $columnString + |FROM + | $tempTableName + |DISTRIBUTED BY + | $partitionColumnString + """.stripMargin + val grouped = sqlContext.sql(query) + println(s"Pre-clustering with partitioning columns with query $query.") + logInfo(s"Pre-clustering with partitioning columns with query $query.") + grouped.write } else { - df.write + data.write } } else { // If the table is not partitioned, coalesce the data to a single file. - df.coalesce(1).write + data.coalesce(1).write } writer.format(format).mode(mode) if (partitionColumns.nonEmpty) { @@ -123,6 +145,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend println(s"Generating table $name in database to $location with save mode $mode.") logInfo(s"Generating table $name in database to $location with save mode $mode.") writer.save(location) + sqlContext.dropTempTable(tempTableName) } def createExternalTable(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = { @@ -151,7 +174,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend overwrite: Boolean, partitionTables: Boolean, useDoubleForDecimal: Boolean, - orderByPartitionColumns: Boolean): Unit = { + clusterByPartitionColumns: Boolean): Unit = { val tablesToBeGenerated = if (partitionTables) { tables } else { @@ -166,7 +189,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend withSpecifiedDataType.foreach { table => val tableLocation = s"$location/${table.name}" - table.genData(tableLocation, format, overwrite, orderByPartitionColumns) + table.genData(tableLocation, format, overwrite, clusterByPartitionColumns) } } From f4e20af10764067e68c79df4dbf48768be4810d5 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 25 Aug 2015 23:31:50 -0700 Subject: [PATCH 09/10] fix typo --- src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index 620b077..1749440 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -124,7 +124,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend | $columnString |FROM | $tempTableName - |DISTRIBUTED BY + |DISTRIBUTE BY | $partitionColumnString """.stripMargin val grouped = sqlContext.sql(query) From 34f66a0a1022b4400abae1556ebcd1d7239f557d Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 26 Aug 2015 11:14:19 -0700 Subject: [PATCH 10/10] Add a option of filter rows with null partition column values. --- README.md | 2 +- .../spark/sql/perf/tpcds/Tables.scala | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 198cd63..520049c 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ import com.databricks.spark.sql.perf.tpcds.Tables // dsdgenDir is the location of dsdgen tool installed in your machines. val tables = new Tables(sqlContext, dsdgenDir, scaleFactor) // Generate data. -tables.genData(location, format, overwrite, partitionTables, useDoubleForDecimal, clusterByPartitionColumns) +tables.genData(location, format, overwrite, partitionTables, useDoubleForDecimal, clusterByPartitionColumns, filterOutNullPartitionValues) // Create metastore tables in a specified database for your data. // Once tables are created, the current database will be switched to the specified database. tables.createExternalTables(location, format, databaseName, overwrite) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index 1749440..ee470f8 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -104,7 +104,12 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend Table(name, partitionColumns, newFields:_*) } - def genData(location: String, format: String, overwrite: Boolean, clusterByPartitionColumns: Boolean): Unit = { + def genData( + location: String, + format: String, + overwrite: Boolean, + clusterByPartitionColumns: Boolean, + filterOutNullPartitionValues: Boolean): Unit = { val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore val data = df @@ -117,6 +122,11 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend field.name }.mkString(",") val partitionColumnString = partitionColumns.mkString(",") + val predicates = if (filterOutNullPartitionValues) { + partitionColumns.map(col => s"$col IS NOT NULL").mkString("WHERE ", " AND ", "") + } else { + "" + } val query = s""" @@ -124,6 +134,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend | $columnString |FROM | $tempTableName + |$predicates |DISTRIBUTE BY | $partitionColumnString """.stripMargin @@ -174,7 +185,8 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend overwrite: Boolean, partitionTables: Boolean, useDoubleForDecimal: Boolean, - clusterByPartitionColumns: Boolean): Unit = { + clusterByPartitionColumns: Boolean, + filterOutNullPartitionValues: Boolean): Unit = { val tablesToBeGenerated = if (partitionTables) { tables } else { @@ -189,7 +201,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend withSpecifiedDataType.foreach { table => val tableLocation = s"$location/${table.name}" - table.genData(tableLocation, format, overwrite, clusterByPartitionColumns) + table.genData(tableLocation, format, overwrite, clusterByPartitionColumns, filterOutNullPartitionValues) } }