diff --git a/externals/kyuubi-flink-sql-engine/pom.xml b/externals/kyuubi-flink-sql-engine/pom.xml
index a4c7c0020..fd6f8b199 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -139,6 +139,12 @@
jul-to-slf4j
test
+
+
+ org.apache.flink
+ flink-test-utils_${scala.binary.version}
+ test
+
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
new file mode 100644
index 000000000..818d16843
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.util
+import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.annotations.VisibleForTesting
+import org.apache.flink.table.client.gateway.{Executor, ResultDescriptor, TypedResult}
+import org.apache.flink.table.operations.QueryOperation
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
+import org.apache.kyuubi.engine.flink.result.{ColumnInfo, ResultKind, ResultSet}
+import org.apache.kyuubi.operation.{OperationState, OperationType}
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.ThreadUtils
+
+class ExecuteStatement(
+ session: Session,
+ override val statement: String,
+ override val shouldRunAsync: Boolean,
+ queryTimeout: Long)
+ extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging {
+
+ private val operationLog: OperationLog =
+ OperationLog.createOperationLog(session, getHandle)
+
+ private var resultDescriptor: ResultDescriptor = _
+
+ private var columnInfos: util.List[ColumnInfo] = _
+
+ private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
+
+ override def getOperationLog: Option[OperationLog] = Option(operationLog)
+
+ @VisibleForTesting
+ override def setExecutor(executor: Executor): Unit = {
+ this.executor = executor
+ }
+
+ def setSessionId(sessionId: String): Unit = {
+ this.sessionId = sessionId
+ }
+
+ override protected def beforeRun(): Unit = {
+ OperationLog.setCurrentOperationLog(operationLog)
+ setState(OperationState.PENDING)
+ setHasResultSet(true)
+ }
+
+ override protected def afterRun(): Unit = {
+ OperationLog.removeCurrentOperationLog()
+ }
+
+ override protected def runInternal(): Unit = {
+ addTimeoutMonitor()
+ if (shouldRunAsync) {
+ val asyncOperation = new Runnable {
+ override def run(): Unit = {
+ OperationLog.setCurrentOperationLog(operationLog)
+ }
+ }
+
+ try {
+ executeStatement()
+ val flinkSQLSessionManager = session.sessionManager
+ val backgroundHandle = flinkSQLSessionManager.submitBackgroundOperation(asyncOperation)
+ setBackgroundHandle(backgroundHandle)
+ } catch {
+ case rejected: RejectedExecutionException =>
+ setState(OperationState.ERROR)
+ val ke =
+ KyuubiSQLException("Error submitting query in background, query rejected", rejected)
+ setOperationException(ke)
+ throw ke
+ }
+ } else {
+ executeStatement()
+ }
+ }
+
+ private def executeStatement(): Unit = {
+ try {
+ setState(OperationState.RUNNING)
+
+ columnInfos = new util.ArrayList[ColumnInfo]
+
+ val operation = executor.parseStatement(sessionId, statement)
+ resultDescriptor = executor.executeQuery(sessionId, operation.asInstanceOf[QueryOperation])
+ resultDescriptor.getResultSchema.getColumns.asScala.foreach { column =>
+ columnInfos.add(ColumnInfo.create(column.getName, column.getDataType.getLogicalType))
+ }
+
+ val resultID = resultDescriptor.getResultId
+
+ val rows = new ArrayBuffer[Row]()
+ var loop = true
+ while (loop) {
+ Thread.sleep(50) // slow the processing down
+
+ val result = executor.snapshotResult(sessionId, resultID, 2)
+ result.getType match {
+ case TypedResult.ResultType.PAYLOAD =>
+ rows.clear()
+ (1 to result.getPayload).foreach { page =>
+ rows ++= executor.retrieveResultPage(resultID, page).asScala
+ }
+ case TypedResult.ResultType.EOS => loop = false
+ case TypedResult.ResultType.EMPTY =>
+ }
+ }
+
+ resultSet = ResultSet.builder
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .columns(columnInfos)
+ .data(rows.toArray[Row])
+ .build
+ setState(OperationState.FINISHED)
+ } catch {
+ onError(cancel = true)
+ } finally {
+ statementTimeoutCleaner.foreach(_.shutdown())
+ }
+ }
+
+ private def addTimeoutMonitor(): Unit = {
+ if (queryTimeout > 0) {
+ val timeoutExecutor =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
+ val action: Runnable = () => cleanup(OperationState.TIMEOUT)
+ timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
+ statementTimeoutCleaner = Some(timeoutExecutor)
+ }
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index 2c61ab4c5..f6780a239 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -21,6 +21,7 @@ import java.io.IOException
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
@@ -40,8 +41,20 @@ abstract class FlinkOperation(
session: Session)
extends AbstractOperation(opType, session) {
- protected val sessionContext: SessionContext =
+ protected val sessionContext: SessionContext = {
session.asInstanceOf[FlinkSessionImpl].getSessionContext
+ }
+
+ protected var executor: Executor = _
+
+ protected def setExecutor(executor: Executor): Unit = {
+ this.executor = session.asInstanceOf[FlinkSessionImpl].getExecutor
+ }
+
+ protected var sessionId: String = {
+ session.asInstanceOf[FlinkSessionImpl].getSessionId
+ }
+
protected var resultSet: ResultSet = _
override protected def beforeRun(): Unit = {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index df839492b..390817ad2 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -28,7 +28,10 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
session: Session,
statement: String,
runAsync: Boolean,
- queryTimeout: Long): Operation = null
+ queryTimeout: Long): Operation = {
+ val op = new ExecuteStatement(session, statement, runAsync, queryTimeout)
+ addOperation(op)
+ }
override def newGetTypeInfoOperation(session: Session): Operation = null
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
index 069ade71e..3439805c0 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
@@ -113,6 +113,12 @@ object RowSet {
tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
}
TColumnValue.stringVal(tStringValue)
+ } else if (logicalType.isInstanceOf[CharType]) {
+ val tStringValue = new TStringValue
+ if (row.getField(ordinal) != null) {
+ tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
+ }
+ TColumnValue.stringVal(tStringValue)
} else {
val tStrValue = new TStringValue
if (row.getField(ordinal) != null) {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 75a1b03bd..1ffc31138 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -17,7 +17,9 @@
package org.apache.kyuubi.engine.flink.session
+import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.DefaultContext
+import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
@@ -29,13 +31,24 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
override protected def isServer: Boolean = false
val operationManager = new FlinkSQLOperationManager()
+ val executor: Executor = new LocalExecutor(engineContext)
+
+ override def start(): Unit = {
+ super.start()
+ executor.start()
+ }
override def openSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
- conf: Map[String, String]): SessionHandle = null
+ conf: Map[String, String]): SessionHandle = {
+ executor.openSession("")
+ null
+ }
- override def closeSession(sessionHandle: SessionHandle): Unit = {}
+ override def closeSession(sessionHandle: SessionHandle): Unit = {
+ executor.closeSession(sessionHandle.toString)
+ }
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index ccb3ec927..fe97cda51 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.flink.session
+import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.hive.service.rpc.thrift.TProtocolVersion
@@ -36,4 +37,8 @@ class FlinkSessionImpl(
def getSessionContext: SessionContext = sessionContext
+ def getExecutor: Executor = sessionManager.asInstanceOf[FlinkSQLSessionManager].executor
+
+ def getSessionId: String = handle.toString
+
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index ce538ec13..f58104319 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -17,11 +17,17 @@
package org.apache.kyuubi.engine.flink.operation
+import java.net.URL
+import java.util
import java.util.Collections
import org.apache.flink.client.cli.DefaultCLI
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.client.program.ClusterClient
+import org.apache.flink.configuration.{ConfigConstants, Configuration, MemorySize, TaskManagerOptions, WebOptions}
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.table.client.gateway.context.{DefaultContext, SessionContext}
+import org.apache.flink.table.client.gateway.local.LocalExecutor
+import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
@@ -34,6 +40,27 @@ class FlinkOperationSuite extends KyuubiFunSuite {
val user: String = Utils.currentUser
val password = "anonymous"
+ val NUM_TMS = 2
+ val NUM_SLOTS_PER_TM = 2
+
+ private def getConfig = {
+ val config = new Configuration
+ config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"))
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS)
+ config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM)
+ config.setBoolean(WebOptions.SUBMIT_ENABLE, false)
+ config
+ }
+
+ val MINI_CLUSTER_RESOURCE =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfig)
+ .setNumberTaskManagers(NUM_TMS)
+ .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM).build)
+
+ var clusterClient: ClusterClient[_] = _
+
var engineContext = new DefaultContext(
Collections.emptyList(),
new Configuration,
@@ -41,7 +68,24 @@ class FlinkOperationSuite extends KyuubiFunSuite {
var sessionContext: SessionContext = _
var flinkSession: FlinkSessionImpl = _
+ private def createLocalExecutor: LocalExecutor =
+ createLocalExecutor(Collections.emptyList[URL], new Configuration)
+
+ private def createLocalExecutor(
+ dependencies: util.List[URL],
+ configuration: Configuration): LocalExecutor = {
+ configuration.addAll(clusterClient.getFlinkConfiguration)
+ val defaultContext: DefaultContext = new DefaultContext(
+ dependencies,
+ configuration,
+ Collections.singletonList(new DefaultCLI))
+ new LocalExecutor(defaultContext)
+ }
+
override def beforeAll(): Unit = {
+ MINI_CLUSTER_RESOURCE.before()
+ clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient
+
sessionContext = SessionContext.create(engineContext, "test-session-id");
val flinkSQLSessionManager = new FlinkSQLSessionManager(engineContext)
flinkSQLSessionManager.initialize(KyuubiConf())
@@ -66,7 +110,20 @@ class FlinkOperationSuite extends KyuubiFunSuite {
val resultSet = getCatalogOperation.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
assert(1 == resultSet.getRowsSize)
- assert(resultSet.getRows.get(0).getColVals().get(0).getStringVal.getValue === "default_catalog")
+ assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "default_catalog")
+ }
+
+ test("execute statement - select column name with dots") {
+ val executeStatementOp = new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1)
+ val executor = createLocalExecutor
+ executor.openSession("test-session")
+ executeStatementOp.setExecutor(executor)
+ executeStatementOp.setSessionId("test-session")
+ executeStatementOp.run()
+
+ val resultSet = executeStatementOp.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
+ assert(1 == resultSet.getRowsSize)
+ assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "tmp.hello")
}
}
diff --git a/pom.xml b/pom.xml
index 4540f2355..ad8fd0913 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1136,6 +1136,26 @@
flink-sql-client_${scala.binary.version}
${flink.version}
+
+
+ org.apache.flink
+ flink-test-utils_${scala.binary.version}
+ ${flink.version}
+
+
+ org.apache.logging.log4j
+ log4j-api
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+