fixed #7 inline exception
This commit is contained in:
parent
4683bd790d
commit
59cc84eeea
7
docs/sql_state_code.md
Normal file
7
docs/sql_state_code.md
Normal file
@ -0,0 +1,7 @@
|
||||
|SQL State|Condition|Vendor Code|Type|
|
||||
|:---:|:---|:---:|:---:|
|
||||
|ServerError| internal error | 1000 | Kyuubi |
|
||||
|08S01|SQL client unable to establish SQL connection | 1001| |
|
||||
|ParseException|failed parsing sql statement|2000|Spark|
|
||||
|AnalysisException|failed analysing sql statement|2001||
|
||||
|HiveAccessControlException| failed privilege checking|3000|Hive|
|
||||
@ -1,249 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hive.service.cli;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hive.service.cli.thrift.TStatus;
|
||||
import org.apache.hive.service.cli.thrift.TStatusCode;
|
||||
|
||||
/**
|
||||
* HiveSQLException.
|
||||
*
|
||||
*/
|
||||
public class HiveSQLException extends SQLException {
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private static final long serialVersionUID = -6095254671958748094L;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public HiveSQLException() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param reason
|
||||
*/
|
||||
public HiveSQLException(String reason) {
|
||||
super(reason);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param cause
|
||||
*/
|
||||
public HiveSQLException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param reason
|
||||
* @param sqlState
|
||||
*/
|
||||
public HiveSQLException(String reason, String sqlState) {
|
||||
super(reason, sqlState);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param reason
|
||||
* @param cause
|
||||
*/
|
||||
public HiveSQLException(String reason, Throwable cause) {
|
||||
super(reason, cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param reason
|
||||
* @param sqlState
|
||||
* @param vendorCode
|
||||
*/
|
||||
public HiveSQLException(String reason, String sqlState, int vendorCode) {
|
||||
super(reason, sqlState, vendorCode);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param reason
|
||||
* @param sqlState
|
||||
* @param cause
|
||||
*/
|
||||
public HiveSQLException(String reason, String sqlState, Throwable cause) {
|
||||
super(reason, sqlState, cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param reason
|
||||
* @param sqlState
|
||||
* @param vendorCode
|
||||
* @param cause
|
||||
*/
|
||||
public HiveSQLException(String reason, String sqlState, int vendorCode, Throwable cause) {
|
||||
super(reason, sqlState, vendorCode, cause);
|
||||
}
|
||||
|
||||
public HiveSQLException(TStatus status) {
|
||||
// TODO: set correct vendorCode field
|
||||
super(status.getErrorMessage(), status.getSqlState(), status.getErrorCode());
|
||||
if (status.getInfoMessages() != null) {
|
||||
initCause(toCause(status.getInfoMessages()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts current object to a {@link TStatus} object
|
||||
* @return a {@link TStatus} object
|
||||
*/
|
||||
public TStatus toTStatus() {
|
||||
// TODO: convert sqlState, etc.
|
||||
TStatus tStatus = new TStatus(TStatusCode.ERROR_STATUS);
|
||||
tStatus.setSqlState(getSQLState());
|
||||
tStatus.setErrorCode(getErrorCode());
|
||||
tStatus.setErrorMessage(getMessage());
|
||||
tStatus.setInfoMessages(toString(this));
|
||||
return tStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the specified {@link Exception} object into a {@link TStatus} object
|
||||
* @param e a {@link Exception} object
|
||||
* @return a {@link TStatus} object
|
||||
*/
|
||||
public static TStatus toTStatus(Exception e) {
|
||||
if (e instanceof HiveSQLException) {
|
||||
return ((HiveSQLException)e).toTStatus();
|
||||
}
|
||||
TStatus tStatus = new TStatus(TStatusCode.ERROR_STATUS);
|
||||
tStatus.setErrorMessage(e.getMessage());
|
||||
tStatus.setInfoMessages(toString(e));
|
||||
return tStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a {@link Throwable} object into a flattened list of texts including its stack trace
|
||||
* and the stack traces of the nested causes.
|
||||
* @param ex a {@link Throwable} object
|
||||
* @return a flattened list of texts including the {@link Throwable} object's stack trace
|
||||
* and the stack traces of the nested causes.
|
||||
*/
|
||||
public static List<String> toString(Throwable ex) {
|
||||
return toString(ex, null);
|
||||
}
|
||||
|
||||
private static List<String> toString(Throwable cause, StackTraceElement[] parent) {
|
||||
StackTraceElement[] trace = cause.getStackTrace();
|
||||
int m = trace.length - 1;
|
||||
if (parent != null) {
|
||||
int n = parent.length - 1;
|
||||
while (m >= 0 && n >= 0 && trace[m].equals(parent[n])) {
|
||||
m--;
|
||||
n--;
|
||||
}
|
||||
}
|
||||
List<String> detail = enroll(cause, trace, m);
|
||||
cause = cause.getCause();
|
||||
if (cause != null) {
|
||||
detail.addAll(toString(cause, trace));
|
||||
}
|
||||
return detail;
|
||||
}
|
||||
|
||||
private static List<String> enroll(Throwable ex, StackTraceElement[] trace, int max) {
|
||||
List<String> details = new ArrayList<String>();
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append('*').append(ex.getClass().getName()).append(':');
|
||||
builder.append(ex.getMessage()).append(':');
|
||||
builder.append(trace.length).append(':').append(max);
|
||||
details.add(builder.toString());
|
||||
for (int i = 0; i <= max; i++) {
|
||||
builder.setLength(0);
|
||||
builder.append(trace[i].getClassName()).append(':');
|
||||
builder.append(trace[i].getMethodName()).append(':');
|
||||
String fileName = trace[i].getFileName();
|
||||
builder.append(fileName == null ? "" : fileName).append(':');
|
||||
builder.append(trace[i].getLineNumber());
|
||||
details.add(builder.toString());
|
||||
}
|
||||
return details;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a flattened list of texts including the stack trace and the stack
|
||||
* traces of the nested causes into a {@link Throwable} object.
|
||||
* @param details a flattened list of texts including the stack trace and the stack
|
||||
* traces of the nested causes
|
||||
* @return a {@link Throwable} object
|
||||
*/
|
||||
public static Throwable toCause(List<String> details) {
|
||||
return toStackTrace(details, null, 0);
|
||||
}
|
||||
|
||||
private static Throwable toStackTrace(List<String> details, StackTraceElement[] parent, int index) {
|
||||
String detail = details.get(index++);
|
||||
if (!detail.startsWith("*")) {
|
||||
return null; // should not be happened. ignore remaining
|
||||
}
|
||||
int i1 = detail.indexOf(':');
|
||||
int i3 = detail.lastIndexOf(':');
|
||||
int i2 = detail.substring(0, i3).lastIndexOf(':');
|
||||
String exceptionClass = detail.substring(1, i1);
|
||||
String exceptionMessage = detail.substring(i1 + 1, i2);
|
||||
Throwable ex = newInstance(exceptionClass, exceptionMessage);
|
||||
|
||||
Integer length = Integer.valueOf(detail.substring(i2 + 1, i3));
|
||||
Integer unique = Integer.valueOf(detail.substring(i3 + 1));
|
||||
|
||||
int i = 0;
|
||||
StackTraceElement[] trace = new StackTraceElement[length];
|
||||
for (; i <= unique; i++) {
|
||||
detail = details.get(index++);
|
||||
int j1 = detail.indexOf(':');
|
||||
int j3 = detail.lastIndexOf(':');
|
||||
int j2 = detail.substring(0, j3).lastIndexOf(':');
|
||||
String className = detail.substring(0, j1);
|
||||
String methodName = detail.substring(j1 + 1, j2);
|
||||
String fileName = detail.substring(j2 + 1, j3);
|
||||
if (fileName.isEmpty()) {
|
||||
fileName = null;
|
||||
}
|
||||
int lineNumber = Integer.valueOf(detail.substring(j3 + 1));
|
||||
trace[i] = new StackTraceElement(className, methodName, fileName, lineNumber);
|
||||
}
|
||||
int common = trace.length - i;
|
||||
if (common > 0) {
|
||||
System.arraycopy(parent, parent.length - common, trace, trace.length - common, common);
|
||||
}
|
||||
if (details.size() > index) {
|
||||
ex.initCause(toStackTrace(details, trace, index));
|
||||
}
|
||||
ex.setStackTrace(trace);
|
||||
return ex;
|
||||
}
|
||||
|
||||
private static Throwable newInstance(String className, String message) {
|
||||
try {
|
||||
return (Throwable)Class.forName(className).getConstructor(String.class).newInstance(message);
|
||||
} catch (Exception e) {
|
||||
return new RuntimeException(className + ":" + message);
|
||||
}
|
||||
}
|
||||
}
|
||||
98
src/main/scala/yaooqinn/kyuubi/KyuubiSQLException.scala
Normal file
98
src/main/scala/yaooqinn/kyuubi/KyuubiSQLException.scala
Normal file
@ -0,0 +1,98 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package yaooqinn.kyuubi
|
||||
|
||||
import java.sql.SQLException
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.hive.service.cli.thrift.{TStatus, TStatusCode}
|
||||
|
||||
class KyuubiSQLException(reason: String, sqlState: String, vendorCode: Int, cause: Throwable)
|
||||
extends SQLException(reason, sqlState, vendorCode, cause) {
|
||||
|
||||
|
||||
def this(reason: String, sqlState: String, cause: Throwable) = this(reason, sqlState, 0, cause)
|
||||
|
||||
def this(reason: String, sqlState: String, vendorCode: Int) =
|
||||
this(reason, sqlState, vendorCode, null)
|
||||
|
||||
def this(reason: String, cause: Throwable) = this(reason, null, 0, cause)
|
||||
|
||||
def this(reason: String, sqlState: String) = this(reason, sqlState, vendorCode = 0)
|
||||
|
||||
def this(reason: String) = this(reason, sqlState = null)
|
||||
|
||||
def this(cause: Throwable) = this(reason = null, cause)
|
||||
|
||||
/**
|
||||
* Converts current object to a {@link TStatus} object
|
||||
*
|
||||
* @return a { @link TStatus} object
|
||||
*/
|
||||
def toTStatus: TStatus = {
|
||||
val tStatus = new TStatus(TStatusCode.ERROR_STATUS)
|
||||
tStatus.setSqlState(getSQLState)
|
||||
tStatus.setErrorCode(getErrorCode)
|
||||
tStatus.setErrorMessage(getMessage)
|
||||
tStatus.setInfoMessages(KyuubiSQLException.toString(this).asJava)
|
||||
tStatus
|
||||
}
|
||||
}
|
||||
|
||||
object KyuubiSQLException {
|
||||
|
||||
def toTStatus(e: Exception): TStatus = e match {
|
||||
case k: KyuubiSQLException => k.toTStatus
|
||||
case _ =>
|
||||
val tStatus = new TStatus(TStatusCode.ERROR_STATUS)
|
||||
tStatus.setErrorMessage(e.getMessage)
|
||||
tStatus.setInfoMessages(toString(e).asJava)
|
||||
tStatus
|
||||
}
|
||||
|
||||
def toString(ex: Throwable): List[String] = toString(ex, null)
|
||||
|
||||
private[this] def toString(cause: Throwable, parent: Array[StackTraceElement]): List[String] = {
|
||||
val trace = cause.getStackTrace
|
||||
val m = if (parent != null) {
|
||||
trace.length - 1 - trace.take(parent.length).zipWithIndex.count { kv =>
|
||||
kv._1.equals(parent(kv._2))
|
||||
}
|
||||
} else {
|
||||
trace.length - 1
|
||||
}
|
||||
enroll(cause, trace, m) ++ (if (cause.getCause != null) toString(cause, trace) else Nil)
|
||||
}
|
||||
|
||||
private[this] def enroll(
|
||||
ex: Throwable,
|
||||
trace: Array[StackTraceElement],
|
||||
max: Int): List[String] = {
|
||||
val builder = new StringBuilder
|
||||
builder.append('*').append(ex.getClass.getName).append(':')
|
||||
builder.append(ex.getMessage).append(':')
|
||||
builder.append(trace.length).append(':').append(max)
|
||||
List(builder.toString) ++ trace.take(max).map { t =>
|
||||
builder.setLength(0)
|
||||
builder.append(t.getClassName).append(":").append(t.getMethodName).append(":")
|
||||
Option(t.getFileName).foreach(builder.append)
|
||||
builder.append(":").append(t.getLineNumber)
|
||||
builder.toString
|
||||
}.toList
|
||||
}
|
||||
}
|
||||
@ -31,14 +31,13 @@ import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers
|
||||
import org.apache.hive.service.auth.{KyuubiKerberosSaslHelper, KyuubiPlainSaslHelper, SaslQOP}
|
||||
import org.apache.hive.service.cli.HiveSQLException
|
||||
import org.apache.hive.service.cli.thrift.TCLIService
|
||||
import org.apache.spark.{SparkConf, SparkUtils}
|
||||
import org.apache.spark.KyuubiConf._
|
||||
import org.apache.thrift.TProcessorFactory
|
||||
import org.apache.thrift.transport.{TServerSocket, TTransportException, TTransportFactory}
|
||||
|
||||
import yaooqinn.kyuubi.KyuubiExecption
|
||||
import yaooqinn.kyuubi.{KyuubiExecption, KyuubiSQLException}
|
||||
|
||||
/**
|
||||
* Authentication
|
||||
@ -102,56 +101,56 @@ class KyuubiAuthFactory(conf: SparkConf) {
|
||||
def getIpAddress: Option[String] = saslServer.map(_.getRemoteAddress).map(_.getHostAddress)
|
||||
|
||||
// retrieve delegation token for the given user
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def getDelegationToken(owner: String, renewer: String): String = saslServer match {
|
||||
case Some(server) =>
|
||||
try {
|
||||
val tokenStr = server.getDelegationTokenWithService(owner, renewer, KYUUBI_CLIENT_TOKEN)
|
||||
if (tokenStr == null || tokenStr.isEmpty) {
|
||||
throw new HiveSQLException(
|
||||
throw new KyuubiSQLException(
|
||||
"Received empty retrieving delegation token for user " + owner, "08S01")
|
||||
}
|
||||
tokenStr
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
throw new HiveSQLException(
|
||||
throw new KyuubiSQLException(
|
||||
"Error retrieving delegation token for user " + owner, "08S01", e)
|
||||
case e: InterruptedException =>
|
||||
throw new HiveSQLException("delegation token retrieval interrupted", "08S01", e)
|
||||
throw new KyuubiSQLException("delegation token retrieval interrupted", "08S01", e)
|
||||
}
|
||||
case None =>
|
||||
throw new HiveSQLException(
|
||||
throw new KyuubiSQLException(
|
||||
"Delegation token only supported over kerberos authentication", "08S01")
|
||||
}
|
||||
|
||||
// cancel given delegation token
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def cancelDelegationToken(delegationToken: String): Unit = saslServer match {
|
||||
case Some(server) =>
|
||||
try {
|
||||
server.cancelDelegationToken(delegationToken)
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
throw new HiveSQLException(
|
||||
throw new KyuubiSQLException(
|
||||
"Error canceling delegation token " + delegationToken, "08S01", e)
|
||||
}
|
||||
case None =>
|
||||
throw new HiveSQLException(
|
||||
throw new KyuubiSQLException(
|
||||
"Delegation token only supported over kerberos authentication", "08S01")
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def renewDelegationToken(delegationToken: String): Unit = saslServer match {
|
||||
case Some(server) =>
|
||||
try {
|
||||
server.renewDelegationToken(delegationToken)
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
throw new HiveSQLException(
|
||||
throw new KyuubiSQLException(
|
||||
"Error renewing delegation token " + delegationToken, "08S01", e)
|
||||
}
|
||||
case None =>
|
||||
throw new HiveSQLException(
|
||||
throw new KyuubiSQLException(
|
||||
"Delegation token only supported over kerberos authentication", "08S01")
|
||||
}
|
||||
}
|
||||
@ -168,7 +167,7 @@ object KyuubiAuthFactory {
|
||||
}
|
||||
)
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def verifyProxyAccess(
|
||||
realUser: String,
|
||||
proxyUser: String,
|
||||
@ -195,7 +194,7 @@ object KyuubiAuthFactory {
|
||||
}
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
throw new HiveSQLException(
|
||||
throw new KyuubiSQLException(
|
||||
"Failed to validate proxy privilege of " + realUser + " for " + proxyUser, "08S01", e)
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,13 +22,11 @@ import java.security.PrivilegedExceptionAction
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.{Future, RejectedExecutionException}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import org.apache.hadoop.hive.conf.HiveConf
|
||||
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException
|
||||
import org.apache.hadoop.hive.ql.session.OperationLog
|
||||
import org.apache.hive.service.cli._
|
||||
import org.apache.hive.service.cli.thrift.TProtocolVersion
|
||||
import org.apache.spark.KyuubiConf._
|
||||
import org.apache.spark.SparkUtils
|
||||
@ -36,7 +34,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
|
||||
import org.apache.spark.sql.catalyst.parser.ParseException
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
import yaooqinn.kyuubi.Logging
|
||||
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
|
||||
import yaooqinn.kyuubi.cli.FetchOrientation
|
||||
import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder}
|
||||
import yaooqinn.kyuubi.session.KyuubiSession
|
||||
@ -53,7 +51,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
private[this] var lastAccessTime = System.currentTimeMillis()
|
||||
|
||||
private[this] var hasResultSet: Boolean = false
|
||||
private[this] var operationException: HiveSQLException = _
|
||||
private[this] var operationException: KyuubiSQLException = _
|
||||
private[this] var backgroundHandle: Future[_] = _
|
||||
private[this] var operationLog: OperationLog = _
|
||||
private[this] var isOperationLogEnabled: Boolean = false
|
||||
@ -82,11 +80,11 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
|
||||
def getOperationLog: OperationLog = operationLog
|
||||
|
||||
private[this] def setOperationException(opEx: HiveSQLException): Unit = {
|
||||
private[this] def setOperationException(opEx: KyuubiSQLException): Unit = {
|
||||
this.operationException = opEx
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
private[this] def setState(newState: OperationState): Unit = {
|
||||
state.validateTransition(newState)
|
||||
this.state = newState
|
||||
@ -101,10 +99,10 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
checkState(CLOSED) || checkState(CANCELED)
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
private[this] def assertState(state: OperationState): Unit = {
|
||||
if (this.state ne state) {
|
||||
throw new HiveSQLException("Expected state " + state + ", but found " + this.state)
|
||||
throw new KyuubiSQLException("Expected state " + state + ", but found " + this.state)
|
||||
}
|
||||
this.lastAccessTime = System.currentTimeMillis()
|
||||
}
|
||||
@ -178,7 +176,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
}
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def run(): Unit = {
|
||||
createOperationLog()
|
||||
try {
|
||||
@ -238,7 +236,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
/**
|
||||
* Verify if the given fetch orientation is part of the default orientation types.
|
||||
*/
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
private[this] def validateDefaultFetchOrientation(orientation: FetchOrientation): Unit = {
|
||||
validateFetchOrientation(orientation, DEFAULT_FETCH_ORIENTATION_SET)
|
||||
}
|
||||
@ -246,12 +244,12 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
/**
|
||||
* Verify if the given fetch orientation is part of the supported orientation types.
|
||||
*/
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
private[this] def validateFetchOrientation(
|
||||
orientation: FetchOrientation,
|
||||
supportedOrientations: Set[FetchOrientation]): Unit = {
|
||||
if (!supportedOrientations.contains(orientation)) {
|
||||
throw new HiveSQLException(
|
||||
throw new KyuubiSQLException(
|
||||
"The fetch type " + orientation.toString + " is not supported for this resultset", "HY106")
|
||||
}
|
||||
}
|
||||
@ -270,12 +268,12 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
try {
|
||||
execute()
|
||||
} catch {
|
||||
case e: HiveSQLException => setOperationException(e)
|
||||
case e: KyuubiSQLException => setOperationException(e)
|
||||
}
|
||||
}
|
||||
})
|
||||
} catch {
|
||||
case e: Exception => setOperationException(new HiveSQLException(e))
|
||||
case e: Exception => setOperationException(new KyuubiSQLException(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -288,7 +286,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
} catch {
|
||||
case rejected: RejectedExecutionException =>
|
||||
setState(ERROR)
|
||||
throw new HiveSQLException("The background threadpool cannot accept" +
|
||||
throw new KyuubiSQLException("The background threadpool cannot accept" +
|
||||
" new task for execution, please retry the operation", rejected)
|
||||
case NonFatal(e) =>
|
||||
error(s"Error executing query in background", e)
|
||||
@ -321,7 +319,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
setState(FINISHED)
|
||||
KyuubiServerMonitor.getListener(session.getUserName).foreach(_.onStatementFinish(statementId))
|
||||
} catch {
|
||||
case e: HiveSQLException =>
|
||||
case e: KyuubiSQLException =>
|
||||
if (!isClosedOrCanceled) {
|
||||
onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e))
|
||||
throw e
|
||||
@ -330,23 +328,23 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
if (!isClosedOrCanceled) {
|
||||
onStatementError(
|
||||
statementId, e.withCommand(statement).getMessage, SparkUtils.exceptionString(e))
|
||||
throw new HiveSQLException(
|
||||
throw new KyuubiSQLException(
|
||||
e.withCommand(statement).getMessage, "ParseException", 2000, e)
|
||||
}
|
||||
case e: AnalysisException =>
|
||||
if (!isClosedOrCanceled) {
|
||||
onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e))
|
||||
throw new HiveSQLException(e.getMessage, "AnalysisException", 2001, e)
|
||||
throw new KyuubiSQLException(e.getMessage, "AnalysisException", 2001, e)
|
||||
}
|
||||
case e: HiveAccessControlException =>
|
||||
if (!isClosedOrCanceled) {
|
||||
onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e))
|
||||
throw new HiveSQLException(e.getMessage, "HiveAccessControlException", 3000, e)
|
||||
throw new KyuubiSQLException(e.getMessage, "HiveAccessControlException", 3000, e)
|
||||
}
|
||||
case e: Throwable =>
|
||||
if (!isClosedOrCanceled) {
|
||||
onStatementError(statementId, e.getMessage, SparkUtils.exceptionString(e))
|
||||
throw new HiveSQLException(e.toString, "<unknown>", 10000, e)
|
||||
throw new KyuubiSQLException(e.toString, "<unknown>", 10000, e)
|
||||
}
|
||||
} finally {
|
||||
if (statementId != null) {
|
||||
|
||||
@ -30,7 +30,7 @@ import org.apache.spark.KyuubiConf._
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
import yaooqinn.kyuubi.Logging
|
||||
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
|
||||
import yaooqinn.kyuubi.cli.FetchOrientation
|
||||
import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder}
|
||||
import yaooqinn.kyuubi.service.AbstractService
|
||||
@ -94,7 +94,9 @@ private[kyuubi] class OperationManager private(name: String)
|
||||
|
||||
def getOperation(operationHandle: OperationHandle): KyuubiOperation = {
|
||||
val operation = getOperationInternal(operationHandle)
|
||||
if (operation == null) throw new HiveSQLException("Invalid OperationHandle: " + operationHandle)
|
||||
if (operation == null) {
|
||||
throw new KyuubiSQLException("Invalid OperationHandle: " + operationHandle)
|
||||
}
|
||||
operation
|
||||
}
|
||||
|
||||
@ -132,21 +134,21 @@ private[kyuubi] class OperationManager private(name: String)
|
||||
}
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def closeOperation(opHandle: OperationHandle): Unit = {
|
||||
val operation = removeOperation(opHandle)
|
||||
if (operation == null) throw new HiveSQLException("Operation does not exist!")
|
||||
if (operation == null) throw new KyuubiSQLException("Operation does not exist!")
|
||||
operation.close()
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def getOperationNextRowSet(
|
||||
opHandle: OperationHandle,
|
||||
orientation: FetchOrientation,
|
||||
maxRows: Long): RowSet =
|
||||
getOperation(opHandle).getNextRowSet(orientation, maxRows)
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def getOperationLogRowSet(
|
||||
opHandle: OperationHandle,
|
||||
orientation: FetchOrientation,
|
||||
@ -154,7 +156,7 @@ private[kyuubi] class OperationManager private(name: String)
|
||||
// get the OperationLog object from the operation
|
||||
val opLog: OperationLog = getOperation(opHandle).getOperationLog
|
||||
if (opLog == null) {
|
||||
throw new HiveSQLException("Couldn't find log associated with operation handle: " + opHandle)
|
||||
throw new KyuubiSQLException("Couldn't find log associated with operation handle: " + opHandle)
|
||||
}
|
||||
try {
|
||||
// convert logs to RowBasedSet
|
||||
@ -162,7 +164,7 @@ private[kyuubi] class OperationManager private(name: String)
|
||||
RowSetBuilder.create(logSchema, logs, getOperation(opHandle).getProtocolVersion)
|
||||
} catch {
|
||||
case e: SQLException =>
|
||||
throw new HiveSQLException(e.getMessage, e.getCause)
|
||||
throw new KyuubiSQLException(e.getMessage, e.getCause)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -17,19 +17,20 @@
|
||||
|
||||
package yaooqinn.kyuubi.operation
|
||||
|
||||
import org.apache.hive.service.cli.HiveSQLException
|
||||
import org.apache.hive.service.cli.thrift.TOperationState
|
||||
|
||||
import yaooqinn.kyuubi.KyuubiSQLException
|
||||
|
||||
trait OperationState {
|
||||
def toTOperationState(): TOperationState
|
||||
def isTerminal(): Boolean = false
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def validateTransition(newState: OperationState): Unit = ex(newState)
|
||||
|
||||
@throws[HiveSQLException]
|
||||
protected def ex(state: OperationState): Unit = throw new HiveSQLException(
|
||||
"Illegal Operation state transition " + this + " -> " + state, "KyuubiException", 1000)
|
||||
@throws[KyuubiSQLException]
|
||||
protected def ex(state: OperationState): Unit = throw new KyuubiSQLException(
|
||||
"Illegal Operation state transition " + this + " -> " + state, "ServerError", 1000)
|
||||
}
|
||||
|
||||
case object INITIALIZED extends OperationState {
|
||||
|
||||
@ -17,9 +17,9 @@
|
||||
|
||||
package yaooqinn.kyuubi.operation
|
||||
|
||||
import org.apache.hive.service.cli.HiveSQLException
|
||||
import yaooqinn.kyuubi.KyuubiSQLException
|
||||
|
||||
class OperationStatus(state: OperationState, operationException: HiveSQLException) {
|
||||
class OperationStatus(state: OperationState, operationException: KyuubiSQLException) {
|
||||
def getState: OperationState = state
|
||||
def getOperationException: HiveSQLException = operationException
|
||||
def getOperationException: KyuubiSQLException = operationException
|
||||
}
|
||||
|
||||
@ -22,7 +22,7 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
import yaooqinn.kyuubi.Logging
|
||||
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
|
||||
import yaooqinn.kyuubi.auth.KyuubiAuthFactory
|
||||
import yaooqinn.kyuubi.cli.{FetchOrientation, FetchType, GetInfoType, GetInfoValue}
|
||||
import yaooqinn.kyuubi.operation.{OperationHandle, OperationStatus}
|
||||
@ -97,16 +97,16 @@ private[server] class BackendService private(name: String)
|
||||
}
|
||||
|
||||
def getTypeInfo(sessionHandle: SessionHandle): OperationHandle = {
|
||||
throw new HiveSQLException("Method Not Implemented!")
|
||||
throw new KyuubiSQLException("Method Not Implemented!")
|
||||
}
|
||||
|
||||
def getCatalogs(sessionHandle: SessionHandle): OperationHandle = {
|
||||
throw new HiveSQLException("Method Not Implemented!")
|
||||
throw new KyuubiSQLException("Method Not Implemented!")
|
||||
}
|
||||
|
||||
def getSchemas(
|
||||
sessionHandle: SessionHandle, catalogName: String, schemaName: String): OperationHandle = {
|
||||
throw new HiveSQLException("Method Not Implemented!")
|
||||
throw new KyuubiSQLException("Method Not Implemented!")
|
||||
}
|
||||
|
||||
def getTables(
|
||||
@ -115,25 +115,25 @@ private[server] class BackendService private(name: String)
|
||||
schemaName: String,
|
||||
tableName: String,
|
||||
tableTypes: Seq[String]): OperationHandle = {
|
||||
throw new HiveSQLException("Method Not Implemented!")
|
||||
throw new KyuubiSQLException("Method Not Implemented!")
|
||||
}
|
||||
|
||||
def getTableTypes(sessionHandle: SessionHandle): OperationHandle = {
|
||||
throw new HiveSQLException("Method Not Implemented!")
|
||||
throw new KyuubiSQLException("Method Not Implemented!")
|
||||
}
|
||||
|
||||
def getColumns(
|
||||
sessionHandle: SessionHandle,
|
||||
catalogName: String,
|
||||
schemaName: String, tableName: String, columnName: String): OperationHandle = {
|
||||
throw new HiveSQLException("Method Not Implemented!")
|
||||
throw new KyuubiSQLException("Method Not Implemented!")
|
||||
}
|
||||
|
||||
def getFunctions(
|
||||
sessionHandle: SessionHandle,
|
||||
catalogName: String,
|
||||
schemaName: String, functionName: String): OperationHandle = {
|
||||
throw new HiveSQLException("Method Not Implemented!")
|
||||
throw new KyuubiSQLException("Method Not Implemented!")
|
||||
}
|
||||
|
||||
def getOperationStatus(opHandle: OperationHandle): OperationStatus = {
|
||||
|
||||
@ -34,7 +34,7 @@ import org.apache.thrift.protocol.{TBinaryProtocol, TProtocol}
|
||||
import org.apache.thrift.server.{ServerContext, TServer, TServerEventHandler, TThreadPoolServer}
|
||||
import org.apache.thrift.transport.{TServerSocket, TTransport}
|
||||
|
||||
import yaooqinn.kyuubi.Logging
|
||||
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
|
||||
import yaooqinn.kyuubi.auth.{KERBEROS, KyuubiAuthFactory}
|
||||
import yaooqinn.kyuubi.cli.{FetchOrientation, FetchType, GetInfoType}
|
||||
import yaooqinn.kyuubi.operation.OperationHandle
|
||||
@ -191,13 +191,13 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
}
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
private[this] def getProxyUser(sessionConf: Map[String, String], ipAddress: String): String = {
|
||||
Option(sessionConf).flatMap(_.get(KyuubiAuthFactory.HS2_PROXY_USER)) match {
|
||||
case None => realUser
|
||||
case Some(_)
|
||||
if !hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ALLOW_USER_SUBSTITUTION) =>
|
||||
throw new HiveSQLException("Proxy user substitution is not allowed")
|
||||
throw new KyuubiSQLException("Proxy user substitution is not allowed")
|
||||
case Some(p) if !isKerberosAuthMode => p
|
||||
case Some(p) => // Verify proxy user privilege of the realUser for the proxyUser
|
||||
KyuubiAuthFactory.verifyProxyAccess(realUser, p, ipAddress, hiveConf)
|
||||
@ -229,7 +229,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
throw new IllegalArgumentException("never")
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
private[this] def getSessionHandle(req: TOpenSessionReq, res: TOpenSessionResp) = {
|
||||
val userName = getUserName(req)
|
||||
val ipAddress = getIpAddress
|
||||
@ -262,7 +262,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error opening session: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -281,7 +281,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error closing session: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -296,7 +296,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error getting info: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -317,7 +317,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error executing statement: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -331,7 +331,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error getting type info: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -345,7 +345,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error getting catalogs: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -360,7 +360,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error getting schemas: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -375,7 +375,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error getting tables: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -389,7 +389,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error getting table types: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -405,7 +405,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error getting columns: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -421,7 +421,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error getting functions: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -442,7 +442,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error getting operation status: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -455,7 +455,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error cancelling operation: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -468,7 +468,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error closing operation: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -482,7 +482,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error getting result set metadata: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -501,7 +501,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
warn("Error fetching results: ", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
resp
|
||||
}
|
||||
@ -520,9 +520,9 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
resp.setDelegationToken(token)
|
||||
resp.setStatus(OK_STATUS)
|
||||
} catch {
|
||||
case e: HiveSQLException =>
|
||||
case e: KyuubiSQLException =>
|
||||
error("Error obtaining delegation token", e)
|
||||
val tokenErrorStatus = HiveSQLException.toTStatus(e)
|
||||
val tokenErrorStatus = KyuubiSQLException.toTStatus(e)
|
||||
tokenErrorStatus.setSqlState("42000")
|
||||
resp.setStatus(tokenErrorStatus)
|
||||
}
|
||||
@ -548,9 +548,9 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
req.getDelegationToken)
|
||||
resp.setStatus(OK_STATUS)
|
||||
} catch {
|
||||
case e: HiveSQLException =>
|
||||
case e: KyuubiSQLException =>
|
||||
error("Error canceling delegation token", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
}
|
||||
resp
|
||||
@ -568,9 +568,9 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
|
||||
req.getDelegationToken)
|
||||
resp.setStatus(OK_STATUS)
|
||||
} catch {
|
||||
case e: HiveSQLException =>
|
||||
case e: KyuubiSQLException =>
|
||||
error("Error obtaining renewing token", e)
|
||||
resp.setStatus(HiveSQLException.toTStatus(e))
|
||||
resp.setStatus(KyuubiSQLException.toTStatus(e))
|
||||
}
|
||||
}
|
||||
resp
|
||||
|
||||
@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.ui.KyuubiServerTab
|
||||
|
||||
import yaooqinn.kyuubi.Logging
|
||||
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
|
||||
import yaooqinn.kyuubi.auth.KyuubiAuthFactory
|
||||
import yaooqinn.kyuubi.cli._
|
||||
import yaooqinn.kyuubi.operation.{KyuubiOperation, OperationHandle, OperationManager}
|
||||
@ -110,7 +110,7 @@ private[kyuubi] class KyuubiSession(
|
||||
wait(interval)
|
||||
checkRound -= 1
|
||||
if (checkRound <= 0) {
|
||||
throw new HiveSQLException(s"A partially constructed SparkContext for [$getUserName] " +
|
||||
throw new KyuubiSQLException(s"A partially constructed SparkContext for [$getUserName] " +
|
||||
s"has last more than ${checkRound * interval} seconds")
|
||||
}
|
||||
info(s"A partially constructed SparkContext for [$getUserName], $checkRound times countdown.")
|
||||
@ -177,15 +177,15 @@ private[kyuubi] class KyuubiSession(
|
||||
override def run(): Unit = HadoopUtils.killYarnAppByName(appName)
|
||||
})
|
||||
}
|
||||
throw new HiveSQLException(
|
||||
s"Get SparkSession for [$getUserName] failed: " + te, "08S01", te)
|
||||
throw new KyuubiSQLException(
|
||||
s"Get SparkSession for [$getUserName] failed: " + te, "08S01", 1001, te)
|
||||
case _ =>
|
||||
stopContext()
|
||||
throw new HiveSQLException(ute.toString, "08S01", ute.getCause)
|
||||
throw new KyuubiSQLException(ute.toString, "08S01", ute.getCause)
|
||||
}
|
||||
case e: Exception =>
|
||||
stopContext()
|
||||
throw new HiveSQLException(
|
||||
throw new KyuubiSQLException(
|
||||
s"Get SparkSession for [$getUserName] failed: " + e, "08S01", e)
|
||||
} finally {
|
||||
sessionManager.setSCFullyConstructed(getUserName)
|
||||
@ -216,7 +216,7 @@ private[kyuubi] class KyuubiSession(
|
||||
}
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
private[this] def executeStatementInternal(statement: String) = {
|
||||
acquire(true)
|
||||
val operation =
|
||||
@ -227,7 +227,7 @@ private[kyuubi] class KyuubiSession(
|
||||
opHandleSet.add(opHandle)
|
||||
opHandle
|
||||
} catch {
|
||||
case e: HiveSQLException =>
|
||||
case e: KyuubiSQLException =>
|
||||
operationManager.closeOperation(opHandle)
|
||||
throw e
|
||||
} finally {
|
||||
@ -293,7 +293,7 @@ private[kyuubi] class KyuubiSession(
|
||||
|
||||
def ugi: UserGroupInformation = this.sessionUGI
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def open(sessionConf: Map[String, String]): Unit = {
|
||||
try {
|
||||
getOrCreateSparkSession(sessionConf)
|
||||
@ -305,10 +305,10 @@ private[kyuubi] class KyuubiSession(
|
||||
} catch {
|
||||
case ute: UndeclaredThrowableException => ute.getCause match {
|
||||
case e: HiveAccessControlException =>
|
||||
throw new HiveSQLException(e.getMessage, "08S01", e.getCause)
|
||||
throw new KyuubiSQLException(e.getMessage, "08S01", e.getCause)
|
||||
case e: NoSuchDatabaseException =>
|
||||
throw new HiveSQLException(e.getMessage, "08S01", e.getCause)
|
||||
case e: HiveSQLException => throw e
|
||||
throw new KyuubiSQLException(e.getMessage, "08S01", e.getCause)
|
||||
case e: KyuubiSQLException => throw e
|
||||
}
|
||||
}
|
||||
lastAccessTime = System.currentTimeMillis
|
||||
@ -323,7 +323,7 @@ private[kyuubi] class KyuubiSession(
|
||||
case GetInfoType.DBMS_NAME => new GetInfoValue("Spark SQL")
|
||||
case GetInfoType.DBMS_VERSION => new GetInfoValue(this._sparkSession.version)
|
||||
case _ =>
|
||||
throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString)
|
||||
throw new KyuubiSQLException("Unrecognized GetInfoType value: " + getInfoType.toString)
|
||||
}
|
||||
} finally {
|
||||
release(true)
|
||||
@ -336,7 +336,7 @@ private[kyuubi] class KyuubiSession(
|
||||
* @param statement sql statement
|
||||
* @return
|
||||
*/
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def executeStatement(statement: String): OperationHandle = {
|
||||
executeStatementInternal(statement)
|
||||
}
|
||||
@ -347,7 +347,7 @@ private[kyuubi] class KyuubiSession(
|
||||
* @param statement sql statement
|
||||
* @return
|
||||
*/
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def executeStatementAsync(statement: String): OperationHandle = {
|
||||
executeStatementInternal(statement)
|
||||
}
|
||||
@ -355,7 +355,7 @@ private[kyuubi] class KyuubiSession(
|
||||
/**
|
||||
* close the session
|
||||
*/
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def close(): Unit = {
|
||||
acquire(true)
|
||||
try {
|
||||
@ -371,7 +371,7 @@ private[kyuubi] class KyuubiSession(
|
||||
FileSystem.closeAllForUGI(sessionUGI)
|
||||
} catch {
|
||||
case ioe: IOException =>
|
||||
throw new HiveSQLException("Could not clean up file-system handles for UGI: "
|
||||
throw new KyuubiSQLException("Could not clean up file-system handles for UGI: "
|
||||
+ sessionUGI, ioe)
|
||||
}
|
||||
}
|
||||
@ -405,7 +405,7 @@ private[kyuubi] class KyuubiSession(
|
||||
}
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def fetchResults(
|
||||
opHandle: OperationHandle,
|
||||
orientation: FetchOrientation,
|
||||
@ -424,7 +424,7 @@ private[kyuubi] class KyuubiSession(
|
||||
}
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def getDelegationToken(
|
||||
authFactory: KyuubiAuthFactory,
|
||||
owner: String,
|
||||
@ -432,12 +432,12 @@ private[kyuubi] class KyuubiSession(
|
||||
authFactory.getDelegationToken(owner, renewer)
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def cancelDelegationToken(authFactory: KyuubiAuthFactory, tokenStr: String): Unit = {
|
||||
authFactory.cancelDelegationToken(tokenStr)
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def renewDelegationToken(authFactory: KyuubiAuthFactory, tokenStr: String): Unit = {
|
||||
authFactory.renewDelegationToken(tokenStr)
|
||||
}
|
||||
|
||||
@ -26,13 +26,12 @@ import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable.{HashSet => MHSet}
|
||||
|
||||
import org.apache.commons.io.FileUtils
|
||||
import org.apache.hive.service.cli.HiveSQLException
|
||||
import org.apache.hive.service.cli.thrift.TProtocolVersion
|
||||
import org.apache.spark.{SparkConf, SparkException}
|
||||
import org.apache.spark.KyuubiConf._
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
import yaooqinn.kyuubi.Logging
|
||||
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
|
||||
import yaooqinn.kyuubi.operation.OperationManager
|
||||
import yaooqinn.kyuubi.server.KyuubiServer
|
||||
import yaooqinn.kyuubi.service.CompositeService
|
||||
@ -155,7 +154,7 @@ private[kyuubi] class SessionManager private(
|
||||
try {
|
||||
closeSession(handle)
|
||||
} catch {
|
||||
case e: HiveSQLException =>
|
||||
case e: KyuubiSQLException =>
|
||||
warn("Exception is thrown closing idle session " + handle, e)
|
||||
}
|
||||
} else {
|
||||
@ -260,20 +259,20 @@ private[kyuubi] class SessionManager private(
|
||||
if (sessionAndTimes != null) {
|
||||
sessionAndTimes._2.decrementAndGet()
|
||||
} else {
|
||||
throw new SparkException(s"SparkSession for [$sessionUser] does not exist")
|
||||
throw new KyuubiSQLException(s"SparkSession for [$sessionUser] does not exist")
|
||||
}
|
||||
val session = handleToSession.remove(sessionHandle)
|
||||
if (session == null) {
|
||||
throw new HiveSQLException(s"Session for [$sessionUser] does not exist!")
|
||||
throw new KyuubiSQLException(s"Session for [$sessionUser] does not exist!")
|
||||
}
|
||||
session.close()
|
||||
}
|
||||
|
||||
@throws[HiveSQLException]
|
||||
@throws[KyuubiSQLException]
|
||||
def getSession(sessionHandle: SessionHandle): KyuubiSession = {
|
||||
val session = handleToSession.get(sessionHandle)
|
||||
if (session == null) {
|
||||
throw new HiveSQLException("Invalid SessionHandle: " + sessionHandle)
|
||||
throw new KyuubiSQLException("Invalid SessionHandle: " + sessionHandle)
|
||||
}
|
||||
session
|
||||
}
|
||||
|
||||
@ -17,105 +17,106 @@
|
||||
|
||||
package yaooqinn.kyuubi.operation
|
||||
|
||||
import org.apache.hive.service.cli.HiveSQLException
|
||||
import org.apache.hive.service.cli.thrift.TOperationState
|
||||
import org.apache.spark.SparkFunSuite
|
||||
|
||||
import yaooqinn.kyuubi.KyuubiSQLException
|
||||
|
||||
class OperationStateSuite extends SparkFunSuite {
|
||||
|
||||
test("OperationState INITIALIZED") {
|
||||
val tOpState = TOperationState.INITIALIZED_STATE
|
||||
assert(INITIALIZED.toTOperationState() === tOpState)
|
||||
assert(!INITIALIZED.isTerminal())
|
||||
intercept[HiveSQLException](INITIALIZED.validateTransition(INITIALIZED))
|
||||
intercept[HiveSQLException](INITIALIZED.validateTransition(FINISHED))
|
||||
intercept[HiveSQLException](INITIALIZED.validateTransition(ERROR))
|
||||
intercept[HiveSQLException](INITIALIZED.validateTransition(UNKNOWN))
|
||||
intercept[KyuubiSQLException](INITIALIZED.validateTransition(INITIALIZED))
|
||||
intercept[KyuubiSQLException](INITIALIZED.validateTransition(FINISHED))
|
||||
intercept[KyuubiSQLException](INITIALIZED.validateTransition(ERROR))
|
||||
intercept[KyuubiSQLException](INITIALIZED.validateTransition(UNKNOWN))
|
||||
}
|
||||
|
||||
test("OperationState RUNNING") {
|
||||
val tOpState = TOperationState.RUNNING_STATE
|
||||
assert(RUNNING.toTOperationState() === tOpState)
|
||||
assert(!RUNNING.isTerminal())
|
||||
intercept[HiveSQLException](RUNNING.validateTransition(INITIALIZED))
|
||||
intercept[HiveSQLException](RUNNING.validateTransition(PENDING))
|
||||
intercept[HiveSQLException](RUNNING.validateTransition(RUNNING))
|
||||
intercept[HiveSQLException](RUNNING.validateTransition(UNKNOWN))
|
||||
intercept[KyuubiSQLException](RUNNING.validateTransition(INITIALIZED))
|
||||
intercept[KyuubiSQLException](RUNNING.validateTransition(PENDING))
|
||||
intercept[KyuubiSQLException](RUNNING.validateTransition(RUNNING))
|
||||
intercept[KyuubiSQLException](RUNNING.validateTransition(UNKNOWN))
|
||||
}
|
||||
|
||||
test("OperationState FINISHED") {
|
||||
val tOpState = TOperationState.FINISHED_STATE
|
||||
assert(FINISHED.toTOperationState() === tOpState)
|
||||
assert(FINISHED.isTerminal())
|
||||
intercept[HiveSQLException](FINISHED.validateTransition(INITIALIZED))
|
||||
intercept[HiveSQLException](FINISHED.validateTransition(PENDING))
|
||||
intercept[HiveSQLException](FINISHED.validateTransition(RUNNING))
|
||||
intercept[HiveSQLException](FINISHED.validateTransition(FINISHED))
|
||||
intercept[HiveSQLException](FINISHED.validateTransition(ERROR))
|
||||
intercept[HiveSQLException](FINISHED.validateTransition(CANCELED))
|
||||
intercept[HiveSQLException](FINISHED.validateTransition(UNKNOWN))
|
||||
intercept[KyuubiSQLException](FINISHED.validateTransition(INITIALIZED))
|
||||
intercept[KyuubiSQLException](FINISHED.validateTransition(PENDING))
|
||||
intercept[KyuubiSQLException](FINISHED.validateTransition(RUNNING))
|
||||
intercept[KyuubiSQLException](FINISHED.validateTransition(FINISHED))
|
||||
intercept[KyuubiSQLException](FINISHED.validateTransition(ERROR))
|
||||
intercept[KyuubiSQLException](FINISHED.validateTransition(CANCELED))
|
||||
intercept[KyuubiSQLException](FINISHED.validateTransition(UNKNOWN))
|
||||
}
|
||||
|
||||
test("OperationState CANCELED") {
|
||||
val tOpState = TOperationState.CANCELED_STATE
|
||||
assert(CANCELED.toTOperationState() === tOpState)
|
||||
assert(CANCELED.isTerminal())
|
||||
intercept[HiveSQLException](CANCELED.validateTransition(INITIALIZED))
|
||||
intercept[HiveSQLException](CANCELED.validateTransition(PENDING))
|
||||
intercept[HiveSQLException](CANCELED.validateTransition(RUNNING))
|
||||
intercept[HiveSQLException](CANCELED.validateTransition(FINISHED))
|
||||
intercept[HiveSQLException](CANCELED.validateTransition(ERROR))
|
||||
intercept[HiveSQLException](CANCELED.validateTransition(CANCELED))
|
||||
intercept[HiveSQLException](CANCELED.validateTransition(UNKNOWN))
|
||||
intercept[KyuubiSQLException](CANCELED.validateTransition(INITIALIZED))
|
||||
intercept[KyuubiSQLException](CANCELED.validateTransition(PENDING))
|
||||
intercept[KyuubiSQLException](CANCELED.validateTransition(RUNNING))
|
||||
intercept[KyuubiSQLException](CANCELED.validateTransition(FINISHED))
|
||||
intercept[KyuubiSQLException](CANCELED.validateTransition(ERROR))
|
||||
intercept[KyuubiSQLException](CANCELED.validateTransition(CANCELED))
|
||||
intercept[KyuubiSQLException](CANCELED.validateTransition(UNKNOWN))
|
||||
}
|
||||
|
||||
test("OperationState CLOSED") {
|
||||
val tOpState = TOperationState.CLOSED_STATE
|
||||
assert(CLOSED.toTOperationState() === tOpState)
|
||||
assert(CLOSED.isTerminal())
|
||||
intercept[HiveSQLException](CLOSED.validateTransition(INITIALIZED))
|
||||
intercept[HiveSQLException](CLOSED.validateTransition(PENDING))
|
||||
intercept[HiveSQLException](CLOSED.validateTransition(RUNNING))
|
||||
intercept[HiveSQLException](CLOSED.validateTransition(FINISHED))
|
||||
intercept[HiveSQLException](CLOSED.validateTransition(ERROR))
|
||||
intercept[HiveSQLException](CLOSED.validateTransition(CANCELED))
|
||||
intercept[HiveSQLException](CLOSED.validateTransition(CLOSED))
|
||||
intercept[HiveSQLException](CLOSED.validateTransition(UNKNOWN))
|
||||
intercept[KyuubiSQLException](CLOSED.validateTransition(INITIALIZED))
|
||||
intercept[KyuubiSQLException](CLOSED.validateTransition(PENDING))
|
||||
intercept[KyuubiSQLException](CLOSED.validateTransition(RUNNING))
|
||||
intercept[KyuubiSQLException](CLOSED.validateTransition(FINISHED))
|
||||
intercept[KyuubiSQLException](CLOSED.validateTransition(ERROR))
|
||||
intercept[KyuubiSQLException](CLOSED.validateTransition(CANCELED))
|
||||
intercept[KyuubiSQLException](CLOSED.validateTransition(CLOSED))
|
||||
intercept[KyuubiSQLException](CLOSED.validateTransition(UNKNOWN))
|
||||
}
|
||||
|
||||
test("OperationState ERROR") {
|
||||
val tOpState = TOperationState.ERROR_STATE
|
||||
assert(ERROR.toTOperationState() === tOpState)
|
||||
assert(ERROR.isTerminal())
|
||||
intercept[HiveSQLException](ERROR.validateTransition(INITIALIZED))
|
||||
intercept[HiveSQLException](ERROR.validateTransition(PENDING))
|
||||
intercept[HiveSQLException](ERROR.validateTransition(RUNNING))
|
||||
intercept[HiveSQLException](ERROR.validateTransition(FINISHED))
|
||||
intercept[HiveSQLException](ERROR.validateTransition(ERROR))
|
||||
intercept[HiveSQLException](ERROR.validateTransition(CANCELED))
|
||||
intercept[HiveSQLException](ERROR.validateTransition(UNKNOWN))
|
||||
intercept[KyuubiSQLException](ERROR.validateTransition(INITIALIZED))
|
||||
intercept[KyuubiSQLException](ERROR.validateTransition(PENDING))
|
||||
intercept[KyuubiSQLException](ERROR.validateTransition(RUNNING))
|
||||
intercept[KyuubiSQLException](ERROR.validateTransition(FINISHED))
|
||||
intercept[KyuubiSQLException](ERROR.validateTransition(ERROR))
|
||||
intercept[KyuubiSQLException](ERROR.validateTransition(CANCELED))
|
||||
intercept[KyuubiSQLException](ERROR.validateTransition(UNKNOWN))
|
||||
}
|
||||
|
||||
test("OperationState UNKNOWN") {
|
||||
val tOpState = TOperationState.UKNOWN_STATE
|
||||
assert(UNKNOWN.toTOperationState() === tOpState)
|
||||
assert(!UNKNOWN.isTerminal())
|
||||
intercept[HiveSQLException](UNKNOWN.validateTransition(INITIALIZED))
|
||||
intercept[HiveSQLException](UNKNOWN.validateTransition(PENDING))
|
||||
intercept[HiveSQLException](UNKNOWN.validateTransition(RUNNING))
|
||||
intercept[HiveSQLException](UNKNOWN.validateTransition(FINISHED))
|
||||
intercept[HiveSQLException](UNKNOWN.validateTransition(ERROR))
|
||||
intercept[HiveSQLException](UNKNOWN.validateTransition(CANCELED))
|
||||
intercept[HiveSQLException](UNKNOWN.validateTransition(CLOSED))
|
||||
intercept[HiveSQLException](UNKNOWN.validateTransition(UNKNOWN))
|
||||
intercept[KyuubiSQLException](UNKNOWN.validateTransition(INITIALIZED))
|
||||
intercept[KyuubiSQLException](UNKNOWN.validateTransition(PENDING))
|
||||
intercept[KyuubiSQLException](UNKNOWN.validateTransition(RUNNING))
|
||||
intercept[KyuubiSQLException](UNKNOWN.validateTransition(FINISHED))
|
||||
intercept[KyuubiSQLException](UNKNOWN.validateTransition(ERROR))
|
||||
intercept[KyuubiSQLException](UNKNOWN.validateTransition(CANCELED))
|
||||
intercept[KyuubiSQLException](UNKNOWN.validateTransition(CLOSED))
|
||||
intercept[KyuubiSQLException](UNKNOWN.validateTransition(UNKNOWN))
|
||||
}
|
||||
|
||||
test("OperationState PENDING") {
|
||||
val tOpState = TOperationState.PENDING_STATE
|
||||
assert(PENDING.toTOperationState() === tOpState)
|
||||
assert(!PENDING.isTerminal())
|
||||
intercept[HiveSQLException](PENDING.validateTransition(INITIALIZED))
|
||||
intercept[HiveSQLException](PENDING.validateTransition(PENDING))
|
||||
intercept[HiveSQLException](PENDING.validateTransition(UNKNOWN))
|
||||
intercept[KyuubiSQLException](PENDING.validateTransition(INITIALIZED))
|
||||
intercept[KyuubiSQLException](PENDING.validateTransition(PENDING))
|
||||
intercept[KyuubiSQLException](PENDING.validateTransition(UNKNOWN))
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,13 +17,14 @@
|
||||
|
||||
package yaooqinn.kyuubi.operation
|
||||
|
||||
import org.apache.hive.service.cli.HiveSQLException
|
||||
import org.apache.spark.SparkFunSuite
|
||||
|
||||
import yaooqinn.kyuubi.KyuubiSQLException
|
||||
|
||||
class OperationStatusSuite extends SparkFunSuite {
|
||||
|
||||
test("operation status basic tests") {
|
||||
val status = new OperationStatus(INITIALIZED, new HiveSQLException("test"))
|
||||
val status = new OperationStatus(INITIALIZED, new KyuubiSQLException("test"))
|
||||
assert(status.getState === INITIALIZED)
|
||||
assert(!status.getState.isTerminal())
|
||||
assert(status.getOperationException.getMessage === "test")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user