Merge pull request #4 from yaooqinn/i2
fixed #3 initial db switch fails with privileges check close #3
This commit is contained in:
commit
abab544d49
@ -29,11 +29,13 @@ import scala.util.control.NonFatal
|
||||
|
||||
import org.apache.hadoop.hive.conf.HiveConf
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema
|
||||
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException
|
||||
import org.apache.hadoop.hive.ql.session.OperationLog
|
||||
import org.apache.hive.service.cli._
|
||||
import org.apache.hive.service.cli.thrift.TProtocolVersion
|
||||
import org.apache.spark.SparkUtils
|
||||
import org.apache.spark.sql.{DataFrame, Row, SparkSQLUtils}
|
||||
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils}
|
||||
import org.apache.spark.sql.catalyst.parser.ParseException
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
import yaooqinn.kyuubi.Logging
|
||||
@ -95,6 +97,14 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
this.state = newState
|
||||
}
|
||||
|
||||
private[this] def checkState(state: OperationState): Boolean = {
|
||||
this.state == state
|
||||
}
|
||||
|
||||
private[this] def isClosedOrCanceled: Boolean = {
|
||||
checkState(OperationState.CLOSED) || checkState(OperationState.CANCELED)
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
private[this] def assertState(state: OperationState): Unit = {
|
||||
if (this.state ne state) {
|
||||
@ -319,17 +329,12 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
try {
|
||||
execute()
|
||||
} catch {
|
||||
case e: HiveSQLException =>
|
||||
setOperationException(e)
|
||||
error("Error running hive query: ", e)
|
||||
case e: HiveSQLException => setOperationException(e)
|
||||
}
|
||||
}
|
||||
})
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
setOperationException(new HiveSQLException(e))
|
||||
error("Error running hive query as user : " +
|
||||
session.getUserName, e)
|
||||
case e: Exception => setOperationException(new HiveSQLException(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -352,60 +357,74 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
}
|
||||
|
||||
private def execute(): Unit = {
|
||||
statementId = UUID.randomUUID().toString
|
||||
info(s"Running query '$statement' with $statementId")
|
||||
setState(OperationState.RUNNING)
|
||||
// Always use the latest class loader provided by executionHive's state.
|
||||
KyuubiServerMonitor.getListener(session.getUserName).onStatementStart(
|
||||
statementId,
|
||||
session.getSessionHandle.getSessionId.toString,
|
||||
statement,
|
||||
statementId,
|
||||
session.getUserName)
|
||||
session.sparkSession().sparkContext.setJobGroup(statementId, statement)
|
||||
try {
|
||||
statementId = UUID.randomUUID().toString
|
||||
info(s"Running query '$statement' with $statementId")
|
||||
setState(OperationState.RUNNING)
|
||||
KyuubiServerMonitor.getListener(session.getUserName).onStatementStart(
|
||||
statementId,
|
||||
session.getSessionHandle.getSessionId.toString,
|
||||
statement,
|
||||
statementId,
|
||||
session.getUserName)
|
||||
session.sparkSession().sparkContext.setJobGroup(statementId, statement)
|
||||
result = session.sparkSession().sql(statement)
|
||||
KyuubiServerMonitor.getListener(session.getUserName)
|
||||
.onStatementParsed(statementId, result.queryExecution.toString())
|
||||
debug(result.queryExecution.toString())
|
||||
iter = result.collect().iterator
|
||||
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
|
||||
setState(OperationState.FINISHED)
|
||||
KyuubiServerMonitor.getListener(session.getUserName).onStatementFinish(statementId)
|
||||
} catch {
|
||||
case e: HiveSQLException =>
|
||||
if (getStatus.getState == OperationState.CANCELED
|
||||
|| getStatus.getState == OperationState.CLOSED) {
|
||||
return
|
||||
} else {
|
||||
setState(OperationState.ERROR)
|
||||
KyuubiServerMonitor.getListener(session.getUserName).onStatementError(
|
||||
statementId, e.getMessage, SparkUtils.exceptionString(e))
|
||||
if (!isClosedOrCanceled) {
|
||||
onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e))
|
||||
throw e
|
||||
}
|
||||
// Actually do need to catch Throwable as some failures don't inherit from Exception and
|
||||
// HiveServer will silently swallow them.
|
||||
case e: ParseException =>
|
||||
if (!isClosedOrCanceled) {
|
||||
onStatementError(
|
||||
statementId, e.withCommand(statement).getMessage, SparkUtils.exceptionString(e))
|
||||
throw new HiveSQLException(
|
||||
e.withCommand(statement).getMessage, "ParseException", 2000, e)
|
||||
}
|
||||
case e: AnalysisException =>
|
||||
if (!isClosedOrCanceled) {
|
||||
onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e))
|
||||
throw new HiveSQLException(e.getMessage, "AnalysisException", 2001, e)
|
||||
}
|
||||
case e: HiveAccessControlException =>
|
||||
if (!isClosedOrCanceled) {
|
||||
onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e))
|
||||
throw new HiveSQLException(e.getMessage, "HiveAccessControlException", 3000, e)
|
||||
}
|
||||
case e: Throwable =>
|
||||
val currentState = getStatus.getState
|
||||
error(s"Error executing query, currentState $currentState, ", e)
|
||||
if (currentState == OperationState.CANCELED
|
||||
|| currentState == OperationState.CLOSED) {
|
||||
return
|
||||
} else {
|
||||
setState(OperationState.ERROR)
|
||||
KyuubiServerMonitor.getListener(session.getUserName).onStatementError(
|
||||
statementId, e.getMessage, SparkUtils.exceptionString(e))
|
||||
throw new HiveSQLException(e.toString)
|
||||
if (!isClosedOrCanceled) {
|
||||
onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e))
|
||||
throw new HiveSQLException(e.toString, "<unknown>", 10000, e)
|
||||
}
|
||||
} finally {
|
||||
if (statementId != null) {
|
||||
session.sparkSession().sparkContext.cancelJobGroup(statementId)
|
||||
}
|
||||
}
|
||||
setState(OperationState.FINISHED)
|
||||
KyuubiServerMonitor.getListener(session.getUserName).onStatementFinish(statementId)
|
||||
}
|
||||
|
||||
private def cleanup(state: OperationState) {
|
||||
if (getStatus.getState != OperationState.CLOSED) {
|
||||
private[this] def onStatementError(id: String, message: String, trace: String): Unit = {
|
||||
error(
|
||||
s"""
|
||||
|Error executing query as ${session.getUserName},
|
||||
|$statement
|
||||
|Current operation state ${this.state},
|
||||
|$trace
|
||||
""".stripMargin)
|
||||
setState(OperationState.ERROR)
|
||||
KyuubiServerMonitor.getListener(session.getUserName).onStatementError(id, message, trace)
|
||||
}
|
||||
|
||||
private[this] def cleanup(state: OperationState) {
|
||||
if (this.state != OperationState.CLOSED) {
|
||||
setState(state)
|
||||
}
|
||||
val backgroundHandle = getBackgroundHandle
|
||||
|
||||
@ -30,12 +30,14 @@ import scala.util.matching.Regex
|
||||
|
||||
import org.apache.commons.io.FileUtils
|
||||
import org.apache.hadoop.fs.FileSystem
|
||||
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.hive.service.cli._
|
||||
import org.apache.hive.service.cli.thrift.TProtocolVersion
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
import org.apache.spark.KyuubiConf._
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
|
||||
import org.apache.spark.ui.KyuubiServerTab
|
||||
|
||||
import yaooqinn.kyuubi.Logging
|
||||
@ -75,7 +77,7 @@ private[kyuubi] class KyuubiSession(
|
||||
private[this] var sessionLogDir: File = _
|
||||
private[this] var lastAccessTime = 0L
|
||||
private[this] var lastIdleTime = 0L
|
||||
private[this] var initialDatabase: String = "use default"
|
||||
private[this] var initialDatabase: Option[String] = None
|
||||
|
||||
private[this] val sessionUGI: UserGroupInformation = {
|
||||
val currentUser = UserGroupInformation.getCurrentUser
|
||||
@ -234,7 +236,7 @@ private[kyuubi] class KyuubiSession(
|
||||
} else {
|
||||
conf.set(SPARK_HADOOP_PREFIX + k, value)
|
||||
}
|
||||
case "use:database" => initialDatabase = "use " + value
|
||||
case "use:database" => initialDatabase = Some("use " + value)
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
@ -257,7 +259,7 @@ private[kyuubi] class KyuubiSession(
|
||||
} else {
|
||||
_sparkSession.conf.set(SPARK_HADOOP_PREFIX + k, value)
|
||||
}
|
||||
case "use:database" => initialDatabase = "use " + value
|
||||
case "use:database" => initialDatabase = Some("use " + value)
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
@ -271,11 +273,17 @@ private[kyuubi] class KyuubiSession(
|
||||
getOrCreateSparkSession(sessionConf)
|
||||
assert(_sparkSession != null)
|
||||
|
||||
sessionUGI.doAs(new PrivilegedExceptionAction[Unit] {
|
||||
override def run(): Unit = {
|
||||
_sparkSession.sql(initialDatabase)
|
||||
try {
|
||||
initialDatabase.foreach(executeStatement)
|
||||
} catch {
|
||||
case ute: UndeclaredThrowableException => ute.getCause match {
|
||||
case e: HiveAccessControlException =>
|
||||
throw new HiveSQLException(e.getMessage, "08S01", e.getCause)
|
||||
case e: NoSuchDatabaseException =>
|
||||
throw new HiveSQLException(e.getMessage, "08S01", e.getCause)
|
||||
case e: HiveSQLException => throw e
|
||||
}
|
||||
})
|
||||
}
|
||||
lastAccessTime = System.currentTimeMillis
|
||||
lastIdleTime = lastAccessTime
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user