diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala index 2a103213e..f67b088a5 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/events/KyuubiOperationEvent.scala @@ -18,8 +18,6 @@ package org.apache.kyuubi.events import org.apache.kyuubi.Utils -import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle} -import org.apache.kyuubi.session.KyuubiSession /** * A [[KyuubiOperationEvent]] used to tracker the lifecycle of an operation at server side. @@ -45,7 +43,7 @@ import org.apache.kyuubi.session.KyuubiSession * @param kyuubiInstance the parent session connection url * @param metrics the operation metrics */ -case class KyuubiOperationEvent private ( +case class KyuubiOperationEvent( statementId: String, remoteId: String, statement: String, @@ -67,30 +65,3 @@ case class KyuubiOperationEvent private ( override def partitions: Seq[(String, String)] = ("day", Utils.getDateFromTimestamp(createTime)) :: Nil } - -object KyuubiOperationEvent { - - /** - * Shorthand for instantiating a operation event with a [[KyuubiOperation]] instance - */ - def apply(operation: KyuubiOperation): KyuubiOperationEvent = { - val session = operation.getSession.asInstanceOf[KyuubiSession] - val status = operation.getStatus - new KyuubiOperationEvent( - operation.statementId, - Option(operation.remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull, - operation.statement, - operation.shouldRunAsync, - status.state.name(), - status.lastModified, - status.create, - status.start, - status.completed, - status.exception, - session.handle.identifier.toString, - session.user, - session.sessionType.toString, - session.connectionUrl, - operation.metrics) - } -} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala index d0289abb9..a543bddb6 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala @@ -206,7 +206,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi protected def eventEnabled: Boolean = false - if (eventEnabled) EventBus.post(KyuubiOperationEvent(this)) + if (eventEnabled) EventBus.post(getOperationEvent) override def setState(newState: OperationState): Unit = { MetricsSystem.tracing { ms => @@ -217,6 +217,26 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi ms.markMeter(MetricRegistry.name(OPERATION_STATE, newState.toString.toLowerCase)) } super.setState(newState) - if (eventEnabled) EventBus.post(KyuubiOperationEvent(this)) + if (eventEnabled) EventBus.post(getOperationEvent) + } + + def getOperationEvent: KyuubiOperationEvent = { + val kyuubiSession = session.asInstanceOf[KyuubiSession] + KyuubiOperationEvent( + statementId, + Option(remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull, + statement, + shouldRunAsync, + state.name(), + lastAccessTime, + createTime, + startTime, + completedTime, + Option(operationException), + kyuubiSession.handle.identifier.toString, + kyuubiSession.user, + kyuubiSession.sessionType.toString, + kyuubiSession.connectionUrl, + metrics) } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala index 494421608..50900627c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala @@ -22,7 +22,6 @@ import scala.collection.JavaConverters._ import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.client.api.v1.dto import org.apache.kyuubi.client.api.v1.dto.{OperationData, OperationProgress, ServerData, SessionData} -import org.apache.kyuubi.events.KyuubiOperationEvent import org.apache.kyuubi.ha.client.ServiceNodeInfo import org.apache.kyuubi.operation.KyuubiOperation import org.apache.kyuubi.session.KyuubiSession @@ -80,7 +79,7 @@ object ApiUtils extends Logging { } def operationEvent(operation: KyuubiOperation): dto.KyuubiOperationEvent = { - val opEvent = KyuubiOperationEvent(operation) + val opEvent = operation.getOperationEvent dto.KyuubiOperationEvent.builder() .statementId(opEvent.statementId) .remoteId(opEvent.remoteId) @@ -102,7 +101,7 @@ object ApiUtils extends Logging { } def operationData(operation: KyuubiOperation): OperationData = { - val opEvent = KyuubiOperationEvent(operation) + val opEvent = operation.getOperationEvent new OperationData( opEvent.statementId, opEvent.remoteId,