[KYUUBI #1766] [TEST] Remove SessionManagerSuite to reduce duplicated tests

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

SessionManagerSuite re-runs all tests in the super class, it's unnecessary.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1766 from yaooqinn/test2.

Closes #1766

28250732 [Kent Yao] [TEST] Remove SessionManagerSuite to reduce duplicated tests

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
Kent Yao 2022-01-15 21:50:02 +08:00 committed by ulysses-you
parent 38814691b2
commit 783fc16d3a
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
4 changed files with 165 additions and 198 deletions

View File

@ -19,17 +19,12 @@ package org.apache.kyuubi.operation
import java.sql.ResultSet
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift._
import org.apache.hive.service.rpc.thrift.TCLIService.Iface
import org.apache.hive.service.rpc.thrift.TOperationState._
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TSocket
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.Utils
import org.apache.kyuubi.service.authentication.PlainSASLHelper
trait HiveJDBCTestHelper extends JDBCTestHelper {
@ -81,43 +76,13 @@ trait HiveJDBCTestHelper extends JDBCTestHelper {
jdbcUrl + sessionConfStr + jdbcConfStr + jdbcVarsStr
}
def withThriftClient(f: TCLIService.Iface => Unit): Unit = {
val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head.split(":")
val host = hostAndPort.head
val port = hostAndPort(1).toInt
val socket = new TSocket(host, port)
val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, password, socket)
val protocol = new TBinaryProtocol(transport)
val client = new TCLIService.Client(protocol)
transport.open()
try {
f(client)
} finally {
socket.close()
}
def withThriftClient[T](f: TCLIService.Iface => T): T = {
TClientTestUtils.withThriftClient(jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head)(f)
}
def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername(user)
req.setPassword(password)
req.setConfiguration(_sessionConfigs.asJava)
val resp = client.OpenSession(req)
val handle = resp.getSessionHandle
try {
f(client, handle)
} finally {
val tCloseSessionReq = new TCloseSessionReq(handle)
try {
client.CloseSession(tCloseSessionReq)
} catch {
case e: Exception => error(s"Failed to close $handle", e)
}
}
}
def withSessionHandle[T](f: (TCLIService.Iface, TSessionHandle) => T): T = {
val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head
TClientTestUtils.withSessionHandle(hostAndPort, sessionConfigs)(f)
}
def checkGetSchemas(rs: ResultSet, dbNames: Seq[String], catalogName: String = ""): Unit = {

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TSessionHandle}
import org.apache.hive.service.rpc.thrift.TCLIService.Iface
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TSocket
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.service.FrontendService
import org.apache.kyuubi.service.authentication.PlainSASLHelper
object TClientTestUtils extends Logging {
def withThriftClient[T](url: String)(f: Iface => T): T = {
val hostport = url.split(':')
val socket = new TSocket(hostport.head, hostport.last.toInt)
val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, "anonymous", socket)
val protocol = new TBinaryProtocol(transport)
val client = new TCLIService.Client(protocol)
transport.open()
try {
f(client)
} finally {
socket.close()
}
}
/**
* s shall be [[TFrontendService]]
*/
def withThriftClient[T](s: FrontendService)(f: Iface => T): T = {
withThriftClient(s.connectionUrl)(f)
}
def withSessionHandle[T](url: String, configs: Map[String, String])(
f: (TCLIService.Iface, TSessionHandle) => T): T = {
withThriftClient(url) { client =>
val req = new TOpenSessionReq()
req.setUsername(Utils.currentUser)
req.setPassword("anonymous")
req.setConfiguration(configs.asJava)
val resp = client.OpenSession(req)
val handle = resp.getSessionHandle
try {
f(client, handle)
} finally {
val tCloseSessionReq = new TCloseSessionReq(handle)
try {
client.CloseSession(tCloseSessionReq)
} catch {
case e: Exception => error(s"Failed to close $handle", e)
}
}
}
}
}

View File

@ -17,31 +17,35 @@
package org.apache.kyuubi.service
import java.util
import java.time.Duration
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift._
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TSocket
import org.scalatest.time._
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_BIND_HOST, FRONTEND_CONNECTION_URL_USE_HOSTNAME, FRONTEND_THRIFT_BINARY_BIND_HOST, FRONTEND_THRIFT_BINARY_BIND_PORT}
import org.apache.kyuubi.operation.{OperationHandle, OperationType}
import org.apache.kyuubi.operation.{OperationHandle, OperationType, TClientTestUtils}
import org.apache.kyuubi.service.TFrontendService.{FeServiceServerContext, SERVER_VERSION}
import org.apache.kyuubi.service.authentication.PlainSASLHelper
import org.apache.kyuubi.session.SessionHandle
import org.apache.kyuubi.session.{AbstractSession, SessionHandle}
class ThriftFrontendServiceSuite extends KyuubiFunSuite {
class TFrontendServiceSuite extends KyuubiFunSuite {
protected val server = new NoopTBinaryFrontendServer()
protected val conf = KyuubiConf()
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
.set("kyuubi.test.server.should.fail", "false")
.set(KyuubiConf.SESSION_CHECK_INTERVAL, Duration.ofSeconds(5).toMillis)
.set(KyuubiConf.SESSION_IDLE_TIMEOUT, Duration.ofSeconds(5).toMillis)
.set(KyuubiConf.OPERATION_IDLE_TIMEOUT, Duration.ofSeconds(20).toMillis)
.set(KyuubiConf.SESSION_CONF_RESTRICT_LIST, Seq("spark.*"))
.set(KyuubiConf.SESSION_CONF_IGNORE_LIST, Seq("session.engine.*"))
val user: String = System.getProperty("user.name")
val sessionConf: util.Map[String, String] = new util.HashMap()
private def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = {
TClientTestUtils.withSessionHandle(server.frontendServices.head.connectionUrl, Map.empty)(f)
}
override def beforeAll(): Unit = {
server.initialize(conf)
@ -54,45 +58,6 @@ class ThriftFrontendServiceSuite extends KyuubiFunSuite {
super.afterAll()
}
protected def withThriftClient(f: TCLIService.Iface => Unit): Unit = {
val hostAndPort = server.frontendServices.head.connectionUrl.split(":")
val host = hostAndPort.head
val port = hostAndPort(1).toInt
val socket = new TSocket(host, port)
val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, "anonymous", socket)
val protocol = new TBinaryProtocol(transport)
val client = new TCLIService.Client(protocol)
transport.open()
try {
f(client)
} finally {
socket.close()
}
}
protected def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername(user)
req.setPassword("anonymous")
req.setConfiguration(sessionConf)
val resp = client.OpenSession(req)
val handle = resp.getSessionHandle
try {
f(client, handle)
} finally {
val tCloseSessionReq = new TCloseSessionReq(handle)
try {
client.CloseSession(tCloseSessionReq)
} catch {
case e: Exception => error(s"Failed to close $handle", e)
}
}
}
}
private def checkOperationResult(
client: TCLIService.Iface,
handle: TOperationHandle): Unit = {
@ -151,24 +116,26 @@ class ThriftFrontendServiceSuite extends KyuubiFunSuite {
}
test("open session") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername(user)
req.setPassword("anonymous")
val resp = client.OpenSession(req)
val handle = resp.getSessionHandle
assert(handle != null)
assert(resp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS)
TClientTestUtils.withThriftClient(server.frontendServices.head) {
client =>
val req = new TOpenSessionReq()
req.setUsername(Utils.currentUser)
req.setPassword("anonymous")
val resp = client.OpenSession(req)
val handle = resp.getSessionHandle
assert(handle != null)
assert(resp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS)
req.setConfiguration(Map("kyuubi.test.should.fail" -> "true").asJava)
val resp1 = client.OpenSession(req)
assert(resp1.getSessionHandle === null)
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
val cause = KyuubiSQLException.toCause(resp1.getStatus.getInfoMessages.asScala)
assert(cause.isInstanceOf[KyuubiSQLException])
assert(cause.getMessage === "Asked to fail")
req.setConfiguration(Map("kyuubi.test.should.fail" -> "true").asJava)
val resp1 = client.OpenSession(req)
assert(resp1.getSessionHandle === null)
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
val cause = KyuubiSQLException.toCause(resp1.getStatus.getInfoMessages.asScala)
assert(cause.isInstanceOf[KyuubiSQLException])
assert(cause.getMessage === "Asked to fail")
assert(resp1.getStatus.getErrorMessage === "Asked to fail")
assert(resp1.getStatus.getErrorMessage === "Asked to fail")
}
}
@ -513,4 +480,58 @@ class ThriftFrontendServiceSuite extends KyuubiFunSuite {
"Delegation token is not supported")
}
}
test("close expired operations") {
withSessionHandle { (client, handle) =>
val req = new TCancelOperationReq()
val req1 = new TGetSchemasReq(handle)
val resp1 = client.GetSchemas(req1)
val sessionManager = server.backendService.sessionManager
val session = sessionManager
.getSession(SessionHandle(handle))
.asInstanceOf[AbstractSession]
var lastAccessTime = session.lastAccessTime
assert(sessionManager.getOpenSessionCount == 1)
assert(session.lastIdleTime > 0)
resp1.getOperationHandle
req.setOperationHandle(resp1.getOperationHandle)
val resp2 = client.CancelOperation(req)
assert(resp2.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
assert(sessionManager.getOpenSessionCount == 1)
assert(session.lastIdleTime == 0)
assert(lastAccessTime < session.lastAccessTime)
lastAccessTime = session.lastAccessTime
eventually(timeout(Span(60, Seconds)), interval(Span(1, Seconds))) {
assert(session.lastIdleTime > lastAccessTime)
}
info("operation is terminated")
assert(lastAccessTime == session.lastAccessTime)
assert(sessionManager.getOpenSessionCount == 1)
eventually(timeout(Span(60, Seconds)), interval(Span(1, Seconds))) {
assert(session.lastAccessTime > lastAccessTime)
}
assert(sessionManager.getOpenSessionCount == 0)
}
}
test("test validate and normalize config") {
val sessionManager = server.backendService.sessionManager
// test restrict
intercept[KyuubiSQLException] {
sessionManager.validateAndNormalizeConf(Map("spark.driver.memory" -> "2G"))
}
// test ignore
val conf = sessionManager.validateAndNormalizeConf(
Map(
"session.engine.spark.main.resource" -> "org.apahce.kyuubi.test",
"session.check.interval" -> "10000"))
assert(conf.size == 1)
assert(conf("session.check.interval") == "10000")
}
}

View File

@ -1,94 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.session
import java.time.Duration
import org.apache.hive.service.rpc.thrift._
import org.scalatest.concurrent.Eventually
import org.scalatest.time.{Seconds, Span}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.ThriftFrontendServiceSuite
class SessionManagerSuite extends ThriftFrontendServiceSuite with Eventually {
override val conf = KyuubiConf()
.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
.set("kyuubi.test.server.should.fail", "false")
.set(KyuubiConf.SESSION_CHECK_INTERVAL, Duration.ofSeconds(5).toMillis)
.set(KyuubiConf.SESSION_IDLE_TIMEOUT, Duration.ofSeconds(5).toMillis)
.set(KyuubiConf.OPERATION_IDLE_TIMEOUT, Duration.ofSeconds(20).toMillis)
.set(KyuubiConf.SESSION_CONF_RESTRICT_LIST, Seq("spark.*"))
.set(KyuubiConf.SESSION_CONF_IGNORE_LIST, Seq("session.engine.*"))
test("close expired operations") {
withSessionHandle { (client, handle) =>
val req = new TCancelOperationReq()
val req1 = new TGetSchemasReq(handle)
val resp1 = client.GetSchemas(req1)
val sessionManager = server.backendService.sessionManager
val session = sessionManager
.getSession(SessionHandle(handle))
.asInstanceOf[AbstractSession]
var lastAccessTime = session.lastAccessTime
assert(sessionManager.getOpenSessionCount == 1)
assert(session.lastIdleTime > 0)
resp1.getOperationHandle
req.setOperationHandle(resp1.getOperationHandle)
val resp2 = client.CancelOperation(req)
assert(resp2.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
assert(sessionManager.getOpenSessionCount == 1)
assert(session.lastIdleTime == 0)
assert(lastAccessTime < session.lastAccessTime)
lastAccessTime = session.lastAccessTime
eventually(timeout(Span(60, Seconds)), interval(Span(1, Seconds))) {
assert(session.lastIdleTime > lastAccessTime)
}
info("operation is terminated")
assert(lastAccessTime == session.lastAccessTime)
assert(sessionManager.getOpenSessionCount == 1)
eventually(timeout(Span(60, Seconds)), interval(Span(1, Seconds))) {
assert(session.lastAccessTime > lastAccessTime)
}
assert(sessionManager.getOpenSessionCount == 0)
}
}
test("test validate and normalize config") {
val sessionManager = server.backendService.sessionManager
// test restrict
intercept[KyuubiSQLException] {
sessionManager.validateAndNormalizeConf(Map("spark.driver.memory" -> "2G"))
}
// test ignore
val conf = sessionManager.validateAndNormalizeConf(
Map(
"session.engine.spark.main.resource" -> "org.apahce.kyuubi.test",
"session.check.interval" -> "10000"))
assert(conf.size == 1)
assert(conf("session.check.interval") == "10000")
}
}