diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTHttpFrontendService.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTHttpFrontendService.scala index 0fe24475a..b99f50310 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTHttpFrontendService.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiTHttpFrontendService.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.server import java.net.ServerSocket +import java.util.Base64 import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit} import javax.security.sasl.AuthenticationException import javax.servlet.{ServletContextEvent, ServletContextListener} @@ -35,15 +36,19 @@ import org.eclipse.jetty.util.security.Constraint import org.eclipse.jetty.util.ssl.SslContextFactory import org.eclipse.jetty.util.thread.ExecutorThreadPool -import org.apache.kyuubi.KyuubiException +import org.apache.kyuubi.{KyuubiException, KyuubiSQLException} +import org.apache.kyuubi.cli.Handle import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID, KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_SECRET, KYUUBI_SESSION_ENGINE_LAUNCH_SUPPORT_RESULT} import org.apache.kyuubi.metrics.MetricsConstants.{THRIFT_HTTP_CONN_FAIL, THRIFT_HTTP_CONN_OPEN, THRIFT_HTTP_CONN_TOTAL} import org.apache.kyuubi.metrics.MetricsSystem import org.apache.kyuubi.server.http.ThriftHttpServlet import org.apache.kyuubi.server.http.authentication.AuthenticationFilter import org.apache.kyuubi.service.{Serverable, Service, ServiceUtils, TFrontendService} -import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TCLIService, TOpenSessionReq} +import org.apache.kyuubi.service.TFrontendService.{CURRENT_SERVER_CONTEXT, OK_STATUS} +import org.apache.kyuubi.session.KyuubiSessionImpl +import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TCLIService, TOpenSessionReq, TOpenSessionResp} import org.apache.kyuubi.shaded.thrift.protocol.TBinaryProtocol import org.apache.kyuubi.util.NamedThreadFactory @@ -67,6 +72,8 @@ final class KyuubiTHttpFrontendService( override protected def hadoopConf: Configuration = KyuubiServer.getHadoopConf() + private lazy val defaultFetchSize = conf.get(KYUUBI_SERVER_THRIFT_RESULTSET_DEFAULT_FETCH_SIZE) + /** * Configure Jetty to serve http requests. Example of a client connection URL: * http://localhost:10000/servlets/thrifths2/ A gateway may cause actual target @@ -221,6 +228,45 @@ final class KyuubiTHttpFrontendService( super.initialize(conf) } + /** Same as KyuubiTBinaryFrontendService, to return launch engine op handle. */ + override def OpenSession(req: TOpenSessionReq): TOpenSessionResp = { + debug(req.toString) + info("Client protocol version: " + req.getClient_protocol) + val resp = new TOpenSessionResp + try { + val sessionHandle = getSessionHandle(req, resp) + + val respConfiguration = new java.util.HashMap[String, String]() + val launchEngineOp = be.sessionManager.getSession(sessionHandle) + .asInstanceOf[KyuubiSessionImpl].launchEngineOp + + val opHandleIdentifier = Handle.toTHandleIdentifier(launchEngineOp.getHandle.identifier) + respConfiguration.put( + KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID, + Base64.getEncoder.encodeToString(opHandleIdentifier.getGuid)) + respConfiguration.put( + KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_SECRET, + Base64.getEncoder.encodeToString(opHandleIdentifier.getSecret)) + + respConfiguration.put(KYUUBI_SESSION_ENGINE_LAUNCH_SUPPORT_RESULT, true.toString) + + // HIVE-23005(4.0.0), Hive JDBC driver supposes that server always returns this conf + respConfiguration.put( + "hive.server2.thrift.resultset.default.fetch.size", + defaultFetchSize.toString) + + resp.setSessionHandle(sessionHandle.toTSessionHandle) + resp.setConfiguration(respConfiguration) + resp.setStatus(OK_STATUS) + Option(CURRENT_SERVER_CONTEXT.get()).foreach(_.setSessionHandle(sessionHandle)) + } catch { + case e: Exception => + error("Error opening session: ", e) + resp.setStatus(KyuubiSQLException.toTStatus(e, verbose = true)) + } + resp + } + override def run(): Unit = try { if (isServer()) {