From 91a2ab3665f44ade8aa768a9bf125bcd8a71478f Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 10 Apr 2023 11:41:37 +0800 Subject: [PATCH] [KYUUBI #4678] Improve FinalStageResourceManager kill executors ### _Why are the changes needed?_ This pr change two things: 1. add a config to kill executors if the plan contains table caches. It's not always safe to kill executors if the cache is referenced by two write-like plan. 2. force adjustTargetNumExecutors when killing executors. YarnAllocator` might re-request original target executors if DRA has not updated target executors yet. Note, DRA would re-adjust executors if there are more tasks to be executed, so we are safe. It's better to adjuest target num executor once we kill executors. ### _How was this patch tested?_ These issues are found during my POC Closes #4678 from ulysses-you/skip-cache. Closes #4678 b12620954 [ulysses-you] Improve kill executors Authored-by: ulysses-you Signed-off-by: ulyssesyou --- docs/extensions/engines/spark/rules.md | 1 + .../spark/sql/FinalStageResourceManager.scala | 28 +++++++++++++++++-- .../org/apache/kyuubi/sql/KyuubiSQLConf.scala | 7 +++++ 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/docs/extensions/engines/spark/rules.md b/docs/extensions/engines/spark/rules.md index a4bda5d53..46e8dd3d1 100644 --- a/docs/extensions/engines/spark/rules.md +++ b/docs/extensions/engines/spark/rules.md @@ -84,6 +84,7 @@ Kyuubi provides some configs to make these feature easy to use. | spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled | false | When true, add repartition even if the original plan does not have shuffle. | 1.7.0 | | spark.sql.optimizer.finalStageConfigIsolationWriteOnly.enabled | true | When true, only enable final stage isolation for writing. | 1.7.0 | | spark.sql.finalWriteStage.eagerlyKillExecutors.enabled | false | When true, eagerly kill redundant executors before running final write stage. | 1.8.0 | +| spark.sql.finalWriteStage.skipKillingExecutorsForTableCache | true | When true, skip killing executors if the plan has table caches. | 1.8.0 | | spark.sql.finalWriteStage.retainExecutorsFactor | 1.2 | If the target executors * factor < active executors, and target executors * factor > min executors, then inject kill executors or inject custom resource profile. | 1.8.0 | | spark.sql.finalWriteStage.resourceIsolation.enabled | false | When true, make final write stage resource isolation using custom RDD resource profile. | 1.2.0 | | spark.sql.finalWriteStageExecutorCores | fallback spark.executor.cores | Specify the executor core request for final write stage. It would be passed to the RDD resource profile. | 1.8.0 | diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala index ca3f762e1..7a0ae1592 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/spark/sql/FinalStageResourceManager.scala @@ -26,6 +26,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec, SparkPlan} import org.apache.spark.sql.execution.adaptive._ +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec} import org.apache.kyuubi.sql.{KyuubiSQLConf, MarkNumOutputColumnsRule} @@ -69,6 +70,13 @@ case class FinalStageResourceManager(session: SparkSession) return plan } + // It's not safe to kill executors if this plan contains table cache. + // If the executor loses then the rdd would re-compute those partition. + if (hasTableCache(plan) && + conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE)) { + return plan + } + // TODO: move this to query stage optimizer when updating Spark to 3.5.x // Since we are in `prepareQueryStage`, the AQE shuffle read has not been applied. // So we need to apply it by self. @@ -188,9 +196,18 @@ case class FinalStageResourceManager(session: SparkSession) // see `https://github.com/apache/spark/pull/20604`. // It may cause the status in `ExecutorAllocationManager` inconsistent with // `CoarseGrainedSchedulerBackend` for a while. But it should be synchronous finally. + // + // We should adjust target num executors, otherwise `YarnAllocator` might re-request original + // target executors if DRA has not updated target executors yet. + // Note, DRA would re-adjust executors if there are more tasks to be executed, so we are safe. + // + // * We kill executor + // * YarnAllocator re-request target executors + // * DRA can not release executors since they are new added + // ----------------------------------------------------------------> timeline executorAllocationClient.killExecutors( executorIds = executorsToKill, - adjustTargetNumExecutors = false, + adjustTargetNumExecutors = true, countFailures = false, force = false) } @@ -201,7 +218,7 @@ case class FinalStageResourceManager(session: SparkSession) OptimizeShuffleWithLocalRead) } -trait FinalRebalanceStageHelper { +trait FinalRebalanceStageHelper extends AdaptiveSparkPlanHelper { @tailrec final protected def findFinalRebalanceStage(plan: SparkPlan): Option[ShuffleQueryStageExec] = { plan match { @@ -216,4 +233,11 @@ trait FinalRebalanceStageHelper { case _ => None } } + + final protected def hasTableCache(plan: SparkPlan): Boolean = { + find(plan) { + case _: InMemoryTableScanExec => true + case _ => false + }.isDefined + } } diff --git a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala index 4df924b51..aeee45869 100644 --- a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala +++ b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -198,6 +198,13 @@ object KyuubiSQLConf { .booleanConf .createWithDefault(false) + val FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE = + buildConf("spark.sql.finalWriteStage.skipKillingExecutorsForTableCache") + .doc("When true, skip killing executors if the plan has table caches.") + .version("1.8.0") + .booleanConf + .createWithDefault(true) + val FINAL_WRITE_STAGE_PARTITION_FACTOR = buildConf("spark.sql.finalWriteStage.retainExecutorsFactor") .doc("If the target executors * factor < active executors, and " +