[KYUUBI #4058] [IT][Test][K8S] Fix the missing of connectionConf in SparkQueryTests

### _Why are the changes needed?_

to close issue #4058 and help to close pr #3852.
When execute test `KyuubiOnKubernetesWithClientSparkTestsSuite` with `execute statement - select with variable substitution`, the initial method
`org.apache.kyuubi.operation.HiveJDBCTestHelper#withThriftClient` will overlook the connectionConf in `org.apache.kyuubi.operation.JDBCTestHelper#jdbcUrl`:
```
  def withThriftClient[T](user: Option[String] = None)(f: TCLIService.Iface => T): T = {
    TClientTestUtils.withThriftClient(
      jdbcUrl.stripPrefix(URL_PREFIX).split("/;").head,
      user)(f)`
  }
```
Here we just aborted the tail of `jdbcUrl.stripPrefix(URL_PREFIX).split("/;")`, in which the connectionConfs are set.
In that case the `org.apache.kyuubi.kubernetes.test.deployment.KyuubiOnKubernetesWithClientSparkTestsSuite#connectionConf` was invalid actually.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4059 from xuefeimiaoao/branch-fix-connectionConf.

Closes #4058

a6395631 [xuefeimiaoao] [KYUUBI #4058] [IT][Test][K8S] Fix the missing of connectionConf of SparkQueryTests

Authored-by: xuefeimiaoao <1255072085@qq.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
xuefeimiaoao 2023-01-04 20:40:45 +08:00 committed by Cheng Pan
parent 090e5bf4f2
commit bca43af01a
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
3 changed files with 46 additions and 2 deletions

View File

@ -88,6 +88,18 @@ trait HiveJDBCTestHelper extends JDBCTestHelper {
user)(f)
}
def withThriftClientAndConnectionConf[T](f: (TCLIService.Iface, Map[String, String]) => T): T = {
withThriftClientAndConnectionConf()(f)
}
def withThriftClientAndConnectionConf[T](user: Option[String] = None)(f: (
TCLIService.Iface,
Map[String, String]) => T): T = {
TClientTestUtils.withThriftClientAndConnectionConf(
jdbcUrl.stripPrefix(URL_PREFIX),
user)(f)
}
def withSessionHandle[T](f: (TCLIService.Iface, TSessionHandle) => T): T = {
val hostAndPort = jdbcUrl.stripPrefix(URL_PREFIX).split("/;").head
TClientTestUtils.withSessionHandle(hostAndPort, sessionConfigs)(f)

View File

@ -52,7 +52,7 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
test("execute statement - select with variable substitution") {
assume(!httpMode)
withThriftClient { client =>
withThriftClientAndConnectionConf { (client, connectionConf) =>
val req = new TOpenSessionReq()
req.setUsername("chengpan")
req.setPassword("123")
@ -62,7 +62,7 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
"set:hivevar:b" -> "y",
"set:metaconf:c" -> "z",
"set:system:s" -> "s")
req.setConfiguration(conf.asJava)
req.setConfiguration((conf ++ connectionConf).asJava)
val tOpenSessionResp = client.OpenSession(req)
val status = tOpenSessionResp.getStatus
assert(status.getStatusCode === TStatusCode.SUCCESS_STATUS)

View File

@ -51,6 +51,38 @@ object TClientTestUtils extends Logging {
}
}
def withThriftClientAndConnectionConf[T](
url: String,
user: Option[String] = None)(f: (Iface, Map[String, String]) => T): T = {
val hostPortPrefix = url.split("/;").head
val hostport = hostPortPrefix.split(':')
val connectionConf = url.stripPrefix(hostPortPrefix) match {
case connectionStr: String if connectionStr.startsWith("/;#") =>
val kvPairs = connectionStr.stripPrefix("/;#")
if (kvPairs.contains("=")) {
kvPairs.split(";").map(kv => (kv.split("=")(0), kv.split("=")(1))).toMap
} else {
Map.empty[String, String]
}
case _ =>
Map.empty[String, String]
}
val socket = new TSocket(hostport.head, hostport.last.toInt)
val transport = PlainSASLHelper.getPlainTransport(
user.getOrElse(Utils.currentUser),
"anonymous",
socket)
val protocol = new TBinaryProtocol(transport)
val client = new TCLIService.Client(protocol)
transport.open()
try {
f(client, connectionConf)
} finally {
socket.close()
}
}
/**
* s shall be [[TFrontendService]]
*/