From 783fc16d3a93004b9524ca7fc29219e91af763f9 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Sat, 15 Jan 2022 21:50:02 +0800 Subject: [PATCH] [KYUUBI #1766] [TEST] Remove SessionManagerSuite to reduce duplicated tests ### _Why are the changes needed?_ SessionManagerSuite re-runs all tests in the super class, it's unnecessary. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1766 from yaooqinn/test2. Closes #1766 28250732 [Kent Yao] [TEST] Remove SessionManagerSuite to reduce duplicated tests Authored-by: Kent Yao Signed-off-by: ulysses-you --- .../kyuubi/operation/HiveJDBCTestHelper.scala | 45 +----- .../kyuubi/operation/TClientTestUtils.scala | 75 +++++++++ ...uite.scala => TFrontendServiceSuite.scala} | 149 ++++++++++-------- .../kyuubi/session/SessionManagerSuite.scala | 94 ----------- 4 files changed, 165 insertions(+), 198 deletions(-) create mode 100644 kyuubi-common/src/test/scala/org/apache/kyuubi/operation/TClientTestUtils.scala rename kyuubi-common/src/test/scala/org/apache/kyuubi/service/{ThriftFrontendServiceSuite.scala => TFrontendServiceSuite.scala} (84%) delete mode 100644 kyuubi-common/src/test/scala/org/apache/kyuubi/session/SessionManagerSuite.scala diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala index fc87f10e7..fda700e03 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/HiveJDBCTestHelper.scala @@ -19,17 +19,12 @@ package org.apache.kyuubi.operation import java.sql.ResultSet -import scala.collection.JavaConverters._ - import org.apache.hive.service.rpc.thrift._ import org.apache.hive.service.rpc.thrift.TCLIService.Iface import org.apache.hive.service.rpc.thrift.TOperationState._ -import org.apache.thrift.protocol.TBinaryProtocol -import org.apache.thrift.transport.TSocket import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.kyuubi.Utils -import org.apache.kyuubi.service.authentication.PlainSASLHelper trait HiveJDBCTestHelper extends JDBCTestHelper { @@ -81,43 +76,13 @@ trait HiveJDBCTestHelper extends JDBCTestHelper { jdbcUrl + sessionConfStr + jdbcConfStr + jdbcVarsStr } - def withThriftClient(f: TCLIService.Iface => Unit): Unit = { - val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head.split(":") - val host = hostAndPort.head - val port = hostAndPort(1).toInt - val socket = new TSocket(host, port) - val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, password, socket) - - val protocol = new TBinaryProtocol(transport) - val client = new TCLIService.Client(protocol) - transport.open() - try { - f(client) - } finally { - socket.close() - } + def withThriftClient[T](f: TCLIService.Iface => T): T = { + TClientTestUtils.withThriftClient(jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head)(f) } - def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = { - withThriftClient { client => - val req = new TOpenSessionReq() - req.setUsername(user) - req.setPassword(password) - req.setConfiguration(_sessionConfigs.asJava) - val resp = client.OpenSession(req) - val handle = resp.getSessionHandle - - try { - f(client, handle) - } finally { - val tCloseSessionReq = new TCloseSessionReq(handle) - try { - client.CloseSession(tCloseSessionReq) - } catch { - case e: Exception => error(s"Failed to close $handle", e) - } - } - } + def withSessionHandle[T](f: (TCLIService.Iface, TSessionHandle) => T): T = { + val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head + TClientTestUtils.withSessionHandle(hostAndPort, sessionConfigs)(f) } def checkGetSchemas(rs: ResultSet, dbNames: Seq[String], catalogName: String = ""): Unit = { diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/TClientTestUtils.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/TClientTestUtils.scala new file mode 100644 index 000000000..439d6c670 --- /dev/null +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/TClientTestUtils.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import scala.collection.JavaConverters._ + +import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TSessionHandle} +import org.apache.hive.service.rpc.thrift.TCLIService.Iface +import org.apache.thrift.protocol.TBinaryProtocol +import org.apache.thrift.transport.TSocket + +import org.apache.kyuubi.{Logging, Utils} +import org.apache.kyuubi.service.FrontendService +import org.apache.kyuubi.service.authentication.PlainSASLHelper + +object TClientTestUtils extends Logging { + + def withThriftClient[T](url: String)(f: Iface => T): T = { + val hostport = url.split(':') + val socket = new TSocket(hostport.head, hostport.last.toInt) + val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, "anonymous", socket) + val protocol = new TBinaryProtocol(transport) + val client = new TCLIService.Client(protocol) + transport.open() + try { + f(client) + } finally { + socket.close() + } + } + + /** + * s shall be [[TFrontendService]] + */ + def withThriftClient[T](s: FrontendService)(f: Iface => T): T = { + withThriftClient(s.connectionUrl)(f) + } + + def withSessionHandle[T](url: String, configs: Map[String, String])( + f: (TCLIService.Iface, TSessionHandle) => T): T = { + withThriftClient(url) { client => + val req = new TOpenSessionReq() + req.setUsername(Utils.currentUser) + req.setPassword("anonymous") + req.setConfiguration(configs.asJava) + val resp = client.OpenSession(req) + val handle = resp.getSessionHandle + try { + f(client, handle) + } finally { + val tCloseSessionReq = new TCloseSessionReq(handle) + try { + client.CloseSession(tCloseSessionReq) + } catch { + case e: Exception => error(s"Failed to close $handle", e) + } + } + } + } +} diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/ThriftFrontendServiceSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala similarity index 84% rename from kyuubi-common/src/test/scala/org/apache/kyuubi/service/ThriftFrontendServiceSuite.scala rename to kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala index 7f2ffeec9..4bf5dec8c 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/ThriftFrontendServiceSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala @@ -17,31 +17,35 @@ package org.apache.kyuubi.service -import java.util +import java.time.Duration import scala.collection.JavaConverters._ import org.apache.hive.service.rpc.thrift._ -import org.apache.thrift.protocol.TBinaryProtocol -import org.apache.thrift.transport.TSocket +import org.scalatest.time._ import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_BIND_HOST, FRONTEND_CONNECTION_URL_USE_HOSTNAME, FRONTEND_THRIFT_BINARY_BIND_HOST, FRONTEND_THRIFT_BINARY_BIND_PORT} -import org.apache.kyuubi.operation.{OperationHandle, OperationType} +import org.apache.kyuubi.operation.{OperationHandle, OperationType, TClientTestUtils} import org.apache.kyuubi.service.TFrontendService.{FeServiceServerContext, SERVER_VERSION} -import org.apache.kyuubi.service.authentication.PlainSASLHelper -import org.apache.kyuubi.session.SessionHandle +import org.apache.kyuubi.session.{AbstractSession, SessionHandle} -class ThriftFrontendServiceSuite extends KyuubiFunSuite { +class TFrontendServiceSuite extends KyuubiFunSuite { protected val server = new NoopTBinaryFrontendServer() protected val conf = KyuubiConf() .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) .set("kyuubi.test.server.should.fail", "false") + .set(KyuubiConf.SESSION_CHECK_INTERVAL, Duration.ofSeconds(5).toMillis) + .set(KyuubiConf.SESSION_IDLE_TIMEOUT, Duration.ofSeconds(5).toMillis) + .set(KyuubiConf.OPERATION_IDLE_TIMEOUT, Duration.ofSeconds(20).toMillis) + .set(KyuubiConf.SESSION_CONF_RESTRICT_LIST, Seq("spark.*")) + .set(KyuubiConf.SESSION_CONF_IGNORE_LIST, Seq("session.engine.*")) - val user: String = System.getProperty("user.name") - val sessionConf: util.Map[String, String] = new util.HashMap() + private def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = { + TClientTestUtils.withSessionHandle(server.frontendServices.head.connectionUrl, Map.empty)(f) + } override def beforeAll(): Unit = { server.initialize(conf) @@ -54,45 +58,6 @@ class ThriftFrontendServiceSuite extends KyuubiFunSuite { super.afterAll() } - protected def withThriftClient(f: TCLIService.Iface => Unit): Unit = { - val hostAndPort = server.frontendServices.head.connectionUrl.split(":") - val host = hostAndPort.head - val port = hostAndPort(1).toInt - val socket = new TSocket(host, port) - val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, "anonymous", socket) - - val protocol = new TBinaryProtocol(transport) - val client = new TCLIService.Client(protocol) - transport.open() - try { - f(client) - } finally { - socket.close() - } - } - - protected def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = { - withThriftClient { client => - val req = new TOpenSessionReq() - req.setUsername(user) - req.setPassword("anonymous") - req.setConfiguration(sessionConf) - val resp = client.OpenSession(req) - val handle = resp.getSessionHandle - - try { - f(client, handle) - } finally { - val tCloseSessionReq = new TCloseSessionReq(handle) - try { - client.CloseSession(tCloseSessionReq) - } catch { - case e: Exception => error(s"Failed to close $handle", e) - } - } - } - } - private def checkOperationResult( client: TCLIService.Iface, handle: TOperationHandle): Unit = { @@ -151,24 +116,26 @@ class ThriftFrontendServiceSuite extends KyuubiFunSuite { } test("open session") { - withThriftClient { client => - val req = new TOpenSessionReq() - req.setUsername(user) - req.setPassword("anonymous") - val resp = client.OpenSession(req) - val handle = resp.getSessionHandle - assert(handle != null) - assert(resp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS) + TClientTestUtils.withThriftClient(server.frontendServices.head) { + client => + val req = new TOpenSessionReq() + req.setUsername(Utils.currentUser) + req.setPassword("anonymous") + val resp = client.OpenSession(req) + val handle = resp.getSessionHandle + assert(handle != null) + assert(resp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS) - req.setConfiguration(Map("kyuubi.test.should.fail" -> "true").asJava) - val resp1 = client.OpenSession(req) - assert(resp1.getSessionHandle === null) - assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) - val cause = KyuubiSQLException.toCause(resp1.getStatus.getInfoMessages.asScala) - assert(cause.isInstanceOf[KyuubiSQLException]) - assert(cause.getMessage === "Asked to fail") + req.setConfiguration(Map("kyuubi.test.should.fail" -> "true").asJava) + val resp1 = client.OpenSession(req) + assert(resp1.getSessionHandle === null) + assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + val cause = KyuubiSQLException.toCause(resp1.getStatus.getInfoMessages.asScala) + assert(cause.isInstanceOf[KyuubiSQLException]) + assert(cause.getMessage === "Asked to fail") + + assert(resp1.getStatus.getErrorMessage === "Asked to fail") - assert(resp1.getStatus.getErrorMessage === "Asked to fail") } } @@ -513,4 +480,58 @@ class ThriftFrontendServiceSuite extends KyuubiFunSuite { "Delegation token is not supported") } } + + test("close expired operations") { + withSessionHandle { (client, handle) => + val req = new TCancelOperationReq() + val req1 = new TGetSchemasReq(handle) + val resp1 = client.GetSchemas(req1) + + val sessionManager = server.backendService.sessionManager + val session = sessionManager + .getSession(SessionHandle(handle)) + .asInstanceOf[AbstractSession] + var lastAccessTime = session.lastAccessTime + assert(sessionManager.getOpenSessionCount == 1) + assert(session.lastIdleTime > 0) + + resp1.getOperationHandle + req.setOperationHandle(resp1.getOperationHandle) + val resp2 = client.CancelOperation(req) + assert(resp2.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + assert(sessionManager.getOpenSessionCount == 1) + assert(session.lastIdleTime == 0) + assert(lastAccessTime < session.lastAccessTime) + lastAccessTime = session.lastAccessTime + + eventually(timeout(Span(60, Seconds)), interval(Span(1, Seconds))) { + assert(session.lastIdleTime > lastAccessTime) + } + + info("operation is terminated") + assert(lastAccessTime == session.lastAccessTime) + assert(sessionManager.getOpenSessionCount == 1) + + eventually(timeout(Span(60, Seconds)), interval(Span(1, Seconds))) { + assert(session.lastAccessTime > lastAccessTime) + } + assert(sessionManager.getOpenSessionCount == 0) + } + } + + test("test validate and normalize config") { + val sessionManager = server.backendService.sessionManager + // test restrict + intercept[KyuubiSQLException] { + sessionManager.validateAndNormalizeConf(Map("spark.driver.memory" -> "2G")) + } + + // test ignore + val conf = sessionManager.validateAndNormalizeConf( + Map( + "session.engine.spark.main.resource" -> "org.apahce.kyuubi.test", + "session.check.interval" -> "10000")) + assert(conf.size == 1) + assert(conf("session.check.interval") == "10000") + } } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/SessionManagerSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/SessionManagerSuite.scala deleted file mode 100644 index 38806a4c5..000000000 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/SessionManagerSuite.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.session - -import java.time.Duration - -import org.apache.hive.service.rpc.thrift._ -import org.scalatest.concurrent.Eventually -import org.scalatest.time.{Seconds, Span} - -import org.apache.kyuubi.KyuubiSQLException -import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.service.ThriftFrontendServiceSuite - -class SessionManagerSuite extends ThriftFrontendServiceSuite with Eventually { - - override val conf = KyuubiConf() - .set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) - .set("kyuubi.test.server.should.fail", "false") - .set(KyuubiConf.SESSION_CHECK_INTERVAL, Duration.ofSeconds(5).toMillis) - .set(KyuubiConf.SESSION_IDLE_TIMEOUT, Duration.ofSeconds(5).toMillis) - .set(KyuubiConf.OPERATION_IDLE_TIMEOUT, Duration.ofSeconds(20).toMillis) - .set(KyuubiConf.SESSION_CONF_RESTRICT_LIST, Seq("spark.*")) - .set(KyuubiConf.SESSION_CONF_IGNORE_LIST, Seq("session.engine.*")) - - test("close expired operations") { - withSessionHandle { (client, handle) => - val req = new TCancelOperationReq() - val req1 = new TGetSchemasReq(handle) - val resp1 = client.GetSchemas(req1) - - val sessionManager = server.backendService.sessionManager - val session = sessionManager - .getSession(SessionHandle(handle)) - .asInstanceOf[AbstractSession] - var lastAccessTime = session.lastAccessTime - assert(sessionManager.getOpenSessionCount == 1) - assert(session.lastIdleTime > 0) - - resp1.getOperationHandle - req.setOperationHandle(resp1.getOperationHandle) - val resp2 = client.CancelOperation(req) - assert(resp2.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) - assert(sessionManager.getOpenSessionCount == 1) - assert(session.lastIdleTime == 0) - assert(lastAccessTime < session.lastAccessTime) - lastAccessTime = session.lastAccessTime - - eventually(timeout(Span(60, Seconds)), interval(Span(1, Seconds))) { - assert(session.lastIdleTime > lastAccessTime) - } - - info("operation is terminated") - assert(lastAccessTime == session.lastAccessTime) - assert(sessionManager.getOpenSessionCount == 1) - - eventually(timeout(Span(60, Seconds)), interval(Span(1, Seconds))) { - assert(session.lastAccessTime > lastAccessTime) - } - assert(sessionManager.getOpenSessionCount == 0) - } - } - - test("test validate and normalize config") { - val sessionManager = server.backendService.sessionManager - // test restrict - intercept[KyuubiSQLException] { - sessionManager.validateAndNormalizeConf(Map("spark.driver.memory" -> "2G")) - } - - // test ignore - val conf = sessionManager.validateAndNormalizeConf( - Map( - "session.engine.spark.main.resource" -> "org.apahce.kyuubi.test", - "session.check.interval" -> "10000")) - assert(conf.size == 1) - assert(conf("session.check.interval") == "10000") - } -}