[KYUUBI #4949] For operation getNextRowSet method, with operation lock required

### _Why are the changes needed?_

For the operation getNextRowSet method, we shall add lock for it.

For example, for spark operation, the result iterator is not thread-safe, it might throw exception(if the jdbc client to kyuubi server connection socket timeout).

For incremental collect mode, the fetchResult might trigger a spark task to collect the incremental result(`self.next().toIterator`).

The jdbc client to kyuubi gateway timeout, but the fetchResult request has been sent to engine.
Then the jdbc client re-send the fetchResult request.

And the getNextResultSet in spark engine side concurrent execute.

And the result iterator is not thread-safe and might cause NPE.

![image](https://github.com/apache/kyuubi/assets/6757692/03c369c7-dc12-40d7-aac3-c8f5e799d1cf)
![image](https://github.com/apache/kyuubi/assets/6757692/a3414f84-5112-4ea6-a611-f15e6288aba2)

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4949 from turboFei/lock_next_rowset.

Closes #4949

8f18f3236 [fwang12] getNextRowSetInternal and withLockRequired

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
fwang12 2023-06-12 13:31:36 +08:00
parent b10a8ef27d
commit 7d8e89c27f
12 changed files with 16 additions and 12 deletions

View File

@ -31,7 +31,7 @@ abstract class ChatOperation(session: Session) extends AbstractOperation(session
protected lazy val conf: KyuubiConf = session.sessionManager.getConf
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)

View File

@ -91,7 +91,7 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
resp
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)

View File

@ -92,7 +92,7 @@ abstract class HiveOperation(session: Session) extends AbstractOperation(session
resp
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
val tOrder = FetchOrientation.toTFetchOrientation(order)
val hiveOrder = org.apache.hive.service.cli.FetchOrientation.getFetchOrientation(tOrder)
val rowSet = internalHiveOperation.getNextRowSet(hiveOrder, rowSetSize)

View File

@ -36,7 +36,7 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session
protected lazy val dialect: JdbcDialect = JdbcDialects.get(conf)
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)

View File

@ -233,7 +233,7 @@ abstract class SparkOperation(session: Session)
resp
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
var resultRowSet: TRowSet = null
try {
withLocalProperties {

View File

@ -82,7 +82,7 @@ class ExecuteStatement(
}
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)

View File

@ -54,7 +54,7 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio
resp
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)

View File

@ -182,7 +182,11 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
override def getResultSetMetadata: TGetResultSetMetadataResp
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet
def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = withLockRequired {
getNextRowSetInternal(order, rowSetSize)
}
/**
* convert SQL 'like' pattern to a Java regular expression.

View File

@ -76,7 +76,7 @@ class NoopOperation(session: Session, shouldFail: Boolean = false)
resp
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
val col = TColumn.stringVal(new TStringColumn(Seq(opType).asJava, ByteBuffer.allocate(0)))
val tRowSet = ThriftUtils.newEmptyRowSet
tRowSet.addToColumns(col)

View File

@ -67,7 +67,7 @@ class ExecutedCommandExec(
if (!shouldRunAsync) getBackgroundHandle.get()
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)

View File

@ -54,7 +54,7 @@ abstract class KyuubiApplicationOperation(session: Session) extends KyuubiOperat
resp
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
applicationInfoMap.map { state =>
val tRow = new TRowSet(0, new JArrayList[TRow](state.size))
Seq(state.keys, state.values.map(Option(_).getOrElse(""))).map(_.toSeq.asJava).foreach {

View File

@ -179,7 +179,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
}
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
override def getNextRowSetInternal(order: FetchOrientation, rowSetSize: Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)