Updates for Spark 3.0 and Scala 2.12 compatibility (#176)

* Refactor deprecated `getOrCreate()` in spark 3
* Compile with scala 2.12
* Updated usage related to obsolete/deprecated features
* remove use of scala-logging replaced by using slf4j directly
This commit is contained in:
Luca Canali 2019-01-29 09:58:52 +01:00 committed by Nico Poggi
parent 85bbfd4ca2
commit e1e1365a87
13 changed files with 39 additions and 39 deletions

View File

@ -34,7 +34,8 @@ The first run of `bin/run` will build the library.
## Build
Use `sbt package` or `sbt assembly` to build the library jar.
Use `sbt package` or `sbt assembly` to build the library jar.
Use `sbt +package` to build for scala 2.11 and 2.12.
## Local performance tests
The framework contains twelve benchmarks that can be executed in local mode. They are organized into three classes and target different components and functions of Spark:

View File

@ -5,9 +5,9 @@ name := "spark-sql-perf"
organization := "com.databricks"
scalaVersion := "2.11.8"
scalaVersion := "2.11.12"
crossScalaVersions := Seq("2.11.8")
crossScalaVersions := Seq("2.11.12","2.12.8")
sparkPackageName := "databricks/spark-sql-perf"
@ -32,17 +32,13 @@ initialCommands in console :=
|import sqlContext.implicits._
""".stripMargin
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.5"
libraryDependencies += "com.github.scopt" %% "scopt" % "3.7.1"
libraryDependencies += "com.github.scopt" %% "scopt" % "3.3.0"
libraryDependencies += "com.twitter" %% "util-jvm" % "6.45.0" % "provided"
libraryDependencies += "com.twitter" %% "util-jvm" % "6.23.0" % "provided"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test"
libraryDependencies += "org.yaml" % "snakeyaml" % "1.17"
libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2"
libraryDependencies += "org.yaml" % "snakeyaml" % "1.23"
fork := true

View File

@ -1,2 +1,2 @@
// This file should only contain the version of sbt to use.
sbt.version=0.13.8
sbt.version=0.13.18

View File

@ -25,7 +25,7 @@ import scala.util.{Success, Try, Failure => SFailure}
import scala.util.control.NonFatal
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, DataFrame, SQLContext}
import org.apache.spark.sql.{Dataset, DataFrame, SQLContext, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.SparkContext
@ -42,7 +42,7 @@ abstract class Benchmark(
import Benchmark._
def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate()))
def this() = this(SparkSession.builder.getOrCreate().sqlContext)
val resultsLocation =
sqlContext.getAllConfs.getOrElse(
@ -476,14 +476,14 @@ object Benchmark {
/** Returns results from an actively running experiment. */
def getCurrentResults() = {
val tbl = sqlContext.createDataFrame(currentResults)
tbl.registerTempTable("currentResults")
tbl.createOrReplaceTempView("currentResults")
tbl
}
/** Returns full iterations from an actively running experiment. */
def getCurrentRuns() = {
val tbl = sqlContext.createDataFrame(currentRuns)
tbl.registerTempTable("currentRuns")
tbl.createOrReplaceTempView("currentRuns")
tbl
}

View File

@ -18,23 +18,25 @@ package com.databricks.spark.sql.perf
import java.util.UUID
import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
import org.slf4j.LoggerFactory
import scala.concurrent.duration._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SQLContext,SparkSession}
import org.apache.spark.{SparkEnv, SparkContext}
/** A trait to describe things that can be benchmarked. */
trait Benchmarkable extends Logging {
@transient protected[this] val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
@transient protected[this] val sparkContext = sqlContext.sparkContext
trait Benchmarkable {
@transient protected[this] val sqlSession = SparkSession.builder.getOrCreate()
@transient protected[this] val sqlContext = sqlSession.sqlContext
@transient protected[this] val sparkContext = sqlSession.sparkContext
val name: String
protected val executionMode: ExecutionMode
lazy val logger = LoggerFactory.getLogger(this.getClass.getName)
final def benchmark(
includeBreakdown: Boolean,

View File

@ -104,7 +104,7 @@ package object cpu {
}
val counts = cpuLogs.groupBy($"stack").agg(count($"*")).collect().flatMap {
case Row(stackLines: Seq[String], count: Long) => stackLines.map(toStackElement) -> count :: Nil
case Row(stackLines: Array[String], count: Long) => stackLines.toSeq.map(toStackElement) -> count :: Nil
case other => println(s"Failed to parse $other"); Nil
}.toMap
val profile = new com.twitter.jvm.CpuProfile(counts, com.twitter.util.Duration.fromSeconds(10), cpuLogs.count().toInt, 0)

View File

@ -122,7 +122,7 @@ class Query(
val executionTime = measureTimeMs {
executionMode match {
case ExecutionMode.CollectResults => dataFrame.collect()
case ExecutionMode.ForeachResults => dataFrame.foreach { row => Unit }
case ExecutionMode.ForeachResults => dataFrame.foreach { _ => ():Unit }
case ExecutionMode.WriteParquet(location) =>
dataFrame.write.parquet(s"$location/$name.parquet")
case ExecutionMode.HashResults =>

View File

@ -18,7 +18,7 @@ package com.databricks.spark.sql.perf
import java.net.InetAddress
import java.io.File
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.{SparkContext, SparkConf}
import scala.util.Try
@ -70,8 +70,9 @@ object RunBenchmark {
.setMaster(config.master)
.setAppName(getClass.getName)
val sc = SparkContext.getOrCreate(conf)
val sqlContext = SQLContext.getOrCreate(sc)
val sparkSession = SparkSession.builder.config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val sqlContext = sparkSession.sqlContext
import sqlContext.implicits._
sqlContext.setConf("spark.sql.perf.results",

View File

@ -1,6 +1,5 @@
package com.databricks.spark.sql.perf.mllib
import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
import org.apache.spark.ml.attribute.{NominalAttribute, NumericAttribute}
import org.apache.spark.ml.{Estimator, PipelineStage, Transformer}
import org.apache.spark.ml.evaluation.Evaluator
@ -21,7 +20,7 @@ import com.databricks.spark.sql.perf._
*
* It is assumed that the implementation is going to be an object.
*/
trait BenchmarkAlgorithm extends Logging {
trait BenchmarkAlgorithm {
def trainingDataSet(ctx: MLBenchContext): DataFrame

View File

@ -2,7 +2,7 @@ package com.databricks.spark.sql.perf.mllib
import com.databricks.spark.sql.perf.mllib.classification.LogisticRegression
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SQLContext,SparkSession}
import com.databricks.spark.sql.perf.{MLParams}
import OptionImplicits._
@ -27,8 +27,9 @@ object MLBenchmarks {
)
)
val context = SparkContext.getOrCreate()
val sqlContext: SQLContext = SQLContext.getOrCreate(context)
val sparkSession = SparkSession.builder.getOrCreate()
val sqlContext: SQLContext = sparkSession.sqlContext
val context = sqlContext.sparkContext
def benchmarkObjects: Seq[MLPipelineStageBenchmarkable] = benchmarks.map { mlb =>
new MLPipelineStageBenchmarkable(mlb.params, mlb.benchmark, sqlContext)

View File

@ -4,7 +4,7 @@ package com.databricks.spark.sql.perf.mllib
import scala.io.Source
import scala.language.implicitConversions
import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
import org.slf4j.LoggerFactory
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
@ -18,7 +18,7 @@ class MLLib(sqlContext: SQLContext)
def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate()))
}
object MLLib extends Logging {
object MLLib {
/**
* Runs a set of preprogrammed experiments and blocks on completion.
@ -26,6 +26,9 @@ object MLLib extends Logging {
* @param runConfig a configuration that is av
* @return
*/
lazy val logger = LoggerFactory.getLogger(this.getClass.getName)
def runDefault(runConfig: RunConfig): DataFrame = {
val ml = new MLLib()
val benchmarks = MLBenchmarks.benchmarkObjects

View File

@ -3,8 +3,6 @@ package com.databricks.spark.sql.perf.mllib
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
import org.apache.spark.ml.{Estimator, Transformer}
import org.apache.spark.sql._
import org.apache.spark.{SparkContext, SparkEnv}
@ -15,7 +13,7 @@ class MLPipelineStageBenchmarkable(
params: MLParams,
test: BenchmarkAlgorithm,
sqlContext: SQLContext)
extends Benchmarkable with Serializable with Logging {
extends Benchmarkable with Serializable {
import MLPipelineStageBenchmarkable._

View File

@ -17,10 +17,9 @@
package com.databricks.spark.sql.perf.tpcds
import scala.collection.mutable
import com.databricks.spark.sql.perf._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SQLContext, SparkSession}
/**
* TPC-DS benchmark's dataset.
@ -35,7 +34,7 @@ class TPCDS(@transient sqlContext: SQLContext)
with Tpcds_2_4_Queries
with Serializable {
def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate()))
def this() = this(SparkSession.builder.getOrCreate().sqlContext)
/*
def setupBroadcast(skipTables: Seq[String] = Seq("store_sales", "customer")) = {