From 367339932964102bf3111d2f37f4f89975b28ea6 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 22 Dec 2021 22:55:43 +0800 Subject: [PATCH] [KYUUBI #1579] Implement basic ability of executing statement in Flink engine ### _Why are the changes needed?_ ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1603 from yanghua/KYUUBI-1579. Closes #1579 48db76b3 [Cheng Pan] cleanup 36707516 [Cheng Pan] Address comments 25ca5ae2 [yanghua] reduce code 6f18a4a0 [yanghua] [KYUUBI #1579] Implement basic ability of executing statement Lead-authored-by: yanghua Co-authored-by: Cheng Pan Signed-off-by: Cheng Pan --- externals/kyuubi-flink-sql-engine/pom.xml | 6 + .../flink/operation/ExecuteStatement.scala | 155 ++++++++++++++++++ .../flink/operation/FlinkOperation.scala | 15 +- .../operation/FlinkSQLOperationManager.scala | 5 +- .../kyuubi/engine/flink/schema/RowSet.scala | 6 + .../session/FlinkSQLSessionManager.scala | 17 +- .../flink/session/FlinkSessionImpl.scala | 5 + .../flink/operation/FlinkOperationSuite.scala | 61 ++++++- pom.xml | 20 +++ 9 files changed, 284 insertions(+), 6 deletions(-) create mode 100644 externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala diff --git a/externals/kyuubi-flink-sql-engine/pom.xml b/externals/kyuubi-flink-sql-engine/pom.xml index a4c7c0020..fd6f8b199 100644 --- a/externals/kyuubi-flink-sql-engine/pom.xml +++ b/externals/kyuubi-flink-sql-engine/pom.xml @@ -139,6 +139,12 @@ jul-to-slf4j test + + + org.apache.flink + flink-test-utils_${scala.binary.version} + test + diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala new file mode 100644 index 000000000..818d16843 --- /dev/null +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.flink.operation + +import java.util +import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.google.common.annotations.VisibleForTesting +import org.apache.flink.table.client.gateway.{Executor, ResultDescriptor, TypedResult} +import org.apache.flink.table.operations.QueryOperation +import org.apache.flink.types.Row + +import org.apache.kyuubi.{KyuubiSQLException, Logging} +import org.apache.kyuubi.engine.flink.result.{ColumnInfo, ResultKind, ResultSet} +import org.apache.kyuubi.operation.{OperationState, OperationType} +import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.session.Session +import org.apache.kyuubi.util.ThreadUtils + +class ExecuteStatement( + session: Session, + override val statement: String, + override val shouldRunAsync: Boolean, + queryTimeout: Long) + extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging { + + private val operationLog: OperationLog = + OperationLog.createOperationLog(session, getHandle) + + private var resultDescriptor: ResultDescriptor = _ + + private var columnInfos: util.List[ColumnInfo] = _ + + private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None + + override def getOperationLog: Option[OperationLog] = Option(operationLog) + + @VisibleForTesting + override def setExecutor(executor: Executor): Unit = { + this.executor = executor + } + + def setSessionId(sessionId: String): Unit = { + this.sessionId = sessionId + } + + override protected def beforeRun(): Unit = { + OperationLog.setCurrentOperationLog(operationLog) + setState(OperationState.PENDING) + setHasResultSet(true) + } + + override protected def afterRun(): Unit = { + OperationLog.removeCurrentOperationLog() + } + + override protected def runInternal(): Unit = { + addTimeoutMonitor() + if (shouldRunAsync) { + val asyncOperation = new Runnable { + override def run(): Unit = { + OperationLog.setCurrentOperationLog(operationLog) + } + } + + try { + executeStatement() + val flinkSQLSessionManager = session.sessionManager + val backgroundHandle = flinkSQLSessionManager.submitBackgroundOperation(asyncOperation) + setBackgroundHandle(backgroundHandle) + } catch { + case rejected: RejectedExecutionException => + setState(OperationState.ERROR) + val ke = + KyuubiSQLException("Error submitting query in background, query rejected", rejected) + setOperationException(ke) + throw ke + } + } else { + executeStatement() + } + } + + private def executeStatement(): Unit = { + try { + setState(OperationState.RUNNING) + + columnInfos = new util.ArrayList[ColumnInfo] + + val operation = executor.parseStatement(sessionId, statement) + resultDescriptor = executor.executeQuery(sessionId, operation.asInstanceOf[QueryOperation]) + resultDescriptor.getResultSchema.getColumns.asScala.foreach { column => + columnInfos.add(ColumnInfo.create(column.getName, column.getDataType.getLogicalType)) + } + + val resultID = resultDescriptor.getResultId + + val rows = new ArrayBuffer[Row]() + var loop = true + while (loop) { + Thread.sleep(50) // slow the processing down + + val result = executor.snapshotResult(sessionId, resultID, 2) + result.getType match { + case TypedResult.ResultType.PAYLOAD => + rows.clear() + (1 to result.getPayload).foreach { page => + rows ++= executor.retrieveResultPage(resultID, page).asScala + } + case TypedResult.ResultType.EOS => loop = false + case TypedResult.ResultType.EMPTY => + } + } + + resultSet = ResultSet.builder + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .columns(columnInfos) + .data(rows.toArray[Row]) + .build + setState(OperationState.FINISHED) + } catch { + onError(cancel = true) + } finally { + statementTimeoutCleaner.foreach(_.shutdown()) + } + } + + private def addTimeoutMonitor(): Unit = { + if (queryTimeout > 0) { + val timeoutExecutor = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread") + val action: Runnable = () => cleanup(OperationState.TIMEOUT) + timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS) + statementTimeoutCleaner = Some(timeoutExecutor) + } + } +} diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala index 2c61ab4c5..f6780a239 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala @@ -21,6 +21,7 @@ import java.io.IOException import scala.collection.JavaConverters.collectionAsScalaIterableConverter +import org.apache.flink.table.client.gateway.Executor import org.apache.flink.table.client.gateway.context.SessionContext import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema} @@ -40,8 +41,20 @@ abstract class FlinkOperation( session: Session) extends AbstractOperation(opType, session) { - protected val sessionContext: SessionContext = + protected val sessionContext: SessionContext = { session.asInstanceOf[FlinkSessionImpl].getSessionContext + } + + protected var executor: Executor = _ + + protected def setExecutor(executor: Executor): Unit = { + this.executor = session.asInstanceOf[FlinkSessionImpl].getExecutor + } + + protected var sessionId: String = { + session.asInstanceOf[FlinkSessionImpl].getSessionId + } + protected var resultSet: ResultSet = _ override protected def beforeRun(): Unit = { diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala index df839492b..390817ad2 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala @@ -28,7 +28,10 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage session: Session, statement: String, runAsync: Boolean, - queryTimeout: Long): Operation = null + queryTimeout: Long): Operation = { + val op = new ExecuteStatement(session, statement, runAsync, queryTimeout) + addOperation(op) + } override def newGetTypeInfoOperation(session: Session): Operation = null diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala index 069ade71e..3439805c0 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala @@ -113,6 +113,12 @@ object RowSet { tStringValue.setValue(row.getField(ordinal).asInstanceOf[String]) } TColumnValue.stringVal(tStringValue) + } else if (logicalType.isInstanceOf[CharType]) { + val tStringValue = new TStringValue + if (row.getField(ordinal) != null) { + tStringValue.setValue(row.getField(ordinal).asInstanceOf[String]) + } + TColumnValue.stringVal(tStringValue) } else { val tStrValue = new TStringValue if (row.getField(ordinal) != null) { diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala index 75a1b03bd..1ffc31138 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala @@ -17,7 +17,9 @@ package org.apache.kyuubi.engine.flink.session +import org.apache.flink.table.client.gateway.Executor import org.apache.flink.table.client.gateway.context.DefaultContext +import org.apache.flink.table.client.gateway.local.LocalExecutor import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager @@ -29,13 +31,24 @@ class FlinkSQLSessionManager(engineContext: DefaultContext) override protected def isServer: Boolean = false val operationManager = new FlinkSQLOperationManager() + val executor: Executor = new LocalExecutor(engineContext) + + override def start(): Unit = { + super.start() + executor.start() + } override def openSession( protocol: TProtocolVersion, user: String, password: String, ipAddress: String, - conf: Map[String, String]): SessionHandle = null + conf: Map[String, String]): SessionHandle = { + executor.openSession("") + null + } - override def closeSession(sessionHandle: SessionHandle): Unit = {} + override def closeSession(sessionHandle: SessionHandle): Unit = { + executor.closeSession(sessionHandle.toString) + } } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala index ccb3ec927..fe97cda51 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.engine.flink.session +import org.apache.flink.table.client.gateway.Executor import org.apache.flink.table.client.gateway.context.SessionContext import org.apache.hive.service.rpc.thrift.TProtocolVersion @@ -36,4 +37,8 @@ class FlinkSessionImpl( def getSessionContext: SessionContext = sessionContext + def getExecutor: Executor = sessionManager.asInstanceOf[FlinkSQLSessionManager].executor + + def getSessionId: String = handle.toString + } diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala index ce538ec13..f58104319 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala @@ -17,11 +17,17 @@ package org.apache.kyuubi.engine.flink.operation +import java.net.URL +import java.util import java.util.Collections import org.apache.flink.client.cli.DefaultCLI -import org.apache.flink.configuration.Configuration +import org.apache.flink.client.program.ClusterClient +import org.apache.flink.configuration.{ConfigConstants, Configuration, MemorySize, TaskManagerOptions, WebOptions} +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration import org.apache.flink.table.client.gateway.context.{DefaultContext, SessionContext} +import org.apache.flink.table.client.gateway.local.LocalExecutor +import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.{KyuubiFunSuite, Utils} @@ -34,6 +40,27 @@ class FlinkOperationSuite extends KyuubiFunSuite { val user: String = Utils.currentUser val password = "anonymous" + val NUM_TMS = 2 + val NUM_SLOTS_PER_TM = 2 + + private def getConfig = { + val config = new Configuration + config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m")) + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS) + config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM) + config.setBoolean(WebOptions.SUBMIT_ENABLE, false) + config + } + + val MINI_CLUSTER_RESOURCE = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfig) + .setNumberTaskManagers(NUM_TMS) + .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM).build) + + var clusterClient: ClusterClient[_] = _ + var engineContext = new DefaultContext( Collections.emptyList(), new Configuration, @@ -41,7 +68,24 @@ class FlinkOperationSuite extends KyuubiFunSuite { var sessionContext: SessionContext = _ var flinkSession: FlinkSessionImpl = _ + private def createLocalExecutor: LocalExecutor = + createLocalExecutor(Collections.emptyList[URL], new Configuration) + + private def createLocalExecutor( + dependencies: util.List[URL], + configuration: Configuration): LocalExecutor = { + configuration.addAll(clusterClient.getFlinkConfiguration) + val defaultContext: DefaultContext = new DefaultContext( + dependencies, + configuration, + Collections.singletonList(new DefaultCLI)) + new LocalExecutor(defaultContext) + } + override def beforeAll(): Unit = { + MINI_CLUSTER_RESOURCE.before() + clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient + sessionContext = SessionContext.create(engineContext, "test-session-id"); val flinkSQLSessionManager = new FlinkSQLSessionManager(engineContext) flinkSQLSessionManager.initialize(KyuubiConf()) @@ -66,7 +110,20 @@ class FlinkOperationSuite extends KyuubiFunSuite { val resultSet = getCatalogOperation.getNextRowSet(FetchOrientation.FETCH_FIRST, 10) assert(1 == resultSet.getRowsSize) - assert(resultSet.getRows.get(0).getColVals().get(0).getStringVal.getValue === "default_catalog") + assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "default_catalog") + } + + test("execute statement - select column name with dots") { + val executeStatementOp = new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1) + val executor = createLocalExecutor + executor.openSession("test-session") + executeStatementOp.setExecutor(executor) + executeStatementOp.setSessionId("test-session") + executeStatementOp.run() + + val resultSet = executeStatementOp.getNextRowSet(FetchOrientation.FETCH_FIRST, 10) + assert(1 == resultSet.getRowsSize) + assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "tmp.hello") } } diff --git a/pom.xml b/pom.xml index 4540f2355..ad8fd0913 100644 --- a/pom.xml +++ b/pom.xml @@ -1136,6 +1136,26 @@ flink-sql-client_${scala.binary.version} ${flink.version} + + + org.apache.flink + flink-test-utils_${scala.binary.version} + ${flink.version} + + + org.apache.logging.log4j + log4j-api + + + org.apache.logging.log4j + log4j-core + + + org.apache.logging.log4j + log4j-slf4j-impl + + +