[KYUUBI #1356] Refine KyuubiStatementEvent

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
1. When a statement error happened, the event will be logged twice, in `setState` and `setException`, we fix it
2. We break down KyuubiStatementEvent into three parts - statement basis, status, and sessionId/user, see the improved comments
3. log the remote operation handle too

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1356 from yaooqinn/se.

Closes #1356

4b5d0de2 [Kent Yao] Refine KyuubiStatementEvent
e7228ddd [Kent Yao] Refine KyuubiStatementEvent

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
Kent Yao 2021-11-10 12:09:04 +08:00 committed by ulysses-you
parent 32c3f5cb2e
commit 9db2e5c054
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
7 changed files with 64 additions and 52 deletions

View File

@ -32,6 +32,7 @@ import org.apache.kyuubi.session.Session
abstract class AbstractOperation(opType: OperationType, session: Session)
extends Operation with Logging {
private final val createTime = System.currentTimeMillis()
private final val handle = OperationHandle(opType, session.protocol)
private final val operationTimeout: Long = {
session.sessionManager.getConf.get(OPERATION_IDLE_TIMEOUT)
@ -44,7 +45,7 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
@volatile protected var state: OperationState = INITIALIZED
@volatile protected var startTime: Long = _
@volatile protected var completedTime: Long = _
@volatile protected var lastAccessTime: Long = System.currentTimeMillis()
@volatile protected var lastAccessTime: Long = createTime
@volatile protected var operationException: KyuubiSQLException = _
@volatile protected var hasResultSet: Boolean = false
@ -147,7 +148,8 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
override def getHandle: OperationHandle = handle
override def getStatus: OperationStatus = {
OperationStatus(state, startTime, completedTime, hasResultSet, Option(operationException))
OperationStatus(state, createTime, startTime, lastAccessTime, completedTime, hasResultSet,
Option(operationException))
}
override def shouldRunAsync: Boolean

View File

@ -22,7 +22,9 @@ import org.apache.kyuubi.operation.OperationState.OperationState
case class OperationStatus(
state: OperationState,
create: Long,
start: Long,
lastModified: Long,
completed: Long,
hasResultSet: Boolean,
exception: Option[KyuubiSQLException] = None)

View File

@ -22,7 +22,7 @@ import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
class OperationStatusSuite extends KyuubiFunSuite {
test("operation status") {
val status = OperationStatus(OperationState.INITIALIZED, 0, 0, hasResultSet = false)
val status = OperationStatus(OperationState.INITIALIZED, 0, 0, 0, 0, hasResultSet = false)
assert(status.exception.isEmpty)
val status1 = status.copy(exception = Some(KyuubiSQLException("nothing")))
assert(status1.exception.get.getMessage === "nothing")

View File

@ -18,49 +18,69 @@
package org.apache.kyuubi.events
import org.apache.kyuubi.Utils
import org.apache.kyuubi.operation.ExecuteStatement
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.{ExecuteStatement, OperationHandle}
/**
* A [[KyuubiStatementEvent]] used to tracker the lifecycle of a statement at server side.
* <ul>
* <li>Statement Basis</li>
* <li>Statement Live Status</li>
* <li>Parent Session Id</li>
* </ul>
*
* @param user: who connect to kyuubi server
* @param statementId: the identifier of operationHandler
* @param statement: the sql that you execute
* @param remoteIp: the ip of user
* @param sessionId: the identifier of a session
* @param createTime: the create time of this statement
* @param state: store each state that the sql has
* @param stateTime: the time that the sql's state change
* @param statementId the unique identifier of a single statement
* @param remoteId the unique identifier of a single statement at engine side
* @param statement the sql that you execute
* @param shouldRunAsync the flag indicating whether the query runs synchronously or not
* @param state the current operation state
* @param eventTime the time when the event created & logged
* @param createTime the time for changing to the current operation state
* @param startTime the time the query start to time of this statement
* @param completeTime time time the query ends
* @param exception: caught exception if have
* @param sessionId the identifier of the parent session
* @param sessionUser the authenticated client user
*/
case class KyuubiStatementEvent(
user: String,
case class KyuubiStatementEvent private (
statementId: String,
remoteId: String,
statement: String,
remoteIp: String,
sessionId: String,
shouldRunAsync: Boolean,
state: String,
eventTime: Long,
createTime: Long,
var state: String,
var stateTime: Long,
var exception: String = "") extends KyuubiServerEvent {
startTime: Long,
completeTime: Long,
exception: Option[Throwable],
sessionId: String,
sessionUser: String) extends KyuubiServerEvent {
// statement events are partitioned by the date when the corresponding operations are
// created.
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
}
object KyuubiStatementEvent {
def apply(statement: ExecuteStatement,
statementId: String,
state: OperationState,
stateTime: Long): KyuubiStatementEvent = {
/**
* Shorthand for instantiating a statement event with a [[ExecuteStatement]] instance
*/
def apply(statement: ExecuteStatement): KyuubiStatementEvent = {
val session = statement.getSession
val status = statement.getStatus
new KyuubiStatementEvent(
session.user,
statementId,
statement.getHandle.identifier.toString,
Option(statement.remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull,
statement.statement,
session.ipAddress,
statement.shouldRunAsync,
status.state.name(),
status.lastModified,
status.create,
status.start,
status.completed,
status.exception,
session.handle.identifier.toString,
stateTime,
state.toString,
stateTime)
session.user)
}
}

View File

@ -41,11 +41,8 @@ class ExecuteStatement(
override val statement: String,
override val shouldRunAsync: Boolean,
queryTimeout: Long)
extends KyuubiOperation(
OperationType.EXECUTE_STATEMENT, session, client) {
val statementEvent: KyuubiStatementEvent =
KyuubiStatementEvent(this, statementId, state, lastAccessTime)
extends KyuubiOperation(OperationType.EXECUTE_STATEMENT, session, client) {
EventLoggingService.onEvent(KyuubiStatementEvent(this))
private final val _operationLog: OperationLog = if (shouldRunAsync) {
OperationLog.createOperationLog(session, getHandle)
@ -59,8 +56,6 @@ class ExecuteStatement(
override def getOperationLog: Option[OperationLog] = Option(_operationLog)
EventLoggingService.onEvent(statementEvent)
override def beforeRun(): Unit = {
OperationLog.setCurrentOperationLog(_operationLog)
setHasResultSet(true)
@ -180,15 +175,7 @@ class ExecuteStatement(
override def setState(newState: OperationState): Unit = {
super.setState(newState)
statementEvent.state = newState.toString
statementEvent.stateTime = lastAccessTime
EventLoggingService.onEvent(statementEvent)
}
override def setOperationException(opEx: KyuubiSQLException): Unit = {
super.setOperationException(opEx)
statementEvent.exception = opEx.toString
EventLoggingService.onEvent(statementEvent)
EventLoggingService.onEvent(KyuubiStatementEvent(this))
}
override def close(): Unit = {

View File

@ -57,7 +57,6 @@ abstract class KyuubiOperation(
MetricsSystem.tracing {
_.incCount(MetricRegistry.name(STATEMENT_FAIL, errorType))
}
setState(OperationState.ERROR)
val ke = e match {
case kse: KyuubiSQLException => kse
case te: TTransportException if te.getType == TTransportException.END_OF_FILE &&
@ -69,6 +68,7 @@ abstract class KyuubiOperation(
KyuubiSQLException(s"Error $action $opType: ${Utils.stringifyException(e)}", e)
}
setOperationException(ke)
setState(OperationState.ERROR)
throw ke
}
}

View File

@ -48,7 +48,7 @@ class EventLoggingServiceSuite extends WithKyuubiServer with HiveJDBCTestHelper
override protected def jdbcUrl: String = getJdbcUrl
test("statementEvent: generate, dump and query") {
test("round-trip for logging and querying statement events for both kyuubi server and engine") {
val hostName = InetAddress.getLocalHost.getCanonicalHostName
val serverStatementEventPath =
Paths.get(serverLogRoot, "kyuubi_statement", s"day=$currentDate", s"server-$hostName.json")
@ -58,7 +58,6 @@ class EventLoggingServiceSuite extends WithKyuubiServer with HiveJDBCTestHelper
withJdbcStatement() { statement =>
statement.execute(sql)
// check server statement events
val serverTable = serverStatementEventPath.getParent
val resultSet = statement.executeQuery(s"SELECT * FROM `json`.`${serverTable}`" +
@ -66,9 +65,11 @@ class EventLoggingServiceSuite extends WithKyuubiServer with HiveJDBCTestHelper
val states = Array(INITIALIZED, PENDING, RUNNING, FINISHED, CLOSED)
var stateIndex = 0
while (resultSet.next()) {
assert(resultSet.getString("user") == Utils.currentUser)
assert(resultSet.getString("statement") == sql)
assert(resultSet.getString("state") == states(stateIndex).toString)
assert(resultSet.getString("statement") === sql)
assert(resultSet.getString("shouldRunAsync") === "true")
assert(resultSet.getString("state") === states(stateIndex).name())
assert(resultSet.getString("exception") === null)
assert(resultSet.getString("sessionUser") === Utils.currentUser)
stateIndex += 1
}