naming for kyuubi server

This commit is contained in:
Kent Yao 2018-01-10 17:59:08 +08:00
parent 2024477419
commit 6910b57f79
18 changed files with 278 additions and 164 deletions

View File

@ -25,11 +25,10 @@ import scala.xml.Node
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.ui.UIUtils._
import yaooqinn.kyuubi.monitor.{ExecutionInfo, ExecutionState, SessionInfo}
import yaooqinn.kyuubi.ui.{ExecutionInfo, ExecutionState, SessionInfo}
/** Page for Spark Web UI that shows statistics of the thrift server */
class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("") {
/** Page for Spark Web UI that shows statistics of the kyuubi server */
class KyuubiServerPage(parent: KyuubiServerTab) extends WebUIPage("") {
private val listener = parent.listener
private val startTime = Calendar.getInstance().getTime()
@ -48,10 +47,10 @@ class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("") {
generateSessionStatsTable() ++
generateSQLStatsTable()
}
UIUtils.headerSparkPage("JDBC/ODBC Server", content, parent, Some(5000))
UIUtils.headerSparkPage("Kyuubi Server", content, parent, Some(5000))
}
/** Generate basic stats of the thrift server program */
/** Generate basic stats of the kyuubi server program */
private def generateBasicStats(): Seq[Node] = {
val timeSinceStart = System.currentTimeMillis() - startTime.getTime
<ul class ="unstyled">
@ -64,7 +63,7 @@ class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("") {
</ul>
}
/** Generate stats of batch statements of the thrift server program */
/** Generate stats of batch statements of the kyuubi server program */
private def generateSQLStatsTable(): Seq[Node] = {
val numStatement = listener.getExecutionList.size
val table = if (numStatement > 0) {
@ -135,7 +134,7 @@ class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("") {
<td>{errorSummary}{details}</td>
}
/** Generate stats of batch sessions of the thrift server program */
/** Generate stats of batch sessions of the kyuubi server program */
private def generateSessionStatsTable(): Seq[Node] = {
val sessionList = listener.getSessionList
val numBatches = sessionList.size

View File

@ -25,10 +25,10 @@ import scala.xml.Node
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.ui.UIUtils._
import yaooqinn.kyuubi.monitor.{ExecutionInfo, ExecutionState}
import yaooqinn.kyuubi.ui.{ExecutionInfo, ExecutionState}
/** Page for Spark Web UI that shows statistics of jobs running in the thrift server */
class ThriftServerSessionPage(parent: ThriftServerTab) extends WebUIPage("session") {
class KyuubiServerSessionPage(parent: KyuubiServerTab) extends WebUIPage("session") {
private val listener = parent.listener
private val startTime = Calendar.getInstance().getTime
@ -54,10 +54,10 @@ class ThriftServerSessionPage(parent: ThriftServerTab) extends WebUIPage("sessio
</h4> ++
generateSQLStatsTable(sessionStat.sessionId)
}
UIUtils.headerSparkPage("JDBC/ODBC Session", content, parent, Some(5000))
UIUtils.headerSparkPage("Kyuubi Session", content, parent, Some(5000))
}
/** Generate basic stats of the thrift server program */
/** Generate basic stats of the kyuubi server program */
private def generateBasicStats(): Seq[Node] = {
val timeSinceStart = System.currentTimeMillis() - startTime.getTime
<ul class ="unstyled">
@ -70,7 +70,7 @@ class ThriftServerSessionPage(parent: ThriftServerTab) extends WebUIPage("sessio
</ul>
}
/** Generate stats of batch statements of the thrift server program */
/** Generate stats of batch statements of the kyuubi server program */
private def generateSQLStatsTable(sessionID: String): Seq[Node] = {
val executionList = listener.getExecutionList
.filter(_.sessionId == sessionID)

View File

@ -18,26 +18,26 @@
package org.apache.spark.ui
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.ui.ThriftServerTab._
import org.apache.spark.ui.KyuubiServerTab._
import yaooqinn.kyuubi.monitor.ThriftServerMonitor
import yaooqinn.kyuubi.ui.KyuubiServerMonitor
/**
* Spark Web UI tab that shows statistics of jobs running in the thrift server.
* This assumes the given SparkContext has enabled its SparkUI.
*/
class ThriftServerTab(userName: String, sparkContext: SparkContext)
class KyuubiServerTab(userName: String, sparkContext: SparkContext)
extends SparkUITab(getSparkUI(sparkContext), "sqlserver") {
override val name = "JDBC/ODBC Server"
override val name = "Kyuubi Server"
val parent = getSparkUI(sparkContext)
// ThriftServerTab renders by different listener's content, identified by user.
val listener = ThriftServerMonitor.getListener(userName)
// KyuubiServerTab renders by different listener's content, identified by user.
val listener = KyuubiServerMonitor.getListener(userName)
attachPage(new ThriftServerPage(this))
attachPage(new ThriftServerSessionPage(this))
attachPage(new KyuubiServerPage(this))
attachPage(new KyuubiServerSessionPage(this))
parent.attachTab(this)
def detach() {
@ -45,7 +45,7 @@ class ThriftServerTab(userName: String, sparkContext: SparkContext)
}
}
object ThriftServerTab {
object KyuubiServerTab {
def getSparkUI(sparkContext: SparkContext): SparkUI = {
sparkContext.ui.getOrElse {
throw new SparkException("Parent SparkUI to attach this tab to not found!")

View File

@ -36,7 +36,7 @@ import org.apache.zookeeper._
import org.apache.zookeeper.data.ACL
import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.server.{KyuubiClientCLIService, KyuubiServer}
import yaooqinn.kyuubi.server.{FrontendService, KyuubiServer}
object HighAvailabilityUtils extends Logging {
@ -58,7 +58,7 @@ object HighAvailabilityUtils extends Logging {
val conf = kyuubiServer.getConf
val zooKeeperEnsemble = getQuorumServers(conf)
val rootNamespace = conf.get(KyuubiConf.KYUUBI_ZOOKEEPER_NAMESPACE.key, "kyuubiserver")
val instanceURI = getServerInstanceURI(kyuubiServer.clientCLIService)
val instanceURI = getServerInstanceURI(kyuubiServer.feService)
setUpZooKeeperAuth(conf)
@ -145,7 +145,7 @@ object HighAvailabilityUtils extends Logging {
quorum.stripSuffix(",")
}
private class DeRegisterWatcher(kyuubiThriftServer: KyuubiServer) extends Watcher {
private class DeRegisterWatcher(kyuubiServer: KyuubiServer) extends Watcher {
override def process(event: WatchedEvent): Unit = {
if (event.getType == Watcher.Event.EventType.NodeDeleted) {
if (znode != null) {
@ -159,11 +159,11 @@ object HighAvailabilityUtils extends Logging {
} finally {
setDeregisteredWithZooKeeper(true)
// If there are no more active client sessions, stop the server
if (kyuubiThriftServer.cliService.getSessionManager.getOpenSessionCount == 0) {
if (kyuubiServer.beService.getSessionManager.getOpenSessionCount == 0) {
warn("This Kyuubi instance has been removed from the list of " +
"server instances available for dynamic service discovery. The last client " +
"session has ended - will shutdown now.")
kyuubiThriftServer.stop()
kyuubiServer.stop()
}
}
}
@ -194,11 +194,11 @@ object HighAvailabilityUtils extends Logging {
}
@throws[Exception]
private def getServerInstanceURI(clientCLIService: KyuubiClientCLIService): String = {
if ((clientCLIService == null) || (clientCLIService.getServerIPAddress == null)) {
private def getServerInstanceURI(service: FrontendService): String = {
if ((service == null) || (service.getServerIPAddress == null)) {
throw new Exception("Unable to get the server address; it hasn't been initialized yet.")
}
clientCLIService.getServerIPAddress.getHostName + ":" + clientCLIService.getPortNumber
service.getServerIPAddress.getHostName + ":" + service.getPortNumber
}
/**

View File

@ -37,7 +37,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSQLUtils}
import org.apache.spark.sql.types._
import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.monitor.ThriftServerMonitor
import yaooqinn.kyuubi.ui.KyuubiServerMonitor
import yaooqinn.kyuubi.session.KyuubiSession
class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extends Logging {
@ -356,7 +356,7 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
info(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
ThriftServerMonitor.getListener(parentSession.getUserName).onStatementStart(
KyuubiServerMonitor.getListener(parentSession.getUserName).onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
statement,
@ -365,7 +365,7 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
parentSession.sparkSession().sparkContext.setJobGroup(statementId, statement)
try {
result = parentSession.sparkSession().sql(statement)
ThriftServerMonitor.getListener(parentSession.getUserName)
KyuubiServerMonitor.getListener(parentSession.getUserName)
.onStatementParsed(statementId, result.queryExecution.toString())
debug(result.queryExecution.toString())
iter = result.collect().iterator
@ -377,7 +377,7 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
return
} else {
setState(OperationState.ERROR)
ThriftServerMonitor.getListener(parentSession.getUserName).onStatementError(
KyuubiServerMonitor.getListener(parentSession.getUserName).onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
}
@ -391,7 +391,7 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
return
} else {
setState(OperationState.ERROR)
ThriftServerMonitor.getListener(parentSession.getUserName).onStatementError(
KyuubiServerMonitor.getListener(parentSession.getUserName).onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e.toString)
}
@ -401,7 +401,7 @@ class KyuubiSQLOperation(parentSession: KyuubiSession, statement: String) extend
}
}
setState(OperationState.FINISHED)
ThriftServerMonitor.getListener(parentSession.getUserName).onStatementFinish(statementId)
KyuubiServerMonitor.getListener(parentSession.getUserName).onStatementFinish(statementId)
}
private def cleanup(state: OperationState) {

View File

@ -19,31 +19,28 @@ package yaooqinn.kyuubi.server
import java.util.{List => JList, Map => JMap}
import org.apache.hive.service.auth.HiveAuthFactory
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkConf
import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.operation.KyuubiSQLOperation
import yaooqinn.kyuubi.service.CompositeService
import yaooqinn.kyuubi.session.KyuubiSessionManager
private[server] class KyuubiServerCLIService private(
name: String,
private val thriftServer: KyuubiServer)
private[server] class BackendService private(name: String, server: KyuubiServer)
extends CompositeService(name) with ICLIService with Logging {
private[this] var sessionManager: KyuubiSessionManager = _
def getSessionManager: KyuubiSessionManager = sessionManager
def this(thriftServer: KyuubiServer) = {
this(classOf[KyuubiServerCLIService].getSimpleName, thriftServer)
this(classOf[BackendService].getSimpleName, thriftServer)
}
override def init(conf: SparkConf): Unit = synchronized {
this.conf = conf
sessionManager = new KyuubiSessionManager(thriftServer)
sessionManager = new KyuubiSessionManager(server)
addService(sessionManager)
super.init(conf)
}
@ -169,6 +166,6 @@ private[server] class KyuubiServerCLIService private(
}
}
object KyuubiServerCLIService {
object BackendService {
final val SERVER_VERSION = TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8
}

View File

@ -40,11 +40,11 @@ import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.service.{AbstractService, ServiceException, ServiceUtils}
/**
* KyuubiClientCLIService keeps compatible with all kinds of Hive JDBC/ Thrift Client Connections
* FrontendService keeps compatible with all kinds of Hive JDBC/Thrift Client Connections
*
* It use Hive configurations to configure itself.
*/
class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLIService)
private[kyuubi] class FrontendService private(name: String, beService: BackendService)
extends AbstractService(name) with TCLIService.Iface with Runnable with Logging {
private[this] var hiveConf: HiveConf = _
@ -69,7 +69,13 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
private[this] var realUser: String = _
class ThriftClientCLIServerContext extends ServerContext {
def this(cliService: BackendService) = {
this(classOf[FrontendService].getSimpleName, cliService)
currentServerContext = new ThreadLocal[ServerContext]()
serverEventHandler = new FeTServerEventHandler
}
class FeServiceServerContext extends ServerContext {
private var sessionHandle: SessionHandle = _
def setSessionHandle(sessionHandle: SessionHandle): Unit = {
@ -79,23 +85,17 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
def getSessionHandle: SessionHandle = sessionHandle
}
def this(cliService: KyuubiServerCLIService) = {
this(classOf[KyuubiClientCLIService].getSimpleName, cliService)
currentServerContext = new ThreadLocal[ServerContext]()
serverEventHandler = new ClientTServerEventHandler
}
class ClientTServerEventHandler extends TServerEventHandler {
class FeTServerEventHandler extends TServerEventHandler {
override def deleteContext(
serverContext: ServerContext, tProtocol: TProtocol, tProtocol1: TProtocol): Unit = {
Option(serverContext.asInstanceOf[ThriftClientCLIServerContext]
.getSessionHandle).foreach { sessionHandle =>
warn(s"Session [$sessionHandle] disconnected without closing properly, " +
Option(serverContext.asInstanceOf[FeServiceServerContext].getSessionHandle)
.foreach { sessionHandle =>
warn(s"Session [$sessionHandle] disconnected without closing properly, " +
s"close it now")
Try {cliService.closeSession(sessionHandle)} match {
case Failure(exception) =>
Try {beService.closeSession(sessionHandle)} match {
case Failure(exception) =>
warn("Failed closing session " + exception, exception)
case _ =>
case _ =>
}
}
}
@ -108,7 +108,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def preServe(): Unit = ()
override def createContext(tProtocol: TProtocol, tProtocol1: TProtocol): ServerContext = {
new ThriftClientCLIServerContext()
new FeServiceServerContext()
}
}
@ -122,14 +122,11 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
try {
if (serverHost != null && !serverHost.isEmpty) {
serverIPAddress = InetAddress.getByName(serverHost)
}
else {
} else {
serverIPAddress = InetAddress.getLocalHost
}
}
catch {
case e: UnknownHostException =>
throw new ServiceException(e)
} catch {
case e: UnknownHostException => throw new ServiceException(e)
}
minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS)
maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS)
@ -150,10 +147,8 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def stop(): Unit = {
if (isStarted) {
server.foreach { s =>
s.stop()
info(this.name + " has stopped")
}
server.foreach(_.stop())
info(this.name + " has stopped")
isStarted = false
}
super.stop()
@ -168,7 +163,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
.equalsIgnoreCase(HiveAuthFactory.AuthTypes.KERBEROS.toString)
}
private def getUserName(req: TOpenSessionReq) = {
private[this] def getUserName(req: TOpenSessionReq) = {
// Kerberos
if (isKerberosAuthMode) {
realUser = hiveAuthFactory.getRemoteUser
@ -185,16 +180,12 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
}
private[this] def getShortName(userName: String): String = {
var ret: String = null
if (userName != null) {
val indexOfDomainMatch = ServiceUtils.indexOfDomainMatch(userName)
ret = if (indexOfDomainMatch <= 0) {
userName
} else {
userName.substring(0, indexOfDomainMatch)
}
val indexOfDomainMatch = ServiceUtils.indexOfDomainMatch(userName)
if (indexOfDomainMatch <= 0) {
userName
} else {
userName.substring(0, indexOfDomainMatch)
}
ret
}
@throws[HiveSQLException]
@ -248,13 +239,13 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
private[this] def getSessionHandle(req: TOpenSessionReq, res: TOpenSessionResp) = {
val userName = getUserName(req)
val ipAddress = getIpAddress
val protocol = getMinVersion(KyuubiServerCLIService.SERVER_VERSION, req.getClient_protocol)
val protocol = getMinVersion(BackendService.SERVER_VERSION, req.getClient_protocol)
val sessionHandle =
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) && (userName != null)) {
cliService.openSessionWithImpersonation(
beService.openSessionWithImpersonation(
protocol, userName, req.getPassword, ipAddress, req.getConfiguration, null)
} else {
cliService.openSession(
beService.openSession(
protocol, userName, req.getPassword, ipAddress, req.getConfiguration)
}
res.setServerProtocolVersion(protocol)
@ -270,7 +261,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
resp.setConfiguration(new JHashMap[String, String])
resp.setStatus(OK_STATUS)
val context = currentServerContext.get
.asInstanceOf[KyuubiClientCLIService#ThriftClientCLIServerContext]
.asInstanceOf[FrontendService#FeServiceServerContext]
if (context != null) {
context.setSessionHandle(sessionHandle)
}
@ -280,17 +271,16 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
resp.setStatus(HiveSQLException.toTStatus(e))
}
resp
}
override def CloseSession(req: TCloseSessionReq): TCloseSessionResp = {
val resp = new TCloseSessionResp
try {
val sessionHandle = new SessionHandle(req.getSessionHandle)
cliService.closeSession(sessionHandle)
beService.closeSession(sessionHandle)
resp.setStatus(OK_STATUS)
val context = currentServerContext.get
.asInstanceOf[ThriftClientCLIServerContext]
.asInstanceOf[FeServiceServerContext]
if (context != null) {
context.setSessionHandle(null)
}
@ -305,7 +295,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def GetInfo(req: TGetInfoReq): TGetInfoResp = {
val resp = new TGetInfoResp
try {
val getInfoValue = cliService.getInfo(
val getInfoValue = beService.getInfo(
new SessionHandle(req.getSessionHandle), GetInfoType.getGetInfoType(req.getInfoType))
resp.setInfoValue(getInfoValue.toTGetInfoValue)
resp.setStatus(OK_STATUS)
@ -325,9 +315,9 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
val confOverlay = req.getConfOverlay
val runAsync = req.isRunAsync
val operationHandle = if (runAsync) {
cliService.executeStatementAsync(sessionHandle, statement, confOverlay)
beService.executeStatementAsync(sessionHandle, statement, confOverlay)
} else {
cliService.executeStatement(sessionHandle, statement, confOverlay)
beService.executeStatement(sessionHandle, statement, confOverlay)
}
resp.setOperationHandle(operationHandle.toTOperationHandle)
resp.setStatus(OK_STATUS)
@ -342,7 +332,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def GetTypeInfo(req: TGetTypeInfoReq): TGetTypeInfoResp = {
val resp = new TGetTypeInfoResp
try {
val operationHandle = cliService.getTypeInfo(new SessionHandle(req.getSessionHandle))
val operationHandle = beService.getTypeInfo(new SessionHandle(req.getSessionHandle))
resp.setOperationHandle(operationHandle.toTOperationHandle)
resp.setStatus(OK_STATUS)
} catch {
@ -356,7 +346,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def GetCatalogs(req: TGetCatalogsReq): TGetCatalogsResp = {
val resp = new TGetCatalogsResp
try {
val opHandle = cliService.getCatalogs(new SessionHandle(req.getSessionHandle))
val opHandle = beService.getCatalogs(new SessionHandle(req.getSessionHandle))
resp.setOperationHandle(opHandle.toTOperationHandle)
resp.setStatus(OK_STATUS)
} catch {
@ -370,7 +360,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def GetSchemas(req: TGetSchemasReq): TGetSchemasResp = {
val resp = new TGetSchemasResp
try {
val opHandle = cliService.getSchemas(
val opHandle = beService.getSchemas(
new SessionHandle(req.getSessionHandle), req.getCatalogName, req.getSchemaName)
resp.setOperationHandle(opHandle.toTOperationHandle)
resp.setStatus(OK_STATUS)
@ -385,7 +375,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def GetTables(req: TGetTablesReq): TGetTablesResp = {
val resp = new TGetTablesResp
try {
val opHandle = cliService.getTables(new SessionHandle(req.getSessionHandle),
val opHandle = beService.getTables(new SessionHandle(req.getSessionHandle),
req.getCatalogName, req.getSchemaName, req.getTableName, req.getTableTypes)
resp.setOperationHandle(opHandle.toTOperationHandle)
resp.setStatus(OK_STATUS)
@ -400,7 +390,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def GetTableTypes(req: TGetTableTypesReq): TGetTableTypesResp = {
val resp = new TGetTableTypesResp
try {
val opHandle = cliService.getTableTypes(new SessionHandle(req.getSessionHandle))
val opHandle = beService.getTableTypes(new SessionHandle(req.getSessionHandle))
resp.setOperationHandle(opHandle.toTOperationHandle)
resp.setStatus(OK_STATUS)
} catch {
@ -414,7 +404,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def GetColumns(req: TGetColumnsReq): TGetColumnsResp = {
val resp = new TGetColumnsResp
try {
val opHandle = cliService.getColumns(
val opHandle = beService.getColumns(
new SessionHandle(req.getSessionHandle),
req.getCatalogName, req.getSchemaName, req.getTableName, req.getColumnName)
resp.setOperationHandle(opHandle.toTOperationHandle)
@ -430,7 +420,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def GetFunctions(req: TGetFunctionsReq): TGetFunctionsResp = {
val resp = new TGetFunctionsResp
try {
val opHandle = cliService.getFunctions(
val opHandle = beService.getFunctions(
new SessionHandle(req.getSessionHandle),
req.getCatalogName, req.getSchemaName, req.getFunctionName)
resp.setOperationHandle(opHandle.toTOperationHandle)
@ -446,7 +436,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def GetOperationStatus(req: TGetOperationStatusReq): TGetOperationStatusResp = {
val resp = new TGetOperationStatusResp
try {
val operationStatus = cliService.getOperationStatus(
val operationStatus = beService.getOperationStatus(
new OperationHandle(req.getOperationHandle))
resp.setOperationState(operationStatus.getState.toTOperationState)
val opException = operationStatus.getOperationException
@ -467,7 +457,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def CancelOperation(req: TCancelOperationReq): TCancelOperationResp = {
val resp = new TCancelOperationResp
try {
cliService.cancelOperation(new OperationHandle(req.getOperationHandle))
beService.cancelOperation(new OperationHandle(req.getOperationHandle))
resp.setStatus(OK_STATUS)
} catch {
case e: Exception =>
@ -480,7 +470,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def CloseOperation(req: TCloseOperationReq): TCloseOperationResp = {
val resp = new TCloseOperationResp
try {
cliService.closeOperation(new OperationHandle(req.getOperationHandle))
beService.closeOperation(new OperationHandle(req.getOperationHandle))
resp.setStatus(OK_STATUS)
} catch {
case e: Exception =>
@ -493,7 +483,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def GetResultSetMetadata(req: TGetResultSetMetadataReq): TGetResultSetMetadataResp = {
val resp = new TGetResultSetMetadataResp
try {
val schema = cliService.getResultSetMetadata(new OperationHandle(req.getOperationHandle))
val schema = beService.getResultSetMetadata(new OperationHandle(req.getOperationHandle))
resp.setSchema(schema.toTTableSchema)
resp.setStatus(OK_STATUS)
} catch {
@ -507,7 +497,7 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = {
val resp = new TFetchResultsResp
try {
val rowSet = cliService.fetchResults(
val rowSet = beService.fetchResults(
new OperationHandle(req.getOperationHandle),
FetchOrientation.getFetchOrientation(req.getOrientation),
req.getMaxRows,
@ -588,14 +578,13 @@ class KyuubiClientCLIService private(name: String, cliService: KyuubiServerCLISe
// TCP Server
server = Some(new TThreadPoolServer(sargs))
server.foreach(_.setServerEventHandler(serverEventHandler))
val msg = "Starting " + classOf[KyuubiClientCLIService].getSimpleName + " on port " +
val msg = "Starting " + classOf[FrontendService].getSimpleName + " on port " +
portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"
info(msg)
server.foreach(_.serve())
} catch {
case t: Throwable =>
error("Error starting KyuubiServer: could not start "
+ classOf[KyuubiClientCLIService].getSimpleName, t)
error("Error starting " + classOf[FrontendService].getSimpleName + " for KyuubiServer", t)
System.exit(-1)
}
}

View File

@ -26,14 +26,15 @@ import org.apache.spark.{SparkConf, SparkUtils}
import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.ha.HighAvailabilityUtils
import yaooqinn.kyuubi.service.CompositeService
import yaooqinn.kyuubi.utils.VersionUtils
private[kyuubi] class KyuubiServer private(name: String)
extends CompositeService(name) with Logging {
private[this] var _cliService: KyuubiServerCLIService = _
def cliService: KyuubiServerCLIService = _cliService
private[this] var _clientCLIService: KyuubiClientCLIService = _
def clientCLIService: KyuubiClientCLIService = _clientCLIService
private[this] var _beService: BackendService = _
def beService: BackendService = _beService
private[this] var _feService: FrontendService = _
def feService: FrontendService = _feService
private[this] val started = new AtomicBoolean(false)
@ -43,10 +44,10 @@ private[kyuubi] class KyuubiServer private(name: String)
override def init(conf: SparkConf): Unit = synchronized {
this.conf = conf
_cliService = new KyuubiServerCLIService(this)
_clientCLIService = new KyuubiClientCLIService(_cliService)
addService(_cliService)
addService(_clientCLIService)
_beService = new BackendService(this)
_feService = new FrontendService(_beService)
addService(_beService)
addService(_feService)
super.init(conf)
SparkUtils.addShutdownHook {
() => this.stop()
@ -104,8 +105,10 @@ object KyuubiServer extends Logging {
conf.set("spark.sql.catalogImplementation", "hive")
// When use User ClassPath First, will cause ClassNotFound exception,
// see https://github.com/apache/spark/pull/20145,
conf.set("spark.sql.hive.metastore.jars",
sys.env("SPARK_HOME") + File.separator + "jars" + File.separator + "*")
// see https://github.com/apache/spark/pull/20145
if (!VersionUtils.isSpark23OrLater()) {
conf.set("spark.sql.hive.metastore.jars",
sys.env("SPARK_HOME") + File.separator + "jars" + File.separator + "*")
}
}
}

View File

@ -31,10 +31,10 @@ import org.apache.hive.service.cli._
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ui.ThriftServerTab
import org.apache.spark.ui.KyuubiServerTab
import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.monitor.{ThriftServerListener, ThriftServerMonitor}
import yaooqinn.kyuubi.ui.{KyuubiServerListener, KyuubiServerMonitor}
import yaooqinn.kyuubi.operation.KyuubiOperationManager
import yaooqinn.kyuubi.utils.ReflectUtils
@ -99,7 +99,7 @@ class KyuubiSession(
private[this] def createSparkSession(): Unit = {
val userName = sessionUGI.getShortUserName
info(s"------ Create new SparkSession for $userName -------")
conf.setAppName(s"SparkThriftServer[$userName]")
conf.setAppName(s"Kyuubi Session 4 [$userName]")
try {
sessionUGI.doAs(new PrivilegedExceptionAction[Unit] {
override def run() = {
@ -112,10 +112,10 @@ class KyuubiSession(
sessionManager.setSparkSession(userName, _sparkSession)
// set sc fully constructed immediately
sessionManager.setSCFullyConstructed(userName)
ThriftServerMonitor.setListener(userName, new ThriftServerListener(conf))
_sparkSession.sparkContext.addSparkListener(ThriftServerMonitor.getListener(userName))
val uiTab = new ThriftServerTab(userName, _sparkSession.sparkContext)
ThriftServerMonitor.addUITab(_sparkSession.sparkContext.sparkUser, uiTab)
KyuubiServerMonitor.setListener(userName, new KyuubiServerListener(conf))
_sparkSession.sparkContext.addSparkListener(KyuubiServerMonitor.getListener(userName))
val uiTab = new KyuubiServerTab(userName, _sparkSession.sparkContext)
KyuubiServerMonitor.addUITab(_sparkSession.sparkContext.sparkUser, uiTab)
} catch {
case e: Exception =>
val hiveSQLException =

View File

@ -33,7 +33,7 @@ import org.apache.spark.{KyuubiConf, SparkConf, SparkException}
import org.apache.spark.sql.SparkSession
import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.monitor.ThriftServerMonitor
import yaooqinn.kyuubi.ui.KyuubiServerMonitor
import yaooqinn.kyuubi.operation.KyuubiOperationManager
import yaooqinn.kyuubi.server.KyuubiServer
import yaooqinn.kyuubi.service.CompositeService
@ -186,7 +186,7 @@ private[kyuubi] class KyuubiSessionManager private(
case (user, (session, times)) =>
if (times.get() <= 0 || session.sparkContext.isStopped) {
removeSparkSession(user)
ThriftServerMonitor.detachUITab(user)
KyuubiServerMonitor.detachUITab(user)
session.stop()
}
case _ =>
@ -240,7 +240,7 @@ private[kyuubi] class KyuubiSessionManager private(
handleToSession.put(sessionHandle, kyuubiSession)
handleToSessionUser.put(sessionHandle, username)
ThriftServerMonitor.getListener(username).onSessionCreated(
KyuubiServerMonitor.getListener(username).onSessionCreated(
kyuubiSession.getIpAddress,
sessionHandle.getSessionId.toString,
kyuubiSession.getUserName)
@ -250,7 +250,7 @@ private[kyuubi] class KyuubiSessionManager private(
def closeSession(sessionHandle: SessionHandle) {
val sessionUser = handleToSessionUser.remove(sessionHandle)
ThriftServerMonitor.getListener(sessionUser)
KyuubiServerMonitor.getListener(sessionUser)
.onSessionClosed(sessionHandle.getSessionId.toString)
val sessionAndTimes = userToSparkSession.get(sessionUser)
if (sessionAndTimes != null) {

View File

@ -15,26 +15,26 @@
* limitations under the License.
*/
package yaooqinn.kyuubi.monitor
package yaooqinn.kyuubi.ui
import scala.collection.mutable.ArrayBuffer
class ExecutionInfo(
val statement: String,
val sessionId: String,
val startTimestamp: Long,
val userName: String) {
var finishTimestamp: Long = 0L
var executePlan: String = ""
var detail: String = ""
var state: ExecutionState.Value = ExecutionState.STARTED
val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
var groupId: String = ""
def totalTime: Long = {
if (finishTimestamp == 0L) {
System.currentTimeMillis - startTimestamp
} else {
finishTimestamp - startTimestamp
}
val statement: String,
val sessionId: String,
val startTimestamp: Long,
val userName: User) {
var finishTimestamp: Long = 0L
var executePlan: String = ""
var detail: String = ""
var state: ExecutionState.Value = ExecutionState.STARTED
val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
var groupId: String = ""
def totalTime: Long = {
if (finishTimestamp == 0L) {
System.currentTimeMillis - startTimestamp
} else {
finishTimestamp - startTimestamp
}
}
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package yaooqinn.kyuubi.monitor
package yaooqinn.kyuubi.ui
object ExecutionState extends Enumeration {
val STARTED, COMPILED, FAILED, FINISHED = Value

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package yaooqinn.kyuubi.monitor
package yaooqinn.kyuubi.ui
import scala.collection.mutable
import scala.collection.JavaConverters._
@ -24,14 +24,15 @@ import org.apache.spark.{SparkConf, SparkUtils}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.internal.SQLConf
class ThriftServerListener(conf: SparkConf) extends SparkListener {
class KyuubiServerListener(conf: SparkConf) extends SparkListener {
protected var onlineSessionNum: Int = 0
protected val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
protected val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
protected val retainedStatements = conf.getInt(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT.key, 200)
protected val retainedSessions = conf.getInt(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT.key, 200)
protected var totalRunning = 0
private[this] var onlineSessionNum: Int = 0
private[this] val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
private[this] val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
private[this] val retainedStatements =
conf.getInt(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT.key, 200)
private[this] val retainedSessions = conf.getInt(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT.key, 200)
private[this] var totalRunning = 0
def getOnlineSessionNum: Int = synchronized { onlineSessionNum }

View File

@ -15,32 +15,33 @@
* limitations under the License.
*/
package yaooqinn.kyuubi.monitor
package yaooqinn.kyuubi.ui
import scala.collection.mutable.HashMap
import org.apache.spark.SparkException
import org.apache.spark.ui.ThriftServerTab
import org.apache.spark.ui.KyuubiServerTab
object ThriftServerMonitor {
private[this] val uiTabs = new HashMap[String, ThriftServerTab]()
object KyuubiServerMonitor {
private[this] val listeners = new HashMap[String, ThriftServerListener]()
private[this] val uiTabs = new HashMap[User, KyuubiServerTab]()
def setListener(user: String, sparkListener: ThriftServerListener): Unit = {
private[this] val listeners = new HashMap[User, KyuubiServerListener]()
def setListener(user: User, sparkListener: KyuubiServerListener): Unit = {
listeners.put(user, sparkListener)
}
def getListener(user: String): ThriftServerListener = {
def getListener(user: User): KyuubiServerListener = {
listeners.getOrElse(user, throw new SparkException(s"Listener does not init for user[$user]"))
}
def addUITab(user: String, ui: ThriftServerTab): Unit = {
def addUITab(user: User, ui: KyuubiServerTab): Unit = {
uiTabs.put(user, ui)
}
def detachUITab(user: String): Unit = {
def detachUITab(user: User): Unit = {
listeners.remove(user)
uiTabs.get(user).foreach(_.detach())
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package yaooqinn.kyuubi.monitor
package yaooqinn.kyuubi.ui
class SessionInfo(

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi
package object ui {
type User = String
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi.utils
import org.apache.spark.SPARK_VERSION
object VersionUtils {
val spark_1_6_x = """1\.6\.\d+""".r
val spark_2_0_x = """2\.0\.\d+""".r
val spark_2_1_x = """2\.1\.\d+""".r
val spark_2_2_x = """2\.2\.\d+""".r
val spark_2_3_x = """2\.3\.\d+""".r
def getSupportedSpark(version: String): Option[String] = version match {
case spark_1_6_x() | spark_2_0_x() | spark_2_1_x() | spark_2_2_x() | spark_2_3_x() =>
Some(version)
case _ => None
}
def isSupportedSpark(): Boolean = getSupportedSpark(SPARK_VERSION).nonEmpty
def isSpark23OrLater(): Boolean = SPARK_VERSION match {
case spark_2_3_x() => true
case _ => false
}
def isSpark22OrLater(): Boolean = SPARK_VERSION match {
case spark_2_2_x() | spark_2_3_x() => true
case _ => false
}
def isSpark21OrLater(): Boolean = SPARK_VERSION match {
case spark_2_1_x() | spark_2_2_x() | spark_2_3_x() => true
case _ => false
}
def isSpark20OrLater(): Boolean = SPARK_VERSION match {
case spark_2_0_x() | spark_2_1_x() | spark_2_2_x() | spark_2_3_x() => true
case _ => false
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi.utils
import org.apache.spark.SparkFunSuite
class VersionUtilsSuite extends SparkFunSuite {
test("Get Supported Spark Vesion") {
val v160 = VersionUtils.getSupportedSpark("1.6.0")
assert(v160.nonEmpty, "1.6.0 is supported")
val v161 = VersionUtils.getSupportedSpark("1.6.1")
assert(v161.nonEmpty, "1.6.1 is supported")
val v162 = VersionUtils.getSupportedSpark("1.6.1")
assert(v162.nonEmpty, "1.6.2 is supported")
val v200 = VersionUtils.getSupportedSpark("2.0.0")
assert(v200.nonEmpty, "2.0.0 is supported")
val v210 = VersionUtils.getSupportedSpark("2.1.0")
assert(v210.nonEmpty, "2.1.0 is supported")
val v220 = VersionUtils.getSupportedSpark("2.2.0")
assert(v220.nonEmpty, "2.2.0 is supported")
val v230 = VersionUtils.getSupportedSpark("2.3.0")
assert(v230.nonEmpty, "2.3.0 is supported")
val v150 = VersionUtils.getSupportedSpark("1.5.0")
assert(v150.isEmpty, "1.5.0 is not supported")
}
}