[KYUUBI #4678] Improve FinalStageResourceManager kill executors

### _Why are the changes needed?_

This pr change two things:
1. add a config to kill executors if the plan contains table caches. It's not always safe to kill executors if the cache is referenced by two write-like plan.
2. force adjustTargetNumExecutors when killing executors. YarnAllocator` might re-request original target executors if DRA has not updated target executors yet. Note, DRA would re-adjust executors if there are more tasks to be executed, so we are safe. It's better to adjuest target num executor once we kill executors.

### _How was this patch tested?_
These issues are found during my POC

Closes #4678 from ulysses-you/skip-cache.

Closes #4678

b12620954 [ulysses-you] Improve kill executors

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: ulyssesyou <ulyssesyou@apache.org>
This commit is contained in:
ulysses-you 2023-04-10 11:41:37 +08:00 committed by ulyssesyou
parent a834ed3efb
commit 91a2ab3665
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
3 changed files with 34 additions and 2 deletions

View File

@ -84,6 +84,7 @@ Kyuubi provides some configs to make these feature easy to use.
| spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled | false | When true, add repartition even if the original plan does not have shuffle. | 1.7.0 |
| spark.sql.optimizer.finalStageConfigIsolationWriteOnly.enabled | true | When true, only enable final stage isolation for writing. | 1.7.0 |
| spark.sql.finalWriteStage.eagerlyKillExecutors.enabled | false | When true, eagerly kill redundant executors before running final write stage. | 1.8.0 |
| spark.sql.finalWriteStage.skipKillingExecutorsForTableCache | true | When true, skip killing executors if the plan has table caches. | 1.8.0 |
| spark.sql.finalWriteStage.retainExecutorsFactor | 1.2 | If the target executors * factor < active executors, and target executors * factor > min executors, then inject kill executors or inject custom resource profile. | 1.8.0 |
| spark.sql.finalWriteStage.resourceIsolation.enabled | false | When true, make final write stage resource isolation using custom RDD resource profile. | 1.2.0 |
| spark.sql.finalWriteStageExecutorCores | fallback spark.executor.cores | Specify the executor core request for final write stage. It would be passed to the RDD resource profile. | 1.8.0 |

View File

@ -26,6 +26,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec}
import org.apache.kyuubi.sql.{KyuubiSQLConf, MarkNumOutputColumnsRule}
@ -69,6 +70,13 @@ case class FinalStageResourceManager(session: SparkSession)
return plan
}
// It's not safe to kill executors if this plan contains table cache.
// If the executor loses then the rdd would re-compute those partition.
if (hasTableCache(plan) &&
conf.getConf(KyuubiSQLConf.FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE)) {
return plan
}
// TODO: move this to query stage optimizer when updating Spark to 3.5.x
// Since we are in `prepareQueryStage`, the AQE shuffle read has not been applied.
// So we need to apply it by self.
@ -188,9 +196,18 @@ case class FinalStageResourceManager(session: SparkSession)
// see `https://github.com/apache/spark/pull/20604`.
// It may cause the status in `ExecutorAllocationManager` inconsistent with
// `CoarseGrainedSchedulerBackend` for a while. But it should be synchronous finally.
//
// We should adjust target num executors, otherwise `YarnAllocator` might re-request original
// target executors if DRA has not updated target executors yet.
// Note, DRA would re-adjust executors if there are more tasks to be executed, so we are safe.
//
// * We kill executor
// * YarnAllocator re-request target executors
// * DRA can not release executors since they are new added
// ----------------------------------------------------------------> timeline
executorAllocationClient.killExecutors(
executorIds = executorsToKill,
adjustTargetNumExecutors = false,
adjustTargetNumExecutors = true,
countFailures = false,
force = false)
}
@ -201,7 +218,7 @@ case class FinalStageResourceManager(session: SparkSession)
OptimizeShuffleWithLocalRead)
}
trait FinalRebalanceStageHelper {
trait FinalRebalanceStageHelper extends AdaptiveSparkPlanHelper {
@tailrec
final protected def findFinalRebalanceStage(plan: SparkPlan): Option[ShuffleQueryStageExec] = {
plan match {
@ -216,4 +233,11 @@ trait FinalRebalanceStageHelper {
case _ => None
}
}
final protected def hasTableCache(plan: SparkPlan): Boolean = {
find(plan) {
case _: InMemoryTableScanExec => true
case _ => false
}.isDefined
}
}

View File

@ -198,6 +198,13 @@ object KyuubiSQLConf {
.booleanConf
.createWithDefault(false)
val FINAL_WRITE_STAGE_SKIP_KILLING_EXECUTORS_FOR_TABLE_CACHE =
buildConf("spark.sql.finalWriteStage.skipKillingExecutorsForTableCache")
.doc("When true, skip killing executors if the plan has table caches.")
.version("1.8.0")
.booleanConf
.createWithDefault(true)
val FINAL_WRITE_STAGE_PARTITION_FACTOR =
buildConf("spark.sql.finalWriteStage.retainExecutorsFactor")
.doc("If the target executors * factor < active executors, and " +