[KYUUBI #1579] Implement basic ability of executing statement in Flink engine

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1603 from yanghua/KYUUBI-1579.

Closes #1579

48db76b3 [Cheng Pan] cleanup
36707516 [Cheng Pan] Address comments
25ca5ae2 [yanghua] reduce code
6f18a4a0 [yanghua] [KYUUBI #1579] Implement basic ability of executing statement

Lead-authored-by: yanghua <yanghua1127@gmail.com>
Co-authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
yanghua 2021-12-22 22:55:43 +08:00 committed by Cheng Pan
parent c972139b51
commit 3673399329
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
9 changed files with 284 additions and 6 deletions

View File

@ -139,6 +139,12 @@
<artifactId>jul-to-slf4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.operation
import java.util
import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import com.google.common.annotations.VisibleForTesting
import org.apache.flink.table.client.gateway.{Executor, ResultDescriptor, TypedResult}
import org.apache.flink.table.operations.QueryOperation
import org.apache.flink.types.Row
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.flink.result.{ColumnInfo, ResultKind, ResultSet}
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThreadUtils
class ExecuteStatement(
session: Session,
override val statement: String,
override val shouldRunAsync: Boolean,
queryTimeout: Long)
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging {
private val operationLog: OperationLog =
OperationLog.createOperationLog(session, getHandle)
private var resultDescriptor: ResultDescriptor = _
private var columnInfos: util.List[ColumnInfo] = _
private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@VisibleForTesting
override def setExecutor(executor: Executor): Unit = {
this.executor = executor
}
def setSessionId(sessionId: String): Unit = {
this.sessionId = sessionId
}
override protected def beforeRun(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)
setState(OperationState.PENDING)
setHasResultSet(true)
}
override protected def afterRun(): Unit = {
OperationLog.removeCurrentOperationLog()
}
override protected def runInternal(): Unit = {
addTimeoutMonitor()
if (shouldRunAsync) {
val asyncOperation = new Runnable {
override def run(): Unit = {
OperationLog.setCurrentOperationLog(operationLog)
}
}
try {
executeStatement()
val flinkSQLSessionManager = session.sessionManager
val backgroundHandle = flinkSQLSessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
val ke =
KyuubiSQLException("Error submitting query in background, query rejected", rejected)
setOperationException(ke)
throw ke
}
} else {
executeStatement()
}
}
private def executeStatement(): Unit = {
try {
setState(OperationState.RUNNING)
columnInfos = new util.ArrayList[ColumnInfo]
val operation = executor.parseStatement(sessionId, statement)
resultDescriptor = executor.executeQuery(sessionId, operation.asInstanceOf[QueryOperation])
resultDescriptor.getResultSchema.getColumns.asScala.foreach { column =>
columnInfos.add(ColumnInfo.create(column.getName, column.getDataType.getLogicalType))
}
val resultID = resultDescriptor.getResultId
val rows = new ArrayBuffer[Row]()
var loop = true
while (loop) {
Thread.sleep(50) // slow the processing down
val result = executor.snapshotResult(sessionId, resultID, 2)
result.getType match {
case TypedResult.ResultType.PAYLOAD =>
rows.clear()
(1 to result.getPayload).foreach { page =>
rows ++= executor.retrieveResultPage(resultID, page).asScala
}
case TypedResult.ResultType.EOS => loop = false
case TypedResult.ResultType.EMPTY =>
}
}
resultSet = ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(columnInfos)
.data(rows.toArray[Row])
.build
setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
} finally {
statementTimeoutCleaner.foreach(_.shutdown())
}
}
private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
val action: Runnable = () => cleanup(OperationState.TIMEOUT)
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
statementTimeoutCleaner = Some(timeoutExecutor)
}
}
}

View File

@ -21,6 +21,7 @@ import java.io.IOException
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
@ -40,8 +41,20 @@ abstract class FlinkOperation(
session: Session)
extends AbstractOperation(opType, session) {
protected val sessionContext: SessionContext =
protected val sessionContext: SessionContext = {
session.asInstanceOf[FlinkSessionImpl].getSessionContext
}
protected var executor: Executor = _
protected def setExecutor(executor: Executor): Unit = {
this.executor = session.asInstanceOf[FlinkSessionImpl].getExecutor
}
protected var sessionId: String = {
session.asInstanceOf[FlinkSessionImpl].getSessionId
}
protected var resultSet: ResultSet = _
override protected def beforeRun(): Unit = {

View File

@ -28,7 +28,10 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
session: Session,
statement: String,
runAsync: Boolean,
queryTimeout: Long): Operation = null
queryTimeout: Long): Operation = {
val op = new ExecuteStatement(session, statement, runAsync, queryTimeout)
addOperation(op)
}
override def newGetTypeInfoOperation(session: Session): Operation = null

View File

@ -113,6 +113,12 @@ object RowSet {
tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
}
TColumnValue.stringVal(tStringValue)
} else if (logicalType.isInstanceOf[CharType]) {
val tStringValue = new TStringValue
if (row.getField(ordinal) != null) {
tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
}
TColumnValue.stringVal(tStringValue)
} else {
val tStrValue = new TStringValue
if (row.getField(ordinal) != null) {

View File

@ -17,7 +17,9 @@
package org.apache.kyuubi.engine.flink.session
import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
@ -29,13 +31,24 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
override protected def isServer: Boolean = false
val operationManager = new FlinkSQLOperationManager()
val executor: Executor = new LocalExecutor(engineContext)
override def start(): Unit = {
super.start()
executor.start()
}
override def openSession(
protocol: TProtocolVersion,
user: String,
password: String,
ipAddress: String,
conf: Map[String, String]): SessionHandle = null
conf: Map[String, String]): SessionHandle = {
executor.openSession("")
null
}
override def closeSession(sessionHandle: SessionHandle): Unit = {}
override def closeSession(sessionHandle: SessionHandle): Unit = {
executor.closeSession(sessionHandle.toString)
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.flink.session
import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.hive.service.rpc.thrift.TProtocolVersion
@ -36,4 +37,8 @@ class FlinkSessionImpl(
def getSessionContext: SessionContext = sessionContext
def getExecutor: Executor = sessionManager.asInstanceOf[FlinkSQLSessionManager].executor
def getSessionId: String = handle.toString
}

View File

@ -17,11 +17,17 @@
package org.apache.kyuubi.engine.flink.operation
import java.net.URL
import java.util
import java.util.Collections
import org.apache.flink.client.cli.DefaultCLI
import org.apache.flink.configuration.Configuration
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{ConfigConstants, Configuration, MemorySize, TaskManagerOptions, WebOptions}
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.table.client.gateway.context.{DefaultContext, SessionContext}
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
@ -34,6 +40,27 @@ class FlinkOperationSuite extends KyuubiFunSuite {
val user: String = Utils.currentUser
val password = "anonymous"
val NUM_TMS = 2
val NUM_SLOTS_PER_TM = 2
private def getConfig = {
val config = new Configuration
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"))
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS)
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM)
config.setBoolean(WebOptions.SUBMIT_ENABLE, false)
config
}
val MINI_CLUSTER_RESOURCE =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig)
.setNumberTaskManagers(NUM_TMS)
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM).build)
var clusterClient: ClusterClient[_] = _
var engineContext = new DefaultContext(
Collections.emptyList(),
new Configuration,
@ -41,7 +68,24 @@ class FlinkOperationSuite extends KyuubiFunSuite {
var sessionContext: SessionContext = _
var flinkSession: FlinkSessionImpl = _
private def createLocalExecutor: LocalExecutor =
createLocalExecutor(Collections.emptyList[URL], new Configuration)
private def createLocalExecutor(
dependencies: util.List[URL],
configuration: Configuration): LocalExecutor = {
configuration.addAll(clusterClient.getFlinkConfiguration)
val defaultContext: DefaultContext = new DefaultContext(
dependencies,
configuration,
Collections.singletonList(new DefaultCLI))
new LocalExecutor(defaultContext)
}
override def beforeAll(): Unit = {
MINI_CLUSTER_RESOURCE.before()
clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient
sessionContext = SessionContext.create(engineContext, "test-session-id");
val flinkSQLSessionManager = new FlinkSQLSessionManager(engineContext)
flinkSQLSessionManager.initialize(KyuubiConf())
@ -66,7 +110,20 @@ class FlinkOperationSuite extends KyuubiFunSuite {
val resultSet = getCatalogOperation.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
assert(1 == resultSet.getRowsSize)
assert(resultSet.getRows.get(0).getColVals().get(0).getStringVal.getValue === "default_catalog")
assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "default_catalog")
}
test("execute statement - select column name with dots") {
val executeStatementOp = new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1)
val executor = createLocalExecutor
executor.openSession("test-session")
executeStatementOp.setExecutor(executor)
executeStatementOp.setSessionId("test-session")
executeStatementOp.run()
val resultSet = executeStatementOp.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
assert(1 == resultSet.getRowsSize)
assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "tmp.hello")
}
}

20
pom.xml
View File

@ -1136,6 +1136,26 @@
<artifactId>flink-sql-client_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>