From a4cb010feb18e15e7d4ed86b77d92cf6be78c1c8 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 30 Oct 2018 17:47:55 +0800 Subject: [PATCH] add more uts --- .../kyuubi/auth/TSetIpAddressProcessor.scala | 2 +- .../kyuubi/cli/HandleIdentifier.scala | 4 +- .../kyuubi/server/FrontendService.scala | 55 ++++++------- .../kyuubi/session/SessionManager.scala | 2 +- .../kyuubi/utils/KyuubiHadoopUtil.scala | 10 ++- .../kyuubi/cli/HandleIdentifierSuite.scala | 3 +- .../kyuubi/server/FrontendServiceSuite.scala | 78 ++++++++++++++++++- 7 files changed, 120 insertions(+), 34 deletions(-) diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/auth/TSetIpAddressProcessor.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/auth/TSetIpAddressProcessor.scala index 074e4b235..1f454f8b0 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/auth/TSetIpAddressProcessor.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/auth/TSetIpAddressProcessor.scala @@ -50,7 +50,7 @@ class TSetIpAddressProcessor[I <: Iface](iface: Iface) } } - private def setUserName(in: TProtocol) = { + private def setUserName(in: TProtocol): Unit = { val transport = in.getTransport transport match { case transport1: TSaslServerTransport => diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/cli/HandleIdentifier.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/cli/HandleIdentifier.scala index f496d848e..c89a2c60e 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/cli/HandleIdentifier.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/cli/HandleIdentifier.scala @@ -31,7 +31,7 @@ class HandleIdentifier(val publicId: UUID, val secretId: UUID) { Option(secret).map(id => new UUID(id.getLong(), id.getLong())).getOrElse(UUID.randomUUID())) def this(tHandleId: THandleIdentifier) = - this(tHandleId.bufferForGuid(), tHandleId.bufferForSecret()) + this(ByteBuffer.wrap(tHandleId.getGuid), ByteBuffer.wrap(tHandleId.getSecret)) def getPublicId: UUID = this.publicId def getSecretId: UUID = this.secretId @@ -80,5 +80,5 @@ class HandleIdentifier(val publicId: UUID, val secretId: UUID) { true } - override def toString: String = publicId.toString + override def toString: String = Option(publicId).map(_.toString).getOrElse("") } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/server/FrontendService.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/server/FrontendService.scala index ecc9f5a16..bed6ff601 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/server/FrontendService.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/server/FrontendService.scala @@ -49,24 +49,24 @@ import yaooqinn.kyuubi.utils.NamedThreadFactory private[kyuubi] class FrontendService private(name: String, beService: BackendService) extends AbstractService(name) with TCLIService.Iface with Runnable with Logging { - private[this] var hadoopConf: Configuration = _ - private[this] var authFactory: KyuubiAuthFactory = _ + private var hadoopConf: Configuration = _ + private var authFactory: KyuubiAuthFactory = _ - private[this] val OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS) + private val OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS) - private[this] var serverEventHandler: TServerEventHandler = _ - private[this] var currentServerContext: ThreadLocal[ServerContext] = _ + private var serverEventHandler: TServerEventHandler = _ + private var currentServerContext: ThreadLocal[ServerContext] = _ - private[this] var server: Option[TServer] = None - private[this] var portNum = 0 - private[this] var serverIPAddress: InetAddress = _ - private[this] var serverSocket: ServerSocket = _ + private var server: Option[TServer] = None + private var portNum = 0 + private var serverIPAddress: InetAddress = _ + private var serverSocket: ServerSocket = _ - private[this] val threadPoolName = name + "-Handler-Pool" + private val threadPoolName = name + "-Handler-Pool" - private[this] var isStarted = false + private var isStarted = false - private[this] var realUser: String = _ + private var realUser: String = _ def this(beService: BackendService) = { this(classOf[FrontendService].getSimpleName, beService) @@ -126,8 +126,6 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe case e: Exception => throw new ServiceException(e.getMessage + ": " + portNum, e) } portNum = serverSocket.getLocalPort -// conf.set(FRONTEND_BIND_PORT, portNum.toString) -// conf.set(FRONTEND_BIND_HOST, serverIPAddress.getCanonicalHostName) super.init(conf) } @@ -152,11 +150,11 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe def getServerIPAddress: InetAddress = serverIPAddress - private[this] def isKerberosAuthMode = { + private def isKerberosAuthMode: Boolean = { conf.get(KyuubiConf.AUTHENTICATION_METHOD).equalsIgnoreCase(AuthType.KERBEROS.name) } - private[this] def getUserName(req: TOpenSessionReq) = { + private def getUserName(req: TOpenSessionReq): String = { // Kerberos if (isKerberosAuthMode) { realUser = authFactory.getRemoteUser.orNull @@ -169,10 +167,15 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe realUser = req.getUsername } realUser = getShortName(realUser) - getProxyUser(req.getConfiguration.asScala.toMap, getIpAddress) + + if (req.getConfiguration == null) { + realUser + } else { + getProxyUser(req.getConfiguration.asScala.toMap, getIpAddress) + } } - private[this] def getShortName(userName: String): String = { + private def getShortName(userName: String): String = { val indexOfDomainMatch = ServiceUtils.indexOfDomainMatch(userName) if (indexOfDomainMatch <= 0) { userName @@ -182,7 +185,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe } @throws[KyuubiSQLException] - private[this] def getProxyUser(sessionConf: Map[String, String], ipAddress: String): String = { + private def getProxyUser(sessionConf: Map[String, String], ipAddress: String): String = { Option(sessionConf).flatMap(_.get(KyuubiAuthFactory.HS2_PROXY_USER)) match { case None => realUser case Some(_) if !conf.get(FRONTEND_ALLOW_USER_SUBSTITUTION).toBoolean => @@ -194,7 +197,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe } } - private[this] def getIpAddress: String = { + private def getIpAddress: String = { if (isKerberosAuthMode) { this.authFactory.getIpAddress.orNull } else { @@ -202,7 +205,7 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe } } - private[this] def getMinVersion(versions: TProtocolVersion*): TProtocolVersion = { + private def getMinVersion(versions: TProtocolVersion*): TProtocolVersion = { val values = TProtocolVersion.values var current = values(values.length - 1).getValue for (version <- versions) { @@ -219,17 +222,17 @@ private[kyuubi] class FrontendService private(name: String, beService: BackendSe } @throws[KyuubiSQLException] - private[this] def getSessionHandle(req: TOpenSessionReq, res: TOpenSessionResp) = { + private def getSessionHandle(req: TOpenSessionReq, res: TOpenSessionResp): SessionHandle = { val userName = getUserName(req) val ipAddress = getIpAddress val protocol = getMinVersion(BackendService.SERVER_VERSION, req.getClient_protocol) - val sessionHandle = - if (conf.get(FRONTEND_ENABLE_DOAS).toBoolean && (userName != null)) { + val confMap = Option(req.getConfiguration).map(_.asScala.toMap).getOrElse(Map.empty) + val sessionHandle = if (conf.get(FRONTEND_ENABLE_DOAS).toBoolean && (userName != null)) { beService.openSessionWithImpersonation( - protocol, userName, req.getPassword, ipAddress, req.getConfiguration.asScala.toMap, null) + protocol, userName, req.getPassword, ipAddress, confMap, null) } else { beService.openSession( - protocol, userName, req.getPassword, ipAddress, req.getConfiguration.asScala.toMap) + protocol, userName, req.getPassword, ipAddress, confMap) } res.setServerProtocolVersion(protocol) sessionHandle diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala index 811d78ac1..ac899c93f 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala @@ -233,7 +233,7 @@ private[kyuubi] class SessionManager private( val sessionHandle = kyuubiSession.getSessionHandle handleToSession.put(sessionHandle, kyuubiSession) - KyuubiServerMonitor.getListener(username).foreach { + KyuubiServerMonitor.getListener(kyuubiSession.getUserName).foreach { _.onSessionCreated( kyuubiSession.getIpAddress, sessionHandle.getSessionId.toString, diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala index 88a560486..cd363a1e2 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala @@ -27,7 +27,9 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState._ import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration -private[kyuubi] object KyuubiHadoopUtil { +import yaooqinn.kyuubi.Logging + +private[kyuubi] object KyuubiHadoopUtil extends Logging { // YarnClient is thread safe. Create once, share it across threads. private lazy val yarnClient = { @@ -38,7 +40,11 @@ private[kyuubi] object KyuubiHadoopUtil { } def killYarnApp(report: ApplicationReport): Unit = { - yarnClient.killApplication(report.getApplicationId) + try { + yarnClient.killApplication(report.getApplicationId) + } catch { + case e: Exception => error("Failed to kill Application: " + report.getApplicationId, e) + } } def getApplications: Seq[ApplicationReport] = { diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/cli/HandleIdentifierSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/cli/HandleIdentifierSuite.scala index 673d775ef..0cd733b7d 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/cli/HandleIdentifierSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/cli/HandleIdentifierSuite.scala @@ -17,6 +17,7 @@ package yaooqinn.kyuubi.cli +import java.nio.ByteBuffer import java.util.UUID import org.apache.hive.service.cli.thrift.THandleIdentifier @@ -46,7 +47,7 @@ class HandleIdentifierSuite extends SparkFunSuite { val handleId3 = new HandleIdentifier(handleId1.toTHandleIdentifier) assert(handleId1 === handleId3) - val tHandleId = new THandleIdentifier() + val tHandleId = new THandleIdentifier(ByteBuffer.allocate(16), ByteBuffer.allocate(16)) val handleId4 = new HandleIdentifier(tHandleId) assert(handleId1 !== handleId4) diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala index b29ddc8c6..b29b2ca55 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala @@ -19,15 +19,18 @@ package yaooqinn.kyuubi.server import java.net.InetAddress +import scala.collection.JavaConverters._ + import org.apache.hive.service.cli.thrift._ import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite} import org.apache.spark.KyuubiConf._ import org.scalatest.Matchers +import yaooqinn.kyuubi.SecuredFunSuite import yaooqinn.kyuubi.service.{ServiceException, State} import yaooqinn.kyuubi.session.SessionHandle -class FrontendServiceSuite extends SparkFunSuite with Matchers { +class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSuite { private val beService = new BackendService() private val sessionHandle = new SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8) @@ -42,6 +45,16 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers { conf.remove(KyuubiSparkUtil.CATALOG_IMPL) conf.setMaster("local").set(FRONTEND_BIND_PORT.key, "0") + override def beforeAll(): Unit = { + beService.init(conf) + beService.start() + super.beforeAll() + } + override def afterAll(): Unit = { + beService.stop() + super.afterAll() + } + test(" test new fe service") { val feService = new FrontendService(beService) feService.getConf should be(null) @@ -177,4 +190,67 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers { handler.processContext(context, null, null) handler.deleteContext(context, null, null) } + + test("open session, execute sql and get results") { + val feService = new FrontendService(beService) + try { + feService.init(conf) + feService.start() + val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) + val resp = feService.OpenSession(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + val handle = resp.getSessionHandle + val req2 = new TExecuteStatementReq(handle, "show databases") + val resp2 = feService.ExecuteStatement(req2) + resp2.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + val req3 = new TGetOperationStatusReq(resp2.getOperationHandle) + val resp3 = feService.GetOperationStatus(req3) + resp3.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + Thread.sleep(10000) + val req4 = new TFetchResultsReq(resp2.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + val resp4 = feService.FetchResults(req4) + resp4.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + resp4.getResults.getRows.get(0).getColVals.get(0).getStringVal.getValue should be("default") + assert(!resp4.getResults.isSetColumns) + val req5 = new TGetResultSetMetadataReq(resp2.getOperationHandle) + val resp5 = feService.GetResultSetMetadata(req5) + resp5.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + resp5.getSchema.getColumns.get(0).getColumnName should be("databaseName") + val req7 = new TCancelOperationReq(resp2.getOperationHandle) + val resp7 = feService.CancelOperation(req7) + resp7.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + val req6 = new TCloseOperationReq(resp2.getOperationHandle) + val resp6 = feService.CloseOperation(req6) + resp6.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + val req9 = new TCancelOperationReq(resp2.getOperationHandle) + val resp9 = feService.CancelOperation(req9) + resp9.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) + val req8 = new TCloseSessionReq(handle) + val resp8 = feService.CloseSession(req8) + resp8.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + // after session closed + val resp10 = feService.CloseSession(req8) + resp10.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) + resp10.getStatus.getErrorMessage should include("does not exist!") + val resp11 = feService.ExecuteStatement(req2) + resp11.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) + feService.GetOperationStatus(req3).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) + feService.FetchResults(req4).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) + feService.GetResultSetMetadata(req5) + .getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) + feService.CancelOperation(req7).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) + feService.CloseOperation(req6).getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) + + val tOpenSessionReq = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) + tOpenSessionReq.setUsername("yaooqinn") + tOpenSessionReq.setPassword("passwd") + tOpenSessionReq.setConfiguration( + Map("hive.server2.proxy.user" -> "kent").asJava) + val tOpenSessionResp = feService.OpenSession(tOpenSessionReq) + tOpenSessionResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + } finally { + feService.stop() + } + } }