[KYUUBI #1710] Support to specify OPERATION_LANGUAGE with TExecuteStatementReq confOverlay

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Now kyuubi supports SQL and SCALA language.

Now the way to switch OPERATION_LANAUAGE:

```
// for SCALA
spark.sql("set kyuubi.operation.language=SCALA")

// for SQL
set kyuubi.operation.language=SCALA
```

It is more user friendly that user can specify the OPERATION_LANGUAGE for each ExecuteStatement.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1710 from turboFei/conf_overlay.

Closes #1710

385264dc [Fei Wang] fix ut
58fbdbdf [Fei Wang] fix npe
2718fb48 [Fei Wang] Support to get OPERATION_LANGUAGE from execute statement confOverlay

Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Fei Wang 2022-01-10 17:21:25 +08:00 committed by Cheng Pan
parent 58dd4ca71f
commit e0eeab07eb
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
17 changed files with 68 additions and 13 deletions

View File

@ -27,6 +27,7 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
override def newExecuteStatementOperation(
session: Session,
statement: String,
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): Operation = {
val op = new ExecuteStatement(session, statement, runAsync, queryTimeout)

View File

@ -48,10 +48,12 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
override def newExecuteStatementOperation(
session: Session,
statement: String,
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): Operation = {
val spark = session.asInstanceOf[SparkSessionImpl].spark
val lang = spark.conf.get(OPERATION_LANGUAGE.key, operationLanguageDefault)
val lang = confOverlay.get(OPERATION_LANGUAGE.key)
.getOrElse(spark.conf.get(OPERATION_LANGUAGE.key, operationLanguageDefault))
val operation =
OperationLanguages.withName(lang.toUpperCase(Locale.ROOT)) match {
case OperationLanguages.SQL =>

View File

@ -46,6 +46,7 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
def newExecuteStatementOperation(
session: Session,
statement: String,
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): Operation
def newGetTypeInfoOperation(session: Session): Operation

View File

@ -56,9 +56,14 @@ abstract class AbstractBackendService(name: String)
override def executeStatement(
sessionHandle: SessionHandle,
statement: String,
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): OperationHandle = {
sessionManager.getSession(sessionHandle).executeStatement(statement, runAsync, queryTimeout)
sessionManager.getSession(sessionHandle).executeStatement(
statement,
confOverlay,
runAsync,
queryTimeout)
}
override def getTypeInfo(sessionHandle: SessionHandle): OperationHandle = {

View File

@ -48,6 +48,7 @@ trait BackendService {
def executeStatement(
sessionHandle: SessionHandle,
statement: String,
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): OperationHandle

View File

@ -261,9 +261,14 @@ abstract class ThriftBinaryFrontendService(name: String)
val sessionHandle = SessionHandle(req.getSessionHandle)
val statement = req.getStatement
val runAsync = req.isRunAsync
// val confOverlay = req.getConfOverlay
val confOverlay = Option(req.getConfOverlay).getOrElse(Map.empty.asJava)
val queryTimeout = req.getQueryTimeout
val operationHandle = be.executeStatement(sessionHandle, statement, runAsync, queryTimeout)
val operationHandle = be.executeStatement(
sessionHandle,
statement,
confOverlay.asScala.toMap,
runAsync,
queryTimeout)
resp.setOperationHandle(operationHandle.toTOperationHandle)
resp.setStatus(OK_STATUS)
} catch {

View File

@ -115,10 +115,11 @@ abstract class AbstractSession(
override def executeStatement(
statement: String,
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): OperationHandle = withAcquireRelease() {
val operation = sessionManager.operationManager
.newExecuteStatementOperation(this, statement, runAsync, queryTimeout)
.newExecuteStatementOperation(this, statement, confOverlay, runAsync, queryTimeout)
runOperation(operation)
}

View File

@ -47,7 +47,11 @@ trait Session {
def getInfo(infoType: TGetInfoType): TGetInfoValue
def executeStatement(statement: String, runAsync: Boolean, queryTimeout: Long): OperationHandle
def executeStatement(
statement: String,
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): OperationHandle
def getTableTypes: OperationHandle
def getTypeInfo: OperationHandle

View File

@ -31,6 +31,7 @@ class NoopOperationManager extends OperationManager("noop") {
override def newExecuteStatementOperation(
session: Session,
statement: String,
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): Operation = {
val operation =

View File

@ -93,11 +93,13 @@ class KyuubiSyncThriftClient private (protocol: TProtocol)
def executeStatement(
statement: String,
confOverlay: Map[String, String],
shouldRunAsync: Boolean,
queryTimeout: Long): TOperationHandle = {
val req = new TExecuteStatementReq()
req.setSessionHandle(_remoteSessionHandle)
req.setStatement(statement)
req.setConfOverlay(confOverlay.asJava)
req.setRunAsync(shouldRunAsync)
req.setQueryTimeout(queryTimeout)
val resp = withLockAcquired(ExecuteStatement(req))

View File

@ -35,6 +35,7 @@ import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, Sessi
class ExecuteStatement(
session: Session,
override val statement: String,
confOverlay: Map[String, String],
override val shouldRunAsync: Boolean,
queryTimeout: Long)
extends KyuubiOperation(OperationType.EXECUTE_STATEMENT, session) {
@ -68,7 +69,7 @@ class ExecuteStatement(
// We need to avoid executing query in sync mode, because there is no heartbeat mechanism
// in thrift protocol, in sync mode, we cannot distinguish between long-run query and
// engine crash without response before socket read timeout.
_remoteOpHandle = client.executeStatement(statement, true, queryTimeout)
_remoteOpHandle = client.executeStatement(statement, confOverlay, true, queryTimeout)
setHasResultSet(_remoteOpHandle.isHasResultSet)
} catch onError()
}

View File

@ -54,10 +54,11 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
override def newExecuteStatementOperation(
session: Session,
statement: String,
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): Operation = {
val operation =
new ExecuteStatement(session, statement, runAsync, getQueryTimeout(queryTimeout))
new ExecuteStatement(session, statement, confOverlay, runAsync, getQueryTimeout(queryTimeout))
addOperation(operation)
}

View File

@ -55,10 +55,11 @@ trait BackendServiceTimeMetric extends BackendService {
abstract override def executeStatement(
sessionHandle: SessionHandle,
statement: String,
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): OperationHandle = {
MetricsSystem.timerTracing(MetricsConstants.BS_EXECUTE_STATEMENT) {
super.executeStatement(sessionHandle, statement, runAsync, queryTimeout)
super.executeStatement(sessionHandle, statement, confOverlay, runAsync, queryTimeout)
}
}

View File

@ -159,6 +159,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
fe.be.executeStatement(
parseSessionHandle(sessionHandleStr),
request.statement,
Map.empty,
request.runAsync,
request.queryTimeout)
} catch {

View File

@ -170,7 +170,12 @@ class MySQLCommandHandler(be: BackendService, execPool: ThreadPoolExecutor)
private def beExecuteStatement(ctx: ChannelHandlerContext, sql: String): MySQLQueryResult = {
try {
val ssHandle = ctx.channel.attr(SESSION_HANDLE).get
val opHandle = be.executeStatement(ssHandle, sql, runAsync = false, queryTimeout = 0)
val opHandle = be.executeStatement(
ssHandle,
sql,
confOverlay = Map.empty,
runAsync = false,
queryTimeout = 0)
val opStatus = be.getOperationStatus(opHandle)
if (opStatus.state != FINISHED) {
throw opStatus.exception

View File

@ -20,7 +20,9 @@ package org.apache.kyuubi.operation
import java.sql.SQLException
import java.util.Properties
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TGetOperationStatusReq, TOperationState, TStatusCode}
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TGetOperationStatusReq, TOperationState, TStatusCode}
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.{Utils, WithKyuubiServer}
@ -158,4 +160,25 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe
assert(resultSet.getString(1).nonEmpty)
}
}
test("support to specify OPERATION_LANGUAGE with confOverlay") {
withSessionHandle { (client, handle) =>
val executeStmtReq = new TExecuteStatementReq()
executeStmtReq.setStatement("""spark.sql("SET kyuubi.operation.language").show(false)""")
executeStmtReq.setSessionHandle(handle)
executeStmtReq.setRunAsync(false)
executeStmtReq.setConfOverlay(Map(KyuubiConf.OPERATION_LANGUAGE.key -> "SCALA").asJava)
val executeStmtResp = client.ExecuteStatement(executeStmtReq)
assert(executeStmtResp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq = new TFetchResultsReq()
tFetchResultsReq.setOperationHandle(executeStmtResp.getOperationHandle)
tFetchResultsReq.setFetchType(0)
tFetchResultsReq.setMaxRows(10)
val tFetchResultsResp = client.FetchResults(tFetchResultsReq)
val resultSet = tFetchResultsResp.getResults.getColumns.asScala
assert(resultSet.size == 1)
assert(resultSet.head.getStringVal.getValues.get(0).contains("kyuubi.operation.language"))
}
}
}

View File

@ -56,7 +56,7 @@ class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper
Map("testConfig" -> "testValue"))
val sessionManager = fe.be.sessionManager
val session = sessionManager.getSession(sessionHandle)
val op = new ExecuteStatement(session, "show tables", true, 3000)
val op = new ExecuteStatement(session, "show tables", Map.empty, true, 3000)
op.setState(OperationState.RUNNING)
sessionManager.operationManager.addOperation(op)
val opHandleStr = s"${op.getHandle.identifier.publicId}|" +
@ -125,7 +125,7 @@ class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper
val op = typ match {
case OperationType.EXECUTE_STATEMENT =>
fe.be.executeStatement(sessionHandle, statement, runAsync = true, 3000)
fe.be.executeStatement(sessionHandle, statement, Map.empty, runAsync = true, 3000)
case OperationType.GET_CATALOGS => fe.be.getCatalogs(sessionHandle)
}