From 7bf2d45b0f9234e120ec8d645f9929013188cf2f Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Mon, 18 Sep 2017 11:51:12 +0200 Subject: [PATCH] Don't clean blocks after every run in Benchmarkable (#119) Do not clean blocks after each run in the generic Benchmarkable trait. It seems to have been there since #33, and an option spark.databricks.benchmark.cleanBlocksAfter to turn it off was added to it in #98, specifically to allow parallel TPCDS streams to not wipe each other's blocks. But that option is quite well hidden and obscure, and as a SparkContext config option can only be set during cluster creation, so it's not friendly to use. Cleaning up the blocks doesn't seem necessary for the Query Benchmarkables used for TPCDS and TPCH. Remove it from there, and leave it only for MLPipelineStageBenchmarkable. --- .../com/databricks/spark/sql/perf/Benchmarkable.scala | 9 +-------- .../sql/perf/mllib/MLPipelineStageBenchmarkable.scala | 10 ++++++++++ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala index e5a2996..4c53b3a 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala @@ -55,15 +55,8 @@ trait Benchmarkable extends Logging { protected def beforeBenchmark(): Unit = { } - private def afterBenchmark(sc: SparkContext): Unit = { - // Best-effort clean up of weakly referenced RDDs, shuffles, and broadcasts + protected def afterBenchmark(sc: SparkContext): Unit = { System.gc() - if (sparkContext.getConf.getBoolean("spark.databricks.benchmark.cleanBlocksAfter", true)) { - // Remove any leftover blocks that still exist - sc.getExecutorStorageStatus - .flatMap { status => status.blocks.map { case (bid, _) => bid } } - .foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) } - } } private def runBenchmarkForked( diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala index b9b2827..a1712f0 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala @@ -6,6 +6,7 @@ import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging} import org.apache.spark.ml.{Estimator, Transformer} import org.apache.spark.sql._ +import org.apache.spark.{SparkContext, SparkEnv} import com.databricks.spark.sql.perf._ @@ -42,6 +43,15 @@ class MLPipelineStageBenchmarkable( } } + override protected def afterBenchmark(sc: SparkContext): Unit = { + // Best-effort clean up of weakly referenced RDDs, shuffles, and broadcasts + // Remove any leftover blocks that still exist + sc.getExecutorStorageStatus + .flatMap { status => status.blocks.map { case (bid, _) => bid } } + .foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) } + super.afterBenchmark(sc) + } + override protected def doBenchmark( includeBreakdown: Boolean, description: String,