[KYUUBI #6726] Support trino stage progress

# 🔍 Description
## Issue References 🔗

This pull request fixes https://github.com/apache/kyuubi/issues/6726

## Describe Your Solution 🔧

Add trino statement progress

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6759 from taylor12805/trino-progress.

Closes #6726

6646c9511 [taylor.fan] [KYUUBI #6726] update test case result
d84904e82 [taylor.fan] [KYUUBI #6726] reformat code
2b1c776e1 [taylor.fan] [KYUUBI #6726] reformat code
f635b38de [taylor.fan] [KYUUBI #6726] add test case
7c29ba6f3 [taylor.fan] [KYUUBI #6726] Support trino stage progress

Authored-by: taylor.fan <taylor.fan@vipshop.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
taylor.fan 2024-11-22 17:46:43 +08:00 committed by Kent Yao
parent cbbb0622bd
commit 160bf58042
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
5 changed files with 281 additions and 1 deletions

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.trino
import java.util
import scala.collection.JavaConverters._
import scala.collection.immutable.SortedMap
import io.trino.client.{StageStats, StatementClient}
import org.apache.kyuubi.engine.trino.TrinoProgressMonitor.{COLUMN_1_WIDTH, HEADERS}
import org.apache.kyuubi.engine.trino.operation.progress.{TrinoStage, TrinoStageProgress}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TJobExecutionStatus
class TrinoProgressMonitor(trino: StatementClient) {
private lazy val progressMap: Map[TrinoStage, TrinoStageProgress] = {
if (trino != null) {
val trinoStats = trino.getStats
val stageQueue = scala.collection.mutable.Queue[StageStats]()
val stages = scala.collection.mutable.ListBuffer[(TrinoStage, TrinoStageProgress)]()
val rootStage = trinoStats.getRootStage
if (rootStage != null) {
stageQueue.enqueue(rootStage)
}
while (stageQueue.nonEmpty) {
val stage = stageQueue.dequeue()
val stageId = stage.getStageId
val stageProgress = TrinoStageProgress(
stage.getState,
stage.getTotalSplits,
stage.getCompletedSplits,
stage.getRunningSplits,
stage.getFailedTasks)
stages.append((TrinoStage(stageId), stageProgress))
val subStages = asScalaBuffer(stage.getSubStages)
stageQueue.enqueue(subStages: _*)
}
SortedMap(stages: _*)
} else {
SortedMap()
}
}
def headers: util.List[String] = HEADERS
def rows: util.List[util.List[String]] = {
val progressRows = progressMap.map {
case (stage, progress) =>
val complete = progress.completedSplits
val total = progress.totalSplits
val running = progress.runningSplits
val failed = progress.failedTasks
val stageName = "Stage-" + stage.stageId
val nameWithProgress = getNameWithProgress(stageName, complete, total)
val pending = total - complete - running
util.Arrays.asList(
nameWithProgress,
progress.state,
String.valueOf(total),
String.valueOf(complete),
String.valueOf(running),
String.valueOf(pending),
String.valueOf(failed),
"")
}.toList.asJavaCollection
new util.ArrayList[util.List[String]](progressRows)
}
def footerSummary: String = {
"STAGES: %02d/%02d".format(getCompletedStages, progressMap.keySet.size)
}
def progressedPercentage: Double = {
if (trino != null && trino.getStats != null) {
val progressPercentage = trino.getStats.getProgressPercentage
progressPercentage.orElse(0.0d)
} else {
0.0d
}
}
def executionStatus: TJobExecutionStatus =
if (getCompletedStages == progressMap.keySet.size) {
TJobExecutionStatus.COMPLETE
} else {
TJobExecutionStatus.IN_PROGRESS
}
private def getNameWithProgress(s: String, complete: Int, total: Int): String = {
if (s == null) return ""
val percent =
if (total == 0) 1.0f
else complete.toFloat / total.toFloat
// lets use the remaining space in column 1 as progress bar
val spaceRemaining = COLUMN_1_WIDTH - s.length - 1
var trimmedVName = s
// if the vertex name is longer than column 1 width, trim it down
if (s.length > COLUMN_1_WIDTH) {
trimmedVName = s.substring(0, COLUMN_1_WIDTH - 2)
trimmedVName += ".."
} else trimmedVName += " "
val toFill = (spaceRemaining * percent).toInt
s"$trimmedVName${"." * toFill}"
}
private def getCompletedStages: Int = {
var completed = 0
progressMap.values.foreach { progress =>
val complete = progress.completedSplits
val total = progress.totalSplits
if (total > 0 && complete == total) completed += 1
}
completed
}
}
object TrinoProgressMonitor {
private val HEADERS: util.List[String] = util.Arrays.asList(
"STAGES",
"STATUS",
"TOTAL",
"COMPLETED",
"RUNNING",
"PENDING",
"FAILED",
"")
private val COLUMN_1_WIDTH = 16
}

View File

@ -41,6 +41,8 @@ class ExecuteStatement(
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
override protected def supportProgress: Boolean = true
override protected def beforeRun(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)
setState(OperationState.PENDING)

View File

@ -24,16 +24,19 @@ import io.trino.client.StatementClient
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf.SESSION_PROGRESS_ENABLE
import org.apache.kyuubi.engine.trino.TrinoContext
import org.apache.kyuubi.engine.trino.TrinoProgressMonitor
import org.apache.kyuubi.engine.trino.schema.{SchemaHelper, TrinoTRowSetGenerator}
import org.apache.kyuubi.engine.trino.session.TrinoSessionImpl
import org.apache.kyuubi.operation.AbstractOperation
import org.apache.kyuubi.operation.FetchIterator
import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation}
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.operation.OperationStatus
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TProgressUpdateResp}
abstract class TrinoOperation(session: Session) extends AbstractOperation(session) {
@ -45,6 +48,24 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio
protected var iter: FetchIterator[List[Any]] = _
protected def supportProgress: Boolean = false
private val progressEnable: Boolean = session.sessionManager.getConf.get(SESSION_PROGRESS_ENABLE)
override def getStatus: OperationStatus = {
if (progressEnable && supportProgress) {
val progressMonitor = new TrinoProgressMonitor(trino)
setOperationJobProgress(new TProgressUpdateResp(
progressMonitor.headers,
progressMonitor.rows,
progressMonitor.progressedPercentage,
progressMonitor.executionStatus,
progressMonitor.footerSummary,
startTime))
}
super.getStatus
}
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val tTableSchema = SchemaHelper.toTTableSchema(schema)
val resp = new TGetResultSetMetadataResp

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.trino.operation.progress
case class TrinoStage(stageId: String) extends Comparable[TrinoStage] {
override def compareTo(o: TrinoStage): Int = {
stageId.compareTo(o.stageId)
}
}
case class TrinoStageProgress(
state: String,
totalSplits: Int,
completedSplits: Int,
runningSplits: Int,
failedTasks: Int)

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.trino.operation
import scala.collection.JavaConverters._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TRINO_CONNECTION_CATALOG, SESSION_PROGRESS_ENABLE}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TExecuteStatementReq, TGetOperationStatusReq, TJobExecutionStatus}
class TrinoOperationProgressSuite extends TrinoOperationSuite {
override def withKyuubiConf: Map[String, String] = Map(
ENGINE_TRINO_CONNECTION_CATALOG.key -> "memory",
SESSION_PROGRESS_ENABLE.key -> "true")
test("get operation progress") {
val sql = "select * from (select item from (SELECT sequence(0, 100, 1) as t) as a " +
"CROSS JOIN UNNEST(t) AS temTable (item)) WHERE random() < 0.1"
withSessionHandle { (client, handle) =>
val req = new TExecuteStatementReq()
req.setStatement(sql)
req.setRunAsync(true)
req.setSessionHandle(handle)
val resp = client.ExecuteStatement(req)
eventually(Timeout(25.seconds)) {
val statusReq = new TGetOperationStatusReq(resp.getOperationHandle)
val statusResp = client.GetOperationStatus(statusReq)
val headers = statusResp.getProgressUpdateResponse.getHeaderNames
val progress = statusResp.getProgressUpdateResponse.getProgressedPercentage
val rows = statusResp.getProgressUpdateResponse.getRows
val footerSummary = statusResp.getProgressUpdateResponse.getFooterSummary
val status = statusResp.getProgressUpdateResponse.getStatus
assertResult(Seq(
"STAGES",
"STATUS",
"TOTAL",
"COMPLETED",
"RUNNING",
"PENDING",
"FAILED",
""))(headers.asScala)
assert(rows.size() == 1)
progress match {
case 100.0 =>
assertResult(Seq(
s"Stage-0 ........",
"FINISHED",
"3",
"3",
"0",
"0",
"0",
""))(
rows.get(0).asScala)
assert("STAGES: 01/01" === footerSummary)
assert(TJobExecutionStatus.COMPLETE === status)
}
}
}
}
}