[KYUUBI #1838] Clean up query results after query operations finish

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

Query results would be cached in memory, and we should clean it up when all rows are fetched.

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

This is a sub-task of KPIP-2 #1322.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1859 from link3280/feature/KYUUBI-1838.

Closes #1838

3cef0766 [Paul Lin] [KYUUBI #1838] Improve logging message syntax
c6253de8 [Paul Lin] [KYUUBI #1838] Log cleanup exceptions
e0b1866f [Paul Lin] [KYUUBI #1838] Clean up query results after query operations finish

Authored-by: Paul Lin <paullin3280@gmail.com>
Signed-off-by: yanghua <yanghua1127@gmail.com>
This commit is contained in:
Paul Lin 2022-02-09 11:31:09 +08:00 committed by yanghua
parent 73c84d42e7
commit 03c84f8342

View File

@ -119,33 +119,42 @@ class ExecuteStatement(
}
private def runQueryOperation(operation: QueryOperation): Unit = {
val resultDescriptor = executor.executeQuery(sessionId, operation)
var resultId: String = null
try {
val resultDescriptor = executor.executeQuery(sessionId, operation)
val resultID = resultDescriptor.getResultId
resultId = resultDescriptor.getResultId
val rows = new ArrayBuffer[Row]()
var loop = true
while (loop) {
Thread.sleep(50) // slow the processing down
val rows = new ArrayBuffer[Row]()
var loop = true
val result = executor.snapshotResult(sessionId, resultID, 2)
result.getType match {
case TypedResult.ResultType.PAYLOAD =>
rows.clear()
(1 to result.getPayload).foreach { page =>
rows ++= executor.retrieveResultPage(resultID, page).asScala
}
case TypedResult.ResultType.EOS => loop = false
case TypedResult.ResultType.EMPTY =>
while (loop) {
Thread.sleep(50) // slow the processing down
val result = executor.snapshotResult(sessionId, resultId, 2)
result.getType match {
case TypedResult.ResultType.PAYLOAD =>
rows.clear()
(1 to result.getPayload).foreach { page =>
rows ++= executor.retrieveResultPage(resultId, page).asScala
}
case TypedResult.ResultType.EOS => loop = false
case TypedResult.ResultType.EMPTY =>
}
}
resultSet = ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(resultDescriptor.getResultSchema.getColumns)
.data(rows.toArray[Row])
.build
setState(OperationState.FINISHED)
} finally {
if (resultId != null) {
cleanupQueryResult(resultId)
}
}
resultSet = ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(resultDescriptor.getResultSchema.getColumns)
.data(rows.toArray[Row])
.build
setState(OperationState.FINISHED)
}
private def runSetOperation(setOperation: SetOperation): Unit = {
@ -213,6 +222,15 @@ class ExecuteStatement(
setState(OperationState.FINISHED)
}
private def cleanupQueryResult(resultId: String): Unit = {
try {
executor.cancelQuery(sessionId, resultId)
} catch {
case t: Throwable =>
warn(s"Failed to clean result set $resultId in session $sessionId", t)
}
}
private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =