From 7e38b77c507e4b1d0ca392f36820013772412a38 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 19 Feb 2016 13:02:29 -0800 Subject: [PATCH] Update to compile against Spark 2.0.0-SNAPSHOT and bump version to 0.4.0-SNAPSHOT Author: Josh Rosen Closes #51 from JoshRosen/spark-2.0.0. --- build.sbt | 7 ++++++- .../databricks/spark/sql/perf/Benchmark.scala | 2 +- .../spark/sql/perf/handleResults.scala | 2 +- .../spark/sql/perf/tpcds/Tables.scala | 17 ++++++++++------- version.sbt | 2 +- 5 files changed, 19 insertions(+), 11 deletions(-) diff --git a/build.sbt b/build.sbt index 6846224..f34eb63 100644 --- a/build.sbt +++ b/build.sbt @@ -12,7 +12,7 @@ sparkPackageName := "databricks/spark-sql-perf" // All Spark Packages need a license licenses := Seq("Apache-2.0" -> url("http://opensource.org/licenses/Apache-2.0")) -sparkVersion := "1.6.0" +sparkVersion := "2.0.0-SNAPSHOT" sparkComponents ++= Seq("sql", "hive") @@ -29,6 +29,11 @@ initialCommands in console := |import sqlContext.implicits._ """.stripMargin +// TODO: remove after Spark 2.0.0 is released: +resolvers += "apache-snapshots" at "https://repository.apache.org/snapshots/" + +libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.5" + libraryDependencies += "com.github.scopt" %% "scopt" % "3.3.0" libraryDependencies += "com.twitter" %% "util-jvm" % "6.23.0" % "provided" diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index 9e25e12..1676f14 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -603,7 +603,7 @@ abstract class Benchmark( case ExecutionMode.CollectResults => dataFrame.rdd.collect() case ExecutionMode.ForeachResults => dataFrame.rdd.foreach { row => Unit } case ExecutionMode.WriteParquet(location) => - dataFrame.saveAsParquetFile(s"$location/$name.parquet") + dataFrame.write.parquet(s"$location/$name.parquet") case ExecutionMode.HashResults => val columnStr = dataFrame.schema.map(_.name).mkString(",") // SELECT SUM(HASH(col1, col2, ...)) FROM (benchmark query) diff --git a/src/main/scala/com/databricks/spark/sql/perf/handleResults.scala b/src/main/scala/com/databricks/spark/sql/perf/handleResults.scala index 9d5be71..a1c07de 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/handleResults.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/handleResults.scala @@ -20,6 +20,6 @@ import org.apache.spark.sql.SQLContext case class Results(resultsLocation: String, @transient sqlContext: SQLContext) { def allResults = - sqlContext.jsonRDD( + sqlContext.read.json( sqlContext.sparkContext.textFile(s"$resultsLocation/*/")) } diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala index 6807dba..e6c75ec 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/Tables.scala @@ -18,14 +18,17 @@ package com.databricks.spark.sql.perf.tpcds import scala.sys.process._ -import org.apache.spark.Logging +import org.slf4j.LoggerFactory + import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SQLContext, SaveMode} -class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extends Serializable with Logging { +class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extends Serializable { import sqlContext.implicits._ + private val log = LoggerFactory.getLogger(getClass) + def sparkContext = sqlContext.sparkContext val dsdgen = s"$dsdgenDir/dsdgen" @@ -149,7 +152,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend """.stripMargin val grouped = sqlContext.sql(query) println(s"Pre-clustering with partitioning columns with query $query.") - logInfo(s"Pre-clustering with partitioning columns with query $query.") + log.info(s"Pre-clustering with partitioning columns with query $query.") grouped.write } else { data.write @@ -163,7 +166,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend writer.partitionBy(partitionColumns : _*) } println(s"Generating table $name in database to $location with save mode $mode.") - logInfo(s"Generating table $name in database to $location with save mode $mode.") + log.info(s"Generating table $name in database to $location with save mode $mode.") writer.save(location) sqlContext.dropTempTable(tempTableName) } @@ -176,14 +179,14 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend } if (!tableExists || overwrite) { println(s"Creating external table $name in database $databaseName using data stored in $location.") - logInfo(s"Creating external table $name in database $databaseName using data stored in $location.") + log.info(s"Creating external table $name in database $databaseName using data stored in $location.") sqlContext.createExternalTable(qualifiedTableName, location, format) } } def createTemporaryTable(location: String, format: String): Unit = { println(s"Creating temporary table $name using data stored in $location.") - logInfo(s"Creating temporary table $name using data stored in $location.") + log.info(s"Creating temporary table $name using data stored in $location.") sqlContext.read.format(format).load(location).registerTempTable(name) } } @@ -237,7 +240,7 @@ class Tables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int) extend } sqlContext.sql(s"USE $databaseName") println(s"The current database has been set to $databaseName.") - logInfo(s"The current database has been set to $databaseName.") + log.info(s"The current database has been set to $databaseName.") } def createTemporaryTables(location: String, format: String, tableFilter: String = ""): Unit = { diff --git a/version.sbt b/version.sbt index 8811f26..edb9965 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.3.3-SNAPSHOT" \ No newline at end of file +version in ThisBuild := "0.4.0-SNAPSHOT" \ No newline at end of file