[KYUUBI #2854] Add exception field in KyuubiSessionEvent
### _Why are the changes needed?_ close #2854 ### _How was this patch tested?_ - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2860 from wForget/KYUUBI-2854. Closes #2854 3225be7a [wforget] comment 834aedcf [Wang Zhen] update scaladoc d71c1f26 [wforget] fix checkstyle da0e92d2 [wforget] [KYUUBI-2854] Add exception field in KyuubiSessionEvent Lead-authored-by: wforget <643348094@qq.com> Co-authored-by: Wang Zhen <wangzhen07@qiyi.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
9de3365f50
commit
99959b89b7
@ -30,13 +30,14 @@ import org.apache.kyuubi.session.KyuubiSession
|
||||
* @param clientIP client ip address
|
||||
* @param serverIP A unique Kyuubi server id, e.g. kyuubi server ip address and port,
|
||||
* it is useful if has multi-instance Kyuubi Server
|
||||
* @param remoteSessionId remote engine session id
|
||||
* @param startTime session create time
|
||||
* @param conf session config
|
||||
* @param startTime session create time
|
||||
* @param remoteSessionId remote engine session id
|
||||
* @param engineId engine id. For engine on yarn, it is applicationId.
|
||||
* @param openedTime session opened time
|
||||
* @param endTime session end time
|
||||
* @param totalOperations how many queries and meta calls
|
||||
* @param engineId engine id. For engine on yarn, it is applicationId.
|
||||
* @param exception the session exception, such as the exception that occur when opening session
|
||||
*/
|
||||
case class KyuubiSessionEvent(
|
||||
sessionId: String,
|
||||
@ -52,7 +53,8 @@ case class KyuubiSessionEvent(
|
||||
var engineId: String = "",
|
||||
var openedTime: Long = -1L,
|
||||
var endTime: Long = -1L,
|
||||
var totalOperations: Int = 0) extends KyuubiEvent {
|
||||
var totalOperations: Int = 0,
|
||||
var exception: Option[Throwable] = None) extends KyuubiEvent {
|
||||
override def partitions: Seq[(String, String)] =
|
||||
("day", Utils.getDateFromTimestamp(startTime)) :: Nil
|
||||
}
|
||||
|
||||
@ -151,7 +151,7 @@ class BatchJobSubmission(
|
||||
OperationLog.removeCurrentOperationLog()
|
||||
}
|
||||
|
||||
override protected def runInternal(): Unit = {
|
||||
override protected def runInternal(): Unit = session.handleSessionException {
|
||||
val asyncOperation: Runnable = () => {
|
||||
setStateIfNotCanceled(OperationState.RUNNING)
|
||||
try {
|
||||
|
||||
@ -44,7 +44,7 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool
|
||||
OperationLog.removeCurrentOperationLog()
|
||||
}
|
||||
|
||||
override protected def runInternal(): Unit = {
|
||||
override protected def runInternal(): Unit = session.handleSessionException {
|
||||
val asyncOperation: Runnable = () => {
|
||||
setState(OperationState.RUNNING)
|
||||
try {
|
||||
|
||||
@ -34,4 +34,13 @@ abstract class KyuubiSession(
|
||||
|
||||
def getSessionEvent: Option[KyuubiSessionEvent]
|
||||
|
||||
private[kyuubi] def handleSessionException(f: => Unit): Unit = {
|
||||
try {
|
||||
f
|
||||
} catch {
|
||||
case t: Throwable =>
|
||||
getSessionEvent.foreach(_.exception = Some(t))
|
||||
throw t
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,11 +21,13 @@ import java.net.InetAddress
|
||||
import java.nio.file.Paths
|
||||
import java.util.UUID
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.util.matching.Regex
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hive.service.rpc.thrift.{TOpenSessionReq, TStatusCode}
|
||||
|
||||
import org.apache.kyuubi._
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
@ -228,4 +230,34 @@ class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCT
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("open session exception") {
|
||||
val name = UUID.randomUUID().toString
|
||||
withSessionConf(Map(
|
||||
"spark.driver.memory" -> "abc",
|
||||
KyuubiConf.ENGINE_SHARE_LEVEL.key -> "CONNECTION",
|
||||
KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false",
|
||||
KyuubiConf.SESSION_NAME.key -> name))()() {
|
||||
withThriftClient { client =>
|
||||
val req = new TOpenSessionReq()
|
||||
req.setUsername(Utils.currentUser)
|
||||
req.setPassword("anonymous")
|
||||
req.setConfiguration(sessionConfigs.asJava)
|
||||
val resp = client.OpenSession(req)
|
||||
assert(resp.getSessionHandle === null)
|
||||
assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
|
||||
}
|
||||
}
|
||||
|
||||
val serverSessionEventPath =
|
||||
Paths.get(serverLogRoot, "kyuubi_session", s"day=$currentDate")
|
||||
withJdbcStatement() { statement =>
|
||||
val res = statement.executeQuery(
|
||||
s"SELECT * FROM `json`.`$serverSessionEventPath` " +
|
||||
s"where sessionName = '$name' and exception is not null limit 1")
|
||||
assert(res.next())
|
||||
val exception = res.getObject("exception")
|
||||
assert(exception.toString.contains("Invalid maximum heap size: -Xmxabc"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user