diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala index 51e84156c..06ae1fa94 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala @@ -119,33 +119,42 @@ class ExecuteStatement( } private def runQueryOperation(operation: QueryOperation): Unit = { - val resultDescriptor = executor.executeQuery(sessionId, operation) + var resultId: String = null + try { + val resultDescriptor = executor.executeQuery(sessionId, operation) - val resultID = resultDescriptor.getResultId + resultId = resultDescriptor.getResultId - val rows = new ArrayBuffer[Row]() - var loop = true - while (loop) { - Thread.sleep(50) // slow the processing down + val rows = new ArrayBuffer[Row]() + var loop = true - val result = executor.snapshotResult(sessionId, resultID, 2) - result.getType match { - case TypedResult.ResultType.PAYLOAD => - rows.clear() - (1 to result.getPayload).foreach { page => - rows ++= executor.retrieveResultPage(resultID, page).asScala - } - case TypedResult.ResultType.EOS => loop = false - case TypedResult.ResultType.EMPTY => + while (loop) { + Thread.sleep(50) // slow the processing down + + val result = executor.snapshotResult(sessionId, resultId, 2) + result.getType match { + case TypedResult.ResultType.PAYLOAD => + rows.clear() + (1 to result.getPayload).foreach { page => + rows ++= executor.retrieveResultPage(resultId, page).asScala + } + case TypedResult.ResultType.EOS => loop = false + case TypedResult.ResultType.EMPTY => + } + } + + resultSet = ResultSet.builder + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .columns(resultDescriptor.getResultSchema.getColumns) + .data(rows.toArray[Row]) + .build + setState(OperationState.FINISHED) + + } finally { + if (resultId != null) { + cleanupQueryResult(resultId) } } - - resultSet = ResultSet.builder - .resultKind(ResultKind.SUCCESS_WITH_CONTENT) - .columns(resultDescriptor.getResultSchema.getColumns) - .data(rows.toArray[Row]) - .build - setState(OperationState.FINISHED) } private def runSetOperation(setOperation: SetOperation): Unit = { @@ -213,6 +222,15 @@ class ExecuteStatement( setState(OperationState.FINISHED) } + private def cleanupQueryResult(resultId: String): Unit = { + try { + executor.cancelQuery(sessionId, resultId) + } catch { + case t: Throwable => + warn(s"Failed to clean result set $resultId in session $sessionId", t) + } + } + private def addTimeoutMonitor(): Unit = { if (queryTimeout > 0) { val timeoutExecutor =