diff --git a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index e9e570ca6..87774dd83 100644 --- a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -29,11 +29,13 @@ import scala.util.control.NonFatal import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException import org.apache.hadoop.hive.ql.session.OperationLog import org.apache.hive.service.cli._ import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.SparkUtils -import org.apache.spark.sql.{DataFrame, Row, SparkSQLUtils} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils} +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.types._ import yaooqinn.kyuubi.Logging @@ -95,6 +97,14 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging this.state = newState } + private[this] def checkState(state: OperationState): Boolean = { + this.state == state + } + + private[this] def isClosedOrCanceled: Boolean = { + checkState(OperationState.CLOSED) || checkState(OperationState.CANCELED) + } + @throws[HiveSQLException] private[this] def assertState(state: OperationState): Unit = { if (this.state ne state) { @@ -319,17 +329,12 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging try { execute() } catch { - case e: HiveSQLException => - setOperationException(e) - error("Error running hive query: ", e) + case e: HiveSQLException => setOperationException(e) } } }) } catch { - case e: Exception => - setOperationException(new HiveSQLException(e)) - error("Error running hive query as user : " + - session.getUserName, e) + case e: Exception => setOperationException(new HiveSQLException(e)) } } } @@ -352,60 +357,74 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } private def execute(): Unit = { - statementId = UUID.randomUUID().toString - info(s"Running query '$statement' with $statementId") - setState(OperationState.RUNNING) - // Always use the latest class loader provided by executionHive's state. - KyuubiServerMonitor.getListener(session.getUserName).onStatementStart( - statementId, - session.getSessionHandle.getSessionId.toString, - statement, - statementId, - session.getUserName) - session.sparkSession().sparkContext.setJobGroup(statementId, statement) try { + statementId = UUID.randomUUID().toString + info(s"Running query '$statement' with $statementId") + setState(OperationState.RUNNING) + KyuubiServerMonitor.getListener(session.getUserName).onStatementStart( + statementId, + session.getSessionHandle.getSessionId.toString, + statement, + statementId, + session.getUserName) + session.sparkSession().sparkContext.setJobGroup(statementId, statement) result = session.sparkSession().sql(statement) KyuubiServerMonitor.getListener(session.getUserName) .onStatementParsed(statementId, result.queryExecution.toString()) debug(result.queryExecution.toString()) iter = result.collect().iterator dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray + setState(OperationState.FINISHED) + KyuubiServerMonitor.getListener(session.getUserName).onStatementFinish(statementId) } catch { case e: HiveSQLException => - if (getStatus.getState == OperationState.CANCELED - || getStatus.getState == OperationState.CLOSED) { - return - } else { - setState(OperationState.ERROR) - KyuubiServerMonitor.getListener(session.getUserName).onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) + if (!isClosedOrCanceled) { + onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) throw e } - // Actually do need to catch Throwable as some failures don't inherit from Exception and - // HiveServer will silently swallow them. + case e: ParseException => + if (!isClosedOrCanceled) { + onStatementError( + statementId, e.withCommand(statement).getMessage, SparkUtils.exceptionString(e)) + throw new HiveSQLException( + e.withCommand(statement).getMessage, "ParseException", 2000, e) + } + case e: AnalysisException => + if (!isClosedOrCanceled) { + onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) + throw new HiveSQLException(e.getMessage, "AnalysisException", 2001, e) + } + case e: HiveAccessControlException => + if (!isClosedOrCanceled) { + onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) + throw new HiveSQLException(e.getMessage, "HiveAccessControlException", 3000, e) + } case e: Throwable => - val currentState = getStatus.getState - error(s"Error executing query, currentState $currentState, ", e) - if (currentState == OperationState.CANCELED - || currentState == OperationState.CLOSED) { - return - } else { - setState(OperationState.ERROR) - KyuubiServerMonitor.getListener(session.getUserName).onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw new HiveSQLException(e.toString) + if (!isClosedOrCanceled) { + onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) + throw new HiveSQLException(e.toString, "", 10000, e) } } finally { if (statementId != null) { session.sparkSession().sparkContext.cancelJobGroup(statementId) } } - setState(OperationState.FINISHED) - KyuubiServerMonitor.getListener(session.getUserName).onStatementFinish(statementId) } - private def cleanup(state: OperationState) { - if (getStatus.getState != OperationState.CLOSED) { + private[this] def onStatementError(id: String, message: String, trace: String): Unit = { + error( + s""" + |Error executing query as ${session.getUserName}, + |$statement + |Current operation state ${this.state}, + |$trace + """.stripMargin) + setState(OperationState.ERROR) + KyuubiServerMonitor.getListener(session.getUserName).onStatementError(id, message, trace) + } + + private[this] def cleanup(state: OperationState) { + if (this.state != OperationState.CLOSED) { setState(state) } val backgroundHandle = getBackgroundHandle diff --git a/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala b/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala index ba8569142..4c0367fda 100644 --- a/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala +++ b/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala @@ -30,12 +30,14 @@ import scala.util.matching.Regex import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli._ import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.KyuubiConf._ import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.ui.KyuubiServerTab import yaooqinn.kyuubi.Logging @@ -75,7 +77,7 @@ private[kyuubi] class KyuubiSession( private[this] var sessionLogDir: File = _ private[this] var lastAccessTime = 0L private[this] var lastIdleTime = 0L - private[this] var initialDatabase: String = "use default" + private[this] var initialDatabase: Option[String] = None private[this] val sessionUGI: UserGroupInformation = { val currentUser = UserGroupInformation.getCurrentUser @@ -234,7 +236,7 @@ private[kyuubi] class KyuubiSession( } else { conf.set(SPARK_HADOOP_PREFIX + k, value) } - case "use:database" => initialDatabase = "use " + value + case "use:database" => initialDatabase = Some("use " + value) case _ => } } @@ -257,7 +259,7 @@ private[kyuubi] class KyuubiSession( } else { _sparkSession.conf.set(SPARK_HADOOP_PREFIX + k, value) } - case "use:database" => initialDatabase = "use " + value + case "use:database" => initialDatabase = Some("use " + value) case _ => } } @@ -271,11 +273,17 @@ private[kyuubi] class KyuubiSession( getOrCreateSparkSession(sessionConf) assert(_sparkSession != null) - sessionUGI.doAs(new PrivilegedExceptionAction[Unit] { - override def run(): Unit = { - _sparkSession.sql(initialDatabase) + try { + initialDatabase.foreach(executeStatement) + } catch { + case ute: UndeclaredThrowableException => ute.getCause match { + case e: HiveAccessControlException => + throw new HiveSQLException(e.getMessage, "08S01", e.getCause) + case e: NoSuchDatabaseException => + throw new HiveSQLException(e.getMessage, "08S01", e.getCause) + case e: HiveSQLException => throw e } - }) + } lastAccessTime = System.currentTimeMillis lastIdleTime = lastAccessTime }