[KYUUBI #6626] Fix operation never expired issue with periodical listOperations api calls

# 🔍 Description
## Issue References 🔗

This pull request fixes operation never expired issue.
I found that, some Kyuubi operations never expired.

After investigation, the root cause:
1. our kyuubi console lookup the operations periodically
2. it retrieves the kyuubi operation events
3. the KyuubiOperationEvent::apply will get the operation status and update the operation the last access time, https://github.com/apache/kyuubi/pull/2452
4. then the operation never expired.
## Describe Your Solution 🔧

Just get the operation event without updating the operation last access time.

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6626 from turboFei/operation_not_idle.

Closes #6626

14e6898da [Wang, Fei] Get operation event

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
Wang, Fei 2024-08-18 22:02:25 -07:00
parent 0165d5e3a2
commit 6e5162e8a7
3 changed files with 25 additions and 35 deletions

View File

@ -18,8 +18,6 @@
package org.apache.kyuubi.events
import org.apache.kyuubi.Utils
import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle}
import org.apache.kyuubi.session.KyuubiSession
/**
* A [[KyuubiOperationEvent]] used to tracker the lifecycle of an operation at server side.
@ -45,7 +43,7 @@ import org.apache.kyuubi.session.KyuubiSession
* @param kyuubiInstance the parent session connection url
* @param metrics the operation metrics
*/
case class KyuubiOperationEvent private (
case class KyuubiOperationEvent(
statementId: String,
remoteId: String,
statement: String,
@ -67,30 +65,3 @@ case class KyuubiOperationEvent private (
override def partitions: Seq[(String, String)] =
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
}
object KyuubiOperationEvent {
/**
* Shorthand for instantiating a operation event with a [[KyuubiOperation]] instance
*/
def apply(operation: KyuubiOperation): KyuubiOperationEvent = {
val session = operation.getSession.asInstanceOf[KyuubiSession]
val status = operation.getStatus
new KyuubiOperationEvent(
operation.statementId,
Option(operation.remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull,
operation.statement,
operation.shouldRunAsync,
status.state.name(),
status.lastModified,
status.create,
status.start,
status.completed,
status.exception,
session.handle.identifier.toString,
session.user,
session.sessionType.toString,
session.connectionUrl,
operation.metrics)
}
}

View File

@ -206,7 +206,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
protected def eventEnabled: Boolean = false
if (eventEnabled) EventBus.post(KyuubiOperationEvent(this))
if (eventEnabled) EventBus.post(getOperationEvent)
override def setState(newState: OperationState): Unit = {
MetricsSystem.tracing { ms =>
@ -217,6 +217,26 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
ms.markMeter(MetricRegistry.name(OPERATION_STATE, newState.toString.toLowerCase))
}
super.setState(newState)
if (eventEnabled) EventBus.post(KyuubiOperationEvent(this))
if (eventEnabled) EventBus.post(getOperationEvent)
}
def getOperationEvent: KyuubiOperationEvent = {
val kyuubiSession = session.asInstanceOf[KyuubiSession]
KyuubiOperationEvent(
statementId,
Option(remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull,
statement,
shouldRunAsync,
state.name(),
lastAccessTime,
createTime,
startTime,
completedTime,
Option(operationException),
kyuubiSession.handle.identifier.toString,
kyuubiSession.user,
kyuubiSession.sessionType.toString,
kyuubiSession.connectionUrl,
metrics)
}
}

View File

@ -22,7 +22,6 @@ import scala.collection.JavaConverters._
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.client.api.v1.dto
import org.apache.kyuubi.client.api.v1.dto.{OperationData, OperationProgress, ServerData, SessionData}
import org.apache.kyuubi.events.KyuubiOperationEvent
import org.apache.kyuubi.ha.client.ServiceNodeInfo
import org.apache.kyuubi.operation.KyuubiOperation
import org.apache.kyuubi.session.KyuubiSession
@ -86,7 +85,7 @@ object ApiUtils extends Logging {
}
def operationEvent(operation: KyuubiOperation): dto.KyuubiOperationEvent = {
val opEvent = KyuubiOperationEvent(operation)
val opEvent = operation.getOperationEvent
dto.KyuubiOperationEvent.builder()
.statementId(opEvent.statementId)
.remoteId(opEvent.remoteId)
@ -108,7 +107,7 @@ object ApiUtils extends Logging {
}
def operationData(operation: KyuubiOperation): OperationData = {
val opEvent = KyuubiOperationEvent(operation)
val opEvent = operation.getOperationEvent
new OperationData(
opEvent.statementId,
opEvent.remoteId,