diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala index 952d8e786..b54be21ac 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala @@ -27,6 +27,7 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage override def newExecuteStatementOperation( session: Session, statement: String, + confOverlay: Map[String, String], runAsync: Boolean, queryTimeout: Long): Operation = { val op = new ExecuteStatement(session, statement, runAsync, queryTimeout) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala index 58b539c7e..ade5b4e01 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala @@ -48,10 +48,12 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n override def newExecuteStatementOperation( session: Session, statement: String, + confOverlay: Map[String, String], runAsync: Boolean, queryTimeout: Long): Operation = { val spark = session.asInstanceOf[SparkSessionImpl].spark - val lang = spark.conf.get(OPERATION_LANGUAGE.key, operationLanguageDefault) + val lang = confOverlay.get(OPERATION_LANGUAGE.key) + .getOrElse(spark.conf.get(OPERATION_LANGUAGE.key, operationLanguageDefault)) val operation = OperationLanguages.withName(lang.toUpperCase(Locale.ROOT)) match { case OperationLanguages.SQL => diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala index a817c59aa..b950aa7b9 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala @@ -46,6 +46,7 @@ abstract class OperationManager(name: String) extends AbstractService(name) { def newExecuteStatementOperation( session: Session, statement: String, + confOverlay: Map[String, String], runAsync: Boolean, queryTimeout: Long): Operation def newGetTypeInfoOperation(session: Session): Operation diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala index b6dba2751..00eba7b93 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala @@ -56,9 +56,14 @@ abstract class AbstractBackendService(name: String) override def executeStatement( sessionHandle: SessionHandle, statement: String, + confOverlay: Map[String, String], runAsync: Boolean, queryTimeout: Long): OperationHandle = { - sessionManager.getSession(sessionHandle).executeStatement(statement, runAsync, queryTimeout) + sessionManager.getSession(sessionHandle).executeStatement( + statement, + confOverlay, + runAsync, + queryTimeout) } override def getTypeInfo(sessionHandle: SessionHandle): OperationHandle = { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala index ee382b6b0..84f2746d8 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala @@ -48,6 +48,7 @@ trait BackendService { def executeStatement( sessionHandle: SessionHandle, statement: String, + confOverlay: Map[String, String], runAsync: Boolean, queryTimeout: Long): OperationHandle diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala index e21d82062..97be13609 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala @@ -261,9 +261,14 @@ abstract class ThriftBinaryFrontendService(name: String) val sessionHandle = SessionHandle(req.getSessionHandle) val statement = req.getStatement val runAsync = req.isRunAsync - // val confOverlay = req.getConfOverlay + val confOverlay = Option(req.getConfOverlay).getOrElse(Map.empty.asJava) val queryTimeout = req.getQueryTimeout - val operationHandle = be.executeStatement(sessionHandle, statement, runAsync, queryTimeout) + val operationHandle = be.executeStatement( + sessionHandle, + statement, + confOverlay.asScala.toMap, + runAsync, + queryTimeout) resp.setOperationHandle(operationHandle.toTOperationHandle) resp.setStatus(OK_STATUS) } catch { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala index 43b7b49ec..7a2e709b0 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala @@ -115,10 +115,11 @@ abstract class AbstractSession( override def executeStatement( statement: String, + confOverlay: Map[String, String], runAsync: Boolean, queryTimeout: Long): OperationHandle = withAcquireRelease() { val operation = sessionManager.operationManager - .newExecuteStatementOperation(this, statement, runAsync, queryTimeout) + .newExecuteStatementOperation(this, statement, confOverlay, runAsync, queryTimeout) runOperation(operation) } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala index 46afe669d..153d570c2 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala @@ -47,7 +47,11 @@ trait Session { def getInfo(infoType: TGetInfoType): TGetInfoValue - def executeStatement(statement: String, runAsync: Boolean, queryTimeout: Long): OperationHandle + def executeStatement( + statement: String, + confOverlay: Map[String, String], + runAsync: Boolean, + queryTimeout: Long): OperationHandle def getTableTypes: OperationHandle def getTypeInfo: OperationHandle diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala index 010440336..5b199b9bc 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala @@ -31,6 +31,7 @@ class NoopOperationManager extends OperationManager("noop") { override def newExecuteStatementOperation( session: Session, statement: String, + confOverlay: Map[String, String], runAsync: Boolean, queryTimeout: Long): Operation = { val operation = diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala index a57e294f6..70c450be8 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala @@ -93,11 +93,13 @@ class KyuubiSyncThriftClient private (protocol: TProtocol) def executeStatement( statement: String, + confOverlay: Map[String, String], shouldRunAsync: Boolean, queryTimeout: Long): TOperationHandle = { val req = new TExecuteStatementReq() req.setSessionHandle(_remoteSessionHandle) req.setStatement(statement) + req.setConfOverlay(confOverlay.asJava) req.setRunAsync(shouldRunAsync) req.setQueryTimeout(queryTimeout) val resp = withLockAcquired(ExecuteStatement(req)) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala index 492330252..3a9b265c6 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala @@ -35,6 +35,7 @@ import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, Sessi class ExecuteStatement( session: Session, override val statement: String, + confOverlay: Map[String, String], override val shouldRunAsync: Boolean, queryTimeout: Long) extends KyuubiOperation(OperationType.EXECUTE_STATEMENT, session) { @@ -68,7 +69,7 @@ class ExecuteStatement( // We need to avoid executing query in sync mode, because there is no heartbeat mechanism // in thrift protocol, in sync mode, we cannot distinguish between long-run query and // engine crash without response before socket read timeout. - _remoteOpHandle = client.executeStatement(statement, true, queryTimeout) + _remoteOpHandle = client.executeStatement(statement, confOverlay, true, queryTimeout) setHasResultSet(_remoteOpHandle.isHasResultSet) } catch onError() } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala index ba3267d62..9035405be 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala @@ -54,10 +54,11 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam override def newExecuteStatementOperation( session: Session, statement: String, + confOverlay: Map[String, String], runAsync: Boolean, queryTimeout: Long): Operation = { val operation = - new ExecuteStatement(session, statement, runAsync, getQueryTimeout(queryTimeout)) + new ExecuteStatement(session, statement, confOverlay, runAsync, getQueryTimeout(queryTimeout)) addOperation(operation) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceTimeMetric.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceTimeMetric.scala index 0931bc260..dd26c3d39 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceTimeMetric.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceTimeMetric.scala @@ -55,10 +55,11 @@ trait BackendServiceTimeMetric extends BackendService { abstract override def executeStatement( sessionHandle: SessionHandle, statement: String, + confOverlay: Map[String, String], runAsync: Boolean, queryTimeout: Long): OperationHandle = { MetricsSystem.timerTracing(MetricsConstants.BS_EXECUTE_STATEMENT) { - super.executeStatement(sessionHandle, statement, runAsync, queryTimeout) + super.executeStatement(sessionHandle, statement, confOverlay, runAsync, queryTimeout) } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala index 5d131bda4..158cf2bfc 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala @@ -159,6 +159,7 @@ private[v1] class SessionsResource extends ApiRequestContext { fe.be.executeStatement( parseSessionHandle(sessionHandleStr), request.statement, + Map.empty, request.runAsync, request.queryTimeout) } catch { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala index 48780b5eb..fabbcedd8 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/mysql/MySQLCommandHandler.scala @@ -170,7 +170,12 @@ class MySQLCommandHandler(be: BackendService, execPool: ThreadPoolExecutor) private def beExecuteStatement(ctx: ChannelHandlerContext, sql: String): MySQLQueryResult = { try { val ssHandle = ctx.channel.attr(SESSION_HANDLE).get - val opHandle = be.executeStatement(ssHandle, sql, runAsync = false, queryTimeout = 0) + val opHandle = be.executeStatement( + ssHandle, + sql, + confOverlay = Map.empty, + runAsync = false, + queryTimeout = 0) val opStatus = be.getOperationStatus(opHandle) if (opStatus.state != FINISHED) { throw opStatus.exception diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala index a4fc45481..c04b318fb 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala @@ -20,7 +20,9 @@ package org.apache.kyuubi.operation import java.sql.SQLException import java.util.Properties -import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TGetOperationStatusReq, TOperationState, TStatusCode} +import scala.collection.JavaConverters._ + +import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TGetOperationStatusReq, TOperationState, TStatusCode} import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.kyuubi.{Utils, WithKyuubiServer} @@ -158,4 +160,25 @@ class KyuubiOperationPerConnectionSuite extends WithKyuubiServer with HiveJDBCTe assert(resultSet.getString(1).nonEmpty) } } + + test("support to specify OPERATION_LANGUAGE with confOverlay") { + withSessionHandle { (client, handle) => + val executeStmtReq = new TExecuteStatementReq() + executeStmtReq.setStatement("""spark.sql("SET kyuubi.operation.language").show(false)""") + executeStmtReq.setSessionHandle(handle) + executeStmtReq.setRunAsync(false) + executeStmtReq.setConfOverlay(Map(KyuubiConf.OPERATION_LANGUAGE.key -> "SCALA").asJava) + val executeStmtResp = client.ExecuteStatement(executeStmtReq) + assert(executeStmtResp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = new TFetchResultsReq() + tFetchResultsReq.setOperationHandle(executeStmtResp.getOperationHandle) + tFetchResultsReq.setFetchType(0) + tFetchResultsReq.setMaxRows(10) + val tFetchResultsResp = client.FetchResults(tFetchResultsReq) + val resultSet = tFetchResultsResp.getResults.getColumns.asScala + assert(resultSet.size == 1) + assert(resultSet.head.getStringVal.getValues.get(0).contains("kyuubi.operation.language")) + } + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala index e7eabbb6b..87240cb6f 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala @@ -56,7 +56,7 @@ class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper Map("testConfig" -> "testValue")) val sessionManager = fe.be.sessionManager val session = sessionManager.getSession(sessionHandle) - val op = new ExecuteStatement(session, "show tables", true, 3000) + val op = new ExecuteStatement(session, "show tables", Map.empty, true, 3000) op.setState(OperationState.RUNNING) sessionManager.operationManager.addOperation(op) val opHandleStr = s"${op.getHandle.identifier.publicId}|" + @@ -125,7 +125,7 @@ class OperationsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper val op = typ match { case OperationType.EXECUTE_STATEMENT => - fe.be.executeStatement(sessionHandle, statement, runAsync = true, 3000) + fe.be.executeStatement(sessionHandle, statement, Map.empty, runAsync = true, 3000) case OperationType.GET_CATALOGS => fe.be.getCatalogs(sessionHandle) }