diff --git a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala index e5a2996..4c53b3a 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/Benchmarkable.scala @@ -55,15 +55,8 @@ trait Benchmarkable extends Logging { protected def beforeBenchmark(): Unit = { } - private def afterBenchmark(sc: SparkContext): Unit = { - // Best-effort clean up of weakly referenced RDDs, shuffles, and broadcasts + protected def afterBenchmark(sc: SparkContext): Unit = { System.gc() - if (sparkContext.getConf.getBoolean("spark.databricks.benchmark.cleanBlocksAfter", true)) { - // Remove any leftover blocks that still exist - sc.getExecutorStorageStatus - .flatMap { status => status.blocks.map { case (bid, _) => bid } } - .foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) } - } } private def runBenchmarkForked( diff --git a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala index b9b2827..a1712f0 100644 --- a/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala +++ b/src/main/scala/com/databricks/spark/sql/perf/mllib/MLPipelineStageBenchmarkable.scala @@ -6,6 +6,7 @@ import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging} import org.apache.spark.ml.{Estimator, Transformer} import org.apache.spark.sql._ +import org.apache.spark.{SparkContext, SparkEnv} import com.databricks.spark.sql.perf._ @@ -42,6 +43,15 @@ class MLPipelineStageBenchmarkable( } } + override protected def afterBenchmark(sc: SparkContext): Unit = { + // Best-effort clean up of weakly referenced RDDs, shuffles, and broadcasts + // Remove any leftover blocks that still exist + sc.getExecutorStorageStatus + .flatMap { status => status.blocks.map { case (bid, _) => bid } } + .foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) } + super.afterBenchmark(sc) + } + override protected def doBenchmark( includeBreakdown: Boolean, description: String,