set has row set to be true to avoid null row set

This commit is contained in:
Kent Yao 2018-01-08 15:15:10 +08:00
parent 79010ea2ce
commit 2024477419

View File

@ -45,18 +45,19 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
private[this] var state = OperationState.INITIALIZED
private[this] val opHandle: OperationHandle =
new OperationHandle(OperationType.EXECUTE_STATEMENT, parentSession.getProtocolVersion)
protected var operationException: HiveSQLException = _
protected var backgroundHandle: Future[_] = _
protected var operationLog: OperationLog = _
protected var isOperationLogEnabled: Boolean = false
private var result: DataFrame = _
private var iter: Iterator[Row] = _
private var iterHeader: Iterator[Row] = _
private var dataTypes: Array[DataType] = _
private var statementId: String = _
private[this] var hasResultSet = false
private[this] var operationException: HiveSQLException = _
private[this] var backgroundHandle: Future[_] = _
private[this] var operationLog: OperationLog = _
private[this] var isOperationLogEnabled: Boolean = false
private lazy val resultSchema: TableSchema = {
private[this] var result: DataFrame = _
private[this] var iter: Iterator[Row] = _
private[this] var dataTypes: Array[DataType] = _
private[this] var statementId: String = _
private[this] lazy val resultSchema: TableSchema = {
if (result == null || result.schema.isEmpty) {
new TableSchema(Arrays.asList(new FieldSchema("Result", "string", "")))
} else {
@ -175,8 +176,7 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
createOperationLog()
try {
runInternal()
}
finally {
} finally {
unregisterOperationLog()
}
}
@ -187,7 +187,6 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
error("Operation [ " + opHandle.getHandleIdentifier + " ] " +
"logging is enabled, but its OperationLog object cannot be found.")
} else {
unregisterOperationLog()
operationLog.close()
}
}
@ -198,7 +197,7 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
debug(s"CLOSING $statementId")
cleanup(OperationState.CLOSED)
cleanupOperationLog()
parentSession.sparkSession.sparkContext.clearJobGroup()
parentSession.sparkSession().sparkContext.clearJobGroup()
}
def cancel(): Unit = {
@ -243,15 +242,9 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
// Reset iter to header when fetching start from first row
if (order.equals(FetchOrientation.FETCH_FIRST)) {
val (ita, itb) = iterHeader.duplicate
iter = ita
iterHeader = itb
}
if (!iter.hasNext) {
resultRowSet
} else {
@ -277,6 +270,11 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
}
}
private[this] def setHasResultSet(hasResultSet: Boolean): Unit = {
this.hasResultSet = hasResultSet
opHandle.setHasResultSet(hasResultSet)
}
/**
* Verify if the given fetch orientation is part of the default orientation types.
*
@ -285,7 +283,7 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
* @throws HiveSQLException
*/
@throws[HiveSQLException]
protected def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = {
private[this] def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = {
validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET)
}
@ -309,36 +307,33 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
private[this] def runInternal(): Unit = {
setState(OperationState.PENDING)
val sparkServiceUGI = parentSession.getSessionUgi
setHasResultSet(true)
// Runnable impl to call runInternal asynchronously,
// from a different thread
// Runnable impl to call runInternal asynchronously, from a different thread
val backgroundOperation = new Runnable() {
override def run(): Unit = {
val doAsAction = new PrivilegedExceptionAction[Unit]() {
registerCurrentOperationLog()
override def run(): Unit = {
try {
execute()
} catch {
case e: HiveSQLException =>
setOperationException(e)
error("Error running hive query: ", e)
} finally {
unregisterOperationLog()
}
}
}
try {
sparkServiceUGI.doAs(doAsAction)
parentSession.getSessionUgi.doAs(new PrivilegedExceptionAction[Unit]() {
registerCurrentOperationLog()
override def run(): Unit = {
try {
execute()
} catch {
case e: HiveSQLException =>
setOperationException(e)
error("Error running hive query: ", e)
}
}
})
} catch {
case e: Exception =>
setOperationException(new HiveSQLException(e))
error("Error running hive query as user : " +
sparkServiceUGI.getShortUserName, e)
parentSession.getUserName, e)
}
}
}
try {
// This submit blocks if no background threads are available to run this operation
val backgroundHandle =
@ -374,9 +369,6 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
.onStatementParsed(statementId, result.queryExecution.toString())
debug(result.queryExecution.toString())
iter = result.collect().iterator
val (itra, itrb) = iter.duplicate
iterHeader = itra
iter = itrb
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
} catch {
case e: HiveSQLException =>
@ -421,7 +413,7 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
backgroundHandle.cancel(true)
}
if (statementId != null) {
parentSession.sparkSession.sparkContext.cancelJobGroup(statementId)
parentSession.sparkSession().sparkContext.cancelJobGroup(statementId)
}
}
}