From e1e1365a8795b76a796dab3a51259be9343c80de Mon Sep 17 00:00:00 2001 From: Luca Canali Date: Tue, 29 Jan 2019 09:58:52 +0100 Subject: [PATCH] Updates for Spark 3.0 and Scala 2.12 compatibility (#176) * Refactor deprecated `getOrCreate()` in spark 3 * Compile with scala 2.12 * Updated usage related to obsolete/deprecated features * remove use of scala-logging replaced by using slf4j directly --- README.md | 3 ++- build.sbt | 16 ++++++---------- project/build.properties | 2 +- .../databricks/spark/sql/perf/Benchmark.scala | 8 ++++---- .../spark/sql/perf/Benchmarkable.scala | 12 +++++++----- .../databricks/spark/sql/perf/CpuProfile.scala | 2 +- .../com/databricks/spark/sql/perf/Query.scala | 2 +- .../databricks/spark/sql/perf/RunBenchmark.scala | 7 ++++--- .../sql/perf/mllib/BenchmarkAlgorithm.scala | 3 +-- .../spark/sql/perf/mllib/MLBenchmarks.scala | 7 ++++--- .../databricks/spark/sql/perf/mllib/MLLib.scala | 7 +++++-- .../mllib/MLPipelineStageBenchmarkable.scala | 4 +--- .../databricks/spark/sql/perf/tpcds/TPCDS.scala | 5 ++--- 13 files changed, 39 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index 833a80a..335b0e6 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,8 @@ The first run of `bin/run` will build the library. ## Build -Use `sbt package` or `sbt assembly` to build the library jar. +Use `sbt package` or `sbt assembly` to build the library jar. +Use `sbt +package` to build for scala 2.11 and 2.12. ## Local performance tests The framework contains twelve benchmarks that can be executed in local mode. They are organized into three classes and target different components and functions of Spark: diff --git a/build.sbt b/build.sbt index 31b4f50..13a091c 100644 --- a/build.sbt +++ b/build.sbt @@ -5,9 +5,9 @@ name := "spark-sql-perf" organization := "com.databricks" -scalaVersion := "2.11.8" +scalaVersion := "2.11.12" -crossScalaVersions := Seq("2.11.8") +crossScalaVersions := Seq("2.11.12","2.12.8") sparkPackageName := "databricks/spark-sql-perf" @@ -32,17 +32,13 @@ initialCommands in console := |import sqlContext.implicits._ """.stripMargin -libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.5" +libraryDependencies += "com.github.scopt" %% "scopt" % "3.7.1" -libraryDependencies += "com.github.scopt" %% "scopt" % "3.3.0" +libraryDependencies += "com.twitter" %% "util-jvm" % "6.45.0" % "provided" -libraryDependencies += "com.twitter" %% "util-jvm" % "6.23.0" % "provided" +libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test" -libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.1" % "test" - -libraryDependencies += "org.yaml" % "snakeyaml" % "1.17" - -libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2" +libraryDependencies += "org.yaml" % "snakeyaml" % "1.23" fork := true diff --git a/project/build.properties b/project/build.properties index be414cd..5c4bcd9 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1,2 +1,2 @@ // This file should only contain the version of sbt to use. -sbt.version=0.13.8 +sbt.version=0.13.18 diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala index f2ce663..7d85595 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmark.scala @@ -25,7 +25,7 @@ import scala.util.{Success, Try, Failure => SFailure} import scala.util.control.NonFatal import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Dataset, DataFrame, SQLContext} +import org.apache.spark.sql.{Dataset, DataFrame, SQLContext, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.SparkContext @@ -42,7 +42,7 @@ abstract class Benchmark( import Benchmark._ - def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate())) + def this() = this(SparkSession.builder.getOrCreate().sqlContext) val resultsLocation = sqlContext.getAllConfs.getOrElse( @@ -476,14 +476,14 @@ object Benchmark { /** Returns results from an actively running experiment. */ def getCurrentResults() = { val tbl = sqlContext.createDataFrame(currentResults) - tbl.registerTempTable("currentResults") + tbl.createOrReplaceTempView("currentResults") tbl } /** Returns full iterations from an actively running experiment. */ def getCurrentRuns() = { val tbl = sqlContext.createDataFrame(currentRuns) - tbl.registerTempTable("currentRuns") + tbl.createOrReplaceTempView("currentRuns") tbl } diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala index 62b12ea..9df2a1d 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala @@ -18,23 +18,25 @@ package com.databricks.spark.sql.perf import java.util.UUID -import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging} +import org.slf4j.LoggerFactory import scala.concurrent.duration._ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SQLContext,SparkSession} import org.apache.spark.{SparkEnv, SparkContext} /** A trait to describe things that can be benchmarked. */ -trait Benchmarkable extends Logging { - @transient protected[this] val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate()) - @transient protected[this] val sparkContext = sqlContext.sparkContext +trait Benchmarkable { + @transient protected[this] val sqlSession = SparkSession.builder.getOrCreate() + @transient protected[this] val sqlContext = sqlSession.sqlContext + @transient protected[this] val sparkContext = sqlSession.sparkContext val name: String protected val executionMode: ExecutionMode + lazy val logger = LoggerFactory.getLogger(this.getClass.getName) final def benchmark( includeBreakdown: Boolean, diff --git a/src/main/scala/com/databricks/spark/sql/perf/CpuProfile.scala b/src/main/scala/com/databricks/spark/sql/perf/CpuProfile.scala index 744d057..901563a 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/CpuProfile.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/CpuProfile.scala @@ -104,7 +104,7 @@ package object cpu { } val counts = cpuLogs.groupBy($"stack").agg(count($"*")).collect().flatMap { - case Row(stackLines: Seq[String], count: Long) => stackLines.map(toStackElement) -> count :: Nil + case Row(stackLines: Array[String], count: Long) => stackLines.toSeq.map(toStackElement) -> count :: Nil case other => println(s"Failed to parse $other"); Nil }.toMap val profile = new com.twitter.jvm.CpuProfile(counts, com.twitter.util.Duration.fromSeconds(10), cpuLogs.count().toInt, 0) diff --git a/src/main/scala/com/databricks/spark/sql/perf/Query.scala b/src/main/scala/com/databricks/spark/sql/perf/Query.scala index b3b7255..16cd907 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Query.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Query.scala @@ -122,7 +122,7 @@ class Query( val executionTime = measureTimeMs { executionMode match { case ExecutionMode.CollectResults => dataFrame.collect() - case ExecutionMode.ForeachResults => dataFrame.foreach { row => Unit } + case ExecutionMode.ForeachResults => dataFrame.foreach { _ => ():Unit } case ExecutionMode.WriteParquet(location) => dataFrame.write.parquet(s"$location/$name.parquet") case ExecutionMode.HashResults => diff --git a/src/main/scala/com/databricks/spark/sql/perf/RunBenchmark.scala b/src/main/scala/com/databricks/spark/sql/perf/RunBenchmark.scala index f5c5a93..ed367e7 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/RunBenchmark.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/RunBenchmark.scala @@ -18,7 +18,7 @@ package com.databricks.spark.sql.perf import java.net.InetAddress import java.io.File -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.{SparkContext, SparkConf} import scala.util.Try @@ -70,8 +70,9 @@ object RunBenchmark { .setMaster(config.master) .setAppName(getClass.getName) - val sc = SparkContext.getOrCreate(conf) - val sqlContext = SQLContext.getOrCreate(sc) + val sparkSession = SparkSession.builder.config(conf).getOrCreate() + val sc = sparkSession.sparkContext + val sqlContext = sparkSession.sqlContext import sqlContext.implicits._ sqlContext.setConf("spark.sql.perf.results", diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala index 986b1a8..9e00a45 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/BenchmarkAlgorithm.scala @@ -1,6 +1,5 @@ package com.databricks.spark.sql.perf.mllib -import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging} import org.apache.spark.ml.attribute.{NominalAttribute, NumericAttribute} import org.apache.spark.ml.{Estimator, PipelineStage, Transformer} import org.apache.spark.ml.evaluation.Evaluator @@ -21,7 +20,7 @@ import com.databricks.spark.sql.perf._ * * It is assumed that the implementation is going to be an object. */ -trait BenchmarkAlgorithm extends Logging { +trait BenchmarkAlgorithm { def trainingDataSet(ctx: MLBenchContext): DataFrame diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala index 1103804..13b5d14 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLBenchmarks.scala @@ -2,7 +2,7 @@ package com.databricks.spark.sql.perf.mllib import com.databricks.spark.sql.perf.mllib.classification.LogisticRegression import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SQLContext,SparkSession} import com.databricks.spark.sql.perf.{MLParams} import OptionImplicits._ @@ -27,8 +27,9 @@ object MLBenchmarks { ) ) - val context = SparkContext.getOrCreate() - val sqlContext: SQLContext = SQLContext.getOrCreate(context) + val sparkSession = SparkSession.builder.getOrCreate() + val sqlContext: SQLContext = sparkSession.sqlContext + val context = sqlContext.sparkContext def benchmarkObjects: Seq[MLPipelineStageBenchmarkable] = benchmarks.map { mlb => new MLPipelineStageBenchmarkable(mlb.params, mlb.benchmark, sqlContext) diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala index d325327..c0bf70e 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLLib.scala @@ -4,7 +4,7 @@ package com.databricks.spark.sql.perf.mllib import scala.io.Source import scala.language.implicitConversions -import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging} +import org.slf4j.LoggerFactory import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext} @@ -18,7 +18,7 @@ class MLLib(sqlContext: SQLContext) def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate())) } -object MLLib extends Logging { +object MLLib { /** * Runs a set of preprogrammed experiments and blocks on completion. @@ -26,6 +26,9 @@ object MLLib extends Logging { * @param runConfig a configuration that is av * @return */ + + lazy val logger = LoggerFactory.getLogger(this.getClass.getName) + def runDefault(runConfig: RunConfig): DataFrame = { val ml = new MLLib() val benchmarks = MLBenchmarks.benchmarkObjects diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala index 2ccd78d..8296f46 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala @@ -3,8 +3,6 @@ package com.databricks.spark.sql.perf.mllib import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging} - import org.apache.spark.ml.{Estimator, Transformer} import org.apache.spark.sql._ import org.apache.spark.{SparkContext, SparkEnv} @@ -15,7 +13,7 @@ class MLPipelineStageBenchmarkable( params: MLParams, test: BenchmarkAlgorithm, sqlContext: SQLContext) - extends Benchmarkable with Serializable with Logging { + extends Benchmarkable with Serializable { import MLPipelineStageBenchmarkable._ diff --git a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala index 8d134b1..2f173f0 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDS.scala @@ -17,10 +17,9 @@ package com.databricks.spark.sql.perf.tpcds import scala.collection.mutable - import com.databricks.spark.sql.perf._ import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SQLContext, SparkSession} /** * TPC-DS benchmark's dataset. @@ -35,7 +34,7 @@ class TPCDS(@transient sqlContext: SQLContext) with Tpcds_2_4_Queries with Serializable { - def this() = this(SQLContext.getOrCreate(SparkContext.getOrCreate())) + def this() = this(SparkSession.builder.getOrCreate().sqlContext) /* def setupBroadcast(skipTables: Seq[String] = Seq("store_sales", "customer")) = {