[KYUUBI #1620] Implement api: /${version}/operations/${operation_identifier}/log

### _Why are the changes needed?_
closes #1620
 Implement api: /${version}/operations/${operation_identifier}/log

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1621 from simon824/1223.

Closes #1620

5f211473 [simon] rm
f6d0ce7c [simon] fix
346df798 [simon] operationLog
c60d3447 [simon] init

Authored-by: simon <zhangshiming@cvte.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
simon 2021-12-24 17:02:05 +08:00 committed by Kent Yao
parent fbccba90f4
commit c42f31e211
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
4 changed files with 54 additions and 4 deletions

View File

@ -17,11 +17,13 @@
package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift.TRowSet
import java.nio.ByteBuffer
import java.util
import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThriftUtils
class NoopOperationManager extends OperationManager("noop") {
private val invalid = "invalid"
@ -91,5 +93,12 @@ class NoopOperationManager extends OperationManager("noop") {
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = ThriftUtils.EMPTY_ROW_SET
maxRows: Int): TRowSet = {
val logs = new util.ArrayList[String]()
logs.add("test")
val tColumn = TColumn.stringVal(new TStringColumn(logs, ByteBuffer.allocate(0)))
val tRow = new TRowSet(0, new util.ArrayList[TRow](logs.size()))
tRow.addToColumns(tColumn)
tRow
}
}

View File

@ -30,7 +30,7 @@ import org.apache.hive.service.rpc.thrift.TTypeQualifierValue
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.events.KyuubiOperationEvent
import org.apache.kyuubi.operation.KyuubiOperation
import org.apache.kyuubi.operation.{FetchOrientation, KyuubiOperation}
import org.apache.kyuubi.operation.OperationHandle.parseOperationHandle
import org.apache.kyuubi.server.api.ApiRequestContext
@ -121,4 +121,29 @@ private[v1] class OperationsResource extends ApiRequestContext {
s"Error getting result set metadata for operation handle $operationHandleStr")
}
}
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON)),
description =
"get operation log")
@GET
@Path("{operationHandle}/log")
def getOperationLog(
@PathParam("operationHandle") operationHandleStr: String,
@QueryParam("maxrows") maxRows: Int): OperationLog = {
try {
val rowSet = backendService.sessionManager.operationManager.getOperationLogRowSet(
parseOperationHandle(operationHandleStr),
FetchOrientation.FETCH_NEXT,
maxRows)
val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala
OperationLog(logRowSet, logRowSet.size)
} catch {
case NonFatal(_) =>
throw new NotFoundException(
s"Error getting operation log for operation handle $operationHandleStr")
}
}
}

View File

@ -79,3 +79,5 @@ case class ColumnDesc(
precision: Int,
scale: Int,
comment: String)
case class OperationLog(logRowSet: Seq[String], rowCount: Int)

View File

@ -92,6 +92,20 @@ class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper
}
}
test("test get operation log") {
withKyuubiRestServer { (fe, _, _, webTarget: WebTarget) =>
val opHandleStr = getOpHandleStr(fe, OperationType.EXECUTE_STATEMENT)
val response = webTarget.path(
s"api/v1/operations/$opHandleStr/log")
.queryParam("maxrows", "10")
.request(MediaType.APPLICATION_JSON).get()
assert(200 == response.getStatus)
val logRowSet = response.readEntity(classOf[OperationLog])
assert(logRowSet.logRowSet.head.equals("test"))
assert(logRowSet.rowCount == 1)
}
}
def getOpHandleStr(fe: KyuubiRestFrontendService, typ: OperationType): String = {
val sessionManager = fe.be.sessionManager
val sessionHandle = sessionManager.openSession(