From eb98f1e2339aba6b02af1af89195f89b52034ebc Mon Sep 17 00:00:00 2001 From: fwang12 Date: Tue, 13 Dec 2022 17:59:05 +0800 Subject: [PATCH] [KYUUBI #3950][FOLLOWUP] Check whether the state is INITIALIZED instead of builder process launched ### _Why are the changes needed?_ Check the whether the state is INITIALIZED instead of builder process launched ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3964 from turboFei/batch_op_close. Closes #3950 d841086b [fwang12] comments e51f4920 [fwang12] comment d64c88de [fwang12] check INITIALIZED 81aae3ce [fwang12] check background handle 30dda80c [fwang12] check state Authored-by: fwang12 Signed-off-by: ulysses-you --- .../kyuubi/operation/BatchJobSubmission.scala | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index 6c9d5e3d3..5912e0b00 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -298,16 +298,6 @@ class BatchJobSubmission( MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType))) - if (!builder.processLaunched) { - builder.close() - if (recoveryMetadata.isDefined) { - killMessage = killBatchApplication() - } - setState(OperationState.CANCELED) - updateBatchMetadata() - return - } - // fast fail if (isTerminalState(state)) { killMessage = (false, s"batch $batchId is already terminal so can not kill it.") @@ -319,16 +309,23 @@ class BatchJobSubmission( killMessage = killBatchApplication() builder.close() } finally { - if (killMessage._1 && !isTerminalState(state)) { - // kill success and we can change state safely - // note that, the batch operation state should never be closed + if (state == OperationState.INITIALIZED) { + // if state is INITIALIZED, it means that the batch submission has not started to run, set + // the state to CANCELED manually and regardless of kill result setState(OperationState.CANCELED) updateBatchMetadata() - } else if (killMessage._1) { - // we can not change state safely - killMessage = (false, s"batch $batchId is already terminal so can not kill it.") - } else if (!isTerminalState(state)) { - // failed to kill, the kill message is enough + } else { + if (killMessage._1 && !isTerminalState(state)) { + // kill success and we can change state safely + // note that, the batch operation state should never be closed + setState(OperationState.CANCELED) + updateBatchMetadata() + } else if (killMessage._1) { + // we can not change state safely + killMessage = (false, s"batch $batchId is already terminal so can not kill it.") + } else if (!isTerminalState(state)) { + // failed to kill, the kill message is enough + } } } }