fix #118 fix npe with incremental result collection
This commit is contained in:
parent
8a30b63c37
commit
d4b9d61ea4
@ -145,7 +145,7 @@ object SparkEnv extends Logging {
|
||||
private[spark] val driverSystemName = "sparkDriver"
|
||||
private[spark] val executorSystemName = "sparkExecutor"
|
||||
|
||||
private[this] def user = UserGroupInformation.getCurrentUser.getShortUserName
|
||||
private def user = UserGroupInformation.getCurrentUser.getShortUserName
|
||||
|
||||
def set(e: SparkEnv) {
|
||||
if (e == null) {
|
||||
@ -162,7 +162,7 @@ object SparkEnv extends Logging {
|
||||
*/
|
||||
def get: SparkEnv = {
|
||||
debug(s"Kyuubi: Get SparkEnv for $user")
|
||||
env.get(user)
|
||||
env.getOrDefault(user, env.values().iterator().next())
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -36,7 +36,7 @@ class KyuubiSQLException(reason: String, sqlState: String, vendorCode: Int, caus
|
||||
|
||||
def this(reason: String) = this(reason, sqlState = null)
|
||||
|
||||
def this(cause: Throwable) = this(reason = null, cause)
|
||||
def this(cause: Throwable) = this(cause.toString, cause)
|
||||
|
||||
/**
|
||||
* Converts current object to a [[TStatus]] object
|
||||
@ -64,26 +64,38 @@ object KyuubiSQLException {
|
||||
tStatus
|
||||
}
|
||||
|
||||
def toString(
|
||||
cause: Throwable,
|
||||
parent: Array[StackTraceElement] = Array.empty): List[String] = {
|
||||
|
||||
def toString(cause: Throwable): List[String] = {
|
||||
toString(cause, null)
|
||||
}
|
||||
|
||||
def toString(cause: Throwable, parent: Array[StackTraceElement]): List[String] = {
|
||||
val trace = cause.getStackTrace
|
||||
enroll(cause, trace.diff(parent)) ++
|
||||
var m = trace.length - 1
|
||||
if (parent != null) {
|
||||
var n = parent.length - 1
|
||||
while (m >= 0 && n >=0 && trace(m).equals(parent(n))) {
|
||||
m = m - 1
|
||||
n = n - 1
|
||||
}
|
||||
}
|
||||
|
||||
enroll(cause, trace, m) ++
|
||||
Option(cause.getCause).map(toString(_, trace)).getOrElse(Nil)
|
||||
}
|
||||
|
||||
private[this] def enroll(
|
||||
ex: Throwable,
|
||||
trace: Array[StackTraceElement]): List[String] = {
|
||||
private[this] def enroll(ex: Throwable,
|
||||
trace: Array[StackTraceElement], max: Int): List[String] = {
|
||||
val builder = new StringBuilder
|
||||
builder.append('*').append(ex.getClass.getName).append(':')
|
||||
Option(ex.getMessage).map(_.stripSuffix(";")).foreach(msg => builder.append(msg).append(":"))
|
||||
builder.append(trace.length).append(':').append(trace.length - 1)
|
||||
List(builder.toString) ++ trace.map { t =>
|
||||
builder.append(ex.getMessage).append(':')
|
||||
builder.append(trace.length).append(':').append(max)
|
||||
List(builder.toString) ++ (0 to max).map { i =>
|
||||
builder.setLength(0)
|
||||
builder.append(t.getClassName).append(":").append(t.getMethodName).append(":")
|
||||
Option(t.getFileName).foreach(builder.append)
|
||||
builder.append(":").append(t.getLineNumber)
|
||||
builder.append(trace(i).getClassName).append(":")
|
||||
builder.append(trace(i).getMethodName).append(":")
|
||||
builder.append(Option(trace(i).getFileName).getOrElse("")).append(':')
|
||||
builder.append(trace(i).getLineNumber)
|
||||
builder.toString
|
||||
}.toList
|
||||
}
|
||||
|
||||
@ -221,14 +221,14 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
result.schema
|
||||
}
|
||||
|
||||
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
|
||||
def getNextRowSet(order: FetchOrientation, rowSetSize: Long): RowSet = {
|
||||
validateDefaultFetchOrientation(order)
|
||||
assertState(FINISHED)
|
||||
setHasResultSet(true)
|
||||
val taken = if (order == FetchOrientation.FETCH_FIRST) {
|
||||
result.toLocalIterator().asScala.take(maxRowsL.toInt)
|
||||
result.toLocalIterator().asScala.take(rowSetSize.toInt)
|
||||
} else {
|
||||
iter.take(maxRowsL.toInt)
|
||||
iter.take(rowSetSize.toInt)
|
||||
}
|
||||
RowSetBuilder.create(getResultSetSchema, taken.toSeq, session.getProtocolVersion)
|
||||
}
|
||||
|
||||
@ -94,7 +94,7 @@ private[kyuubi] class OperationManager private(name: String)
|
||||
def getOperation(operationHandle: OperationHandle): KyuubiOperation = {
|
||||
val operation = getOperationInternal(operationHandle)
|
||||
if (operation == null) {
|
||||
throw new KyuubiSQLException("Invalid OperationHandle: " + operationHandle)
|
||||
throw new KyuubiSQLException("Invalid OperationHandle " + operationHandle)
|
||||
}
|
||||
operation
|
||||
}
|
||||
|
||||
@ -57,14 +57,14 @@ private[kyuubi] class KyuubiSession(
|
||||
sessionManager: SessionManager,
|
||||
operationManager: OperationManager) extends Logging {
|
||||
|
||||
private[this] val sessionHandle: SessionHandle = new SessionHandle(protocol)
|
||||
private[this] val opHandleSet = new MHSet[OperationHandle]
|
||||
private[this] var _isOperationLogEnabled = false
|
||||
private[this] var sessionLogDir: File = _
|
||||
@volatile private[this] var lastAccessTime: Long = System.currentTimeMillis()
|
||||
private[this] var lastIdleTime = 0L
|
||||
private val sessionHandle: SessionHandle = new SessionHandle(protocol)
|
||||
private val opHandleSet = new MHSet[OperationHandle]
|
||||
private var _isOperationLogEnabled = false
|
||||
private var sessionLogDir: File = _
|
||||
@volatile private var lastAccessTime: Long = System.currentTimeMillis()
|
||||
private var lastIdleTime = 0L
|
||||
|
||||
private[this] val sessionUGI: UserGroupInformation = {
|
||||
private val sessionUGI: UserGroupInformation = {
|
||||
val currentUser = UserGroupInformation.getCurrentUser
|
||||
if (withImpersonation) {
|
||||
if (UserGroupInformation.isSecurityEnabled) {
|
||||
@ -82,16 +82,16 @@ private[kyuubi] class KyuubiSession(
|
||||
}
|
||||
}
|
||||
|
||||
private[this] val sparkSessionWithUGI =
|
||||
private val sparkSessionWithUGI =
|
||||
new SparkSessionWithUGI(sessionUGI, conf, sessionManager.getCacheMgr)
|
||||
|
||||
private[this] def acquire(userAccess: Boolean): Unit = {
|
||||
private def acquire(userAccess: Boolean): Unit = {
|
||||
if (userAccess) {
|
||||
lastAccessTime = System.currentTimeMillis
|
||||
}
|
||||
}
|
||||
|
||||
private[this] def release(userAccess: Boolean): Unit = {
|
||||
private def release(userAccess: Boolean): Unit = {
|
||||
if (userAccess) {
|
||||
lastAccessTime = System.currentTimeMillis
|
||||
}
|
||||
@ -103,7 +103,7 @@ private[kyuubi] class KyuubiSession(
|
||||
}
|
||||
|
||||
@throws[KyuubiSQLException]
|
||||
private[this] def executeStatementInternal(statement: String) = {
|
||||
private def executeStatementInternal(statement: String): OperationHandle = {
|
||||
acquire(true)
|
||||
val operation =
|
||||
operationManager.newExecuteStatementOperation(this, statement)
|
||||
@ -121,7 +121,7 @@ private[kyuubi] class KyuubiSession(
|
||||
}
|
||||
}
|
||||
|
||||
private[this] def cleanupSessionLogDir(): Unit = {
|
||||
private def cleanupSessionLogDir(): Unit = {
|
||||
if (_isOperationLogEnabled) {
|
||||
try {
|
||||
FileUtils.forceDelete(sessionLogDir)
|
||||
@ -152,7 +152,7 @@ private[kyuubi] class KyuubiSession(
|
||||
case GetInfoType.DBMS_VERSION =>
|
||||
new GetInfoValue(this.sparkSessionWithUGI.sparkSession.version)
|
||||
case _ =>
|
||||
throw new KyuubiSQLException("Unrecognized GetInfoType value: " + getInfoType.toString)
|
||||
throw new KyuubiSQLException("Unrecognized GetInfoType value " + getInfoType.toString)
|
||||
}
|
||||
} finally {
|
||||
release(true)
|
||||
@ -199,7 +199,7 @@ private[kyuubi] class KyuubiSession(
|
||||
FileSystem.closeAllForUGI(sessionUGI)
|
||||
} catch {
|
||||
case ioe: IOException =>
|
||||
throw new KyuubiSQLException("Could not clean up file-system handles for UGI: "
|
||||
throw new KyuubiSQLException("Could not clean up file-system handles for UGI "
|
||||
+ sessionUGI, ioe)
|
||||
}
|
||||
}
|
||||
|
||||
@ -247,7 +247,7 @@ private[kyuubi] class SessionManager private(
|
||||
def getSession(sessionHandle: SessionHandle): KyuubiSession = {
|
||||
val session = handleToSession.get(sessionHandle)
|
||||
if (session == null) {
|
||||
throw new KyuubiSQLException("Invalid SessionHandle: " + sessionHandle)
|
||||
throw new KyuubiSQLException("Invalid SessionHandle " + sessionHandle)
|
||||
}
|
||||
session
|
||||
}
|
||||
|
||||
@ -41,13 +41,13 @@ class SparkSessionWithUGI(
|
||||
user: UserGroupInformation,
|
||||
conf: SparkConf,
|
||||
cache: SparkSessionCacheManager) extends Logging {
|
||||
private[this] var _sparkSession: SparkSession = _
|
||||
private[this] val userName: String = user.getShortUserName
|
||||
private[this] val promisedSparkContext = Promise[SparkContext]()
|
||||
private[this] var initialDatabase: Option[String] = None
|
||||
private[this] var sparkException: Option[Throwable] = None
|
||||
private var _sparkSession: SparkSession = _
|
||||
private val userName: String = user.getShortUserName
|
||||
private val promisedSparkContext = Promise[SparkContext]()
|
||||
private var initialDatabase: Option[String] = None
|
||||
private var sparkException: Option[Throwable] = None
|
||||
|
||||
private[this] def newContext(): Thread = {
|
||||
private def newContext(): Thread = {
|
||||
new Thread(s"Start-SparkContext-$userName") {
|
||||
override def run(): Unit = {
|
||||
try {
|
||||
@ -64,7 +64,7 @@ class SparkSessionWithUGI(
|
||||
/**
|
||||
* Invoke SparkContext.stop() if not succeed initializing it
|
||||
*/
|
||||
private[this] def stopContext(): Unit = {
|
||||
private def stopContext(): Unit = {
|
||||
promisedSparkContext.future.map { sc =>
|
||||
warn(s"Error occurred during initializing SparkContext for $userName, stopping")
|
||||
try {
|
||||
@ -82,7 +82,7 @@ class SparkSessionWithUGI(
|
||||
*
|
||||
* @param sessionConf configurations for user connection string
|
||||
*/
|
||||
private[this] def configureSparkConf(sessionConf: Map[String, String]): Unit = {
|
||||
private def configureSparkConf(sessionConf: Map[String, String]): Unit = {
|
||||
for ((key, value) <- sessionConf) {
|
||||
key match {
|
||||
case HIVE_VAR_PREFIX(DEPRECATED_QUEUE) => conf.set(QUEUE, value)
|
||||
@ -107,7 +107,7 @@ class SparkSessionWithUGI(
|
||||
*
|
||||
* @param sessionConf configurations for user connection string
|
||||
*/
|
||||
private[this] def configureSparkSession(sessionConf: Map[String, String]): Unit = {
|
||||
private def configureSparkSession(sessionConf: Map[String, String]): Unit = {
|
||||
for ((key, value) <- sessionConf) {
|
||||
key match {
|
||||
case HIVE_VAR_PREFIX(k) =>
|
||||
@ -122,7 +122,7 @@ class SparkSessionWithUGI(
|
||||
}
|
||||
}
|
||||
|
||||
private[this] def getOrCreate(sessionConf: Map[String, String]): Unit = synchronized {
|
||||
private def getOrCreate(sessionConf: Map[String, String]): Unit = synchronized {
|
||||
val totalRounds = math.max(conf.get(BACKEND_SESSION_WAIT_OTHER_TIMES).toInt, 15)
|
||||
var checkRound = totalRounds
|
||||
val interval = conf.getTimeAsMs(BACKEND_SESSION_WAIT_OTHER_INTERVAL)
|
||||
@ -148,7 +148,7 @@ class SparkSessionWithUGI(
|
||||
}
|
||||
}
|
||||
|
||||
private[this] def create(sessionConf: Map[String, String]): Unit = {
|
||||
private def create(sessionConf: Map[String, String]): Unit = {
|
||||
info(s"--------- Create new SparkSession for $userName ----------")
|
||||
// kyuubi|user name|canonical host name| port
|
||||
val appName = Seq(
|
||||
@ -218,7 +218,7 @@ class SparkSessionWithUGI(
|
||||
}
|
||||
|
||||
object SparkSessionWithUGI {
|
||||
private[this] val userSparkContextBeingConstruct = new MHSet[String]()
|
||||
private val userSparkContextBeingConstruct = new MHSet[String]()
|
||||
|
||||
def setPartiallyConstructed(user: String): Unit = {
|
||||
userSparkContextBeingConstruct.add(user)
|
||||
|
||||
@ -102,7 +102,7 @@ class OperationManagerSuite extends SparkFunSuite with Matchers with MockitoSuga
|
||||
assert(op === op2)
|
||||
val operationHandle = mock[OperationHandle]
|
||||
val e = intercept[KyuubiSQLException](operationMgr.getOperation(operationHandle))
|
||||
e.getMessage should startWith("Invalid OperationHandle:")
|
||||
e.getMessage should startWith("Invalid OperationHandle")
|
||||
|
||||
val e2 = intercept[KyuubiSQLException](operationMgr.closeOperation(operationHandle))
|
||||
e2.getMessage should be("Operation does not exist!")
|
||||
|
||||
@ -91,7 +91,7 @@ class KyuubiSessionSuite extends SparkFunSuite {
|
||||
assert(
|
||||
session.getInfo(GetInfoType.DBMS_VERSION).toTGetInfoValue.getStringValue === spark.version)
|
||||
val e = intercept[KyuubiSQLException](session.getInfo(new GetInfoType {}))
|
||||
assert(e.getMessage.startsWith("Unrecognized GetInfoType value:"))
|
||||
assert(e.getMessage.startsWith("Unrecognized GetInfoType value"))
|
||||
}
|
||||
|
||||
test("get last access time") {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user