From 03c84f8342d2af2c5b95c68d212511ca80a319e5 Mon Sep 17 00:00:00 2001 From: Paul Lin Date: Wed, 9 Feb 2022 11:31:09 +0800 Subject: [PATCH] [KYUUBI #1838] Clean up query results after query operations finish Query results would be cached in memory, and we should clean it up when all rows are fetched. ### _Why are the changes needed?_ This is a sub-task of KPIP-2 #1322. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1859 from link3280/feature/KYUUBI-1838. Closes #1838 3cef0766 [Paul Lin] [KYUUBI #1838] Improve logging message syntax c6253de8 [Paul Lin] [KYUUBI #1838] Log cleanup exceptions e0b1866f [Paul Lin] [KYUUBI #1838] Clean up query results after query operations finish Authored-by: Paul Lin Signed-off-by: yanghua --- .../flink/operation/ExecuteStatement.scala | 62 ++++++++++++------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala index 51e84156c..06ae1fa94 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala @@ -119,33 +119,42 @@ class ExecuteStatement( } private def runQueryOperation(operation: QueryOperation): Unit = { - val resultDescriptor = executor.executeQuery(sessionId, operation) + var resultId: String = null + try { + val resultDescriptor = executor.executeQuery(sessionId, operation) - val resultID = resultDescriptor.getResultId + resultId = resultDescriptor.getResultId - val rows = new ArrayBuffer[Row]() - var loop = true - while (loop) { - Thread.sleep(50) // slow the processing down + val rows = new ArrayBuffer[Row]() + var loop = true - val result = executor.snapshotResult(sessionId, resultID, 2) - result.getType match { - case TypedResult.ResultType.PAYLOAD => - rows.clear() - (1 to result.getPayload).foreach { page => - rows ++= executor.retrieveResultPage(resultID, page).asScala - } - case TypedResult.ResultType.EOS => loop = false - case TypedResult.ResultType.EMPTY => + while (loop) { + Thread.sleep(50) // slow the processing down + + val result = executor.snapshotResult(sessionId, resultId, 2) + result.getType match { + case TypedResult.ResultType.PAYLOAD => + rows.clear() + (1 to result.getPayload).foreach { page => + rows ++= executor.retrieveResultPage(resultId, page).asScala + } + case TypedResult.ResultType.EOS => loop = false + case TypedResult.ResultType.EMPTY => + } + } + + resultSet = ResultSet.builder + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .columns(resultDescriptor.getResultSchema.getColumns) + .data(rows.toArray[Row]) + .build + setState(OperationState.FINISHED) + + } finally { + if (resultId != null) { + cleanupQueryResult(resultId) } } - - resultSet = ResultSet.builder - .resultKind(ResultKind.SUCCESS_WITH_CONTENT) - .columns(resultDescriptor.getResultSchema.getColumns) - .data(rows.toArray[Row]) - .build - setState(OperationState.FINISHED) } private def runSetOperation(setOperation: SetOperation): Unit = { @@ -213,6 +222,15 @@ class ExecuteStatement( setState(OperationState.FINISHED) } + private def cleanupQueryResult(resultId: String): Unit = { + try { + executor.cancelQuery(sessionId, resultId) + } catch { + case t: Throwable => + warn(s"Failed to clean result set $resultId in session $sessionId", t) + } + } + private def addTimeoutMonitor(): Unit = { if (queryTimeout > 0) { val timeoutExecutor =