From 34043ae41bb27c3ca6769b39b38cf013a5666ef0 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Sat, 29 Jan 2022 10:23:17 +0800 Subject: [PATCH] [KYUUBI #1829] Support executing statement async for Flink SQL engine ### _Why are the changes needed?_ Support executing statement async for Flink SQL engine. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1845 from SteNicholas/KYUUBI-1829. Closes #1829 956739c1 [SteNicholas] [KYUUBI #1829] Support executing statement async for Flink SQL engine 4ccf4b4a [SteNicholas] [KYUUBI #1829] Support executing statement async for Flink SQL engine 616ddd2f [SteNicholas] [KYUUBI #1829] Support executing statement async for Flink SQL engine Authored-by: SteNicholas Signed-off-by: yanghua --- .../flink/operation/ExecuteStatement.scala | 6 ++++- .../flink/operation/FlinkOperation.scala | 4 +-- .../flink/operation/FlinkOperationSuite.scala | 26 ++++++++++++++++++- .../spark/operation/SparkOperationSuite.scala | 3 --- 4 files changed, 31 insertions(+), 8 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala index b38ceefa8..51e84156c 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import com.google.common.annotations.VisibleForTesting +import org.apache.calcite.rel.metadata.{DefaultRelMetadataProvider, JaninoRelMetadataProvider, RelMetadataQueryBase} import org.apache.flink.table.api.{DataTypes, ResultKind} import org.apache.flink.table.catalog.Column import org.apache.flink.table.client.gateway.{Executor, TypedResult} @@ -75,11 +76,11 @@ class ExecuteStatement( val asyncOperation = new Runnable { override def run(): Unit = { OperationLog.setCurrentOperationLog(operationLog) + executeStatement() } } try { - executeStatement() val flinkSQLSessionManager = session.sessionManager val backgroundHandle = flinkSQLSessionManager.submitBackgroundOperation(asyncOperation) setBackgroundHandle(backgroundHandle) @@ -100,6 +101,9 @@ class ExecuteStatement( try { setState(OperationState.RUNNING) + // set the thread variable THREAD_PROVIDERS + RelMetadataQueryBase.THREAD_PROVIDERS.set( + JaninoRelMetadataProvider.of(DefaultRelMetadataProvider.INSTANCE)) val operation = executor.parseStatement(sessionId, statement) operation match { case queryOperation: QueryOperation => runQueryOperation(queryOperation) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala index b1dd10f95..d178d81ee 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala @@ -114,9 +114,7 @@ abstract class FlinkOperation( protected def cleanup(targetState: OperationState): Unit = state.synchronized { if (!isTerminalState(state)) { setState(targetState) - if (shouldRunAsync) { - Option(getBackgroundHandle).foreach(_.cancel(true)) - } + Option(getBackgroundHandle).foreach(_.cancel(true)) } } diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala index c94939441..197cbcbf8 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala @@ -22,6 +22,7 @@ import java.sql.DatabaseMetaData import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE import org.apache.flink.table.types.logical.LogicalTypeRoot +import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TOpenSessionReq} import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ @@ -35,7 +36,7 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper { override def withKyuubiConf: Map[String, String] = Map() override protected def jdbcUrl: String = - s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/" + s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;" ignore("release session if shared level is CONNECTION") { logger.info(s"jdbc url is $jdbcUrl") @@ -734,4 +735,27 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper { assert(resultSet.getString(1) === "a") } } + + test("async execute statement - select column name with dots") { + withThriftClient { client => + val tOpenSessionReq = new TOpenSessionReq() + tOpenSessionReq.setUsername("kentyao") + tOpenSessionReq.setPassword("anonymous") + val tOpenSessionResp = client.OpenSession(tOpenSessionReq) + val tExecuteStatementReq = new TExecuteStatementReq() + tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle) + tExecuteStatementReq.setRunAsync(true) + tExecuteStatementReq.setStatement("select 'tmp.hello'") + val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq) + val operationHandle = tExecuteStatementResp.getOperationHandle + waitForOperationToComplete(client, operationHandle) + val tFetchResultsReq = new TFetchResultsReq() + tFetchResultsReq.setOperationHandle(operationHandle) + tFetchResultsReq.setFetchType(2) + tFetchResultsReq.setMaxRows(1000) + val tFetchResultsResp = client.FetchResults(tFetchResultsReq) + assert(tFetchResultsResp.getResults.getColumns.get(0) + .getStringVal.getValues.get(0) === "tmp.hello") + } + } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala index e28ae53e9..398849393 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala @@ -323,9 +323,6 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with test("basic open | execute | close") { withThriftClient { client => - val operationManager = - engine.backendService.sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager] - val req = new TOpenSessionReq() req.setUsername("kentyao") req.setPassword("anonymous")