[KYUUBI #1639] Make ApiRequestContext provide KyuubiRestFrontendService directly
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Make full use of KyuubiRestFrontendService without hacking, e.g. for the swagger UI base. We can do further improvement based on this PR. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1639 from yaooqinn/ApiRequestContext. Closes #1639 3c0c7061 [Kent Yao] Make ApiRequestContext provides KyuubiRestFrontendService directly 10506c2c [Kent Yao] Make ApiRequestContext provides KyuubiRestFrontendService directly 1482bd78 [Kent Yao] Make ApiRequestContext provides KyuubiRestFrontendService directly 01a0b023 [Kent Yao] Make ApiRequestContext provides KyuubiRestFrontendService directly d3119fa4 [Kent Yao] Make ApiRequestContext provides KyuubiRestFrontendService directly 0f45258c [Kent Yao] Make ApiRequestContext provides KyuubiRestFrontendService directly Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
parent
cb8721fef1
commit
c4fd287c8b
@ -26,7 +26,7 @@ import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler
|
||||
import org.apache.kyuubi.{KyuubiException, Logging, Utils}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_REST_BIND_HOST, FRONTEND_REST_BIND_PORT}
|
||||
import org.apache.kyuubi.server.api.ApiUtils
|
||||
import org.apache.kyuubi.server.api.v1.ApiRootResource
|
||||
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service}
|
||||
|
||||
/**
|
||||
@ -56,7 +56,7 @@ class KyuubiRestFrontendService(override val serverable: Serverable)
|
||||
errorHandler.setServer(jettyServer)
|
||||
jettyServer.addBean(errorHandler)
|
||||
|
||||
jettyServer.setHandler(ApiUtils.getServletHandler(serverable.backendService))
|
||||
jettyServer.setHandler(ApiRootResource.getServletHandler(this))
|
||||
|
||||
connector = new ServerConnector(
|
||||
jettyServer,
|
||||
|
||||
@ -22,11 +22,8 @@ import javax.servlet.http.HttpServletRequest
|
||||
import javax.ws.rs.core.Context
|
||||
|
||||
import org.eclipse.jetty.server.handler.ContextHandler
|
||||
import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder}
|
||||
import org.glassfish.jersey.server.ResourceConfig
|
||||
import org.glassfish.jersey.servlet.ServletContainer
|
||||
|
||||
import org.apache.kyuubi.service.BackendService
|
||||
import org.apache.kyuubi.server.KyuubiRestFrontendService
|
||||
|
||||
private[api] trait ApiRequestContext {
|
||||
|
||||
@ -36,42 +33,18 @@ private[api] trait ApiRequestContext {
|
||||
@Context
|
||||
protected var httpRequest: HttpServletRequest = _
|
||||
|
||||
def backendService: BackendService = BackendServiceProvider.getBackendService(servletContext)
|
||||
|
||||
final protected def fe: KyuubiRestFrontendService = FrontendServiceContext.get(servletContext)
|
||||
}
|
||||
|
||||
private[api] object BackendServiceProvider {
|
||||
private[api] object FrontendServiceContext {
|
||||
|
||||
private val attribute = getClass.getCanonicalName
|
||||
|
||||
def setBackendService(contextHandler: ContextHandler, be: BackendService): Unit = {
|
||||
contextHandler.setAttribute(attribute, be)
|
||||
def set(contextHandler: ContextHandler, fe: KyuubiRestFrontendService): Unit = {
|
||||
contextHandler.setAttribute(attribute, fe)
|
||||
}
|
||||
|
||||
def getBackendService(context: ServletContext): BackendService = {
|
||||
context.getAttribute(attribute).asInstanceOf[BackendService]
|
||||
}
|
||||
}
|
||||
|
||||
private[server] object ApiUtils {
|
||||
|
||||
def getServletHandler(backendService: BackendService): ServletContextHandler = {
|
||||
val openapiConf: ResourceConfig = new OpenAPIConfig
|
||||
val servlet = new ServletHolder(new ServletContainer(openapiConf))
|
||||
val handler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
|
||||
BackendServiceProvider.setBackendService(handler, backendService)
|
||||
handler.addServlet(servlet, "/*")
|
||||
|
||||
// install swagger-ui, these static files are copied from
|
||||
// https://github.com/swagger-api/swagger-ui/tree/master/dist
|
||||
val swaggerUI = new ServletHolder("swagger-ui", classOf[DefaultServlet])
|
||||
swaggerUI.setInitParameter(
|
||||
"resourceBase",
|
||||
getClass.getClassLoader()
|
||||
.getResource("META-INF/resources/webjars/swagger-ui/4.1.3/")
|
||||
.toExternalForm)
|
||||
swaggerUI.setInitParameter("pathInfoOnly", "true")
|
||||
handler.addServlet(swaggerUI, "/swagger-ui-redirected/*");
|
||||
handler
|
||||
def get(context: ServletContext): KyuubiRestFrontendService = {
|
||||
context.getAttribute(attribute).asInstanceOf[KyuubiRestFrontendService]
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,9 +22,12 @@ import javax.ws.rs.{GET, Path, Produces}
|
||||
import javax.ws.rs.core.{MediaType, Response}
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder}
|
||||
import org.glassfish.jersey.server.ResourceConfig
|
||||
import org.glassfish.jersey.servlet.ServletContainer
|
||||
|
||||
import org.apache.kyuubi.server.{KyuubiRestFrontendService, KyuubiServer}
|
||||
import org.apache.kyuubi.server.api.ApiRequestContext
|
||||
import org.apache.kyuubi.server.KyuubiRestFrontendService
|
||||
import org.apache.kyuubi.server.api.{ApiRequestContext, FrontendServiceContext, OpenAPIConfig}
|
||||
|
||||
@Path("/api/v1")
|
||||
private[v1] class ApiRootResource extends ApiRequestContext {
|
||||
@ -52,15 +55,33 @@ private[v1] class ApiRootResource extends ApiRequestContext {
|
||||
@GET
|
||||
@Path("swagger-ui")
|
||||
@Produces(Array(MediaType.TEXT_HTML))
|
||||
def swaggerUi(): Response = {
|
||||
val restServiceOpt = KyuubiServer.kyuubiServer
|
||||
.frontendServices
|
||||
.collectFirst { case rest: KyuubiRestFrontendService => rest }
|
||||
assert(restServiceOpt.isDefined)
|
||||
val serverIP = restServiceOpt.map(_.connectionUrl).get
|
||||
val swaggerUi =
|
||||
s"http://$serverIP/swagger-ui-redirected/index.html?url=http://$serverIP/openapi.json"
|
||||
Response.temporaryRedirect(new URI(swaggerUi)).build()
|
||||
def swaggerUI(): Response = {
|
||||
val swaggerUI = s"http://${fe.connectionUrl}/swagger-ui-redirected/index.html?url=" +
|
||||
s"http://${fe.connectionUrl}/openapi.json"
|
||||
Response.temporaryRedirect(new URI(swaggerUI)).build()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private[server] object ApiRootResource {
|
||||
|
||||
def getServletHandler(fe: KyuubiRestFrontendService): ServletContextHandler = {
|
||||
val openapiConf: ResourceConfig = new OpenAPIConfig
|
||||
val servlet = new ServletHolder(new ServletContainer(openapiConf))
|
||||
val handler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
|
||||
FrontendServiceContext.set(handler, fe)
|
||||
handler.addServlet(servlet, "/*")
|
||||
|
||||
// install swagger-ui, these static files are copied from
|
||||
// https://github.com/swagger-api/swagger-ui/tree/master/dist
|
||||
val swaggerUI = new ServletHolder("swagger-ui", classOf[DefaultServlet])
|
||||
swaggerUI.setInitParameter(
|
||||
"resourceBase",
|
||||
getClass.getClassLoader()
|
||||
.getResource("META-INF/resources/webjars/swagger-ui/4.1.3/")
|
||||
.toExternalForm)
|
||||
swaggerUI.setInitParameter("pathInfoOnly", "true")
|
||||
handler.addServlet(swaggerUI, "/swagger-ui-redirected/*");
|
||||
handler
|
||||
}
|
||||
}
|
||||
|
||||
@ -50,7 +50,7 @@ private[v1] class OperationsResource extends ApiRequestContext {
|
||||
@PathParam("operationHandle") operationHandleStr: String): KyuubiOperationEvent = {
|
||||
try {
|
||||
val opHandle = parseOperationHandle(operationHandleStr)
|
||||
val operation = backendService.sessionManager.operationManager.getOperation(opHandle)
|
||||
val operation = fe.be.sessionManager.operationManager.getOperation(opHandle)
|
||||
KyuubiOperationEvent(operation.asInstanceOf[KyuubiOperation])
|
||||
} catch {
|
||||
case NonFatal(_) =>
|
||||
@ -72,8 +72,8 @@ private[v1] class OperationsResource extends ApiRequestContext {
|
||||
try {
|
||||
val operationHandle = parseOperationHandle(operationHandleStr)
|
||||
request.action.toLowerCase() match {
|
||||
case "cancel" => backendService.cancelOperation(operationHandle)
|
||||
case "close" => backendService.closeOperation(operationHandle)
|
||||
case "cancel" => fe.be.cancelOperation(operationHandle)
|
||||
case "close" => fe.be.closeOperation(operationHandle)
|
||||
case _ => throw KyuubiSQLException(s"Invalid action ${request.action}")
|
||||
}
|
||||
Response.ok().build()
|
||||
@ -97,7 +97,7 @@ private[v1] class OperationsResource extends ApiRequestContext {
|
||||
try {
|
||||
val operationHandle = parseOperationHandle(operationHandleStr)
|
||||
ResultSetMetaData(
|
||||
backendService.getResultSetMetadata(operationHandle).getColumns.asScala.map(c => {
|
||||
fe.be.getResultSetMetadata(operationHandle).getColumns.asScala.map(c => {
|
||||
val tPrimitiveTypeEntry = c.getTypeDesc.getTypes.get(0).getPrimitiveEntry
|
||||
var precision = 0
|
||||
var scale = 0
|
||||
@ -134,7 +134,7 @@ private[v1] class OperationsResource extends ApiRequestContext {
|
||||
@PathParam("operationHandle") operationHandleStr: String,
|
||||
@QueryParam("maxrows") maxRows: Int): OperationLog = {
|
||||
try {
|
||||
val rowSet = backendService.sessionManager.operationManager.getOperationLogRowSet(
|
||||
val rowSet = fe.be.sessionManager.operationManager.getOperationLogRowSet(
|
||||
parseOperationHandle(operationHandleStr),
|
||||
FetchOrientation.FETCH_NEXT,
|
||||
maxRows)
|
||||
|
||||
@ -48,7 +48,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@GET
|
||||
def sessionInfoList(): SessionList = {
|
||||
SessionList(
|
||||
backendService.sessionManager.getSessionList().asScala.map {
|
||||
fe.be.sessionManager.getSessionList().asScala.map {
|
||||
case (handle, session) =>
|
||||
SessionOverview(session.user, session.ipAddress, session.createTime, handle)
|
||||
}.toSeq)
|
||||
@ -63,7 +63,8 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@Path("{sessionHandle}")
|
||||
def sessionInfo(@PathParam("sessionHandle") sessionHandleStr: String): KyuubiSessionEvent = {
|
||||
try {
|
||||
KyuubiSessionEvent(backendService.sessionManager.getSession(
|
||||
// TODO: need to use KyuubiSessionEvent in session
|
||||
KyuubiSessionEvent(fe.be.sessionManager.getSession(
|
||||
parseSessionHandle(sessionHandleStr)).asInstanceOf[AbstractSession])
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
@ -85,7 +86,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@PathParam("infoType") infoType: Int): InfoDetail = {
|
||||
try {
|
||||
val info = TGetInfoType.findByValue(infoType)
|
||||
val infoValue = backendService.getInfo(parseSessionHandle(sessionHandleStr), info)
|
||||
val infoValue = fe.be.getInfo(parseSessionHandle(sessionHandleStr), info)
|
||||
InfoDetail(info.toString, infoValue.getStringValue)
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
@ -102,7 +103,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@GET
|
||||
@Path("count")
|
||||
def sessionCount(): SessionOpenCount = {
|
||||
SessionOpenCount(backendService.sessionManager.getOpenSessionCount)
|
||||
SessionOpenCount(fe.be.sessionManager.getOpenSessionCount)
|
||||
}
|
||||
|
||||
@ApiResponse(
|
||||
@ -114,8 +115,8 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@Path("execPool/statistic")
|
||||
def execPoolStatistic(): ExecPoolStatistic = {
|
||||
ExecPoolStatistic(
|
||||
backendService.sessionManager.getExecPoolSize,
|
||||
backendService.sessionManager.getActiveCount)
|
||||
fe.be.sessionManager.getExecPoolSize,
|
||||
fe.be.sessionManager.getActiveCount)
|
||||
}
|
||||
|
||||
@ApiResponse(
|
||||
@ -126,7 +127,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@POST
|
||||
@Consumes(Array(MediaType.APPLICATION_JSON))
|
||||
def openSession(request: SessionOpenRequest): SessionHandle = {
|
||||
backendService.openSession(
|
||||
fe.be.openSession(
|
||||
TProtocolVersion.findByValue(request.protocolVersion),
|
||||
request.user,
|
||||
request.password,
|
||||
@ -142,7 +143,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@DELETE
|
||||
@Path("{sessionHandle}")
|
||||
def closeSession(@PathParam("sessionHandle") sessionHandleStr: String): Response = {
|
||||
backendService.closeSession(parseSessionHandle(sessionHandleStr))
|
||||
fe.be.closeSession(parseSessionHandle(sessionHandleStr))
|
||||
Response.ok().build()
|
||||
}
|
||||
|
||||
@ -157,7 +158,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@PathParam("sessionHandle") sessionHandleStr: String,
|
||||
request: StatementRequest): OperationHandle = {
|
||||
try {
|
||||
backendService.executeStatement(
|
||||
fe.be.executeStatement(
|
||||
parseSessionHandle(sessionHandleStr),
|
||||
request.statement,
|
||||
request.runAsync,
|
||||
@ -177,7 +178,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@Path("{sessionHandle}/operations/typeInfo")
|
||||
def getTypeInfo(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
|
||||
try {
|
||||
backendService.getTypeInfo(parseSessionHandle(sessionHandleStr))
|
||||
fe.be.getTypeInfo(parseSessionHandle(sessionHandleStr))
|
||||
} catch {
|
||||
case NonFatal(_) =>
|
||||
throw new NotFoundException(s"Error getting type information")
|
||||
@ -193,7 +194,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@Path("{sessionHandle}/operations/catalogs")
|
||||
def getCatalogs(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
|
||||
try {
|
||||
backendService.getCatalogs(parseSessionHandle(sessionHandleStr))
|
||||
fe.be.getCatalogs(parseSessionHandle(sessionHandleStr))
|
||||
} catch {
|
||||
case NonFatal(_) =>
|
||||
throw new NotFoundException(s"Error getting catalogs")
|
||||
@ -212,7 +213,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
request: GetSchemasRequest): OperationHandle = {
|
||||
try {
|
||||
val sessionHandle = parseSessionHandle(sessionHandleStr)
|
||||
val operationHandle = backendService.getSchemas(
|
||||
val operationHandle = fe.be.getSchemas(
|
||||
sessionHandle,
|
||||
request.catalogName,
|
||||
request.schemaName)
|
||||
@ -234,7 +235,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@PathParam("sessionHandle") sessionHandleStr: String,
|
||||
request: GetTablesRequest): OperationHandle = {
|
||||
try {
|
||||
backendService.getTables(
|
||||
fe.be.getTables(
|
||||
parseSessionHandle(sessionHandleStr),
|
||||
request.catalogName,
|
||||
request.schemaName,
|
||||
@ -255,7 +256,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@Path("{sessionHandle}/operations/tableTypes")
|
||||
def getTableTypes(@PathParam("sessionHandle") sessionHandleStr: String): OperationHandle = {
|
||||
try {
|
||||
backendService.getTableTypes(parseSessionHandle(sessionHandleStr))
|
||||
fe.be.getTableTypes(parseSessionHandle(sessionHandleStr))
|
||||
} catch {
|
||||
case NonFatal(_) =>
|
||||
throw new NotFoundException(s"Error getting table types")
|
||||
@ -273,7 +274,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@PathParam("sessionHandle") sessionHandleStr: String,
|
||||
request: GetColumnsRequest): OperationHandle = {
|
||||
try {
|
||||
backendService.getColumns(
|
||||
fe.be.getColumns(
|
||||
parseSessionHandle(sessionHandleStr),
|
||||
request.catalogName,
|
||||
request.schemaName,
|
||||
@ -296,7 +297,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
@PathParam("sessionHandle") sessionHandleStr: String,
|
||||
request: GetFunctionsRequest): OperationHandle = {
|
||||
try {
|
||||
backendService.getFunctions(
|
||||
fe.be.getFunctions(
|
||||
parseSessionHandle(sessionHandleStr),
|
||||
request.catalogName,
|
||||
request.schemaName,
|
||||
@ -320,7 +321,7 @@ private[v1] class SessionsResource extends ApiRequestContext {
|
||||
|
||||
try {
|
||||
val operationHandle = parseOperationHandle(operationHandleStr)
|
||||
backendService.sessionManager.getSession(parseSessionHandle(sessionHandleStr))
|
||||
fe.be.sessionManager.getSession(parseSessionHandle(sessionHandleStr))
|
||||
.closeOperation(operationHandle)
|
||||
operationHandle
|
||||
} catch {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user