From e9ca8272b07e9cc48292c21cb6b035a9381b2c93 Mon Sep 17 00:00:00 2001 From: Paul Lin Date: Thu, 24 Aug 2023 11:58:08 +0800 Subject: [PATCH] [KYUUBI #4806][FLINK] Support time-out incremental result fetch for Flink engine ### _Why are the changes needed?_ As titled. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes #5134 from link3280/KYUUBI-4806. Closes #4806 a1b74783c [Paul Lin] Optimize code style 546cfdf5b [Paul Lin] Update externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b6eb7af4f [Paul Lin] Update externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala 1563fa98b [Paul Lin] Remove explicit StartRowOffset for Flink 4e61a348c [Paul Lin] Add comments c93294650 [Paul Lin] Improve code style 6bd0c8e69 [Paul Lin] Use dedicated thread pool 15412db3a [Paul Lin] Improve logging d6a2a9cff [Paul Lin] [KYUUBI #4806][FLINK] Implement incremental result fetching Authored-by: Paul Lin Signed-off-by: Paul Lin --- docs/configuration/settings.md | 1 + .../flink/operation/ExecuteStatement.scala | 11 +- .../flink/operation/FlinkOperation.scala | 34 +++- .../operation/FlinkSQLOperationManager.scala | 23 ++- .../flink/operation/PlanOnlyStatement.scala | 11 +- .../result/QueryResultFetchIterator.scala | 176 ++++++++++++++++++ .../engine/flink/result/ResultSet.scala | 7 + .../engine/flink/result/ResultSetUtil.scala | 72 +------ .../session/FlinkSQLSessionManager.scala | 5 +- .../flink/operation/FlinkOperationSuite.scala | 25 ++- .../flink/operation/FlinkOperationSuite.scala | 4 +- .../operation/FlinkOperationSuiteOnYarn.scala | 4 +- .../org/apache/kyuubi/config/KyuubiConf.scala | 9 + 13 files changed, 292 insertions(+), 90 deletions(-) create mode 100644 externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index 928ff0ab8..ddb8546c2 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -411,6 +411,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.session.engine.alive.probe.interval | PT10S | The interval for engine alive probe. | duration | 1.6.0 | | kyuubi.session.engine.alive.timeout | PT2M | The timeout for engine alive. If there is no alive probe success in the last timeout window, the engine will be marked as no-alive. | duration | 1.6.0 | | kyuubi.session.engine.check.interval | PT1M | The check interval for engine timeout | duration | 1.0.0 | +| kyuubi.session.engine.flink.fetch.timeout | <undefined> | Result fetch timeout for Flink engine. If the timeout is reached, the result fetch would be stopped and the current fetched would be returned. If no data are fetched, a TimeoutException would be thrown. | duration | 1.8.0 | | kyuubi.session.engine.flink.main.resource | <undefined> | The package used to create Flink SQL engine remote job. If it is undefined, Kyuubi will use the default | string | 1.4.0 | | kyuubi.session.engine.flink.max.rows | 1000000 | Max rows of Flink query results. For batch queries, rows exceeding the limit would be ignored. For streaming queries, the query would be canceled if the limit is reached. | int | 1.5.0 | | kyuubi.session.engine.hive.main.resource | <undefined> | The package used to create Hive engine remote job. If it is undefined, Kyuubi will use the default | string | 1.6.0 | diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala index 4042756b6..0e0c476e2 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala @@ -17,6 +17,8 @@ package org.apache.kyuubi.engine.flink.operation +import scala.concurrent.duration.Duration + import org.apache.flink.api.common.JobID import org.apache.flink.table.gateway.api.operation.OperationHandle @@ -32,7 +34,8 @@ class ExecuteStatement( override val statement: String, override val shouldRunAsync: Boolean, queryTimeout: Long, - resultMaxRows: Int) + resultMaxRows: Int, + resultFetchTimeout: Duration) extends FlinkOperation(session) with Logging { private val operationLog: OperationLog = @@ -48,10 +51,6 @@ class ExecuteStatement( setHasResultSet(true) } - override protected def afterRun(): Unit = { - OperationLog.removeCurrentOperationLog() - } - override protected def runInternal(): Unit = { addTimeoutMonitor(queryTimeout) executeStatement() @@ -64,7 +63,7 @@ class ExecuteStatement( new OperationHandle(getHandle.identifier), statement) jobId = FlinkEngineUtils.getResultJobId(resultFetcher) - resultSet = ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows) + resultSet = ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows, resultFetchTimeout) setState(OperationState.FINISHED) } catch { onError(cancel = true) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala index 5a79d2c0e..1424b721c 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala @@ -19,12 +19,15 @@ package org.apache.kyuubi.engine.flink.operation import java.io.IOException import java.time.ZoneId +import java.util.concurrent.TimeoutException import scala.collection.JavaConverters.collectionAsScalaIterableConverter +import scala.collection.mutable.ListBuffer import org.apache.flink.configuration.Configuration import org.apache.flink.table.gateway.service.context.SessionContext import org.apache.flink.table.gateway.service.operation.OperationExecutor +import org.apache.flink.types.Row import org.apache.hive.service.rpc.thrift.{TFetchResultsResp, TGetResultSetMetadataResp, TTableSchema} import org.apache.kyuubi.{KyuubiSQLException, Utils} @@ -72,6 +75,10 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio override def close(): Unit = { cleanup(OperationState.CLOSED) + // the result set may be null if the operation ends exceptionally + if (resultSet != null) { + resultSet.close + } try { getOperationLog.foreach(_.close()) } catch { @@ -98,25 +105,42 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio assertState(OperationState.FINISHED) setHasResultSet(true) order match { - case FETCH_NEXT => resultSet.getData.fetchNext() case FETCH_PRIOR => resultSet.getData.fetchPrior(rowSetSize); case FETCH_FIRST => resultSet.getData.fetchAbsolute(0); + case FETCH_NEXT => // ignored because new data are fetched lazily + } + val batch = new ListBuffer[Row] + try { + // there could be null values at the end of the batch + // because Flink could return an EOS + var rows = 0 + while (resultSet.getData.hasNext && rows < rowSetSize) { + Option(resultSet.getData.next()).foreach { r => batch += r; rows += 1 } + } + } catch { + case e: TimeoutException => + // ignore and return the current batch if there's some data + // otherwise, rethrow the timeout exception + if (batch.nonEmpty) { + debug(s"Timeout fetching more data for $opType operation. " + + s"Returning the current fetched data.") + } else { + throw e + } } - val token = resultSet.getData.take(rowSetSize) val timeZone = Option(flinkSession.getSessionConfig.get("table.local-time-zone")) val zoneId = timeZone match { case Some(tz) => ZoneId.of(tz) case None => ZoneId.systemDefault() } val resultRowSet = RowSet.resultSetToTRowSet( - token.toList, + batch.toList, resultSet, zoneId, getProtocolVersion) - resultRowSet.setStartRowOffset(resultSet.getData.getPosition) val resp = new TFetchResultsResp(OK_STATUS) resp.setResults(resultRowSet) - resp.setHasMoreRows(false) + resp.setHasMoreRows(resultSet.getData.hasNext) resp } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala index 712c13596..d5c0629ee 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala @@ -20,6 +20,8 @@ package org.apache.kyuubi.engine.flink.operation import java.util import scala.collection.JavaConverters._ +import scala.concurrent.duration.{Duration, DurationLong} +import scala.language.postfixOps import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf._ @@ -66,14 +68,31 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage flinkSession.normalizedConf.getOrElse( ENGINE_FLINK_MAX_ROWS.key, resultMaxRowsDefault.toString).toInt + + val resultFetchTimeout = + flinkSession.normalizedConf.get(ENGINE_FLINK_FETCH_TIMEOUT.key).map(_.toLong milliseconds) + .getOrElse(Duration.Inf) + val op = mode match { case NoneMode => // FLINK-24427 seals calcite classes which required to access in async mode, considering // there is no much benefit in async mode, here we just ignore `runAsync` and always run // statement in sync mode as a workaround - new ExecuteStatement(session, statement, false, queryTimeout, resultMaxRows) + new ExecuteStatement( + session, + statement, + false, + queryTimeout, + resultMaxRows, + resultFetchTimeout) case mode => - new PlanOnlyStatement(session, statement, mode) + new PlanOnlyStatement( + session, + statement, + mode, + queryTimeout, + resultMaxRows, + resultFetchTimeout) } addOperation(op) } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala index 4f5d8218f..1284bfd73 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala @@ -17,6 +17,8 @@ package org.apache.kyuubi.engine.flink.operation +import scala.concurrent.duration.Duration + import com.google.common.base.Preconditions import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.gateway.api.operation.OperationHandle @@ -34,7 +36,10 @@ import org.apache.kyuubi.session.Session class PlanOnlyStatement( session: Session, override val statement: String, - mode: PlanOnlyMode) extends FlinkOperation(session) { + mode: PlanOnlyMode, + queryTimeout: Long, + resultMaxRows: Int, + resultFetchTimeout: Duration) extends FlinkOperation(session) { private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle) private val lineSeparator: String = System.lineSeparator() @@ -46,6 +51,7 @@ class PlanOnlyStatement( } override protected def runInternal(): Unit = { + addTimeoutMonitor(queryTimeout) try { val operations = executor.getTableEnvironment.getParser.parse(statement) Preconditions.checkArgument( @@ -59,7 +65,8 @@ class PlanOnlyStatement( val resultFetcher = executor.executeStatement( new OperationHandle(getHandle.identifier), statement) - resultSet = ResultSetUtil.fromResultFetcher(resultFetcher); + resultSet = + ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows, resultFetchTimeout); case _ => explainOperation(statement) } } catch { diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala new file mode 100644 index 000000000..60ae08d9d --- /dev/null +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/QueryResultFetchIterator.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.flink.result + +import java.util +import java.util.concurrent.Executors + +import scala.collection.convert.ImplicitConversions._ +import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future} +import scala.concurrent.duration.Duration + +import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.flink.table.api.DataTypes +import org.apache.flink.table.catalog.ResolvedSchema +import org.apache.flink.table.data.RowData +import org.apache.flink.table.data.conversion.DataStructureConverters +import org.apache.flink.table.gateway.service.result.ResultFetcher +import org.apache.flink.table.types.DataType +import org.apache.flink.types.Row + +import org.apache.kyuubi.Logging +import org.apache.kyuubi.engine.flink.shim.FlinkResultSet +import org.apache.kyuubi.operation.FetchIterator + +class QueryResultFetchIterator( + resultFetcher: ResultFetcher, + maxRows: Int = 1000000, + resultFetchTimeout: Duration = Duration.Inf) extends FetchIterator[Row] with Logging { + + val schema: ResolvedSchema = resultFetcher.getResultSchema + + val dataTypes: util.List[DataType] = schema.getColumnDataTypes + + var token: Long = 0 + + var pos: Long = 0 + + var fetchStart: Long = 0 + + var bufferedRows: Array[Row] = new Array[Row](0) + + var hasNext: Boolean = true + + val FETCH_INTERVAL_MS: Long = 1000 + + private val executor = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("flink-query-iterator-%d").setDaemon(true).build) + + implicit private val executionContext: ExecutionContextExecutor = + ExecutionContext.fromExecutor(executor) + + /** + * Begin a fetch block, forward from the current position. + * + * Throws TimeoutException if no data is fetched within the timeout. + */ + override def fetchNext(): Unit = { + if (!hasNext) { + return + } + val future = Future(() -> { + var fetched = false + // if no timeout is set, this would block until some rows are fetched + debug(s"Fetching from result store with timeout $resultFetchTimeout ms") + while (!fetched && !Thread.interrupted()) { + val rs = resultFetcher.fetchResults(token, maxRows - bufferedRows.length) + val flinkRs = new FlinkResultSet(rs) + // TODO: replace string-based match when Flink 1.16 support is dropped + flinkRs.getResultType.name() match { + case "EOS" => + debug("EOS received, no more data to fetch.") + fetched = true + hasNext = false + case "NOT_READY" => + // if flink jobs are not ready, continue to retry + debug("Result not ready, retrying...") + case "PAYLOAD" => + val fetchedData = flinkRs.getData + // if no data fetched, continue to retry + if (!fetchedData.isEmpty) { + debug(s"Fetched ${fetchedData.length} rows from result store.") + fetched = true + bufferedRows ++= fetchedData.map(rd => convertToRow(rd, dataTypes.toList)) + fetchStart = pos + } else { + debug("No data fetched, retrying...") + } + case _ => + throw new RuntimeException(s"Unexpected result type: ${flinkRs.getResultType}") + } + if (hasNext) { + val nextToken = flinkRs.getNextToken + if (nextToken == null) { + hasNext = false + } else { + token = nextToken + } + } + Thread.sleep(FETCH_INTERVAL_MS) + } + }) + Await.result(future, resultFetchTimeout) + } + + /** + * Begin a fetch block, moving the iterator to the given position. + * Resets the fetch start offset. + * + * @param pos index to move a position of iterator. + */ + override def fetchAbsolute(pos: Long): Unit = { + val effectivePos = Math.max(pos, 0) + if (effectivePos < bufferedRows.length) { + this.fetchStart = effectivePos + return + } + throw new IllegalArgumentException(s"Cannot skip to an unreachable position $effectivePos.") + } + + override def getFetchStart: Long = fetchStart + + override def getPosition: Long = pos + + /** + * @return returns row if any and null if no more rows can be fetched. + */ + override def next(): Row = { + if (pos < bufferedRows.length) { + debug(s"Fetching from buffered rows at pos $pos.") + val row = bufferedRows(pos.toInt) + pos += 1 + if (pos >= maxRows) { + hasNext = false + } + row + } else { + // block until some rows are fetched or TimeoutException is thrown + fetchNext() + if (hasNext) { + val row = bufferedRows(pos.toInt) + pos += 1 + if (pos >= maxRows) { + hasNext = false + } + row + } else { + null + } + } + } + + def close(): Unit = { + resultFetcher.close() + executor.shutdown() + } + + private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row = { + val converter = DataStructureConverters.getConverter(DataTypes.ROW(dataTypes: _*)) + converter.toExternal(r).asInstanceOf[Row] + } +} diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala index 1e94042d0..b8d407297 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala @@ -50,6 +50,13 @@ case class ResultSet( def getColumns: util.List[Column] = columns def getData: FetchIterator[Row] = data + + def close: Unit = { + data match { + case queryIte: QueryResultFetchIterator => queryIte.close() + case _ => + } + } } /** diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala index c1169528c..8b722f1e5 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala @@ -17,25 +17,17 @@ package org.apache.kyuubi.engine.flink.result -import scala.collection.convert.ImplicitConversions._ -import scala.collection.mutable.ListBuffer +import scala.concurrent.duration.Duration import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.ResultKind import org.apache.flink.table.catalog.Column -import org.apache.flink.table.data.RowData -import org.apache.flink.table.data.conversion.DataStructureConverters import org.apache.flink.table.gateway.service.result.ResultFetcher -import org.apache.flink.table.types.DataType import org.apache.flink.types.Row -import org.apache.kyuubi.engine.flink.shim.FlinkResultSet - /** Utility object for building ResultSet. */ object ResultSetUtil { - private val FETCH_ROWS_PER_SECOND = 1000 - /** * Build a ResultSet with a column name and a list of String values. * @@ -66,63 +58,19 @@ object ResultSetUtil { .data(Array[Row](Row.of("OK"))) .build - def fromResultFetcher(resultFetcher: ResultFetcher, maxRows: Int): ResultSet = { + def fromResultFetcher( + resultFetcher: ResultFetcher, + maxRows: Int, + resultFetchTimeout: Duration): ResultSet = { + if (maxRows <= 0) { + throw new IllegalArgumentException("maxRows should be positive") + } val schema = resultFetcher.getResultSchema - val resultRowData = ListBuffer.newBuilder[RowData] - var fetched: FlinkResultSet = null - var token: Long = 0 - var rowNum: Int = 0 - do { - fetched = new FlinkResultSet(resultFetcher.fetchResults(token, FETCH_ROWS_PER_SECOND)) - val data = fetched.getData - val slice = data.slice(0, maxRows - rowNum) - resultRowData ++= slice - rowNum += slice.size - token = fetched.getNextToken - try Thread.sleep(1000L) - catch { - case _: InterruptedException => fetched.getNextToken == null - } - } while ( - fetched.getNextToken != null && - rowNum < maxRows && - fetched.getResultType != org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS - ) - val dataTypes = resultFetcher.getResultSchema.getColumnDataTypes + val ite = new QueryResultFetchIterator(resultFetcher, maxRows, resultFetchTimeout) ResultSet.builder .resultKind(ResultKind.SUCCESS_WITH_CONTENT) .columns(schema.getColumns) - .data(resultRowData.result().map(rd => convertToRow(rd, dataTypes.toList)).toArray) + .data(ite) .build } - - def fromResultFetcher(resultFetcher: ResultFetcher): ResultSet = { - val schema = resultFetcher.getResultSchema - val resultRowData = ListBuffer.newBuilder[RowData] - var fetched: FlinkResultSet = null - var token: Long = 0 - do { - fetched = new FlinkResultSet(resultFetcher.fetchResults(token, FETCH_ROWS_PER_SECOND)) - resultRowData ++= fetched.getData - token = fetched.getNextToken - try Thread.sleep(1000L) - catch { - case _: InterruptedException => - } - } while ( - fetched.getNextToken != null && - fetched.getResultType != org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS - ) - val dataTypes = resultFetcher.getResultSchema.getColumnDataTypes - ResultSet.builder - .resultKind(ResultKind.SUCCESS_WITH_CONTENT) - .columns(schema.getColumns) - .data(resultRowData.result().map(rd => convertToRow(rd, dataTypes.toList)).toArray) - .build - } - - private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row = { - val converter = DataStructureConverters.getConverter(DataTypes.ROW(dataTypes: _*)) - converter.toExternal(r).asInstanceOf[Row] - } } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala index c7aa7c3c5..b7cd46217 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala @@ -77,9 +77,10 @@ class FlinkSQLSessionManager(engineContext: DefaultContext) } override def closeSession(sessionHandle: SessionHandle): Unit = { + val fSession = super.getSessionOption(sessionHandle) + fSession.foreach(s => + sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle)) super.closeSession(sessionHandle) - sessionManager.closeSession( - new org.apache.flink.table.gateway.api.session.SessionHandle(sessionHandle.identifier)) } override def stop(): Unit = synchronized { diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala index 35b59b661..8e7c35a95 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala @@ -34,7 +34,7 @@ import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION import org.apache.kyuubi.engine.flink.WithFlinkTestResources import org.apache.kyuubi.engine.flink.result.Constants import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar -import org.apache.kyuubi.jdbc.hive.KyuubiStatement +import org.apache.kyuubi.jdbc.hive.{KyuubiSQLException, KyuubiStatement} import org.apache.kyuubi.jdbc.hive.common.TimestampTZ import org.apache.kyuubi.operation.HiveJDBCTestHelper import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ @@ -676,12 +676,10 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest assert(stopResult1.next()) assert(stopResult1.getString(1) === "OK") - val selectResult = statement.executeQuery("select * from tbl_a") - val jobId2 = statement.asInstanceOf[KyuubiStatement].getQueryId - assert(jobId2 !== null) - while (!selectResult.next()) { - Thread.sleep(1000L) - } + val insertResult2 = statement.executeQuery("insert into tbl_b select * from tbl_a") + assert(insertResult2.next()) + val jobId2 = insertResult2.getString(1) + val stopResult2 = statement.executeQuery(s"stop job '$jobId2' with savepoint") assert(stopResult2.getMetaData.getColumnName(1).equals("savepoint path")) assert(stopResult2.next()) @@ -1252,4 +1250,17 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest } } } + + test("test result fetch timeout") { + val exception = intercept[KyuubiSQLException]( + withSessionConf()(Map(ENGINE_FLINK_FETCH_TIMEOUT.key -> "60000"))() { + withJdbcStatement("tbl_a") { stmt => + stmt.executeQuery("create table tbl_a (a int) " + + "with ('connector' = 'datagen', 'rows-per-second'='0')") + val resultSet = stmt.executeQuery("select * from tbl_a") + while (resultSet.next()) {} + } + }) + assert(exception.getMessage === "Futures timed out after [60000 milliseconds]") + } } diff --git a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala index 893e0020a..55476bfd0 100644 --- a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala +++ b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala @@ -31,7 +31,7 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster override val conf: KyuubiConf = KyuubiConf() .set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome) .set(ENGINE_TYPE, "FLINK_SQL") - .set("flink.parallelism.default", "6") + .set("flink.parallelism.default", "2") override protected def jdbcUrl: String = getJdbcUrl @@ -72,7 +72,7 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster var success = false while (resultSet.next() && !success) { if (resultSet.getString(1) == "parallelism.default" && - resultSet.getString(2) == "6") { + resultSet.getString(2) == "2") { success = true } } diff --git a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala index afa4dce8f..ee6b9bb98 100644 --- a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala +++ b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala @@ -40,7 +40,7 @@ class FlinkOperationSuiteOnYarn extends WithKyuubiServerAndYarnMiniCluster .set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome) .set(ENGINE_TYPE, "FLINK_SQL") .set("flink.execution.target", "yarn-application") - .set("flink.parallelism.default", "6") + .set("flink.parallelism.default", "2") super.beforeAll() } @@ -81,7 +81,7 @@ class FlinkOperationSuiteOnYarn extends WithKyuubiServerAndYarnMiniCluster var success = false while (resultSet.next() && !success) { if (resultSet.getString(1) == "parallelism.default" && - resultSet.getString(2) == "6") { + resultSet.getString(2) == "2") { success = true } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 515ffbc34..3f1c3b868 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1319,6 +1319,15 @@ object KyuubiConf { .intConf .createWithDefault(1000000) + val ENGINE_FLINK_FETCH_TIMEOUT: OptionalConfigEntry[Long] = + buildConf("kyuubi.session.engine.flink.fetch.timeout") + .doc("Result fetch timeout for Flink engine. If the timeout is reached, the result " + + "fetch would be stopped and the current fetched would be returned. If no data are " + + "fetched, a TimeoutException would be thrown.") + .version("1.8.0") + .timeConf + .createOptional + val ENGINE_TRINO_MAIN_RESOURCE: OptionalConfigEntry[String] = buildConf("kyuubi.session.engine.trino.main.resource") .doc("The package used to create Trino engine remote job. If it is undefined," +