Merge remote-tracking branch 'origin/master' into refactor
Conflicts: src/main/scala/com/databricks/spark/sql/perf/bigdata/Queries.scala src/main/scala/com/databricks/spark/sql/perf/query.scala src/main/scala/com/databricks/spark/sql/perf/runBenchmarks.scala src/main/scala/com/databricks/spark/sql/perf/table.scala src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/ImpalaKitQueries.scala src/main/scala/com/databricks/spark/sql/perf/tpcds/queries/SimpleQueries.scala
This commit is contained in:
commit
51b9dcb5b5
@ -5,6 +5,8 @@ import org.apache.spark.sql.{Row, SQLContext}
|
||||
trait AggregationPerformance extends Benchmark {
|
||||
|
||||
import sqlContext.implicits._
|
||||
import ExecutionMode._
|
||||
|
||||
|
||||
val sizes = (1 to 6).map(math.pow(10, _).toInt).toSeq
|
||||
|
||||
@ -32,7 +34,7 @@ trait AggregationPerformance extends Benchmark {
|
||||
s"avg-$table",
|
||||
s"SELECT AVG(b) FROM $table GROUP BY a",
|
||||
"an average with a varying number of groups",
|
||||
collectResults = false)
|
||||
executionMode = ForeachResults)
|
||||
}
|
||||
|
||||
val twoGroupsAvg: Seq[Query] = lowCardinality.map(_.name).map { table =>
|
||||
@ -40,7 +42,7 @@ trait AggregationPerformance extends Benchmark {
|
||||
s"avg-$table",
|
||||
s"SELECT AVG(b) FROM $table GROUP BY a",
|
||||
"an average on an int column with only two groups",
|
||||
collectResults = false)
|
||||
executionMode = ForeachResults)
|
||||
}
|
||||
|
||||
val complexInput =
|
||||
@ -49,7 +51,7 @@ trait AggregationPerformance extends Benchmark {
|
||||
s"aggregation-complex-input-$table",
|
||||
s"SELECT SUM(id + id + id + id + id + id + id + id + id + id) FROM $table",
|
||||
"Sum of 9 columns added together",
|
||||
collectResults = true)
|
||||
executionMode = CollectResults)
|
||||
}
|
||||
|
||||
val aggregates =
|
||||
@ -59,7 +61,7 @@ trait AggregationPerformance extends Benchmark {
|
||||
s"single-aggregate-$agg-$table",
|
||||
s"SELECT $agg(id) FROM $table",
|
||||
"aggregation of a single column",
|
||||
collectResults = true)
|
||||
executionMode = CollectResults)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -333,21 +333,33 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
|
||||
""".stripMargin
|
||||
}
|
||||
|
||||
trait ExecutionMode
|
||||
object ExecutionMode {
|
||||
// Benchmark run by collecting queries results (e.g. rdd.collect())
|
||||
case object CollectResults extends ExecutionMode
|
||||
|
||||
// Benchmark run by iterating through the queries results rows (e.g. rdd.foreach(row => Unit))
|
||||
case object ForeachResults extends ExecutionMode
|
||||
|
||||
// Benchmark run by saving the output of each query as a parquet file at the specified location
|
||||
case class WriteParquet(location: String) extends ExecutionMode
|
||||
}
|
||||
|
||||
/** Factory object for benchmark queries. */
|
||||
object Query {
|
||||
def apply(
|
||||
name: String,
|
||||
sqlText: String,
|
||||
description: String,
|
||||
collectResults: Boolean = true): Query = {
|
||||
new Query(name, sqlContext.sql(sqlText), description, collectResults, Some(sqlText))
|
||||
executionMode: ExecutionMode = ExecutionMode.ForeachResults): Query = {
|
||||
new Query(name, sqlContext.sql(sqlText), description, Some(sqlText), executionMode)
|
||||
}
|
||||
|
||||
def apply(
|
||||
name: String,
|
||||
dataFrameBuilder: => DataFrame,
|
||||
description: String): Query = {
|
||||
new Query(name, dataFrameBuilder, description, true, None)
|
||||
new Query(name, dataFrameBuilder, description, None, ExecutionMode.CollectResults)
|
||||
}
|
||||
}
|
||||
|
||||
@ -356,8 +368,8 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
|
||||
val name: String,
|
||||
buildDataFrame: => DataFrame,
|
||||
val description: String,
|
||||
val collectResults: Boolean,
|
||||
val sqlText: Option[String]) {
|
||||
val sqlText: Option[String],
|
||||
val executionMode: ExecutionMode) {
|
||||
|
||||
override def toString =
|
||||
s"""
|
||||
@ -415,13 +427,13 @@ abstract class Benchmark(@transient protected val sqlContext: SQLContext)
|
||||
}
|
||||
|
||||
// The executionTime for the entire query includes the time of type conversion from catalyst to scala.
|
||||
val executionTime = if (collectResults) {
|
||||
benchmarkMs {
|
||||
dataFrame.rdd.collect()
|
||||
}
|
||||
} else {
|
||||
benchmarkMs {
|
||||
dataFrame.rdd.foreach { row => Unit }
|
||||
// The executionTime for the entire query includes the time of type conversion
|
||||
// from catalyst to scala.
|
||||
val executionTime = benchmarkMs {
|
||||
executionMode match {
|
||||
case ExecutionMode.CollectResults => dataFrame.rdd.collect()
|
||||
case ExecutionMode.ForeachResults => dataFrame.rdd.foreach { row => Unit }
|
||||
case ExecutionMode.WriteParquet(location) => dataFrame.saveAsParquetFile(s"$location/$name.parquet")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -5,6 +5,8 @@ import org.apache.spark.sql.SQLContext
|
||||
trait JoinPerformance extends Benchmark {
|
||||
// 1.5 mb, 1 file
|
||||
|
||||
import ExecutionMode._
|
||||
|
||||
val x = Table(
|
||||
"1milints",
|
||||
sqlContext.range(0, 1000000)
|
||||
@ -36,7 +38,7 @@ trait JoinPerformance extends Benchmark {
|
||||
s"singleKey-$join-$table1-$table2",
|
||||
s"SELECT COUNT(*) FROM $table1 a $join $table2 b ON a.id = b.id",
|
||||
"equi-inner join a small table with a big table using a single key.",
|
||||
collectResults = true)
|
||||
executionMode = CollectResults)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,38 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf.bigdata
|
||||
|
||||
import com.databricks.spark.sql.perf._
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.parquet.TPCDSTableForTest
|
||||
import org.apache.spark.sql.{Column, SQLContext}
|
||||
|
||||
class BigDataBenchmark (
|
||||
@transient sqlContext: SQLContext,
|
||||
dataLocation: String,
|
||||
val tables: Seq[Table2],
|
||||
scaleFactor: String)
|
||||
extends Benchmark(sqlContext) with Serializable with TableCreator {
|
||||
import sqlContext._
|
||||
import sqlContext.implicits._
|
||||
|
||||
override def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest] = {
|
||||
tables.map(table =>
|
||||
BigDataTableForTest(table, dataLocation, scaleFactor, sqlContext))
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,8 +18,9 @@ package com.databricks.spark.sql.perf.bigdata
|
||||
|
||||
import com.databricks.spark.sql.perf.Benchmark
|
||||
|
||||
trait Queries {
|
||||
self: Benchmark =>
|
||||
trait Queries extends Benchmark {
|
||||
|
||||
import ExecutionMode._
|
||||
|
||||
val queries1to3 = Seq(
|
||||
Query(
|
||||
@ -34,7 +35,7 @@ trait Queries {
|
||||
| pageRank > 1000
|
||||
""".stripMargin,
|
||||
description = "",
|
||||
collectResults = false),
|
||||
executionMode = ForeachResults),
|
||||
|
||||
Query(
|
||||
name = "q1B",
|
||||
@ -48,7 +49,7 @@ trait Queries {
|
||||
| pageRank > 100
|
||||
""".stripMargin,
|
||||
description = "",
|
||||
collectResults = false),
|
||||
executionMode = ForeachResults),
|
||||
|
||||
Query(
|
||||
name = "q1C",
|
||||
@ -62,7 +63,7 @@ trait Queries {
|
||||
| pageRank > 10
|
||||
""".stripMargin,
|
||||
description = "",
|
||||
collectResults = false),
|
||||
executionMode = ForeachResults),
|
||||
|
||||
Query(
|
||||
name = "q2A",
|
||||
@ -76,7 +77,7 @@ trait Queries {
|
||||
| SUBSTR(sourceIP, 1, 8)
|
||||
""".stripMargin,
|
||||
description = "",
|
||||
collectResults = false),
|
||||
executionMode = ForeachResults),
|
||||
|
||||
Query(
|
||||
name = "q2B",
|
||||
@ -90,7 +91,7 @@ trait Queries {
|
||||
| SUBSTR(sourceIP, 1, 10)
|
||||
""".stripMargin,
|
||||
description = "",
|
||||
collectResults = false),
|
||||
executionMode = ForeachResults),
|
||||
|
||||
Query(
|
||||
name = "q2C",
|
||||
@ -104,7 +105,7 @@ trait Queries {
|
||||
| SUBSTR(sourceIP, 1, 12)
|
||||
""".stripMargin,
|
||||
description = "",
|
||||
collectResults = false),
|
||||
executionMode = ForeachResults),
|
||||
|
||||
Query(
|
||||
name = "q3A",
|
||||
@ -123,7 +124,7 @@ trait Queries {
|
||||
|ORDER BY totalRevenue DESC LIMIT 1
|
||||
""".stripMargin,
|
||||
description = "",
|
||||
collectResults = false),
|
||||
executionMode = ForeachResults),
|
||||
|
||||
Query(
|
||||
name = "q3B",
|
||||
@ -142,7 +143,7 @@ trait Queries {
|
||||
|ORDER BY totalRevenue DESC LIMIT 1
|
||||
""".stripMargin,
|
||||
description = "",
|
||||
collectResults = false),
|
||||
executionMode = ForeachResults),
|
||||
|
||||
Query(
|
||||
name = "q3C",
|
||||
@ -160,6 +161,6 @@ trait Queries {
|
||||
|ORDER BY totalRevenue DESC LIMIT 1
|
||||
""".stripMargin,
|
||||
description = "",
|
||||
collectResults = false)
|
||||
executionMode = ForeachResults)
|
||||
)
|
||||
}
|
||||
|
||||
@ -1,79 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf.bigdata
|
||||
|
||||
// This is a hack until parquet has better support for partitioning.
|
||||
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
|
||||
import com.databricks.spark.sql.perf._
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
|
||||
import org.apache.hadoop.mapreduce.{Job, OutputCommitter, RecordWriter, TaskAttemptContext}
|
||||
import org.apache.spark.SerializableWritable
|
||||
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
import org.apache.spark.sql.{Column, ColumnName, SQLContext}
|
||||
import parquet.hadoop.ParquetOutputFormat
|
||||
import parquet.hadoop.util.ContextUtil
|
||||
|
||||
import scala.sys.process._
|
||||
|
||||
case class BigDataTableForTest(
|
||||
table: Table2,
|
||||
baseDir: String,
|
||||
scaleFactor: String,
|
||||
@transient sqlContext: SQLContext)
|
||||
extends TableForTest(table, baseDir, sqlContext) with Serializable {
|
||||
|
||||
@transient val sparkContext = sqlContext.sparkContext
|
||||
|
||||
override def generate(): Unit =
|
||||
throw new UnsupportedOperationException(
|
||||
"Generate data for BigDataBenchmark has not been implemented")
|
||||
}
|
||||
|
||||
case class Tables(sqlContext: SQLContext) {
|
||||
import sqlContext.implicits._
|
||||
|
||||
val tables = Seq(
|
||||
Table2("rankings",
|
||||
UnpartitionedTable,
|
||||
'pageURL .string,
|
||||
'pageRank .int,
|
||||
'avgDuration .int),
|
||||
|
||||
Table2("uservisits",
|
||||
UnpartitionedTable,
|
||||
'sourceIP .string,
|
||||
'destURL .string,
|
||||
'visitDate .string,
|
||||
'adRevenue .double,
|
||||
'userAgent .string,
|
||||
'countryCode .string,
|
||||
'languageCode .string,
|
||||
'searchWord .string,
|
||||
'duration .int),
|
||||
|
||||
Table2("documents",
|
||||
UnpartitionedTable,
|
||||
'line .string)
|
||||
)
|
||||
}
|
||||
@ -1,29 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import org.apache.spark.sql.{DataFrame, SQLContext}
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
|
||||
|
||||
trait QuerySet {
|
||||
val sqlContext: SQLContext
|
||||
def sparkContext = sqlContext.sparkContext
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
@ -1,90 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Databricks Inc.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.databricks.spark.sql.perf
|
||||
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{Path, FileSystem}
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
|
||||
import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext, RecordWriter, Job}
|
||||
import org.apache.spark.SerializableWritable
|
||||
import org.apache.spark.sql.{SQLContext, Column}
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.hive.HiveMetastoreTypes
|
||||
import org.apache.spark.sql.types._
|
||||
import parquet.hadoop.ParquetOutputFormat
|
||||
import parquet.hadoop.util.ContextUtil
|
||||
|
||||
trait TableCreator {
|
||||
|
||||
def tables: Seq[Table2]
|
||||
|
||||
def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest]
|
||||
|
||||
val tablesForTest: Seq[TableForTest] = createTablesForTest(tables)
|
||||
|
||||
def checkData(): Unit = {
|
||||
tablesForTest.foreach { table =>
|
||||
val fs = FileSystem.get(new java.net.URI(table.outputDir), new Configuration())
|
||||
val exists = fs.exists(new Path(table.outputDir))
|
||||
val wasSuccessful = fs.exists(new Path(s"${table.outputDir}/_SUCCESS"))
|
||||
|
||||
if (!wasSuccessful) {
|
||||
if (exists) {
|
||||
println(s"Table '${table.name}' not generated successfully, regenerating.")
|
||||
} else {
|
||||
println(s"Table '${table.name}' does not exist, generating.")
|
||||
}
|
||||
fs.delete(new Path(table.outputDir), true)
|
||||
table.generate()
|
||||
} else {
|
||||
println(s"Table ${table.name} already exists.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
abstract class TableType
|
||||
case object UnpartitionedTable extends TableType
|
||||
case class PartitionedTable(partitionColumn: String) extends TableType
|
||||
|
||||
case class Table2(name: String, tableType: TableType, fields: StructField*)
|
||||
|
||||
abstract class TableForTest(
|
||||
table: Table2,
|
||||
baseDir: String,
|
||||
@transient sqlContext: SQLContext) extends Serializable {
|
||||
|
||||
val schema = StructType(table.fields)
|
||||
|
||||
val name = table.name
|
||||
|
||||
val outputDir = s"$baseDir/parquet/$name"
|
||||
|
||||
def fromCatalog = sqlContext.table(name)
|
||||
|
||||
def stats =
|
||||
fromCatalog.select(
|
||||
lit(name) as "tableName",
|
||||
count("*") as "numRows",
|
||||
lit(fromCatalog.queryExecution.optimizedPlan.statistics.sizeInBytes.toLong) as "sizeInBytes")
|
||||
|
||||
def generate()
|
||||
}
|
||||
@ -17,9 +17,7 @@
|
||||
package com.databricks.spark.sql.perf.tpcds
|
||||
|
||||
import com.databricks.spark.sql.perf._
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.parquet.TPCDSTableForTest
|
||||
import org.apache.spark.sql.{Column, SQLContext}
|
||||
import org.apache.spark.sql.SQLContext
|
||||
|
||||
/**
|
||||
* TPC-DS benchmark's dataset.
|
||||
@ -27,8 +25,6 @@ import org.apache.spark.sql.{Column, SQLContext}
|
||||
* @param sparkVersion The version of Spark.
|
||||
* @param dataLocation The location of the dataset used by this experiment.
|
||||
* @param dsdgenDir The location of dsdgen in every worker machine.
|
||||
* @param resultsLocation The location of performance results.
|
||||
* @param tables Tables that will be used in this experiment.
|
||||
* @param scaleFactor The scale factor of the dataset. For some benchmarks like TPC-H
|
||||
* and TPC-DS, the scale factor is a number roughly representing the
|
||||
* size of raw data files. For some other benchmarks, the scale factor
|
||||
@ -39,27 +35,16 @@ class TPCDS (
|
||||
sparkVersion: String,
|
||||
dataLocation: String,
|
||||
dsdgenDir: String,
|
||||
val tables: Seq[Table2],
|
||||
scaleFactor: String,
|
||||
userSpecifiedBaseDir: Option[String] = None)
|
||||
extends Benchmark(sqlContext) with TableCreator with Serializable {
|
||||
extends Benchmark(sqlContext) with Serializable {
|
||||
import sqlContext._
|
||||
import sqlContext.implicits._
|
||||
|
||||
lazy val baseDir =
|
||||
userSpecifiedBaseDir.getOrElse(s"$dataLocation/scaleFactor=$scaleFactor/useDecimal=true")
|
||||
|
||||
override def createTablesForTest(tables: Seq[Table2]): Seq[TableForTest] = {
|
||||
tables.map(table =>
|
||||
TPCDSTableForTest(table, baseDir, scaleFactor.toInt, dsdgenDir, sqlContext))
|
||||
}
|
||||
|
||||
/*
|
||||
override def setup(): Unit = {
|
||||
super.setup()
|
||||
setupBroadcast()
|
||||
}
|
||||
|
||||
def setupBroadcast(skipTables: Seq[String] = Seq("store_sales", "customer")) = {
|
||||
val skipExpr = skipTables.map(t => !('tableName === t)).reduceLeft[Column](_ && _)
|
||||
val threshold =
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.parquet // This is a hack until parquet has better support for partitioning.
|
||||
package com.databricks.spark.sql.perf.tpcds
|
||||
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
@ -37,173 +37,63 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
import parquet.hadoop.ParquetOutputFormat
|
||||
import parquet.hadoop.util.ContextUtil
|
||||
|
||||
class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) {
|
||||
import sqlContext.implicits._
|
||||
|
||||
case class TPCDSTableForTest(
|
||||
table: Table2,
|
||||
baseDir: String,
|
||||
scaleFactor: Int,
|
||||
dsdgenDir: String,
|
||||
@transient sqlContext: SQLContext,
|
||||
maxRowsPerPartitions: Int = 20 * 1000 * 1000)
|
||||
extends TableForTest(table, baseDir, sqlContext) with Serializable with SparkHadoopMapReduceUtil {
|
||||
|
||||
@transient val sparkContext = sqlContext.sparkContext
|
||||
|
||||
def sparkContext = sqlContext.sparkContext
|
||||
val dsdgen = s"$dsdgenDir/dsdgen"
|
||||
|
||||
override def generate(): Unit = {
|
||||
val partitions = table.tableType match {
|
||||
case PartitionedTable(_) => scaleFactor
|
||||
case _ => 1
|
||||
}
|
||||
case class Table(name: String, partitionColumns: Seq[String], fields: StructField*) {
|
||||
val schema = StructType(fields)
|
||||
val partitions = if (partitionColumns.isEmpty) 1 else 100
|
||||
|
||||
val generatedData = {
|
||||
sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
|
||||
val localToolsDir = if (new java.io.File(dsdgen).exists) {
|
||||
dsdgenDir
|
||||
} else if (new java.io.File(s"/$dsdgen").exists) {
|
||||
s"/$dsdgenDir"
|
||||
} else {
|
||||
sys.error(s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
|
||||
}
|
||||
|
||||
val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
|
||||
val commands = Seq(
|
||||
"bash", "-c",
|
||||
s"cd $localToolsDir && ./dsdgen -table ${table.name} -filter Y -scale $scaleFactor $parallel")
|
||||
println(commands)
|
||||
commands.lines
|
||||
}
|
||||
}
|
||||
|
||||
generatedData.setName(s"${table.name}, sf=$scaleFactor, strings")
|
||||
|
||||
val rows = generatedData.mapPartitions { iter =>
|
||||
val currentRow = new GenericMutableRow(schema.fields.size)
|
||||
iter.map { l =>
|
||||
(0 until schema.fields.length).foreach(currentRow.setNullAt)
|
||||
l.split("\\|", -1).zipWithIndex.dropRight(1).foreach { case (f, i) => currentRow(i) = f}
|
||||
currentRow: Row
|
||||
}
|
||||
}
|
||||
|
||||
val stringData =
|
||||
sqlContext.createDataFrame(
|
||||
rows,
|
||||
StructType(schema.fields.map(f => StructField(f.name, StringType))))
|
||||
|
||||
val convertedData = {
|
||||
val columns = schema.fields.map { f =>
|
||||
val columnName = new ColumnName(f.name)
|
||||
columnName.cast(f.dataType).as(f.name)
|
||||
}
|
||||
stringData.select(columns: _*)
|
||||
}
|
||||
|
||||
table.tableType match {
|
||||
// This is an awful hack... spark sql parquet should support this natively.
|
||||
case PartitionedTable(partitioningColumn) =>
|
||||
sqlContext.setConf("spark.sql.planner.externalSort", "true")
|
||||
val output = convertedData.queryExecution.analyzed.output
|
||||
val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
|
||||
|
||||
|
||||
//HAX
|
||||
val writeSupport =
|
||||
// if (schema.fields.map(_.dataType).forall(_.isPrimitive)) {
|
||||
// classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport]
|
||||
// } else {
|
||||
classOf[org.apache.spark.sql.parquet.RowWriteSupport]
|
||||
// }
|
||||
|
||||
ParquetOutputFormat.setWriteSupportClass(job, writeSupport)
|
||||
|
||||
val conf = new SerializableWritable(ContextUtil.getConfiguration(job))
|
||||
org.apache.spark.sql.parquet.RowWriteSupport.setSchema(schema.toAttributes, conf.value)
|
||||
|
||||
val partColumnAttr =
|
||||
BindReferences.bindReference[Expression](
|
||||
output.find(_.name == partitioningColumn).get,
|
||||
output)
|
||||
|
||||
|
||||
// TODO: clusterBy would be faster than orderBy
|
||||
val orderedConvertedData =
|
||||
convertedData.filter(new Column(partitioningColumn) isNotNull).orderBy(Column(partitioningColumn) asc)
|
||||
orderedConvertedData.queryExecution.toRdd.foreachPartition { iter =>
|
||||
var writer: RecordWriter[Void, Row] = null
|
||||
val getPartition = new InterpretedMutableProjection(Seq(partColumnAttr))
|
||||
var currentPartition: Row = null
|
||||
var hadoopContext: TaskAttemptContext = null
|
||||
var committer: OutputCommitter = null
|
||||
|
||||
var rowCount = 0
|
||||
var partition = 0
|
||||
|
||||
while (iter.hasNext) {
|
||||
val currentRow = iter.next()
|
||||
|
||||
rowCount += 1
|
||||
if (rowCount >= maxRowsPerPartitions) {
|
||||
rowCount = 0
|
||||
partition += 1
|
||||
println(s"Starting partition $partition")
|
||||
if (writer != null) {
|
||||
writer.close(hadoopContext)
|
||||
committer.commitTask(hadoopContext)
|
||||
}
|
||||
writer = null
|
||||
}
|
||||
|
||||
if ((getPartition(currentRow) != currentPartition || writer == null) &&
|
||||
!getPartition.currentValue.isNullAt(0)) {
|
||||
rowCount = 0
|
||||
currentPartition = getPartition.currentValue.copy()
|
||||
if (writer != null) {
|
||||
writer.close(hadoopContext)
|
||||
committer.commitTask(hadoopContext)
|
||||
}
|
||||
|
||||
val job = new Job(conf.value)
|
||||
val keyType = classOf[Void]
|
||||
job.setOutputKeyClass(keyType)
|
||||
job.setOutputValueClass(classOf[Row])
|
||||
NewFileOutputFormat.setOutputPath(
|
||||
job,
|
||||
new Path(s"$outputDir/$partitioningColumn=${currentPartition(0)}"))
|
||||
val wrappedConf = new SerializableWritable(job.getConfiguration)
|
||||
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
|
||||
val jobtrackerID = formatter.format(new Date())
|
||||
val stageId = partition
|
||||
|
||||
val attemptNumber = 1
|
||||
/* "reduce task" <split #> <attempt # = spark task #> */
|
||||
val attemptId = newTaskAttemptID(jobtrackerID, partition, isMap = false, partition, attemptNumber)
|
||||
hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
|
||||
val format = new ParquetOutputFormat[Row]
|
||||
committer = format.getOutputCommitter(hadoopContext)
|
||||
committer.setupTask(hadoopContext)
|
||||
writer = format.getRecordWriter(hadoopContext)
|
||||
|
||||
}
|
||||
if (!getPartition.currentValue.isNullAt(0)) {
|
||||
writer.write(null, currentRow)
|
||||
}
|
||||
}
|
||||
if (writer != null) {
|
||||
writer.close(hadoopContext)
|
||||
committer.commitTask(hadoopContext)
|
||||
def df = {
|
||||
val generatedData = {
|
||||
sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
|
||||
val localToolsDir = if (new java.io.File(dsdgen).exists) {
|
||||
dsdgenDir
|
||||
} else if (new java.io.File(s"/$dsdgen").exists) {
|
||||
s"/$dsdgenDir"
|
||||
} else {
|
||||
sys.error(s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
|
||||
}
|
||||
|
||||
val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
|
||||
val commands = Seq(
|
||||
"bash", "-c",
|
||||
s"cd $localToolsDir && ./dsdgen -table $name -filter Y -scale $scaleFactor $parallel")
|
||||
println(commands)
|
||||
commands.lines
|
||||
}
|
||||
val fs = FileSystem.get(new java.net.URI(outputDir), new Configuration())
|
||||
fs.create(new Path(s"$outputDir/_SUCCESS")).close()
|
||||
case _ => convertedData.saveAsParquetFile(outputDir)
|
||||
}
|
||||
|
||||
generatedData.setName(s"$name, sf=$scaleFactor, strings")
|
||||
|
||||
val rows = generatedData.mapPartitions { iter =>
|
||||
val currentRow = new GenericMutableRow(schema.fields.size)
|
||||
iter.map { l =>
|
||||
(0 until schema.fields.length).foreach(currentRow.setNullAt)
|
||||
l.split("\\|", -1).zipWithIndex.dropRight(1).foreach { case (f, i) => currentRow(i) = f}
|
||||
currentRow: Row
|
||||
}
|
||||
}
|
||||
|
||||
val stringData =
|
||||
sqlContext.createDataFrame(
|
||||
rows,
|
||||
StructType(schema.fields.map(f => StructField(f.name, StringType))))
|
||||
|
||||
val convertedData = {
|
||||
val columns = schema.fields.map { f =>
|
||||
val columnName = new ColumnName(f.name)
|
||||
columnName.cast(f.dataType).as(f.name)
|
||||
}
|
||||
stringData.select(columns: _*)
|
||||
}
|
||||
|
||||
convertedData
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case class Tables(sqlContext: SQLContext) {
|
||||
import sqlContext.implicits._
|
||||
|
||||
val tables = Seq(
|
||||
/* This is another large table that we don't build yet.
|
||||
@ -213,8 +103,8 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'inv_item_sk .int,
|
||||
'inv_warehouse_sk .int,
|
||||
'inv_quantity_on_hand .int),*/
|
||||
Table2("store_sales",
|
||||
PartitionedTable("ss_sold_date_sk"),
|
||||
Table("store_sales",
|
||||
partitionColumns = "ss_sold_date_sk" :: Nil,
|
||||
'ss_sold_date_sk .int,
|
||||
'ss_sold_time_sk .int,
|
||||
'ss_item_sk .int,
|
||||
@ -238,8 +128,8 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'ss_net_paid .decimal(7,2),
|
||||
'ss_net_paid_inc_tax .decimal(7,2),
|
||||
'ss_net_profit .decimal(7,2)),
|
||||
Table2("customer",
|
||||
UnpartitionedTable,
|
||||
Table("customer",
|
||||
partitionColumns = Nil,
|
||||
'c_customer_sk .int,
|
||||
'c_customer_id .string,
|
||||
'c_current_cdemo_sk .int,
|
||||
@ -258,8 +148,8 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'c_login .string,
|
||||
'c_email_address .string,
|
||||
'c_last_review_date .string),
|
||||
Table2("customer_address",
|
||||
UnpartitionedTable,
|
||||
Table("customer_address",
|
||||
partitionColumns = Nil,
|
||||
'ca_address_sk .int,
|
||||
'ca_address_id .string,
|
||||
'ca_street_number .string,
|
||||
@ -273,8 +163,8 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'ca_country .string,
|
||||
'ca_gmt_offset .decimal(5,2),
|
||||
'ca_location_type .string),
|
||||
Table2("customer_demographics",
|
||||
UnpartitionedTable,
|
||||
Table("customer_demographics",
|
||||
partitionColumns = Nil,
|
||||
'cd_demo_sk .int,
|
||||
'cd_gender .string,
|
||||
'cd_marital_status .string,
|
||||
@ -284,8 +174,8 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'cd_dep_count .int,
|
||||
'cd_dep_employed_count .int,
|
||||
'cd_dep_college_count .int),
|
||||
Table2("date_dim",
|
||||
UnpartitionedTable,
|
||||
Table("date_dim",
|
||||
partitionColumns = Nil,
|
||||
'd_date_sk .int,
|
||||
'd_date_id .string,
|
||||
'd_date .string,
|
||||
@ -314,15 +204,15 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'd_current_month .string,
|
||||
'd_current_quarter .string,
|
||||
'd_current_year .string),
|
||||
Table2("household_demographics",
|
||||
UnpartitionedTable,
|
||||
Table("household_demographics",
|
||||
partitionColumns = Nil,
|
||||
'hd_demo_sk .int,
|
||||
'hd_income_band_sk .int,
|
||||
'hd_buy_potential .string,
|
||||
'hd_dep_count .int,
|
||||
'hd_vehicle_count .int),
|
||||
Table2("item",
|
||||
UnpartitionedTable,
|
||||
Table("item",
|
||||
partitionColumns = Nil,
|
||||
'i_item_sk .int,
|
||||
'i_item_id .string,
|
||||
'i_rec_start_date .string,
|
||||
@ -345,8 +235,8 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'i_container .string,
|
||||
'i_manager_id .int,
|
||||
'i_product_name .string),
|
||||
Table2("promotion",
|
||||
UnpartitionedTable,
|
||||
Table("promotion",
|
||||
partitionColumns = Nil,
|
||||
'p_promo_sk .int,
|
||||
'p_promo_id .string,
|
||||
'p_start_date_sk .int,
|
||||
@ -366,8 +256,8 @@ case class Tables(sqlContext: SQLContext) {
|
||||
'p_channel_details .string,
|
||||
'p_purpose .string,
|
||||
'p_discount_active .string),
|
||||
Table2("store",
|
||||
UnpartitionedTable,
|
||||
Table("store",
|
||||
partitionColumns = Nil,
|
||||
's_store_sk .int,
|
||||
's_store_id .string,
|
||||
's_rec_start_date .string,
|
||||
@ -397,8 +287,8 @@ case class Tables(sqlContext: SQLContext) {
|
||||
's_country .string,
|
||||
's_gmt_offset .decimal(5,2),
|
||||
's_tax_precentage .decimal(5,2)),
|
||||
Table2("time_dim",
|
||||
UnpartitionedTable,
|
||||
Table("time_dim",
|
||||
partitionColumns = Nil,
|
||||
't_time_sk .int,
|
||||
't_time_id .string,
|
||||
't_time .int,
|
||||
|
||||
@ -18,8 +18,9 @@ package com.databricks.spark.sql.perf.tpcds.queries
|
||||
|
||||
import com.databricks.spark.sql.perf.Benchmark
|
||||
|
||||
trait ImpalaKitQueries {
|
||||
self: Benchmark =>
|
||||
trait ImpalaKitQueries extends Benchmark {
|
||||
|
||||
import ExecutionMode._
|
||||
|
||||
// Queries are from
|
||||
// https://github.com/cloudera/impala-tpcds-kit/tree/master/queries-sql92-modified/queries
|
||||
@ -1026,7 +1027,7 @@ trait ImpalaKitQueries {
|
||||
|from store_sales
|
||||
""".stripMargin)
|
||||
).map {
|
||||
case (name, sqlText) => Query(name, sqlText, description = "", collectResults = true)
|
||||
case (name, sqlText) => Query(name, sqlText, description = "", executionMode = CollectResults)
|
||||
}
|
||||
val queriesMap = queries.map(q => q.name -> q).toMap
|
||||
|
||||
@ -1464,8 +1465,8 @@ trait ImpalaKitQueries {
|
||||
| max(ss_promo_sk) as max_ss_promo_sk
|
||||
|from store_sales
|
||||
""".stripMargin)
|
||||
).map {
|
||||
case (name, sqlText) => Query(name, sqlText, description = "original query", collectResults = true)
|
||||
).map { case (name, sqlText) =>
|
||||
Query(name, sqlText, description = "original query", executionMode = CollectResults)
|
||||
}
|
||||
|
||||
val interactiveQueries =
|
||||
|
||||
@ -18,8 +18,9 @@ package com.databricks.spark.sql.perf.tpcds.queries
|
||||
|
||||
import com.databricks.spark.sql.perf.Benchmark
|
||||
|
||||
trait SimpleQueries {
|
||||
self: Benchmark =>
|
||||
trait SimpleQueries extends Benchmark {
|
||||
|
||||
import ExecutionMode._
|
||||
|
||||
val q7Derived = Seq(
|
||||
("q7-simpleScan",
|
||||
@ -138,7 +139,7 @@ trait SimpleQueries {
|
||||
|limit 100
|
||||
|-- end query 1 in stream 0 using template query7.tpl
|
||||
""".stripMargin)
|
||||
).map {
|
||||
case (name, sqlText) => Query(name = name, sqlText = sqlText, description = "", collectResults = false)
|
||||
).map { case (name, sqlText) =>
|
||||
Query(name = name, sqlText = sqlText, description = "", executionMode = ForeachResults)
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user