From abe99da76b55eb16cf22fba0bc4173cdfae8979c Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 15 Mar 2018 16:40:25 +0800 Subject: [PATCH 1/6] fixes #3 initial db switch fails with privileges check --- .../kyuubi/session/KyuubiSession.scala | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala b/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala index ba8569142..4c0367fda 100644 --- a/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala +++ b/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala @@ -30,12 +30,14 @@ import scala.util.matching.Regex import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli._ import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.KyuubiConf._ import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.ui.KyuubiServerTab import yaooqinn.kyuubi.Logging @@ -75,7 +77,7 @@ private[kyuubi] class KyuubiSession( private[this] var sessionLogDir: File = _ private[this] var lastAccessTime = 0L private[this] var lastIdleTime = 0L - private[this] var initialDatabase: String = "use default" + private[this] var initialDatabase: Option[String] = None private[this] val sessionUGI: UserGroupInformation = { val currentUser = UserGroupInformation.getCurrentUser @@ -234,7 +236,7 @@ private[kyuubi] class KyuubiSession( } else { conf.set(SPARK_HADOOP_PREFIX + k, value) } - case "use:database" => initialDatabase = "use " + value + case "use:database" => initialDatabase = Some("use " + value) case _ => } } @@ -257,7 +259,7 @@ private[kyuubi] class KyuubiSession( } else { _sparkSession.conf.set(SPARK_HADOOP_PREFIX + k, value) } - case "use:database" => initialDatabase = "use " + value + case "use:database" => initialDatabase = Some("use " + value) case _ => } } @@ -271,11 +273,17 @@ private[kyuubi] class KyuubiSession( getOrCreateSparkSession(sessionConf) assert(_sparkSession != null) - sessionUGI.doAs(new PrivilegedExceptionAction[Unit] { - override def run(): Unit = { - _sparkSession.sql(initialDatabase) + try { + initialDatabase.foreach(executeStatement) + } catch { + case ute: UndeclaredThrowableException => ute.getCause match { + case e: HiveAccessControlException => + throw new HiveSQLException(e.getMessage, "08S01", e.getCause) + case e: NoSuchDatabaseException => + throw new HiveSQLException(e.getMessage, "08S01", e.getCause) + case e: HiveSQLException => throw e } - }) + } lastAccessTime = System.currentTimeMillis lastIdleTime = lastAccessTime } From 8d3f1c7803d2ddd308f0c419b6e84cd1fb2243d4 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 15 Mar 2018 17:55:06 +0800 Subject: [PATCH 2/6] fixes #3 handle analysis exception --- .../kyuubi/operation/KyuubiOperation.scala | 73 +++++++++++-------- 1 file changed, 41 insertions(+), 32 deletions(-) diff --git a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index e9e570ca6..a98ce2c01 100644 --- a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.session.OperationLog import org.apache.hive.service.cli._ import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.SparkUtils -import org.apache.spark.sql.{DataFrame, Row, SparkSQLUtils} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils} import org.apache.spark.sql.types._ import yaooqinn.kyuubi.Logging @@ -95,6 +95,14 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging this.state = newState } + private[this] def checkState(state: OperationState): Boolean = { + getStatus.getState == state + } + + private[this] def isClosedOrCanceled: Boolean = { + checkState(OperationState.CLOSED) || checkState(OperationState.CANCELED) + } + @throws[HiveSQLException] private[this] def assertState(state: OperationState): Unit = { if (this.state ne state) { @@ -321,7 +329,6 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } catch { case e: HiveSQLException => setOperationException(e) - error("Error running hive query: ", e) } } }) @@ -352,47 +359,44 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } private def execute(): Unit = { - statementId = UUID.randomUUID().toString - info(s"Running query '$statement' with $statementId") - setState(OperationState.RUNNING) - // Always use the latest class loader provided by executionHive's state. - KyuubiServerMonitor.getListener(session.getUserName).onStatementStart( - statementId, - session.getSessionHandle.getSessionId.toString, - statement, - statementId, - session.getUserName) - session.sparkSession().sparkContext.setJobGroup(statementId, statement) try { + statementId = UUID.randomUUID().toString + info(s"Running query '$statement' with $statementId") + setState(OperationState.RUNNING) + KyuubiServerMonitor.getListener(session.getUserName).onStatementStart( + statementId, + session.getSessionHandle.getSessionId.toString, + statement, + statementId, + session.getUserName) + session.sparkSession().sparkContext.setJobGroup(statementId, statement) result = session.sparkSession().sql(statement) KyuubiServerMonitor.getListener(session.getUserName) .onStatementParsed(statementId, result.queryExecution.toString()) debug(result.queryExecution.toString()) iter = result.collect().iterator dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray + setState(OperationState.FINISHED) + KyuubiServerMonitor.getListener(session.getUserName).onStatementFinish(statementId) } catch { case e: HiveSQLException => - if (getStatus.getState == OperationState.CANCELED - || getStatus.getState == OperationState.CLOSED) { - return - } else { + if (!isClosedOrCanceled) { setState(OperationState.ERROR) - KyuubiServerMonitor.getListener(session.getUserName).onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) + onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) throw e } - // Actually do need to catch Throwable as some failures don't inherit from Exception and - // HiveServer will silently swallow them. - case e: Throwable => - val currentState = getStatus.getState - error(s"Error executing query, currentState $currentState, ", e) - if (currentState == OperationState.CANCELED - || currentState == OperationState.CLOSED) { - return - } else { + case e: AnalysisException => + error(s"Error executing query, currentState ${getStatus.getState}, ", e) + if (!isClosedOrCanceled) { setState(OperationState.ERROR) - KyuubiServerMonitor.getListener(session.getUserName).onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) + onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) + throw new HiveSQLException(e.getMessage(), "AnalysisException", e) + } + + case e: Throwable => + error(s"Error executing query, currentState ${getStatus.getState}, ", e) + if (!isClosedOrCanceled) { + onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) throw new HiveSQLException(e.toString) } } finally { @@ -400,8 +404,13 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging session.sparkSession().sparkContext.cancelJobGroup(statementId) } } - setState(OperationState.FINISHED) - KyuubiServerMonitor.getListener(session.getUserName).onStatementFinish(statementId) + } + + private[this] def onStatementError( + id: String, errorMessage: String, errorTrace: String): Unit = { + setState(OperationState.ERROR) + KyuubiServerMonitor.getListener(session.getUserName) + .onStatementError(id, errorMessage, errorTrace) } private def cleanup(state: OperationState) { From b44faa86fa3a57c54947bfd65d665c305970f4d0 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 15 Mar 2018 20:10:38 +0800 Subject: [PATCH 3/6] fixes #3 handle parse exception --- .../yaooqinn/kyuubi/operation/KyuubiOperation.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index a98ce2c01..638d0e08e 100644 --- a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -34,6 +34,7 @@ import org.apache.hive.service.cli._ import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.SparkUtils import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils} +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.types._ import yaooqinn.kyuubi.Logging @@ -381,20 +382,21 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } catch { case e: HiveSQLException => if (!isClosedOrCanceled) { - setState(OperationState.ERROR) onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) throw e } - case e: AnalysisException => - error(s"Error executing query, currentState ${getStatus.getState}, ", e) + case e: ParseException => + if (!isClosedOrCanceled) { + onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) + throw new HiveSQLException(e.getMessage(), "ParseException", e) + } + case e: AnalysisException => if (!isClosedOrCanceled) { - setState(OperationState.ERROR) onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) throw new HiveSQLException(e.getMessage(), "AnalysisException", e) } case e: Throwable => - error(s"Error executing query, currentState ${getStatus.getState}, ", e) if (!isClosedOrCanceled) { onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) throw new HiveSQLException(e.toString) @@ -408,6 +410,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging private[this] def onStatementError( id: String, errorMessage: String, errorTrace: String): Unit = { + error(s"Error executing query, currentState ${getStatus.getState}, $errorMessage") setState(OperationState.ERROR) KyuubiServerMonitor.getListener(session.getUserName) .onStatementError(id, errorMessage, errorTrace) From 2bc0a2f4ed5b917799941cee9abe4950b1b8fb35 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 15 Mar 2018 20:25:13 +0800 Subject: [PATCH 4/6] fixes #3 handle parse exception --- .../scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 638d0e08e..cafd4d030 100644 --- a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -387,8 +387,9 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } case e: ParseException => if (!isClosedOrCanceled) { - onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw new HiveSQLException(e.getMessage(), "ParseException", e) + onStatementError( + statementId, e.withCommand(statement).getMessage, SparkUtils.exceptionString(e)) + throw new HiveSQLException(e.withCommand(statement).getMessage(), "ParseException", e) } case e: AnalysisException => if (!isClosedOrCanceled) { @@ -410,7 +411,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging private[this] def onStatementError( id: String, errorMessage: String, errorTrace: String): Unit = { - error(s"Error executing query, currentState ${getStatus.getState}, $errorMessage") + error(s"Error executing query, currentState ${getStatus.getState}, $errorTrace") setState(OperationState.ERROR) KyuubiServerMonitor.getListener(session.getUserName) .onStatementError(id, errorMessage, errorTrace) From 1d9dc0d642f5697c9d24682da6ee1cbb8a3881a1 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 15 Mar 2018 20:49:48 +0800 Subject: [PATCH 5/6] fixes #3 format err log --- .../yaooqinn/kyuubi/operation/KyuubiOperation.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index cafd4d030..07576c10c 100644 --- a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -97,7 +97,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } private[this] def checkState(state: OperationState): Boolean = { - getStatus.getState == state + this.state == state } private[this] def isClosedOrCanceled: Boolean = { @@ -411,14 +411,20 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging private[this] def onStatementError( id: String, errorMessage: String, errorTrace: String): Unit = { - error(s"Error executing query, currentState ${getStatus.getState}, $errorTrace") + error( + s""" + |Error executing query, + |$statement + |Current operation state ${this.state}, + |$errorTrace + """.stripMargin) setState(OperationState.ERROR) KyuubiServerMonitor.getListener(session.getUserName) .onStatementError(id, errorMessage, errorTrace) } private def cleanup(state: OperationState) { - if (getStatus.getState != OperationState.CLOSED) { + if (this.state != OperationState.CLOSED) { setState(state) } val backgroundHandle = getBackgroundHandle From 125f7f5b84dad3d9ee1fff19b943d25892e510b6 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 15 Mar 2018 21:48:38 +0800 Subject: [PATCH 6/6] fixes #3 handle hive conttol exception --- .../kyuubi/operation/KyuubiOperation.scala | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 07576c10c..87774dd83 100644 --- a/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -29,6 +29,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException import org.apache.hadoop.hive.ql.session.OperationLog import org.apache.hive.service.cli._ import org.apache.hive.service.cli.thrift.TProtocolVersion @@ -328,16 +329,12 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging try { execute() } catch { - case e: HiveSQLException => - setOperationException(e) + case e: HiveSQLException => setOperationException(e) } } }) } catch { - case e: Exception => - setOperationException(new HiveSQLException(e)) - error("Error running hive query as user : " + - session.getUserName, e) + case e: Exception => setOperationException(new HiveSQLException(e)) } } } @@ -389,18 +386,23 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging if (!isClosedOrCanceled) { onStatementError( statementId, e.withCommand(statement).getMessage, SparkUtils.exceptionString(e)) - throw new HiveSQLException(e.withCommand(statement).getMessage(), "ParseException", e) + throw new HiveSQLException( + e.withCommand(statement).getMessage, "ParseException", 2000, e) } case e: AnalysisException => if (!isClosedOrCanceled) { onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw new HiveSQLException(e.getMessage(), "AnalysisException", e) + throw new HiveSQLException(e.getMessage, "AnalysisException", 2001, e) + } + case e: HiveAccessControlException => + if (!isClosedOrCanceled) { + onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) + throw new HiveSQLException(e.getMessage, "HiveAccessControlException", 3000, e) } - case e: Throwable => if (!isClosedOrCanceled) { onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw new HiveSQLException(e.toString) + throw new HiveSQLException(e.toString, "", 10000, e) } } finally { if (statementId != null) { @@ -409,21 +411,19 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } } - private[this] def onStatementError( - id: String, errorMessage: String, errorTrace: String): Unit = { + private[this] def onStatementError(id: String, message: String, trace: String): Unit = { error( s""" - |Error executing query, + |Error executing query as ${session.getUserName}, |$statement |Current operation state ${this.state}, - |$errorTrace + |$trace """.stripMargin) setState(OperationState.ERROR) - KyuubiServerMonitor.getListener(session.getUserName) - .onStatementError(id, errorMessage, errorTrace) + KyuubiServerMonitor.getListener(session.getUserName).onStatementError(id, message, trace) } - private def cleanup(state: OperationState) { + private[this] def cleanup(state: OperationState) { if (this.state != OperationState.CLOSED) { setState(state) }