diff --git a/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala index def479b..f27c596 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/AggregationPerformance.scala @@ -5,6 +5,8 @@ import org.apache.spark.sql.{Row, SQLContext} trait AggregationPerformance extends Benchmark { import sqlContext.implicits._ + import ExecutionMode._ + val sizes = (1 to 6).map(math.pow(10, _).toInt).toSeq @@ -32,7 +34,7 @@ trait AggregationPerformance extends Benchmark { s"avg-$table", s"SELECT AVG(b) FROM $table GROUP BY a", "an average with a varying number of groups", - collectResults = false) + executionMode = ForeachResults) } val twoGroupsAvg: Seq[Query] = lowCardinality.map(_.name).map { table => @@ -40,7 +42,7 @@ trait AggregationPerformance extends Benchmark { s"avg-$table", s"SELECT AVG(b) FROM $table GROUP BY a", "an average on an int column with only two groups", - collectResults = false) + executionMode = ForeachResults) } val complexInput = @@ -49,7 +51,7 @@ trait AggregationPerformance extends Benchmark { s"aggregation-complex-input-$table", s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table", "Sum of 9 columns added together", - collectResults = true) + executionMode = CollectResults) } val aggregates = @@ -59,7 +61,7 @@ trait AggregationPerformance extends Benchmark { s"single-aggregate-$agg-$table", s"SELECT $agg(id) FROM $table", "aggregation of a single column", - collectResults = true) + executionMode = CollectResults) } } } \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index f1073b6..1bc0955 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -333,21 +333,33 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) """.stripMargin } + trait ExecutionMode + object ExecutionMode { + // Benchmark run by collecting queries results (e.g. rdd.collect()) + case object CollectResults extends ExecutionMode + + // Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit)) + case object ForeachResults extends ExecutionMode + + // Benchmark run by saving the output of each query as a parquet file at the specified location + case class WriteParquet(location: String) extends ExecutionMode + } + /** Factory object for benchmark queries. */ object Query { def apply( name: String, sqlText: String, description: String, - collectResults: Boolean = true): Query = { - new Query(name, sqlContext.sql(sqlText), description, collectResults, Some(sqlText)) + executionMode: ExecutionMode = ExecutionMode.ForeachResults): Query = { + new Query(name, sqlContext.sql(sqlText), description, Some(sqlText), executionMode) } def apply( name: String, dataFrameBuilder: => DataFrame, description: String): Query = { - new Query(name, dataFrameBuilder, description, true, None) + new Query(name, dataFrameBuilder, description, None, ExecutionMode.CollectResults) } } @@ -356,8 +368,8 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) val name: String, buildDataFrame: => DataFrame, val description: String, - val collectResults: Boolean, - val sqlText: Option[String]) { + val sqlText: Option[String], + val executionMode: ExecutionMode) { override def toString = s""" @@ -415,13 +427,13 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext) } // The executionTime for the entire query includes the time of type conversion from catalyst to scala. - val executionTime = if (collectResults) { - benchmarkMs { - dataFrame.rdd.collect() - } - } else { - benchmarkMs { - dataFrame.rdd.foreach { row => Unit } + // The executionTime for the entire query includes the time of type conversion + // from catalyst to scala. + val executionTime = benchmarkMs { + executionMode match { + case ExecutionMode.CollectResults => dataFrame.rdd.collect() + case ExecutionMode.ForeachResults => dataFrame.rdd.foreach { row => Unit } + case ExecutionMode.WriteParquet(location) => dataFrame.saveAsParquetFile(s"$location/$name.parquet") } } diff --git a/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala index f2c541b..c7705a2 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/JoinPerformance.scala @@ -5,6 +5,8 @@ import org.apache.spark.sql.SQLContext trait JoinPerformance extends Benchmark { // 1.5 mb, 1 file + import ExecutionMode._ + val x = Table( "1milints", sqlContext.range(0, 1000000) @@ -36,7 +38,7 @@ trait JoinPerformance extends Benchmark { s"singleKey-$join-$table1-$table2", s"SELECT COUNT(*) FROM $table1 a $join $table2 b ON a.id = b.id", "equi-inner join a small table with a big table using a single key.", - collectResults = true) + executionMode = CollectResults) } } } diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala index c30cf7b..e69de29 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/BigData.scala @@ -1,38 +0,0 @@ -/* - * Copyright 2015 Databricks Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.perf.bigdata - -import com.databricks.spark.sql.perf._ -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.parquet.TPCDSTableForTest -import org.apache.spark.sql.{Column, SQLContext} - -class BigDataBenchmark ( - @transient sqlContext: SQLContext, - dataLocation: String, - val tables: Seq[Table2], - scaleFactor: String) - extends Benchmark(sqlContext) with Serializable with TableCreator { - import sqlContext._ - import sqlContext.implicits._ - - override def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest] = { - tables.map(table => - BigDataTableForTest(table, dataLocation, scaleFactor, sqlContext)) - } -} - diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala index 5541a02..b9fb016 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala @@ -18,8 +18,9 @@ package com.databricks.spark.sql.perf.bigdata import com.databricks.spark.sql.perf.Benchmark -trait Queries { - self: Benchmark => +trait Queries extends Benchmark { + + import ExecutionMode._ val queries1to3 = Seq( Query( @@ -34,7 +35,7 @@ trait Queries { | pageRank > 1000 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q1B", @@ -48,7 +49,7 @@ trait Queries { | pageRank > 100 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q1C", @@ -62,7 +63,7 @@ trait Queries { | pageRank > 10 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q2A", @@ -76,7 +77,7 @@ trait Queries { | SUBSTR(sourceIP, 1, 8) """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q2B", @@ -90,7 +91,7 @@ trait Queries { | SUBSTR(sourceIP, 1, 10) """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q2C", @@ -104,7 +105,7 @@ trait Queries { | SUBSTR(sourceIP, 1, 12) """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q3A", @@ -123,7 +124,7 @@ trait Queries { |ORDER BY totalRevenue DESC LIMIT 1 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q3B", @@ -142,7 +143,7 @@ trait Queries { |ORDER BY totalRevenue DESC LIMIT 1 """.stripMargin, description = "", - collectResults = false), + executionMode = ForeachResults), Query( name = "q3C", @@ -160,6 +161,6 @@ trait Queries { |ORDER BY totalRevenue DESC LIMIT 1 """.stripMargin, description = "", - collectResults = false) + executionMode = ForeachResults) ) } diff --git a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala index 9b6f657..e69de29 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/bigdata/Tables.scala @@ -1,79 +0,0 @@ -/* - * Copyright 2015 Databricks Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.perf.bigdata - -// This is a hack until parquet has better support for partitioning. - -import java.text.SimpleDateFormat -import java.util.Date - -import com.databricks.spark.sql.perf._ -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} -import org.apache.hadoop.mapreduce.{Job, OutputCommitter, RecordWriter, TaskAttemptContext} -import org.apache.spark.SerializableWritable -import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.sql.{Column, ColumnName, SQLContext} -import parquet.hadoop.ParquetOutputFormat -import parquet.hadoop.util.ContextUtil - -import scala.sys.process._ - -case class BigDataTableForTest( - table: Table2, - baseDir: String, - scaleFactor: String, - @transient sqlContext: SQLContext) - extends TableForTest(table, baseDir, sqlContext) with Serializable { - - @transient val sparkContext = sqlContext.sparkContext - - override def generate(): Unit = - throw new UnsupportedOperationException( - "Generate data for BigDataBenchmark has not been implemented") -} - -case class Tables(sqlContext: SQLContext) { - import sqlContext.implicits._ - - val tables = Seq( - Table2("rankings", - UnpartitionedTable, - 'pageURL .string, - 'pageRank .int, - 'avgDuration .int), - - Table2("uservisits", - UnpartitionedTable, - 'sourceIP .string, - 'destURL .string, - 'visitDate .string, - 'adRevenue .double, - 'userAgent .string, - 'countryCode .string, - 'languageCode .string, - 'searchWord .string, - 'duration .int), - - Table2("documents", - UnpartitionedTable, - 'line .string) - ) -} diff --git a/src/main/scala/com/databricks/spark/sql/perf/query.scala b/src/main/scala/com/databricks/spark/sql/perf/query.scala index 690acd6..e69de29 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/query.scala @@ -1,29 +0,0 @@ -/* - * Copyright 2015 Databricks Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.perf - -import org.apache.spark.sql.{DataFrame, SQLContext} -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation - -trait QuerySet { - val sqlContext: SQLContext - def sparkContext = sqlContext.sparkContext - - - - -} \ No newline at end of file diff --git a/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala new file mode 100644 index 0000000..e69de29 diff --git a/src/main/scala/com/databricks/spark/sql/perf/table.scala b/src/main/scala/com/databricks/spark/sql/perf/table.scala index 4fd4381..e69de29 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/table.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/table.scala @@ -1,90 +0,0 @@ -/* - * Copyright 2015 Databricks Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.perf - -import java.text.SimpleDateFormat -import java.util.Date - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat -import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext, RecordWriter, Job} -import org.apache.spark.SerializableWritable -import org.apache.spark.sql.{SQLContext, Column} -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.hive.HiveMetastoreTypes -import org.apache.spark.sql.types._ -import parquet.hadoop.ParquetOutputFormat -import parquet.hadoop.util.ContextUtil - -trait TableCreator { - - def tables: Seq[Table2] - - def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest] - - val tablesForTest: Seq[TableForTest] = createTablesForTest(tables) - - def checkData(): Unit = { - tablesForTest.foreach { table => - val fs = FileSystem.get(new java.net.URI(table.outputDir), new Configuration()) - val exists = fs.exists(new Path(table.outputDir)) - val wasSuccessful = fs.exists(new Path(s"${table.outputDir}/_SUCCESS")) - - if (!wasSuccessful) { - if (exists) { - println(s"Table '${table.name}' not generated successfully, regenerating.") - } else { - println(s"Table '${table.name}' does not exist, generating.") - } - fs.delete(new Path(table.outputDir), true) - table.generate() - } else { - println(s"Table ${table.name} already exists.") - } - } - } -} - -abstract class TableType -case object UnpartitionedTable extends TableType -case class PartitionedTable(partitionColumn: String) extends TableType - -case class Table2(name: String, tableType: TableType, fields: StructField*) - -abstract class TableForTest( - table: Table2, - baseDir: String, - @transient sqlContext: SQLContext) extends Serializable { - - val schema = StructType(table.fields) - - val name = table.name - - val outputDir = s"$baseDir/parquet/$name" - - def fromCatalog = sqlContext.table(name) - - def stats = - fromCatalog.select( - lit(name) as "tableName", - count("*") as "numRows", - lit(fromCatalog.queryExecution.optimizedPlan.statistics.sizeInBytes.toLong) as "sizeInBytes") - - def generate() -} diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala index 8c61a33..2c618ac 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala @@ -17,9 +17,7 @@ package com.databricks.spark.sql.perf.tpcds import com.databricks.spark.sql.perf._ -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.parquet.TPCDSTableForTest -import org.apache.spark.sql.{Column, SQLContext} +import org.apache.spark.sql.SQLContext /** * TPC-DS benchmark's dataset. @@ -27,8 +25,6 @@ import org.apache.spark.sql.{Column, SQLContext} * @param sparkVersion The version of Spark. * @param dataLocation The location of the dataset used by this experiment. * @param dsdgenDir The location of dsdgen in every worker machine. - * @param resultsLocation The location of performance results. - * @param tables Tables that will be used in this experiment. * @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H * and TPC-DS, the scale factor is a number roughly representing the * size of raw data files. For some other benchmarks, the scale factor @@ -39,27 +35,16 @@ class TPCDS ( sparkVersion: String, dataLocation: String, dsdgenDir: String, - val tables: Seq[Table2], scaleFactor: String, userSpecifiedBaseDir: Option[String] = None) - extends Benchmark(sqlContext) with TableCreator with Serializable { + extends Benchmark(sqlContext) with Serializable { import sqlContext._ import sqlContext.implicits._ lazy val baseDir = userSpecifiedBaseDir.getOrElse(s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true") - override def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest] = { - tables.map(table => - TPCDSTableForTest(table, baseDir, scaleFactor.toInt, dsdgenDir, sqlContext)) - } - /* - override def setup(): Unit = { - super.setup() - setupBroadcast() - } - def setupBroadcast(skipTables: Seq[String] = Seq("store_sales", "customer")) = { val skipExpr = skipTables.map(t => !('tableName === t)).reduceLeft[Column](_ && _) val threshold = diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index f559347..0466bf9 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.apache.spark.sql.parquet // This is a hack until parquet has better support for partitioning. +package com.databricks.spark.sql.perf.tpcds import java.text.SimpleDateFormat import java.util.Date @@ -37,173 +37,63 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import parquet.hadoop.ParquetOutputFormat import parquet.hadoop.util.ContextUtil +class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) { + import sqlContext.implicits._ -case class TPCDSTableForTest( - table: Table2, - baseDir: String, - scaleFactor: Int, - dsdgenDir: String, - @transient sqlContext: SQLContext, - maxRowsPerPartitions: Int = 20 * 1000 * 1000) - extends TableForTest(table, baseDir, sqlContext) with Serializable with SparkHadoopMapReduceUtil { - - @transient val sparkContext = sqlContext.sparkContext - + def sparkContext = sqlContext.sparkContext val dsdgen = s"$dsdgenDir/dsdgen" - override def generate(): Unit = { - val partitions = table.tableType match { - case PartitionedTable(_) => scaleFactor - case _ => 1 - } + case class Table(name: String, partitionColumns: Seq[String], fields: StructField*) { + val schema = StructType(fields) + val partitions = if (partitionColumns.isEmpty) 1 else 100 - val generatedData = { - sparkContext.parallelize(1 to partitions, partitions).flatMap { i => - val localToolsDir = if (new java.io.File(dsdgen).exists) { - dsdgenDir - } else if (new java.io.File(s"/$dsdgen").exists) { - s"/$dsdgenDir" - } else { - sys.error(s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install") - } - - val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else "" - val commands = Seq( - "bash", "-c", - s"cd $localToolsDir && ./dsdgen -table ${table.name} -filter Y -scale $scaleFactor $parallel") - println(commands) - commands.lines - } - } - - generatedData.setName(s"${table.name}, sf=$scaleFactor, strings") - - val rows = generatedData.mapPartitions { iter => - val currentRow = new GenericMutableRow(schema.fields.size) - iter.map { l => - (0 until schema.fields.length).foreach(currentRow.setNullAt) - l.split("\\|", -1).zipWithIndex.dropRight(1).foreach { case (f, i) => currentRow(i) = f} - currentRow: Row - } - } - - val stringData = - sqlContext.createDataFrame( - rows, - StructType(schema.fields.map(f => StructField(f.name, StringType)))) - - val convertedData = { - val columns = schema.fields.map { f => - val columnName = new ColumnName(f.name) - columnName.cast(f.dataType).as(f.name) - } - stringData.select(columns: _*) - } - - table.tableType match { - // This is an awful hack... spark sql parquet should support this natively. - case PartitionedTable(partitioningColumn) => - sqlContext.setConf("spark.sql.planner.externalSort", "true") - val output = convertedData.queryExecution.analyzed.output - val job = new Job(sqlContext.sparkContext.hadoopConfiguration) - - - //HAX - val writeSupport = - // if (schema.fields.map(_.dataType).forall(_.isPrimitive)) { - // classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport] - // } else { - classOf[org.apache.spark.sql.parquet.RowWriteSupport] - // } - - ParquetOutputFormat.setWriteSupportClass(job, writeSupport) - - val conf = new SerializableWritable(ContextUtil.getConfiguration(job)) - org.apache.spark.sql.parquet.RowWriteSupport.setSchema(schema.toAttributes, conf.value) - - val partColumnAttr = - BindReferences.bindReference[Expression]( - output.find(_.name == partitioningColumn).get, - output) - - - // TODO: clusterBy would be faster than orderBy - val orderedConvertedData = - convertedData.filter(new Column(partitioningColumn) isNotNull).orderBy(Column(partitioningColumn) asc) - orderedConvertedData.queryExecution.toRdd.foreachPartition { iter => - var writer: RecordWriter[Void, Row] = null - val getPartition = new InterpretedMutableProjection(Seq(partColumnAttr)) - var currentPartition: Row = null - var hadoopContext: TaskAttemptContext = null - var committer: OutputCommitter = null - - var rowCount = 0 - var partition = 0 - - while (iter.hasNext) { - val currentRow = iter.next() - - rowCount += 1 - if (rowCount >= maxRowsPerPartitions) { - rowCount = 0 - partition += 1 - println(s"Starting partition $partition") - if (writer != null) { - writer.close(hadoopContext) - committer.commitTask(hadoopContext) - } - writer = null - } - - if ((getPartition(currentRow) != currentPartition || writer == null) && - !getPartition.currentValue.isNullAt(0)) { - rowCount = 0 - currentPartition = getPartition.currentValue.copy() - if (writer != null) { - writer.close(hadoopContext) - committer.commitTask(hadoopContext) - } - - val job = new Job(conf.value) - val keyType = classOf[Void] - job.setOutputKeyClass(keyType) - job.setOutputValueClass(classOf[Row]) - NewFileOutputFormat.setOutputPath( - job, - new Path(s"$outputDir/$partitioningColumn=${currentPartition(0)}")) - val wrappedConf = new SerializableWritable(job.getConfiguration) - val formatter = new SimpleDateFormat("yyyyMMddHHmm") - val jobtrackerID = formatter.format(new Date()) - val stageId = partition - - val attemptNumber = 1 - /* "reduce task" */ - val attemptId = newTaskAttemptID(jobtrackerID, partition, isMap = false, partition, attemptNumber) - hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) - val format = new ParquetOutputFormat[Row] - committer = format.getOutputCommitter(hadoopContext) - committer.setupTask(hadoopContext) - writer = format.getRecordWriter(hadoopContext) - - } - if (!getPartition.currentValue.isNullAt(0)) { - writer.write(null, currentRow) - } - } - if (writer != null) { - writer.close(hadoopContext) - committer.commitTask(hadoopContext) + def df = { + val generatedData = { + sparkContext.parallelize(1 to partitions, partitions).flatMap { i => + val localToolsDir = if (new java.io.File(dsdgen).exists) { + dsdgenDir + } else if (new java.io.File(s"/$dsdgen").exists) { + s"/$dsdgenDir" + } else { + sys.error(s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install") } + + val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else "" + val commands = Seq( + "bash", "-c", + s"cd $localToolsDir && ./dsdgen -table $name -filter Y -scale $scaleFactor $parallel") + println(commands) + commands.lines } - val fs = FileSystem.get(new java.net.URI(outputDir), new Configuration()) - fs.create(new Path(s"$outputDir/_SUCCESS")).close() - case _ => convertedData.saveAsParquetFile(outputDir) + } + + generatedData.setName(s"$name, sf=$scaleFactor, strings") + + val rows = generatedData.mapPartitions { iter => + val currentRow = new GenericMutableRow(schema.fields.size) + iter.map { l => + (0 until schema.fields.length).foreach(currentRow.setNullAt) + l.split("\\|", -1).zipWithIndex.dropRight(1).foreach { case (f, i) => currentRow(i) = f} + currentRow: Row + } + } + + val stringData = + sqlContext.createDataFrame( + rows, + StructType(schema.fields.map(f => StructField(f.name, StringType)))) + + val convertedData = { + val columns = schema.fields.map { f => + val columnName = new ColumnName(f.name) + columnName.cast(f.dataType).as(f.name) + } + stringData.select(columns: _*) + } + + convertedData } } -} - -case class Tables(sqlContext: SQLContext) { - import sqlContext.implicits._ val tables = Seq( /* This is another large table that we don't build yet. @@ -213,8 +103,8 @@ case class Tables(sqlContext: SQLContext) { 'inv_item_sk .int, 'inv_warehouse_sk .int, 'inv_quantity_on_hand .int),*/ - Table2("store_sales", - PartitionedTable("ss_sold_date_sk"), + Table("store_sales", + partitionColumns = "ss_sold_date_sk" :: Nil, 'ss_sold_date_sk .int, 'ss_sold_time_sk .int, 'ss_item_sk .int, @@ -238,8 +128,8 @@ case class Tables(sqlContext: SQLContext) { 'ss_net_paid .decimal(7,2), 'ss_net_paid_inc_tax .decimal(7,2), 'ss_net_profit .decimal(7,2)), - Table2("customer", - UnpartitionedTable, + Table("customer", + partitionColumns = Nil, 'c_customer_sk .int, 'c_customer_id .string, 'c_current_cdemo_sk .int, @@ -258,8 +148,8 @@ case class Tables(sqlContext: SQLContext) { 'c_login .string, 'c_email_address .string, 'c_last_review_date .string), - Table2("customer_address", - UnpartitionedTable, + Table("customer_address", + partitionColumns = Nil, 'ca_address_sk .int, 'ca_address_id .string, 'ca_street_number .string, @@ -273,8 +163,8 @@ case class Tables(sqlContext: SQLContext) { 'ca_country .string, 'ca_gmt_offset .decimal(5,2), 'ca_location_type .string), - Table2("customer_demographics", - UnpartitionedTable, + Table("customer_demographics", + partitionColumns = Nil, 'cd_demo_sk .int, 'cd_gender .string, 'cd_marital_status .string, @@ -284,8 +174,8 @@ case class Tables(sqlContext: SQLContext) { 'cd_dep_count .int, 'cd_dep_employed_count .int, 'cd_dep_college_count .int), - Table2("date_dim", - UnpartitionedTable, + Table("date_dim", + partitionColumns = Nil, 'd_date_sk .int, 'd_date_id .string, 'd_date .string, @@ -314,15 +204,15 @@ case class Tables(sqlContext: SQLContext) { 'd_current_month .string, 'd_current_quarter .string, 'd_current_year .string), - Table2("household_demographics", - UnpartitionedTable, + Table("household_demographics", + partitionColumns = Nil, 'hd_demo_sk .int, 'hd_income_band_sk .int, 'hd_buy_potential .string, 'hd_dep_count .int, 'hd_vehicle_count .int), - Table2("item", - UnpartitionedTable, + Table("item", + partitionColumns = Nil, 'i_item_sk .int, 'i_item_id .string, 'i_rec_start_date .string, @@ -345,8 +235,8 @@ case class Tables(sqlContext: SQLContext) { 'i_container .string, 'i_manager_id .int, 'i_product_name .string), - Table2("promotion", - UnpartitionedTable, + Table("promotion", + partitionColumns = Nil, 'p_promo_sk .int, 'p_promo_id .string, 'p_start_date_sk .int, @@ -366,8 +256,8 @@ case class Tables(sqlContext: SQLContext) { 'p_channel_details .string, 'p_purpose .string, 'p_discount_active .string), - Table2("store", - UnpartitionedTable, + Table("store", + partitionColumns = Nil, 's_store_sk .int, 's_store_id .string, 's_rec_start_date .string, @@ -397,8 +287,8 @@ case class Tables(sqlContext: SQLContext) { 's_country .string, 's_gmt_offset .decimal(5,2), 's_tax_precentage .decimal(5,2)), - Table2("time_dim", - UnpartitionedTable, + Table("time_dim", + partitionColumns = Nil, 't_time_sk .int, 't_time_id .string, 't_time .int, diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala index 31e969a..063ea55 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala @@ -18,8 +18,9 @@ package com.databricks.spark.sql.perf.tpcds.queries import com.databricks.spark.sql.perf.Benchmark -trait ImpalaKitQueries { - self: Benchmark => +trait ImpalaKitQueries extends Benchmark { + + import ExecutionMode._ // Queries are from // https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries @@ -1026,7 +1027,7 @@ trait ImpalaKitQueries { |from store_sales """.stripMargin) ).map { - case (name, sqlText) => Query(name, sqlText, description = "", collectResults = true) + case (name, sqlText) => Query(name, sqlText, description = "", executionMode = CollectResults) } val queriesMap = queries.map(q => q.name -> q).toMap @@ -1464,8 +1465,8 @@ trait ImpalaKitQueries { | max(ss_promo_sk) as max_ss_promo_sk |from store_sales """.stripMargin) - ).map { - case (name, sqlText) => Query(name, sqlText, description = "original query", collectResults = true) + ).map { case (name, sqlText) => + Query(name, sqlText, description = "original query", executionMode = CollectResults) } val interactiveQueries = diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala index 24b2007..cb26820 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala @@ -18,8 +18,9 @@ package com.databricks.spark.sql.perf.tpcds.queries import com.databricks.spark.sql.perf.Benchmark -trait SimpleQueries { - self: Benchmark => +trait SimpleQueries extends Benchmark { + + import ExecutionMode._ val q7Derived = Seq( ("q7-simpleScan", @@ -138,7 +139,7 @@ trait SimpleQueries { |limit 100 |-- end query 1 in stream 0 using template query7.tpl """.stripMargin) - ).map { - case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", collectResults = false) + ).map { case (name, sqlText) => + Query(name = name, sqlText = sqlText, description = "", executionMode = ForeachResults) } }