diff --git a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiSQLOperation.scala b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiSQLOperation.scala index 06014243b..e020a0be2 100644 --- a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiSQLOperation.scala +++ b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiSQLOperation.scala @@ -45,18 +45,19 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend private[this] var state = OperationState.INITIALIZED private[this] val opHandle: OperationHandle = new OperationHandle(OperationType.EXECUTE_STATEMENT, parentSession.getProtocolVersion) - protected var operationException: HiveSQLException = _ - protected var backgroundHandle: Future[_] = _ - protected var operationLog: OperationLog = _ - protected var isOperationLogEnabled: Boolean = false - private var result: DataFrame = _ - private var iter: Iterator[Row] = _ - private var iterHeader: Iterator[Row] = _ - private var dataTypes: Array[DataType] = _ - private var statementId: String = _ + private[this] var hasResultSet = false + private[this] var operationException: HiveSQLException = _ + private[this] var backgroundHandle: Future[_] = _ + private[this] var operationLog: OperationLog = _ + private[this] var isOperationLogEnabled: Boolean = false - private lazy val resultSchema: TableSchema = { + private[this] var result: DataFrame = _ + private[this] var iter: Iterator[Row] = _ + private[this] var dataTypes: Array[DataType] = _ + private[this] var statementId: String = _ + + private[this] lazy val resultSchema: TableSchema = { if (result == null || result.schema.isEmpty) { new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) } else { @@ -175,8 +176,7 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend createOperationLog() try { runInternal() - } - finally { + } finally { unregisterOperationLog() } } @@ -187,7 +187,6 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend error("Operation [ " + opHandle.getHandleIdentifier + " ] " + "logging is enabled, but its OperationLog object cannot be found.") } else { - unregisterOperationLog() operationLog.close() } } @@ -198,7 +197,7 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend debug(s"CLOSING $statementId") cleanup(OperationState.CLOSED) cleanupOperationLog() - parentSession.sparkSession.sparkContext.clearJobGroup() + parentSession.sparkSession().sparkContext.clearJobGroup() } def cancel(): Unit = { @@ -243,15 +242,9 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = { validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) + setHasResultSet(true) val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion) - // Reset iter to header when fetching start from first row - if (order.equals(FetchOrientation.FETCH_FIRST)) { - val (ita, itb) = iterHeader.duplicate - iter = ita - iterHeader = itb - } - if (!iter.hasNext) { resultRowSet } else { @@ -277,6 +270,11 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend } } + private[this] def setHasResultSet(hasResultSet: Boolean): Unit = { + this.hasResultSet = hasResultSet + opHandle.setHasResultSet(hasResultSet) + } + /** * Verify if the given fetch orientation is part of the default orientation types. * @@ -285,7 +283,7 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend * @throws HiveSQLException */ @throws[HiveSQLException] - protected def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = { + private[this] def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = { validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET) } @@ -309,36 +307,33 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend private[this] def runInternal(): Unit = { setState(OperationState.PENDING) - val sparkServiceUGI = parentSession.getSessionUgi + setHasResultSet(true) - // Runnable impl to call runInternal asynchronously, - // from a different thread + // Runnable impl to call runInternal asynchronously, from a different thread val backgroundOperation = new Runnable() { override def run(): Unit = { - val doAsAction = new PrivilegedExceptionAction[Unit]() { - registerCurrentOperationLog() - override def run(): Unit = { - try { - execute() - } catch { - case e: HiveSQLException => - setOperationException(e) - error("Error running hive query: ", e) - } finally { - unregisterOperationLog() - } - } - } try { - sparkServiceUGI.doAs(doAsAction) + parentSession.getSessionUgi.doAs(new PrivilegedExceptionAction[Unit]() { + registerCurrentOperationLog() + override def run(): Unit = { + try { + execute() + } catch { + case e: HiveSQLException => + setOperationException(e) + error("Error running hive query: ", e) + } + } + }) } catch { case e: Exception => setOperationException(new HiveSQLException(e)) error("Error running hive query as user : " + - sparkServiceUGI.getShortUserName, e) + parentSession.getUserName, e) } } } + try { // This submit blocks if no background threads are available to run this operation val backgroundHandle = @@ -374,9 +369,6 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend .onStatementParsed(statementId, result.queryExecution.toString()) debug(result.queryExecution.toString()) iter = result.collect().iterator - val (itra, itrb) = iter.duplicate - iterHeader = itra - iter = itrb dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray } catch { case e: HiveSQLException => @@ -421,7 +413,7 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend backgroundHandle.cancel(true) } if (statementId != null) { - parentSession.sparkSession.sparkContext.cancelJobGroup(statementId) + parentSession.sparkSession().sparkContext.cancelJobGroup(statementId) } } }