From 544adce70fafcc050f7e4c7c73fd2e1670228d49 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 20 Aug 2015 13:37:29 -0700 Subject: [PATCH] Add methods to genData. --- .../spark/sql/perf/tpcds/Tables.scala | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index 011221e..aee846d 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -16,6 +16,7 @@ package com.databricks.spark.sql.perf.tpcds +import java.io.File import java.text.SimpleDateFormat import java.util.Date @@ -31,7 +32,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext, RecordWriter, Job} import org.apache.spark.SerializableWritable -import org.apache.spark.sql.{Column, ColumnName, SQLContext} +import org.apache.spark.sql.{SaveMode, Column, ColumnName, SQLContext} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{StringType, StructField, StructType} import parquet.hadoop.ParquetOutputFormat @@ -93,6 +94,41 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend convertedData } + + def genData(location: String, format: String, overwrite: Boolean): Unit = { + val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore + + val withFormatAndMode = df.write.format(format).mode(mode) + val withPartitionColumns = if (partitionColumns.isEmpty) { + withFormatAndMode + } else { + withFormatAndMode.partitionBy(partitionColumns : _*) + } + + withPartitionColumns.save(location) + } + + def createExternalTables(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = { + val qualifiedTableName = databaseName + "." + name + if (overwrite) { + sqlContext.sql(s"DROP TABLE IF EXISTS $databaseName.$name") + } + sqlContext.createExternalTable(qualifiedTableName, location, format) + } + } + + def genData(location: String, format: String, overwrite: Boolean): Unit = { + tables.foreach { table => + val tableLocation = location + File.separator + format + File.separator + table.name + table.genData(tableLocation, format, overwrite) + } + } + + def createExternalTables(location: String, format: String, databaseName: String, overwrite: Boolean): Unit = { + tables.foreach { table => + val tableLocation = location + File.separator + format + File.separator + table.name + table.createExternalTables(tableLocation, format, databaseName, overwrite) + } } val tables = Seq(