[KYUUBI #6626] Fix operation never expired issue with periodical listOperations api calls
# 🔍 Description
## Issue References 🔗
This pull request fixes operation never expired issue.
I found that, some Kyuubi operations never expired.
After investigation, the root cause:
1. our kyuubi console lookup the operations periodically
2. it retrieves the kyuubi operation events
3. the KyuubiOperationEvent::apply will get the operation status and update the operation the last access time, https://github.com/apache/kyuubi/pull/2452
4. then the operation never expired.
## Describe Your Solution 🔧
Just get the operation event without updating the operation last access time.
## Types of changes 🔖
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)
## Test Plan 🧪
#### Behavior Without This Pull Request ⚰️
#### Behavior With This Pull Request 🎉
#### Related Unit Tests
---
# Checklist 📝
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6626 from turboFei/operation_not_idle.
Closes #6626
14e6898da [Wang, Fei] Get operation event
Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
(cherry picked from commit 6e5162e8a7)
Signed-off-by: Wang, Fei <fwang12@ebay.com>
This commit is contained in:
parent
f509c22e53
commit
8ba7781150
@ -18,8 +18,6 @@
|
||||
package org.apache.kyuubi.events
|
||||
|
||||
import org.apache.kyuubi.Utils
|
||||
import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle}
|
||||
import org.apache.kyuubi.session.KyuubiSession
|
||||
|
||||
/**
|
||||
* A [[KyuubiOperationEvent]] used to tracker the lifecycle of an operation at server side.
|
||||
@ -45,7 +43,7 @@ import org.apache.kyuubi.session.KyuubiSession
|
||||
* @param kyuubiInstance the parent session connection url
|
||||
* @param metrics the operation metrics
|
||||
*/
|
||||
case class KyuubiOperationEvent private (
|
||||
case class KyuubiOperationEvent(
|
||||
statementId: String,
|
||||
remoteId: String,
|
||||
statement: String,
|
||||
@ -67,30 +65,3 @@ case class KyuubiOperationEvent private (
|
||||
override def partitions: Seq[(String, String)] =
|
||||
("day", Utils.getDateFromTimestamp(createTime)) :: Nil
|
||||
}
|
||||
|
||||
object KyuubiOperationEvent {
|
||||
|
||||
/**
|
||||
* Shorthand for instantiating a operation event with a [[KyuubiOperation]] instance
|
||||
*/
|
||||
def apply(operation: KyuubiOperation): KyuubiOperationEvent = {
|
||||
val session = operation.getSession.asInstanceOf[KyuubiSession]
|
||||
val status = operation.getStatus
|
||||
new KyuubiOperationEvent(
|
||||
operation.statementId,
|
||||
Option(operation.remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull,
|
||||
operation.statement,
|
||||
operation.shouldRunAsync,
|
||||
status.state.name(),
|
||||
status.lastModified,
|
||||
status.create,
|
||||
status.start,
|
||||
status.completed,
|
||||
status.exception,
|
||||
session.handle.identifier.toString,
|
||||
session.user,
|
||||
session.sessionType.toString,
|
||||
session.connectionUrl,
|
||||
operation.metrics)
|
||||
}
|
||||
}
|
||||
|
||||
@ -206,7 +206,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
|
||||
|
||||
protected def eventEnabled: Boolean = false
|
||||
|
||||
if (eventEnabled) EventBus.post(KyuubiOperationEvent(this))
|
||||
if (eventEnabled) EventBus.post(getOperationEvent)
|
||||
|
||||
override def setState(newState: OperationState): Unit = {
|
||||
MetricsSystem.tracing { ms =>
|
||||
@ -217,6 +217,26 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
|
||||
ms.markMeter(MetricRegistry.name(OPERATION_STATE, newState.toString.toLowerCase))
|
||||
}
|
||||
super.setState(newState)
|
||||
if (eventEnabled) EventBus.post(KyuubiOperationEvent(this))
|
||||
if (eventEnabled) EventBus.post(getOperationEvent)
|
||||
}
|
||||
|
||||
def getOperationEvent: KyuubiOperationEvent = {
|
||||
val kyuubiSession = session.asInstanceOf[KyuubiSession]
|
||||
KyuubiOperationEvent(
|
||||
statementId,
|
||||
Option(remoteOpHandle()).map(OperationHandle(_).identifier.toString).orNull,
|
||||
statement,
|
||||
shouldRunAsync,
|
||||
state.name(),
|
||||
lastAccessTime,
|
||||
createTime,
|
||||
startTime,
|
||||
completedTime,
|
||||
Option(operationException),
|
||||
kyuubiSession.handle.identifier.toString,
|
||||
kyuubiSession.user,
|
||||
kyuubiSession.sessionType.toString,
|
||||
kyuubiSession.connectionUrl,
|
||||
metrics)
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,7 +22,6 @@ import scala.collection.JavaConverters._
|
||||
import org.apache.kyuubi.{Logging, Utils}
|
||||
import org.apache.kyuubi.client.api.v1.dto
|
||||
import org.apache.kyuubi.client.api.v1.dto.{OperationData, OperationProgress, ServerData, SessionData}
|
||||
import org.apache.kyuubi.events.KyuubiOperationEvent
|
||||
import org.apache.kyuubi.ha.client.ServiceNodeInfo
|
||||
import org.apache.kyuubi.operation.KyuubiOperation
|
||||
import org.apache.kyuubi.session.KyuubiSession
|
||||
@ -80,7 +79,7 @@ object ApiUtils extends Logging {
|
||||
}
|
||||
|
||||
def operationEvent(operation: KyuubiOperation): dto.KyuubiOperationEvent = {
|
||||
val opEvent = KyuubiOperationEvent(operation)
|
||||
val opEvent = operation.getOperationEvent
|
||||
dto.KyuubiOperationEvent.builder()
|
||||
.statementId(opEvent.statementId)
|
||||
.remoteId(opEvent.remoteId)
|
||||
@ -102,7 +101,7 @@ object ApiUtils extends Logging {
|
||||
}
|
||||
|
||||
def operationData(operation: KyuubiOperation): OperationData = {
|
||||
val opEvent = KyuubiOperationEvent(operation)
|
||||
val opEvent = operation.getOperationEvent
|
||||
new OperationData(
|
||||
opEvent.statementId,
|
||||
opEvent.remoteId,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user