[KYUUBI #4806][FLINK] Support time-out incremental result fetch for Flink engine

### _Why are the changes needed?_
As titled.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

Closes #5134 from link3280/KYUUBI-4806.

Closes #4806

a1b74783c [Paul Lin] Optimize code style
546cfdf5b [Paul Lin] Update externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
b6eb7af4f [Paul Lin] Update externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
1563fa98b [Paul Lin] Remove explicit StartRowOffset for Flink
4e61a348c [Paul Lin] Add comments
c93294650 [Paul Lin] Improve code style
6bd0c8e69 [Paul Lin] Use dedicated thread pool
15412db3a [Paul Lin] Improve logging
d6a2a9cff [Paul Lin] [KYUUBI #4806][FLINK] Implement incremental result fetching

Authored-by: Paul Lin <paullin3280@gmail.com>
Signed-off-by: Paul Lin <paullin3280@gmail.com>
This commit is contained in:
Paul Lin 2023-08-24 11:58:08 +08:00
parent d4d79b4716
commit e9ca8272b0
13 changed files with 292 additions and 90 deletions

View File

@ -411,6 +411,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.session.engine.alive.probe.interval | PT10S | The interval for engine alive probe. | duration | 1.6.0 |
| kyuubi.session.engine.alive.timeout | PT2M | The timeout for engine alive. If there is no alive probe success in the last timeout window, the engine will be marked as no-alive. | duration | 1.6.0 |
| kyuubi.session.engine.check.interval | PT1M | The check interval for engine timeout | duration | 1.0.0 |
| kyuubi.session.engine.flink.fetch.timeout | &lt;undefined&gt; | Result fetch timeout for Flink engine. If the timeout is reached, the result fetch would be stopped and the current fetched would be returned. If no data are fetched, a TimeoutException would be thrown. | duration | 1.8.0 |
| kyuubi.session.engine.flink.main.resource | &lt;undefined&gt; | The package used to create Flink SQL engine remote job. If it is undefined, Kyuubi will use the default | string | 1.4.0 |
| kyuubi.session.engine.flink.max.rows | 1000000 | Max rows of Flink query results. For batch queries, rows exceeding the limit would be ignored. For streaming queries, the query would be canceled if the limit is reached. | int | 1.5.0 |
| kyuubi.session.engine.hive.main.resource | &lt;undefined&gt; | The package used to create Hive engine remote job. If it is undefined, Kyuubi will use the default | string | 1.6.0 |

View File

@ -17,6 +17,8 @@
package org.apache.kyuubi.engine.flink.operation
import scala.concurrent.duration.Duration
import org.apache.flink.api.common.JobID
import org.apache.flink.table.gateway.api.operation.OperationHandle
@ -32,7 +34,8 @@ class ExecuteStatement(
override val statement: String,
override val shouldRunAsync: Boolean,
queryTimeout: Long,
resultMaxRows: Int)
resultMaxRows: Int,
resultFetchTimeout: Duration)
extends FlinkOperation(session) with Logging {
private val operationLog: OperationLog =
@ -48,10 +51,6 @@ class ExecuteStatement(
setHasResultSet(true)
}
override protected def afterRun(): Unit = {
OperationLog.removeCurrentOperationLog()
}
override protected def runInternal(): Unit = {
addTimeoutMonitor(queryTimeout)
executeStatement()
@ -64,7 +63,7 @@ class ExecuteStatement(
new OperationHandle(getHandle.identifier),
statement)
jobId = FlinkEngineUtils.getResultJobId(resultFetcher)
resultSet = ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows)
resultSet = ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows, resultFetchTimeout)
setState(OperationState.FINISHED)
} catch {
onError(cancel = true)

View File

@ -19,12 +19,15 @@ package org.apache.kyuubi.engine.flink.operation
import java.io.IOException
import java.time.ZoneId
import java.util.concurrent.TimeoutException
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import scala.collection.mutable.ListBuffer
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.gateway.service.context.SessionContext
import org.apache.flink.table.gateway.service.operation.OperationExecutor
import org.apache.flink.types.Row
import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TTableSchema}
import org.apache.kyuubi.{KyuubiSQLException, Utils}
@ -72,6 +75,10 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
override def close(): Unit = {
cleanup(OperationState.CLOSED)
// the result set may be null if the operation ends exceptionally
if (resultSet != null) {
resultSet.close
}
try {
getOperationLog.foreach(_.close())
} catch {
@ -98,25 +105,42 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
assertState(OperationState.FINISHED)
setHasResultSet(true)
order match {
case FETCH_NEXT => resultSet.getData.fetchNext()
case FETCH_PRIOR => resultSet.getData.fetchPrior(rowSetSize);
case FETCH_FIRST => resultSet.getData.fetchAbsolute(0);
case FETCH_NEXT => // ignored because new data are fetched lazily
}
val batch = new ListBuffer[Row]
try {
// there could be null values at the end of the batch
// because Flink could return an EOS
var rows = 0
while (resultSet.getData.hasNext && rows < rowSetSize) {
Option(resultSet.getData.next()).foreach { r => batch += r; rows += 1 }
}
} catch {
case e: TimeoutException =>
// ignore and return the current batch if there's some data
// otherwise, rethrow the timeout exception
if (batch.nonEmpty) {
debug(s"Timeout fetching more data for $opType operation. " +
s"Returning the current fetched data.")
} else {
throw e
}
}
val token = resultSet.getData.take(rowSetSize)
val timeZone = Option(flinkSession.getSessionConfig.get("table.local-time-zone"))
val zoneId = timeZone match {
case Some(tz) => ZoneId.of(tz)
case None => ZoneId.systemDefault()
}
val resultRowSet = RowSet.resultSetToTRowSet(
token.toList,
batch.toList,
resultSet,
zoneId,
getProtocolVersion)
resultRowSet.setStartRowOffset(resultSet.getData.getPosition)
val resp = new TFetchResultsResp(OK_STATUS)
resp.setResults(resultRowSet)
resp.setHasMoreRows(false)
resp.setHasMoreRows(resultSet.getData.hasNext)
resp
}

View File

@ -20,6 +20,8 @@ package org.apache.kyuubi.engine.flink.operation
import java.util
import scala.collection.JavaConverters._
import scala.concurrent.duration.{Duration, DurationLong}
import scala.language.postfixOps
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf._
@ -66,14 +68,31 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
flinkSession.normalizedConf.getOrElse(
ENGINE_FLINK_MAX_ROWS.key,
resultMaxRowsDefault.toString).toInt
val resultFetchTimeout =
flinkSession.normalizedConf.get(ENGINE_FLINK_FETCH_TIMEOUT.key).map(_.toLong milliseconds)
.getOrElse(Duration.Inf)
val op = mode match {
case NoneMode =>
// FLINK-24427 seals calcite classes which required to access in async mode, considering
// there is no much benefit in async mode, here we just ignore `runAsync` and always run
// statement in sync mode as a workaround
new ExecuteStatement(session, statement, false, queryTimeout, resultMaxRows)
new ExecuteStatement(
session,
statement,
false,
queryTimeout,
resultMaxRows,
resultFetchTimeout)
case mode =>
new PlanOnlyStatement(session, statement, mode)
new PlanOnlyStatement(
session,
statement,
mode,
queryTimeout,
resultMaxRows,
resultFetchTimeout)
}
addOperation(op)
}

View File

@ -17,6 +17,8 @@
package org.apache.kyuubi.engine.flink.operation
import scala.concurrent.duration.Duration
import com.google.common.base.Preconditions
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.gateway.api.operation.OperationHandle
@ -34,7 +36,10 @@ import org.apache.kyuubi.session.Session
class PlanOnlyStatement(
session: Session,
override val statement: String,
mode: PlanOnlyMode) extends FlinkOperation(session) {
mode: PlanOnlyMode,
queryTimeout: Long,
resultMaxRows: Int,
resultFetchTimeout: Duration) extends FlinkOperation(session) {
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
private val lineSeparator: String = System.lineSeparator()
@ -46,6 +51,7 @@ class PlanOnlyStatement(
}
override protected def runInternal(): Unit = {
addTimeoutMonitor(queryTimeout)
try {
val operations = executor.getTableEnvironment.getParser.parse(statement)
Preconditions.checkArgument(
@ -59,7 +65,8 @@ class PlanOnlyStatement(
val resultFetcher = executor.executeStatement(
new OperationHandle(getHandle.identifier),
statement)
resultSet = ResultSetUtil.fromResultFetcher(resultFetcher);
resultSet =
ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows, resultFetchTimeout);
case _ => explainOperation(statement)
}
} catch {

View File

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.result
import java.util
import java.util.concurrent.Executors
import scala.collection.convert.ImplicitConversions._
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration.Duration
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.catalog.ResolvedSchema
import org.apache.flink.table.data.RowData
import org.apache.flink.table.data.conversion.DataStructureConverters
import org.apache.flink.table.gateway.service.result.ResultFetcher
import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row
import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
import org.apache.kyuubi.operation.FetchIterator
class QueryResultFetchIterator(
resultFetcher: ResultFetcher,
maxRows: Int = 1000000,
resultFetchTimeout: Duration = Duration.Inf) extends FetchIterator[Row] with Logging {
val schema: ResolvedSchema = resultFetcher.getResultSchema
val dataTypes: util.List[DataType] = schema.getColumnDataTypes
var token: Long = 0
var pos: Long = 0
var fetchStart: Long = 0
var bufferedRows: Array[Row] = new Array[Row](0)
var hasNext: Boolean = true
val FETCH_INTERVAL_MS: Long = 1000
private val executor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("flink-query-iterator-%d").setDaemon(true).build)
implicit private val executionContext: ExecutionContextExecutor =
ExecutionContext.fromExecutor(executor)
/**
* Begin a fetch block, forward from the current position.
*
* Throws TimeoutException if no data is fetched within the timeout.
*/
override def fetchNext(): Unit = {
if (!hasNext) {
return
}
val future = Future(() -> {
var fetched = false
// if no timeout is set, this would block until some rows are fetched
debug(s"Fetching from result store with timeout $resultFetchTimeout ms")
while (!fetched && !Thread.interrupted()) {
val rs = resultFetcher.fetchResults(token, maxRows - bufferedRows.length)
val flinkRs = new FlinkResultSet(rs)
// TODO: replace string-based match when Flink 1.16 support is dropped
flinkRs.getResultType.name() match {
case "EOS" =>
debug("EOS received, no more data to fetch.")
fetched = true
hasNext = false
case "NOT_READY" =>
// if flink jobs are not ready, continue to retry
debug("Result not ready, retrying...")
case "PAYLOAD" =>
val fetchedData = flinkRs.getData
// if no data fetched, continue to retry
if (!fetchedData.isEmpty) {
debug(s"Fetched ${fetchedData.length} rows from result store.")
fetched = true
bufferedRows ++= fetchedData.map(rd => convertToRow(rd, dataTypes.toList))
fetchStart = pos
} else {
debug("No data fetched, retrying...")
}
case _ =>
throw new RuntimeException(s"Unexpected result type: ${flinkRs.getResultType}")
}
if (hasNext) {
val nextToken = flinkRs.getNextToken
if (nextToken == null) {
hasNext = false
} else {
token = nextToken
}
}
Thread.sleep(FETCH_INTERVAL_MS)
}
})
Await.result(future, resultFetchTimeout)
}
/**
* Begin a fetch block, moving the iterator to the given position.
* Resets the fetch start offset.
*
* @param pos index to move a position of iterator.
*/
override def fetchAbsolute(pos: Long): Unit = {
val effectivePos = Math.max(pos, 0)
if (effectivePos < bufferedRows.length) {
this.fetchStart = effectivePos
return
}
throw new IllegalArgumentException(s"Cannot skip to an unreachable position $effectivePos.")
}
override def getFetchStart: Long = fetchStart
override def getPosition: Long = pos
/**
* @return returns row if any and null if no more rows can be fetched.
*/
override def next(): Row = {
if (pos < bufferedRows.length) {
debug(s"Fetching from buffered rows at pos $pos.")
val row = bufferedRows(pos.toInt)
pos += 1
if (pos >= maxRows) {
hasNext = false
}
row
} else {
// block until some rows are fetched or TimeoutException is thrown
fetchNext()
if (hasNext) {
val row = bufferedRows(pos.toInt)
pos += 1
if (pos >= maxRows) {
hasNext = false
}
row
} else {
null
}
}
}
def close(): Unit = {
resultFetcher.close()
executor.shutdown()
}
private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row = {
val converter = DataStructureConverters.getConverter(DataTypes.ROW(dataTypes: _*))
converter.toExternal(r).asInstanceOf[Row]
}
}

View File

@ -50,6 +50,13 @@ case class ResultSet(
def getColumns: util.List[Column] = columns
def getData: FetchIterator[Row] = data
def close: Unit = {
data match {
case queryIte: QueryResultFetchIterator => queryIte.close()
case _ =>
}
}
}
/**

View File

@ -17,25 +17,17 @@
package org.apache.kyuubi.engine.flink.result
import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.Duration
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.catalog.Column
import org.apache.flink.table.data.RowData
import org.apache.flink.table.data.conversion.DataStructureConverters
import org.apache.flink.table.gateway.service.result.ResultFetcher
import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row
import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
/** Utility object for building ResultSet. */
object ResultSetUtil {
private val FETCH_ROWS_PER_SECOND = 1000
/**
* Build a ResultSet with a column name and a list of String values.
*
@ -66,63 +58,19 @@ object ResultSetUtil {
.data(Array[Row](Row.of("OK")))
.build
def fromResultFetcher(resultFetcher: ResultFetcher, maxRows: Int): ResultSet = {
def fromResultFetcher(
resultFetcher: ResultFetcher,
maxRows: Int,
resultFetchTimeout: Duration): ResultSet = {
if (maxRows <= 0) {
throw new IllegalArgumentException("maxRows should be positive")
}
val schema = resultFetcher.getResultSchema
val resultRowData = ListBuffer.newBuilder[RowData]
var fetched: FlinkResultSet = null
var token: Long = 0
var rowNum: Int = 0
do {
fetched = new FlinkResultSet(resultFetcher.fetchResults(token, FETCH_ROWS_PER_SECOND))
val data = fetched.getData
val slice = data.slice(0, maxRows - rowNum)
resultRowData ++= slice
rowNum += slice.size
token = fetched.getNextToken
try Thread.sleep(1000L)
catch {
case _: InterruptedException => fetched.getNextToken == null
}
} while (
fetched.getNextToken != null &&
rowNum < maxRows &&
fetched.getResultType != org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS
)
val dataTypes = resultFetcher.getResultSchema.getColumnDataTypes
val ite = new QueryResultFetchIterator(resultFetcher, maxRows, resultFetchTimeout)
ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(schema.getColumns)
.data(resultRowData.result().map(rd => convertToRow(rd, dataTypes.toList)).toArray)
.data(ite)
.build
}
def fromResultFetcher(resultFetcher: ResultFetcher): ResultSet = {
val schema = resultFetcher.getResultSchema
val resultRowData = ListBuffer.newBuilder[RowData]
var fetched: FlinkResultSet = null
var token: Long = 0
do {
fetched = new FlinkResultSet(resultFetcher.fetchResults(token, FETCH_ROWS_PER_SECOND))
resultRowData ++= fetched.getData
token = fetched.getNextToken
try Thread.sleep(1000L)
catch {
case _: InterruptedException =>
}
} while (
fetched.getNextToken != null &&
fetched.getResultType != org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS
)
val dataTypes = resultFetcher.getResultSchema.getColumnDataTypes
ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(schema.getColumns)
.data(resultRowData.result().map(rd => convertToRow(rd, dataTypes.toList)).toArray)
.build
}
private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row = {
val converter = DataStructureConverters.getConverter(DataTypes.ROW(dataTypes: _*))
converter.toExternal(r).asInstanceOf[Row]
}
}

View File

@ -77,9 +77,10 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
val fSession = super.getSessionOption(sessionHandle)
fSession.foreach(s =>
sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle))
super.closeSession(sessionHandle)
sessionManager.closeSession(
new org.apache.flink.table.gateway.api.session.SessionHandle(sessionHandle.identifier))
}
override def stop(): Unit = synchronized {

View File

@ -34,7 +34,7 @@ import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
import org.apache.kyuubi.engine.flink.WithFlinkTestResources
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
import org.apache.kyuubi.jdbc.hive.{KyuubiSQLException, KyuubiStatement}
import org.apache.kyuubi.jdbc.hive.common.TimestampTZ
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
@ -676,12 +676,10 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
assert(stopResult1.next())
assert(stopResult1.getString(1) === "OK")
val selectResult = statement.executeQuery("select * from tbl_a")
val jobId2 = statement.asInstanceOf[KyuubiStatement].getQueryId
assert(jobId2 !== null)
while (!selectResult.next()) {
Thread.sleep(1000L)
}
val insertResult2 = statement.executeQuery("insert into tbl_b select * from tbl_a")
assert(insertResult2.next())
val jobId2 = insertResult2.getString(1)
val stopResult2 = statement.executeQuery(s"stop job '$jobId2' with savepoint")
assert(stopResult2.getMetaData.getColumnName(1).equals("savepoint path"))
assert(stopResult2.next())
@ -1252,4 +1250,17 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
}
}
}
test("test result fetch timeout") {
val exception = intercept[KyuubiSQLException](
withSessionConf()(Map(ENGINE_FLINK_FETCH_TIMEOUT.key -> "60000"))() {
withJdbcStatement("tbl_a") { stmt =>
stmt.executeQuery("create table tbl_a (a int) " +
"with ('connector' = 'datagen', 'rows-per-second'='0')")
val resultSet = stmt.executeQuery("select * from tbl_a")
while (resultSet.next()) {}
}
})
assert(exception.getMessage === "Futures timed out after [60000 milliseconds]")
}
}

View File

@ -31,7 +31,7 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster
override val conf: KyuubiConf = KyuubiConf()
.set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
.set(ENGINE_TYPE, "FLINK_SQL")
.set("flink.parallelism.default", "6")
.set("flink.parallelism.default", "2")
override protected def jdbcUrl: String = getJdbcUrl
@ -72,7 +72,7 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster
var success = false
while (resultSet.next() && !success) {
if (resultSet.getString(1) == "parallelism.default" &&
resultSet.getString(2) == "6") {
resultSet.getString(2) == "2") {
success = true
}
}

View File

@ -40,7 +40,7 @@ class FlinkOperationSuiteOnYarn extends WithKyuubiServerAndYarnMiniCluster
.set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
.set(ENGINE_TYPE, "FLINK_SQL")
.set("flink.execution.target", "yarn-application")
.set("flink.parallelism.default", "6")
.set("flink.parallelism.default", "2")
super.beforeAll()
}
@ -81,7 +81,7 @@ class FlinkOperationSuiteOnYarn extends WithKyuubiServerAndYarnMiniCluster
var success = false
while (resultSet.next() && !success) {
if (resultSet.getString(1) == "parallelism.default" &&
resultSet.getString(2) == "6") {
resultSet.getString(2) == "2") {
success = true
}
}

View File

@ -1319,6 +1319,15 @@ object KyuubiConf {
.intConf
.createWithDefault(1000000)
val ENGINE_FLINK_FETCH_TIMEOUT: OptionalConfigEntry[Long] =
buildConf("kyuubi.session.engine.flink.fetch.timeout")
.doc("Result fetch timeout for Flink engine. If the timeout is reached, the result " +
"fetch would be stopped and the current fetched would be returned. If no data are " +
"fetched, a TimeoutException would be thrown.")
.version("1.8.0")
.timeConf
.createOptional
val ENGINE_TRINO_MAIN_RESOURCE: OptionalConfigEntry[String] =
buildConf("kyuubi.session.engine.trino.main.resource")
.doc("The package used to create Trino engine remote job. If it is undefined," +