Update to compile against Spark 2.0.0-SNAPSHOT and bump version to 0.4.0-SNAPSHOT

Author: Josh Rosen <rosenville@gmail.com>

Closes #51 from JoshRosen/spark-2.0.0.
This commit is contained in:
Josh Rosen 2016-02-19 13:02:29 -08:00 committed by Michael Armbrust
parent 685ed9e488
commit 7e38b77c50
5 changed files with 19 additions and 11 deletions

View File

@ -12,7 +12,7 @@ sparkPackageName := "databricks/spark-sql-perf"
// All Spark Packages need a license
licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0"))
sparkVersion := "1.6.0"
sparkVersion := "2.0.0-SNAPSHOT"
sparkComponents ++= Seq("sql", "hive")
@ -29,6 +29,11 @@ initialCommands in console :=
|import sqlContext.implicits._
""".stripMargin
// TODO: remove after Spark 2.0.0 is released:
resolvers += "apache-snapshots" at "https://repository.apache.org/snapshots/"
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.5"
libraryDependencies += "com.github.scopt" %% "scopt" % "3.3.0"
libraryDependencies += "com.twitter" %% "util-jvm" % "6.23.0" % "provided"

View File

@ -603,7 +603,7 @@ abstract class Benchmark(
case ExecutionMode.CollectResults => dataFrame.rdd.collect()
case ExecutionMode.ForeachResults => dataFrame.rdd.foreach { row => Unit }
case ExecutionMode.WriteParquet(location) =>
dataFrame.saveAsParquetFile(s"$location/$name.parquet")
dataFrame.write.parquet(s"$location/$name.parquet")
case ExecutionMode.HashResults =>
val columnStr = dataFrame.schema.map(_.name).mkString(",")
// SELECT SUM(HASH(col1, col2, ...)) FROM (benchmark query)

View File

@ -20,6 +20,6 @@ import org.apache.spark.sql.SQLContext
case class Results(resultsLocation: String, @transient sqlContext: SQLContext) {
def allResults =
sqlContext.jsonRDD(
sqlContext.read.json(
sqlContext.sparkContext.textFile(s"$resultsLocation/*/"))
}

View File

@ -18,14 +18,17 @@ package com.databricks.spark.sql.perf.tpcds
import scala.sys.process._
import org.apache.spark.Logging
import org.slf4j.LoggerFactory
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext, SaveMode}
class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extends Serializable with Logging {
class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extends Serializable {
import sqlContext.implicits._
private val log = LoggerFactory.getLogger(getClass)
def sparkContext = sqlContext.sparkContext
val dsdgen = s"$dsdgenDir/dsdgen"
@ -149,7 +152,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
""".stripMargin
val grouped = sqlContext.sql(query)
println(s"Pre-clustering with partitioning columns with query $query.")
logInfo(s"Pre-clustering with partitioning columns with query $query.")
log.info(s"Pre-clustering with partitioning columns with query $query.")
grouped.write
} else {
data.write
@ -163,7 +166,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
writer.partitionBy(partitionColumns : _*)
}
println(s"Generating table $name in database to $location with save mode $mode.")
logInfo(s"Generating table $name in database to $location with save mode $mode.")
log.info(s"Generating table $name in database to $location with save mode $mode.")
writer.save(location)
sqlContext.dropTempTable(tempTableName)
}
@ -176,14 +179,14 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
}
if (!tableExists || overwrite) {
println(s"Creating external table $name in database $databaseName using data stored in $location.")
logInfo(s"Creating external table $name in database $databaseName using data stored in $location.")
log.info(s"Creating external table $name in database $databaseName using data stored in $location.")
sqlContext.createExternalTable(qualifiedTableName, location, format)
}
}
def createTemporaryTable(location: String, format: String): Unit = {
println(s"Creating temporary table $name using data stored in $location.")
logInfo(s"Creating temporary table $name using data stored in $location.")
log.info(s"Creating temporary table $name using data stored in $location.")
sqlContext.read.format(format).load(location).registerTempTable(name)
}
}
@ -237,7 +240,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend
}
sqlContext.sql(s"USE $databaseName")
println(s"The current database has been set to $databaseName.")
logInfo(s"The current database has been set to $databaseName.")
log.info(s"The current database has been set to $databaseName.")
}
def createTemporaryTables(location: String, format: String, tableFilter: String = ""): Unit = {

View File

@ -1 +1 @@
version in ThisBuild := "0.3.3-SNAPSHOT"
version in ThisBuild := "0.4.0-SNAPSHOT"