add more uts

This commit is contained in:
Kent Yao 2018-10-30 17:47:55 +08:00
parent b593760f62
commit a4cb010feb
7 changed files with 120 additions and 34 deletions

View File

@ -50,7 +50,7 @@ class TSetIpAddressProcessor[I <: Iface](iface: Iface)
}
}
private def setUserName(in: TProtocol) = {
private def setUserName(in: TProtocol): Unit = {
val transport = in.getTransport
transport match {
case transport1: TSaslServerTransport =>

View File

@ -31,7 +31,7 @@ class HandleIdentifier(val publicId: UUID, val secretId: UUID) {
Option(secret).map(id => new UUID(id.getLong(), id.getLong())).getOrElse(UUID.randomUUID()))
def this(tHandleId: THandleIdentifier) =
this(tHandleId.bufferForGuid(), tHandleId.bufferForSecret())
this(ByteBuffer.wrap(tHandleId.getGuid), ByteBuffer.wrap(tHandleId.getSecret))
def getPublicId: UUID = this.publicId
def getSecretId: UUID = this.secretId
@ -80,5 +80,5 @@ class HandleIdentifier(val publicId: UUID, val secretId: UUID) {
true
}
override def toString: String = publicId.toString
override def toString: String = Option(publicId).map(_.toString).getOrElse("")
}

View File

@ -49,24 +49,24 @@ import yaooqinn.kyuubi.utils.NamedThreadFactory
private[kyuubi] class FrontendService private(name: String, beService: BackendService)
extends AbstractService(name) with TCLIService.Iface with Runnable with Logging {
private[this] var hadoopConf: Configuration = _
private[this] var authFactory: KyuubiAuthFactory = _
private var hadoopConf: Configuration = _
private var authFactory: KyuubiAuthFactory = _
private[this] val OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS)
private val OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS)
private[this] var serverEventHandler: TServerEventHandler = _
private[this] var currentServerContext: ThreadLocal[ServerContext] = _
private var serverEventHandler: TServerEventHandler = _
private var currentServerContext: ThreadLocal[ServerContext] = _
private[this] var server: Option[TServer] = None
private[this] var portNum = 0
private[this] var serverIPAddress: InetAddress = _
private[this] var serverSocket: ServerSocket = _
private var server: Option[TServer] = None
private var portNum = 0
private var serverIPAddress: InetAddress = _
private var serverSocket: ServerSocket = _
private[this] val threadPoolName = name + "-Handler-Pool"
private val threadPoolName = name + "-Handler-Pool"
private[this] var isStarted = false
private var isStarted = false
private[this] var realUser: String = _
private var realUser: String = _
def this(beService: BackendService) = {
this(classOf[FrontendService].getSimpleName, beService)
@ -126,8 +126,6 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
case e: Exception => throw new ServiceException(e.getMessage + ": " + portNum, e)
}
portNum = serverSocket.getLocalPort
// conf.set(FRONTEND_BIND_PORT, portNum.toString)
// conf.set(FRONTEND_BIND_HOST, serverIPAddress.getCanonicalHostName)
super.init(conf)
}
@ -152,11 +150,11 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
def getServerIPAddress: InetAddress = serverIPAddress
private[this] def isKerberosAuthMode = {
private def isKerberosAuthMode: Boolean = {
conf.get(KyuubiConf.AUTHENTICATION_METHOD).equalsIgnoreCase(AuthType.KERBEROS.name)
}
private[this] def getUserName(req: TOpenSessionReq) = {
private def getUserName(req: TOpenSessionReq): String = {
// Kerberos
if (isKerberosAuthMode) {
realUser = authFactory.getRemoteUser.orNull
@ -169,10 +167,15 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
realUser = req.getUsername
}
realUser = getShortName(realUser)
getProxyUser(req.getConfiguration.asScala.toMap, getIpAddress)
if (req.getConfiguration == null) {
realUser
} else {
getProxyUser(req.getConfiguration.asScala.toMap, getIpAddress)
}
}
private[this] def getShortName(userName: String): String = {
private def getShortName(userName: String): String = {
val indexOfDomainMatch = ServiceUtils.indexOfDomainMatch(userName)
if (indexOfDomainMatch <= 0) {
userName
@ -182,7 +185,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
}
@throws[KyuubiSQLException]
private[this] def getProxyUser(sessionConf: Map[String, String], ipAddress: String): String = {
private def getProxyUser(sessionConf: Map[String, String], ipAddress: String): String = {
Option(sessionConf).flatMap(_.get(KyuubiAuthFactory.HS2_PROXY_USER)) match {
case None => realUser
case Some(_) if !conf.get(FRONTEND_ALLOW_USER_SUBSTITUTION).toBoolean =>
@ -194,7 +197,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
}
}
private[this] def getIpAddress: String = {
private def getIpAddress: String = {
if (isKerberosAuthMode) {
this.authFactory.getIpAddress.orNull
} else {
@ -202,7 +205,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
}
}
private[this] def getMinVersion(versions: TProtocolVersion*): TProtocolVersion = {
private def getMinVersion(versions: TProtocolVersion*): TProtocolVersion = {
val values = TProtocolVersion.values
var current = values(values.length - 1).getValue
for (version <- versions) {
@ -219,17 +222,17 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe
}
@throws[KyuubiSQLException]
private[this] def getSessionHandle(req: TOpenSessionReq, res: TOpenSessionResp) = {
private def getSessionHandle(req: TOpenSessionReq, res: TOpenSessionResp): SessionHandle = {
val userName = getUserName(req)
val ipAddress = getIpAddress
val protocol = getMinVersion(BackendService.SERVER_VERSION, req.getClient_protocol)
val sessionHandle =
if (conf.get(FRONTEND_ENABLE_DOAS).toBoolean && (userName != null)) {
val confMap = Option(req.getConfiguration).map(_.asScala.toMap).getOrElse(Map.empty)
val sessionHandle = if (conf.get(FRONTEND_ENABLE_DOAS).toBoolean && (userName != null)) {
beService.openSessionWithImpersonation(
protocol, userName, req.getPassword, ipAddress, req.getConfiguration.asScala.toMap, null)
protocol, userName, req.getPassword, ipAddress, confMap, null)
} else {
beService.openSession(
protocol, userName, req.getPassword, ipAddress, req.getConfiguration.asScala.toMap)
protocol, userName, req.getPassword, ipAddress, confMap)
}
res.setServerProtocolVersion(protocol)
sessionHandle

View File

@ -233,7 +233,7 @@ private[kyuubi] class SessionManager private(
val sessionHandle = kyuubiSession.getSessionHandle
handleToSession.put(sessionHandle, kyuubiSession)
KyuubiServerMonitor.getListener(username).foreach {
KyuubiServerMonitor.getListener(kyuubiSession.getUserName).foreach {
_.onSessionCreated(
kyuubiSession.getIpAddress,
sessionHandle.getSessionId.toString,

View File

@ -27,7 +27,9 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState._
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
private[kyuubi] object KyuubiHadoopUtil {
import yaooqinn.kyuubi.Logging
private[kyuubi] object KyuubiHadoopUtil extends Logging {
// YarnClient is thread safe. Create once, share it across threads.
private lazy val yarnClient = {
@ -38,7 +40,11 @@ private[kyuubi] object KyuubiHadoopUtil {
}
def killYarnApp(report: ApplicationReport): Unit = {
yarnClient.killApplication(report.getApplicationId)
try {
yarnClient.killApplication(report.getApplicationId)
} catch {
case e: Exception => error("Failed to kill Application: " + report.getApplicationId, e)
}
}
def getApplications: Seq[ApplicationReport] = {

View File

@ -17,6 +17,7 @@
package yaooqinn.kyuubi.cli
import java.nio.ByteBuffer
import java.util.UUID
import org.apache.hive.service.cli.thrift.THandleIdentifier
@ -46,7 +47,7 @@ class HandleIdentifierSuite extends SparkFunSuite {
val handleId3 = new HandleIdentifier(handleId1.toTHandleIdentifier)
assert(handleId1 === handleId3)
val tHandleId = new THandleIdentifier()
val tHandleId = new THandleIdentifier(ByteBuffer.allocate(16), ByteBuffer.allocate(16))
val handleId4 = new HandleIdentifier(tHandleId)
assert(handleId1 !== handleId4)

View File

@ -19,15 +19,18 @@ package yaooqinn.kyuubi.server
import java.net.InetAddress
import scala.collection.JavaConverters._
import org.apache.hive.service.cli.thrift._
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.apache.spark.KyuubiConf._
import org.scalatest.Matchers
import yaooqinn.kyuubi.SecuredFunSuite
import yaooqinn.kyuubi.service.{ServiceException, State}
import yaooqinn.kyuubi.session.SessionHandle
class FrontendServiceSuite extends SparkFunSuite with Matchers {
class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSuite {
private val beService = new BackendService()
private val sessionHandle = new SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8)
@ -42,6 +45,16 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers {
conf.remove(KyuubiSparkUtil.CATALOG_IMPL)
conf.setMaster("local").set(FRONTEND_BIND_PORT.key, "0")
override def beforeAll(): Unit = {
beService.init(conf)
beService.start()
super.beforeAll()
}
override def afterAll(): Unit = {
beService.stop()
super.afterAll()
}
test(" test new fe service") {
val feService = new FrontendService(beService)
feService.getConf should be(null)
@ -177,4 +190,67 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers {
handler.processContext(context, null, null)
handler.deleteContext(context, null, null)
}
test("open session, execute sql and get results") {
val feService = new FrontendService(beService)
try {
feService.init(conf)
feService.start()
val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
val resp = feService.OpenSession(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val handle = resp.getSessionHandle
val req2 = new TExecuteStatementReq(handle, "show databases")
val resp2 = feService.ExecuteStatement(req2)
resp2.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val req3 = new TGetOperationStatusReq(resp2.getOperationHandle)
val resp3 = feService.GetOperationStatus(req3)
resp3.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
Thread.sleep(10000)
val req4 = new TFetchResultsReq(resp2.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
val resp4 = feService.FetchResults(req4)
resp4.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
resp4.getResults.getRows.get(0).getColVals.get(0).getStringVal.getValue should be("default")
assert(!resp4.getResults.isSetColumns)
val req5 = new TGetResultSetMetadataReq(resp2.getOperationHandle)
val resp5 = feService.GetResultSetMetadata(req5)
resp5.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
resp5.getSchema.getColumns.get(0).getColumnName should be("databaseName")
val req7 = new TCancelOperationReq(resp2.getOperationHandle)
val resp7 = feService.CancelOperation(req7)
resp7.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val req6 = new TCloseOperationReq(resp2.getOperationHandle)
val resp6 = feService.CloseOperation(req6)
resp6.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val req9 = new TCancelOperationReq(resp2.getOperationHandle)
val resp9 = feService.CancelOperation(req9)
resp9.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
val req8 = new TCloseSessionReq(handle)
val resp8 = feService.CloseSession(req8)
resp8.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
// after session closed
val resp10 = feService.CloseSession(req8)
resp10.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
resp10.getStatus.getErrorMessage should include("does not exist!")
val resp11 = feService.ExecuteStatement(req2)
resp11.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
feService.GetOperationStatus(req3).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
feService.FetchResults(req4).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
feService.GetResultSetMetadata(req5)
.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
feService.CancelOperation(req7).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
feService.CloseOperation(req6).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS)
val tOpenSessionReq = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
tOpenSessionReq.setUsername("yaooqinn")
tOpenSessionReq.setPassword("passwd")
tOpenSessionReq.setConfiguration(
Map("hive.server2.proxy.user" -> "kent").asJava)
val tOpenSessionResp = feService.OpenSession(tOpenSessionReq)
tOpenSessionResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
} finally {
feService.stop()
}
}
}