Don't clean blocks after every run in Benchmarkable (#119)
Do not clean blocks after each run in the generic Benchmarkable trait. It seems to have been there since #33, and an option spark.databricks.benchmark.cleanBlocksAfter to turn it off was added to it in #98, specifically to allow parallel TPCDS streams to not wipe each other's blocks. But that option is quite well hidden and obscure, and as a SparkContext config option can only be set during cluster creation, so it's not friendly to use. Cleaning up the blocks doesn't seem necessary for the Query Benchmarkables used for TPCDS and TPCH. Remove it from there, and leave it only for MLPipelineStageBenchmarkable.
This commit is contained in:
parent
fdd0e38717
commit
7bf2d45b0f
@ -55,15 +55,8 @@ trait Benchmarkable extends Logging {
|
||||
|
||||
protected def beforeBenchmark(): Unit = { }
|
||||
|
||||
private def afterBenchmark(sc: SparkContext): Unit = {
|
||||
// Best-effort clean up of weakly referenced RDDs, shuffles, and broadcasts
|
||||
protected def afterBenchmark(sc: SparkContext): Unit = {
|
||||
System.gc()
|
||||
if (sparkContext.getConf.getBoolean("spark.databricks.benchmark.cleanBlocksAfter", true)) {
|
||||
// Remove any leftover blocks that still exist
|
||||
sc.getExecutorStorageStatus
|
||||
.flatMap { status => status.blocks.map { case (bid, _) => bid } }
|
||||
.foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) }
|
||||
}
|
||||
}
|
||||
|
||||
private def runBenchmarkForked(
|
||||
|
||||
@ -6,6 +6,7 @@ import com.typesafe.scalalogging.slf4j.{LazyLogging => Logging}
|
||||
|
||||
import org.apache.spark.ml.{Estimator, Transformer}
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.{SparkContext, SparkEnv}
|
||||
|
||||
import com.databricks.spark.sql.perf._
|
||||
|
||||
@ -42,6 +43,15 @@ class MLPipelineStageBenchmarkable(
|
||||
}
|
||||
}
|
||||
|
||||
override protected def afterBenchmark(sc: SparkContext): Unit = {
|
||||
// Best-effort clean up of weakly referenced RDDs, shuffles, and broadcasts
|
||||
// Remove any leftover blocks that still exist
|
||||
sc.getExecutorStorageStatus
|
||||
.flatMap { status => status.blocks.map { case (bid, _) => bid } }
|
||||
.foreach { bid => SparkEnv.get.blockManager.master.removeBlock(bid) }
|
||||
super.afterBenchmark(sc)
|
||||
}
|
||||
|
||||
override protected def doBenchmark(
|
||||
includeBreakdown: Boolean,
|
||||
description: String,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user