[KYUUBI #5160] Refactor getNextRowSetInternal to support fetch streaming data

### _Why are the changes needed?_
Currently, `getNextRowSetInternal` returns `TRowSet` which is not friendly to explicit EOS in streaming result fetch.

This PR changes the return type to `TFetchResultsResp` to allow the engines to determine the EOS.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

Closes #5160 from link3280/refactor_result.

Closes #5160

09822f2ee [Paul Lin] Fix hasMoreRows missing
c94907e2b [Paul Lin] Explicitly set `resp.setHasMoreRows(false)` for operations
4d193fb1d [Paul Lin] Revert unrelated changes in FlinkOperation
ffd0367b3 [Paul Lin] Refactor getNextRowSetInternal to support fetch streaming data

Authored-by: Paul Lin <paullin3280@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Paul Lin 2023-08-15 16:06:10 +08:00 committed by Cheng Pan
parent 938384c225
commit 14d0dab697
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
30 changed files with 166 additions and 83 deletions

View File

@ -31,7 +31,9 @@ abstract class ChatOperation(session: Session) extends AbstractOperation(session
protected lazy val conf: KyuubiConf = session.sessionManager.getConf
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(
order: FetchOrientation,
rowSetSize: Int): TFetchResultsResp = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
@ -47,7 +49,10 @@ abstract class ChatOperation(session: Session) extends AbstractOperation(session
val taken = iter.take(rowSetSize)
val resultRowSet = RowSet.toTRowSet(taken.toSeq, 1, getProtocolVersion)
resultRowSet.setStartRowOffset(iter.getPosition)
resultRowSet
val resp = new TFetchResultsResp(OK_STATUS)
resp.setResults(resultRowSet)
resp.setHasMoreRows(false)
resp
}
override def cancel(): Unit = {

View File

@ -25,7 +25,7 @@ import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.gateway.service.context.SessionContext
import org.apache.flink.table.gateway.service.operation.OperationExecutor
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TTableSchema}
import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.engine.flink.result.ResultSet
@ -91,7 +91,9 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
resp
}
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(
order: FetchOrientation,
rowSetSize: Int): TFetchResultsResp = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
@ -112,7 +114,10 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
zoneId,
getProtocolVersion)
resultRowSet.setStartRowOffset(resultSet.getData.getPosition)
resultRowSet
val resp = new TFetchResultsResp(OK_STATUS)
resp.setResults(resultRowSet)
resp.setHasMoreRows(false)
resp
}
override def shouldRunAsync: Boolean = false

View File

@ -21,7 +21,7 @@ import java.util.concurrent.Future
import org.apache.hive.service.cli.operation.{Operation, OperationManager}
import org.apache.hive.service.cli.session.{HiveSession, SessionManager => HiveSessionManager}
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
@ -95,22 +95,31 @@ abstract class HiveOperation(session: Session) extends AbstractOperation(session
resp
}
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(
order: FetchOrientation,
rowSetSize: Int): TFetchResultsResp = {
val tOrder = FetchOrientation.toTFetchOrientation(order)
val hiveOrder = org.apache.hive.service.cli.FetchOrientation.getFetchOrientation(tOrder)
val rowSet = internalHiveOperation.getNextRowSet(hiveOrder, rowSetSize)
rowSet.toTRowSet
val resp = new TFetchResultsResp(OK_STATUS)
resp.setResults(rowSet.toTRowSet)
resp.setHasMoreRows(false)
resp
}
def getOperationLogRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
def getOperationLogRowSet(order: FetchOrientation, rowSetSize: Int): TFetchResultsResp = {
val tOrder = FetchOrientation.toTFetchOrientation(order)
val hiveOrder = org.apache.hive.service.cli.FetchOrientation.getFetchOrientation(tOrder)
val handle = internalHiveOperation.getHandle
delegatedOperationManager.getOperationLogRowSet(
val rowSet = delegatedOperationManager.getOperationLogRowSet(
handle,
hiveOrder,
rowSetSize,
hive.getHiveConf).toTRowSet
val resp = new TFetchResultsResp(OK_STATUS)
resp.setResults(rowSet)
resp.setHasMoreRows(false)
resp
}
override def isTimedOut: Boolean = internalHiveOperation.isTimedOut(System.currentTimeMillis)

View File

@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.hive.operation
import java.util.List
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.rpc.thrift.TRowSet
import org.apache.hive.service.rpc.thrift.TFetchResultsResp
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.hive.session.HiveSessionImpl
@ -154,7 +154,7 @@ class HiveOperationManager() extends OperationManager("HiveOperationManager") {
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = {
maxRows: Int): TFetchResultsResp = {
val operation = getOperation(opHandle).asInstanceOf[HiveOperation]
operation.getOperationLogRowSet(order, maxRows)
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.kyuubi.engine.jdbc.operation
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TRowSet}
import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
@ -36,7 +36,9 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session
protected lazy val dialect: JdbcDialect = JdbcDialects.get(conf)
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(
order: FetchOrientation,
rowSetSize: Int): TFetchResultsResp = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
@ -51,7 +53,10 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session
val taken = iter.take(rowSetSize)
val resultRowSet = toTRowSet(taken)
resultRowSet.setStartRowOffset(iter.getPosition)
resultRowSet
val resp = new TFetchResultsResp(OK_STATUS)
resp.setResults(resultRowSet)
resp.setHasMoreRows(false)
resp
}
override def cancel(): Unit = {

View File

@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.operation
import java.io.IOException
import java.time.ZoneId
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressUpdateResp, TRowSet}
import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TProgressUpdateResp, TRowSet}
import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener}
import org.apache.spark.kyuubi.SparkUtilsHelper.redact
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
@ -233,7 +233,9 @@ abstract class SparkOperation(session: Session)
resp
}
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(
order: FetchOrientation,
rowSetSize: Int): TFetchResultsResp = {
var resultRowSet: TRowSet = null
try {
withLocalProperties {
@ -264,7 +266,10 @@ abstract class SparkOperation(session: Session)
}
} catch onError(cancel = true)
resultRowSet
val resp = new TFetchResultsResp(OK_STATUS)
resp.setResults(resultRowSet)
resp.setHasMoreRows(false)
resp
}
override def shouldRunAsync: Boolean = false

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.trino.operation
import java.util.concurrent.RejectedExecutionException
import org.apache.hive.service.rpc.thrift.TRowSet
import org.apache.hive.service.rpc.thrift.TFetchResultsResp
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.trino.TrinoStatement
@ -82,7 +82,9 @@ class ExecuteStatement(
}
}
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(
order: FetchOrientation,
rowSetSize: Int): TFetchResultsResp = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
@ -97,7 +99,10 @@ class ExecuteStatement(
val taken = iter.take(rowSetSize)
val resultRowSet = RowSet.toTRowSet(taken.toList, schema, getProtocolVersion)
resultRowSet.setStartRowOffset(iter.getPosition)
resultRowSet
val fetchResultsResp = new TFetchResultsResp(OK_STATUS)
fetchResultsResp.setResults(resultRowSet)
fetchResultsResp.setHasMoreRows(false)
fetchResultsResp
}
private def executeStatement(trinoStatement: TrinoStatement): Unit = {

View File

@ -21,7 +21,7 @@ import java.io.IOException
import io.trino.client.Column
import io.trino.client.StatementClient
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.Utils
@ -54,7 +54,9 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio
resp
}
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(
order: FetchOrientation,
rowSetSize: Int): TFetchResultsResp = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
@ -66,7 +68,10 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio
val taken = iter.take(rowSetSize)
val resultRowSet = RowSet.toTRowSet(taken.toList, schema, getProtocolVersion)
resultRowSet.setStartRowOffset(iter.getPosition)
resultRowSet
val resp = new TFetchResultsResp(OK_STATUS)
resp.setResults(resultRowSet)
resp.setHasMoreRows(false)
resp
}
override protected def beforeRun(): Unit = {

View File

@ -23,7 +23,7 @@ import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConverters._
import org.apache.commons.lang3.StringUtils
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TRowSet, TStatus, TStatusCode}
import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TStatus, TStatusCode}
import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf.OPERATION_IDLE_TIMEOUT
@ -182,11 +182,12 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
override def getResultSetMetadata: TGetResultSetMetadataResp
def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet
def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TFetchResultsResp
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = withLockRequired {
getNextRowSetInternal(order, rowSetSize)
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TFetchResultsResp =
withLockRequired {
getNextRowSetInternal(order, rowSetSize)
}
/**
* convert SQL 'like' pattern to a Java regular expression.

View File

@ -20,7 +20,7 @@ package org.apache.kyuubi.operation
/**
* Borrowed from Apache Spark, see SPARK-33655
*/
sealed trait FetchIterator[A] extends Iterator[A] {
trait FetchIterator[A] extends Iterator[A] {
/**
* Begin a fetch block, forward from the current position.

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.operation
import java.util.concurrent.Future
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.log.OperationLog
@ -32,7 +32,7 @@ trait Operation {
def close(): Unit
def getResultSetMetadata: TGetResultSetMetadataResp
def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet
def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TFetchResultsResp
def getSession: Session
def getHandle: OperationHandle

View File

@ -137,18 +137,22 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
final def getOperationNextRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = {
maxRows: Int): TFetchResultsResp = {
getOperation(opHandle).getNextRowSet(order, maxRows)
}
def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = {
maxRows: Int): TFetchResultsResp = {
val operationLog = getOperation(opHandle).getOperationLog
operationLog.map(_.read(order, maxRows)).getOrElse {
val rowSet = operationLog.map(_.read(order, maxRows)).getOrElse {
throw KyuubiSQLException(s"$opHandle failed to generate operation log")
}
val resp = new TFetchResultsResp(new TStatus(TStatusCode.SUCCESS_STATUS))
resp.setResults(rowSet)
resp.setHasMoreRows(false)
resp
}
final def removeExpiredOperations(handles: Seq[OperationHandle]): Seq[Operation] = synchronized {

View File

@ -201,7 +201,7 @@ abstract class AbstractBackendService(name: String)
operationHandle: OperationHandle,
orientation: FetchOrientation,
maxRows: Int,
fetchLog: Boolean): TRowSet = {
fetchLog: Boolean): TFetchResultsResp = {
maxRowsLimit.foreach(limit =>
if (maxRows > limit) {
throw new IllegalArgumentException(s"Max rows for fetching results " +

View File

@ -101,7 +101,7 @@ trait BackendService {
operationHandle: OperationHandle,
orientation: FetchOrientation,
maxRows: Int,
fetchLog: Boolean): TRowSet
fetchLog: Boolean): TFetchResultsResp
def sessionManager: SessionManager
}

View File

@ -520,23 +520,20 @@ abstract class TFrontendService(name: String)
override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = {
debug(req.toString)
val resp = new TFetchResultsResp
try {
val operationHandle = OperationHandle(req.getOperationHandle)
val orientation = FetchOrientation.getFetchOrientation(req.getOrientation)
// 1 means fetching log
val fetchLog = req.getFetchType == 1
val maxRows = req.getMaxRows.toInt
val rowSet = be.fetchResults(operationHandle, orientation, maxRows, fetchLog)
resp.setResults(rowSet)
resp.setHasMoreRows(false)
resp.setStatus(OK_STATUS)
be.fetchResults(operationHandle, orientation, maxRows, fetchLog)
} catch {
case e: Exception =>
error("Error fetching results: ", e)
val resp = new TFetchResultsResp
resp.setStatus(KyuubiSQLException.toTStatus(e))
resp
}
resp
}
protected def notSupportTokenErrorStatus = {

View File

@ -233,7 +233,7 @@ abstract class AbstractSession(
operationHandle: OperationHandle,
orientation: FetchOrientation,
maxRows: Int,
fetchLog: Boolean): TRowSet = {
fetchLog: Boolean): TFetchResultsResp = {
if (fetchLog) {
sessionManager.operationManager.getOperationLogRowSet(operationHandle, orientation, maxRows)
} else {

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.session
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TGetResultSetMetadataResp, TProtocolVersion, TRowSet}
import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetInfoType, TGetInfoValue, TGetResultSetMetadataResp, TProtocolVersion}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationHandle
@ -91,7 +91,7 @@ trait Session {
operationHandle: OperationHandle,
orientation: FetchOrientation,
maxRows: Int,
fetchLog: Boolean): TRowSet
fetchLog: Boolean): TFetchResultsResp
def closeExpiredOperations(): Unit
}

View File

@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TFetchResultsResp, TGetResultSetMetadataResp, TPrimitiveTypeEntry, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@ -76,11 +76,16 @@ class NoopOperation(session: Session, shouldFail: Boolean = false)
resp
}
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(
order: FetchOrientation,
rowSetSize: Int): TFetchResultsResp = {
val col = TColumn.stringVal(new TStringColumn(Seq(opType).asJava, ByteBuffer.allocate(0)))
val tRowSet = ThriftUtils.newEmptyRowSet
tRowSet.addToColumns(col)
tRowSet
val resp = new TFetchResultsResp(OK_STATUS)
resp.setResults(tRowSet)
resp.setHasMoreRows(false)
resp
}
override def shouldRunAsync: Boolean = false

View File

@ -20,7 +20,7 @@ package org.apache.kyuubi.operation
import java.nio.ByteBuffer
import java.util
import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn}
import org.apache.hive.service.rpc.thrift.{TColumn, TFetchResultsResp, TRow, TRowSet, TStatus, TStatusCode, TStringColumn}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.Session
@ -136,13 +136,16 @@ class NoopOperationManager extends OperationManager("noop") {
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = {
maxRows: Int): TFetchResultsResp = {
val logs = new util.ArrayList[String]()
logs.add("test")
val tColumn = TColumn.stringVal(new TStringColumn(logs, ByteBuffer.allocate(0)))
val tRow = new TRowSet(0, new util.ArrayList[TRow](logs.size()))
tRow.addToColumns(tColumn)
tRow
val resp = new TFetchResultsResp(new TStatus(TStatusCode.SUCCESS_STATUS))
resp.setResults(tRow)
resp.setHasMoreRows(false)
resp
}
override def getQueryId(operation: Operation): String = {

View File

@ -26,6 +26,7 @@ import org.apache.hive.service.rpc.thrift.*;
import org.apache.kyuubi.jdbc.hive.cli.RowSet;
import org.apache.kyuubi.jdbc.hive.cli.RowSetFactory;
import org.apache.kyuubi.jdbc.hive.common.HiveDecimal;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -47,6 +48,7 @@ public class KyuubiQueryResultSet extends KyuubiBaseResultSet {
private boolean emptyResultSet = false;
private boolean isScrollable = false;
private boolean fetchFirst = false;
private boolean hasMoreToFetch = false;
private final TProtocolVersion protocol;
@ -317,25 +319,20 @@ public class KyuubiQueryResultSet extends KyuubiBaseResultSet {
try {
TFetchOrientation orientation = TFetchOrientation.FETCH_NEXT;
if (fetchFirst) {
// If we are asked to start from begining, clear the current fetched resultset
// If we are asked to start from beginning, clear the current fetched resultset
orientation = TFetchOrientation.FETCH_FIRST;
fetchedRows = null;
fetchedRowsItr = null;
fetchFirst = false;
}
if (fetchedRows == null || !fetchedRowsItr.hasNext()) {
TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, orientation, fetchSize);
TFetchResultsResp fetchResp;
fetchResp = client.FetchResults(fetchReq);
Utils.verifySuccessWithInfo(fetchResp.getStatus());
TRowSet results = fetchResp.getResults();
fetchedRows = RowSetFactory.create(results, protocol);
fetchedRowsItr = fetchedRows.iterator();
fetchResult(orientation);
}
if (fetchedRowsItr.hasNext()) {
row = fetchedRowsItr.next();
} else if (hasMoreToFetch) {
fetchResult(orientation);
} else {
return false;
}
@ -350,6 +347,18 @@ public class KyuubiQueryResultSet extends KyuubiBaseResultSet {
return true;
}
private void fetchResult(TFetchOrientation orientation) throws SQLException, TException {
TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle, orientation, fetchSize);
TFetchResultsResp fetchResp;
fetchResp = client.FetchResults(fetchReq);
Utils.verifySuccessWithInfo(fetchResp.getStatus());
hasMoreToFetch = fetchResp.isSetHasMoreRows() && fetchResp.isHasMoreRows();
TRowSet results = fetchResp.getResults();
fetchedRows = RowSetFactory.create(results, protocol);
fetchedRowsItr = fetchedRows.iterator();
}
@Override
public ResultSetMetaData getMetaData() throws SQLException {
if (isClosed) {

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.log.OperationLog
@ -67,11 +67,17 @@ class ExecutedCommandExec(
if (!shouldRunAsync) getBackgroundHandle.get()
}
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(
order: FetchOrientation,
rowSetSize: Int): TFetchResultsResp = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
command.getNextRowSet(order, rowSetSize, getProtocolVersion)
val rowSet = command.getNextRowSet(order, rowSetSize, getProtocolVersion)
val resp = new TFetchResultsResp(OK_STATUS)
resp.setResults(rowSet)
resp.setHasMoreRows(false)
resp
}
override def getResultSetMetadata: TGetResultSetMetadataResp = {

View File

@ -22,7 +22,7 @@ import java.util.{ArrayList => JArrayList}
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRow, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TFetchResultsResp, TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRow, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
import org.apache.kyuubi.engine.ApplicationInfo
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@ -54,8 +54,11 @@ abstract class KyuubiApplicationOperation(session: Session) extends KyuubiOperat
resp
}
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
applicationInfoMap.map { state =>
override def getNextRowSetInternal(
order: FetchOrientation,
rowSetSize: Int): TFetchResultsResp = {
val resp = new TFetchResultsResp(OK_STATUS)
val rowSet = applicationInfoMap.map { state =>
val tRow = new TRowSet(0, new JArrayList[TRow](state.size))
Seq(state.keys, state.values.map(Option(_).getOrElse(""))).map(_.toSeq.asJava).foreach {
col =>
@ -64,5 +67,8 @@ abstract class KyuubiApplicationOperation(session: Session) extends KyuubiOperat
}
tRow
}.getOrElse(ThriftUtils.EMPTY_ROW_SET)
resp.setResults(rowSet)
resp.setHasMoreRows(false)
resp
}
}

View File

@ -179,11 +179,17 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
}
}
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(
order: FetchOrientation,
rowSetSize: Int): TFetchResultsResp = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
client.fetchResults(_remoteOpHandle, order, rowSetSize, fetchLog = false)
val rowset = client.fetchResults(_remoteOpHandle, order, rowSetSize, fetchLog = false)
val resp = new TFetchResultsResp(OK_STATUS)
resp.setResults(rowset)
resp.setHasMoreRows(false)
resp
}
override def shouldRunAsync: Boolean = false

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.operation
import java.util.concurrent.TimeUnit
import org.apache.hive.service.rpc.thrift.TRowSet
import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TStatus, TStatusCode}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
@ -214,11 +214,11 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = {
maxRows: Int): TFetchResultsResp = {
val resp = new TFetchResultsResp(new TStatus(TStatusCode.SUCCESS_STATUS))
val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation]
val operationLog = operation.getOperationLog
operationLog match {
val rowSet = operationLog match {
case Some(log) => log.read(order, maxRows)
case None =>
val remoteHandle = operation.remoteOpHandle()
@ -229,6 +229,9 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
ThriftUtils.EMPTY_ROW_SET
}
}
resp.setResults(rowSet)
resp.setHasMoreRows(false)
resp
}
override def start(): Unit = synchronized {

View File

@ -183,9 +183,10 @@ trait BackendServiceMetric extends BackendService {
operationHandle: OperationHandle,
orientation: FetchOrientation,
maxRows: Int,
fetchLog: Boolean): TRowSet = {
fetchLog: Boolean): TFetchResultsResp = {
MetricsSystem.timerTracing(MetricsConstants.BS_FETCH_RESULTS) {
val rowSet = super.fetchResults(operationHandle, orientation, maxRows, fetchLog)
val fetchResultsResp = super.fetchResults(operationHandle, orientation, maxRows, fetchLog)
val rowSet = fetchResultsResp.getResults
// TODO: the statistics are wrong when we enabled the arrow.
val rowsSize =
if (rowSet.getColumnsSize > 0) {
@ -217,7 +218,7 @@ trait BackendServiceMetric extends BackendService {
operation.increaseFetchResultsCount(rowsSize)
}
rowSet
fetchResultsResp
}
}

View File

@ -145,10 +145,11 @@ private[v1] class OperationsResource extends ApiRequestContext with Logging {
if (fetchOrientation != "FETCH_NEXT" && fetchOrientation != "FETCH_FIRST") {
throw new BadRequestException(s"$fetchOrientation in operation log is not supported")
}
val rowSet = fe.be.sessionManager.operationManager.getOperationLogRowSet(
val fetchResultsResp = fe.be.sessionManager.operationManager.getOperationLogRowSet(
OperationHandle(operationHandleStr),
FetchOrientation.withName(fetchOrientation),
maxRows)
val rowSet = fetchResultsResp.getResults
val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala
new OperationLog(logRowSet.asJava, logRowSet.size)
} catch {
@ -175,11 +176,12 @@ private[v1] class OperationsResource extends ApiRequestContext with Logging {
@QueryParam("fetchorientation") @DefaultValue("FETCH_NEXT")
fetchOrientation: String): ResultRowSet = {
try {
val rowSet = fe.be.fetchResults(
val fetchResultsResp = fe.be.fetchResults(
OperationHandle(operationHandleStr),
FetchOrientation.withName(fetchOrientation),
maxRows,
fetchLog = false)
val rowSet = fetchResultsResp.getResults
val rows = rowSet.getRows.asScala.map(i => {
new Row(i.getColVals.asScala.map(i => {
new Field(

View File

@ -196,11 +196,12 @@ class MySQLCommandHandler(
.getOrElse(KyuubiSQLException(s"Error operator state ${opStatus.state}"))
}
val resultSetMetadata = be.getResultSetMetadata(opHandle)
val rowSet = be.fetchResults(
val fetchResultResp = be.fetchResults(
opHandle,
FetchOrientation.FETCH_NEXT,
Int.MaxValue,
fetchLog = false)
val rowSet = fetchResultResp.getResults
MySQLQueryResult(resultSetMetadata.getSchema, rowSet)
} catch {
case rethrow: Exception =>

View File

@ -69,7 +69,7 @@ case class Query(
queryId.operationHandle,
defaultFetchOrientation,
defaultMaxRows,
false)
false).getResults
TrinoContext.createQueryResults(
queryId.getQueryId,
nextUri,

View File

@ -151,7 +151,7 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
}
val resultColumns = batchJobSubmissionOp.getNextRowSet(FetchOrientation.FETCH_NEXT, 10)
.getColumns.asScala
.getResults.getColumns.asScala
val keys = resultColumns.head.getStringVal.getValues.asScala
val values = resultColumns.apply(1).getStringVal.getValues.asScala

View File

@ -84,7 +84,7 @@ class TrinoContextSuite extends KyuubiFunSuite with RestFrontendTestHelper {
checkOpState(opHandleStr, FINISHED)
val metadataResp = fe.be.getResultSetMetadata(opHandle)
val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000, false)
val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000, false).getResults
val status = fe.be.getOperationStatus(opHandle, Some(0))
val uri = new URI("sfdsfsdfdsf")
@ -111,7 +111,7 @@ class TrinoContextSuite extends KyuubiFunSuite with RestFrontendTestHelper {
checkOpState(opHandleStr, FINISHED)
val metadataResp = fe.be.getResultSetMetadata(opHandle)
val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000, false)
val tRowSet = fe.be.fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 1000, false).getResults
val status = fe.be.getOperationStatus(opHandle, Some(0))
val uri = new URI("sfdsfsdfdsf")