[KYUUBI #1829] Support executing statement async for Flink SQL engine
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ Support executing statement async for Flink SQL engine. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1845 from SteNicholas/KYUUBI-1829. Closes #1829 956739c1 [SteNicholas] [KYUUBI #1829] Support executing statement async for Flink SQL engine 4ccf4b4a [SteNicholas] [KYUUBI #1829] Support executing statement async for Flink SQL engine 616ddd2f [SteNicholas] [KYUUBI #1829] Support executing statement async for Flink SQL engine Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: yanghua <yanghua1127@gmail.com>
This commit is contained in:
parent
cebc03af1a
commit
34043ae41b
@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
import org.apache.calcite.rel.metadata.{DefaultRelMetadataProvider, JaninoRelMetadataProvider, RelMetadataQueryBase}
|
||||
import org.apache.flink.table.api.{DataTypes, ResultKind}
|
||||
import org.apache.flink.table.catalog.Column
|
||||
import org.apache.flink.table.client.gateway.{Executor, TypedResult}
|
||||
@ -75,11 +76,11 @@ class ExecuteStatement(
|
||||
val asyncOperation = new Runnable {
|
||||
override def run(): Unit = {
|
||||
OperationLog.setCurrentOperationLog(operationLog)
|
||||
executeStatement()
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
executeStatement()
|
||||
val flinkSQLSessionManager = session.sessionManager
|
||||
val backgroundHandle = flinkSQLSessionManager.submitBackgroundOperation(asyncOperation)
|
||||
setBackgroundHandle(backgroundHandle)
|
||||
@ -100,6 +101,9 @@ class ExecuteStatement(
|
||||
try {
|
||||
setState(OperationState.RUNNING)
|
||||
|
||||
// set the thread variable THREAD_PROVIDERS
|
||||
RelMetadataQueryBase.THREAD_PROVIDERS.set(
|
||||
JaninoRelMetadataProvider.of(DefaultRelMetadataProvider.INSTANCE))
|
||||
val operation = executor.parseStatement(sessionId, statement)
|
||||
operation match {
|
||||
case queryOperation: QueryOperation => runQueryOperation(queryOperation)
|
||||
|
||||
@ -114,9 +114,7 @@ abstract class FlinkOperation(
|
||||
protected def cleanup(targetState: OperationState): Unit = state.synchronized {
|
||||
if (!isTerminalState(state)) {
|
||||
setState(targetState)
|
||||
if (shouldRunAsync) {
|
||||
Option(getBackgroundHandle).foreach(_.cancel(true))
|
||||
}
|
||||
Option(getBackgroundHandle).foreach(_.cancel(true))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@ import java.sql.DatabaseMetaData
|
||||
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
|
||||
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE
|
||||
import org.apache.flink.table.types.logical.LogicalTypeRoot
|
||||
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TOpenSessionReq}
|
||||
import org.scalatest.concurrent.PatienceConfiguration.Timeout
|
||||
import org.scalatest.time.SpanSugar._
|
||||
|
||||
@ -35,7 +36,7 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
|
||||
override def withKyuubiConf: Map[String, String] = Map()
|
||||
|
||||
override protected def jdbcUrl: String =
|
||||
s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/"
|
||||
s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
|
||||
|
||||
ignore("release session if shared level is CONNECTION") {
|
||||
logger.info(s"jdbc url is $jdbcUrl")
|
||||
@ -734,4 +735,27 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
|
||||
assert(resultSet.getString(1) === "a")
|
||||
}
|
||||
}
|
||||
|
||||
test("async execute statement - select column name with dots") {
|
||||
withThriftClient { client =>
|
||||
val tOpenSessionReq = new TOpenSessionReq()
|
||||
tOpenSessionReq.setUsername("kentyao")
|
||||
tOpenSessionReq.setPassword("anonymous")
|
||||
val tOpenSessionResp = client.OpenSession(tOpenSessionReq)
|
||||
val tExecuteStatementReq = new TExecuteStatementReq()
|
||||
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
|
||||
tExecuteStatementReq.setRunAsync(true)
|
||||
tExecuteStatementReq.setStatement("select 'tmp.hello'")
|
||||
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
|
||||
val operationHandle = tExecuteStatementResp.getOperationHandle
|
||||
waitForOperationToComplete(client, operationHandle)
|
||||
val tFetchResultsReq = new TFetchResultsReq()
|
||||
tFetchResultsReq.setOperationHandle(operationHandle)
|
||||
tFetchResultsReq.setFetchType(2)
|
||||
tFetchResultsReq.setMaxRows(1000)
|
||||
val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
|
||||
assert(tFetchResultsResp.getResults.getColumns.get(0)
|
||||
.getStringVal.getValues.get(0) === "tmp.hello")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -323,9 +323,6 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
|
||||
|
||||
test("basic open | execute | close") {
|
||||
withThriftClient { client =>
|
||||
val operationManager =
|
||||
engine.backendService.sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager]
|
||||
|
||||
val req = new TOpenSessionReq()
|
||||
req.setUsername("kentyao")
|
||||
req.setPassword("anonymous")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user