From 99959b89b7ef488ebb98cf1bf39f7dbbf1a5187e Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 16 Jun 2022 13:56:20 +0800 Subject: [PATCH] [KYUUBI #2854] Add exception field in KyuubiSessionEvent ### _Why are the changes needed?_ close #2854 ### _How was this patch tested?_ - [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2860 from wForget/KYUUBI-2854. Closes #2854 3225be7a [wforget] comment 834aedcf [Wang Zhen] update scaladoc d71c1f26 [wforget] fix checkstyle da0e92d2 [wforget] [KYUUBI-2854] Add exception field in KyuubiSessionEvent Lead-authored-by: wforget <643348094@qq.com> Co-authored-by: Wang Zhen Signed-off-by: Cheng Pan --- .../kyuubi/events/KyuubiSessionEvent.scala | 10 +++--- .../kyuubi/operation/BatchJobSubmission.scala | 2 +- .../kyuubi/operation/LaunchEngine.scala | 2 +- .../apache/kyuubi/session/KyuubiSession.scala | 9 ++++++ .../ServerJsonLoggingEventHandlerSuite.scala | 32 +++++++++++++++++++ 5 files changed, 49 insertions(+), 6 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala index 0f95dc37c..0cd44188c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiSessionEvent.scala @@ -30,13 +30,14 @@ import org.apache.kyuubi.session.KyuubiSession * @param clientIP client ip address * @param serverIP A unique Kyuubi server id, e.g. kyuubi server ip address and port, * it is useful if has multi-instance Kyuubi Server - * @param remoteSessionId remote engine session id - * @param startTime session create time * @param conf session config + * @param startTime session create time + * @param remoteSessionId remote engine session id + * @param engineId engine id. For engine on yarn, it is applicationId. * @param openedTime session opened time * @param endTime session end time * @param totalOperations how many queries and meta calls - * @param engineId engine id. For engine on yarn, it is applicationId. + * @param exception the session exception, such as the exception that occur when opening session */ case class KyuubiSessionEvent( sessionId: String, @@ -52,7 +53,8 @@ case class KyuubiSessionEvent( var engineId: String = "", var openedTime: Long = -1L, var endTime: Long = -1L, - var totalOperations: Int = 0) extends KyuubiEvent { + var totalOperations: Int = 0, + var exception: Option[Throwable] = None) extends KyuubiEvent { override def partitions: Seq[(String, String)] = ("day", Utils.getDateFromTimestamp(startTime)) :: Nil } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index 5af11b7ab..a81117d2e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -151,7 +151,7 @@ class BatchJobSubmission( OperationLog.removeCurrentOperationLog() } - override protected def runInternal(): Unit = { + override protected def runInternal(): Unit = session.handleSessionException { val asyncOperation: Runnable = () => { setStateIfNotCanceled(OperationState.RUNNING) try { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala index 8e087df6e..46ec94225 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala @@ -44,7 +44,7 @@ class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: Bool OperationLog.removeCurrentOperationLog() } - override protected def runInternal(): Unit = { + override protected def runInternal(): Unit = session.handleSessionException { val asyncOperation: Runnable = () => { setState(OperationState.RUNNING) try { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala index b2cf1a0d6..64dc649e5 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSession.scala @@ -34,4 +34,13 @@ abstract class KyuubiSession( def getSessionEvent: Option[KyuubiSessionEvent] + private[kyuubi] def handleSessionException(f: => Unit): Unit = { + try { + f + } catch { + case t: Throwable => + getSessionEvent.foreach(_.exception = Some(t)) + throw t + } + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala index 9df55bcc2..62a2742c7 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/events/handler/ServerJsonLoggingEventHandlerSuite.scala @@ -21,11 +21,13 @@ import java.net.InetAddress import java.nio.file.Paths import java.util.UUID +import scala.collection.JavaConverters._ import scala.util.matching.Regex import com.fasterxml.jackson.databind.ObjectMapper import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hive.service.rpc.thrift.{TOpenSessionReq, TStatusCode} import org.apache.kyuubi._ import org.apache.kyuubi.config.KyuubiConf @@ -228,4 +230,34 @@ class ServerJsonLoggingEventHandlerSuite extends WithKyuubiServer with HiveJDBCT } } } + + test("open session exception") { + val name = UUID.randomUUID().toString + withSessionConf(Map( + "spark.driver.memory" -> "abc", + KyuubiConf.ENGINE_SHARE_LEVEL.key -> "CONNECTION", + KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false", + KyuubiConf.SESSION_NAME.key -> name))()() { + withThriftClient { client => + val req = new TOpenSessionReq() + req.setUsername(Utils.currentUser) + req.setPassword("anonymous") + req.setConfiguration(sessionConfigs.asJava) + val resp = client.OpenSession(req) + assert(resp.getSessionHandle === null) + assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + } + } + + val serverSessionEventPath = + Paths.get(serverLogRoot, "kyuubi_session", s"day=$currentDate") + withJdbcStatement() { statement => + val res = statement.executeQuery( + s"SELECT * FROM `json`.`$serverSessionEventPath` " + + s"where sessionName = '$name' and exception is not null limit 1") + assert(res.next()) + val exception = res.getObject("exception") + assert(exception.toString.contains("Invalid maximum heap size: -Xmxabc")) + } + } }