[KYUUBI #1415] Immediately wrap the thrift client with a thread-safe proxy

### _Why are the changes needed?_

Sync before async engine log request has been sent
```
- open session with KyuubiConnection *** FAILED ***
  java.sql.SQLException: org.apache.kyuubi.shade.org.apache.thrift.transport.TTransportException: Read a negative frame size (-2147418110)!
  at org.apache.kyuubi.jdbc.hive.KyuubiStatement.runAsyncOnServer(KyuubiStatement.java:271)
  at org.apache.kyuubi.jdbc.hive.KyuubiStatement.execute(KyuubiStatement.java:193)
  at org.apache.kyuubi.operation.KyuubiOperationPerConnectionSuite.$anonfun$new$20(KyuubiOperationPerConnectionSuite.scala:145)
  at org.apache.kyuubi.operation.HiveJDBCTestHelper.withSessionConf(HiveJDBCTestHelper.scala:60)
  at org.apache.kyuubi.operation.HiveJDBCTestHelper.withSessionConf$(HiveJDBCTestHelper.scala:53)
  at org.apache.kyuubi.operation.KyuubiOperationPerConnectionSuite.withSessionConf(KyuubiOperationPerConnectionSuite.scala:33)
  at org.apache.kyuubi.operation.KyuubiOperationPerConnectionSuite.$anonfun$new$19(KyuubiOperationPerConnectionSuite.scala:140)
  at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
  at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
  ...
  Cause: org.apache.kyuubi.shade.org.apache.thrift.transport.TTransportException: Read a negative frame size (-2147418110)!
  at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:456)
  at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:435)
  at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslClientTransport.read(TSaslClientTransport.java:37)
  at org.apache.kyuubi.shade.org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
  at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
  at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
  at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
  at org.apache.kyuubi.shade.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
  at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Client.recv_ExecuteStatement(TCLIService.java:237)
  at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Client.ExecuteStatement(TCLIService.java:224)
  ...
```

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1415 from pan3793/sync.

Closes #1415

5b7ca1bd [Cheng Pan] Immediately wrap the thrift client with a thread-safe proxy

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
Cheng Pan 2021-11-18 18:25:01 +08:00 committed by fwang12
parent 93e9745a39
commit 1a3228f132

View File

@ -163,8 +163,9 @@ public class KyuubiConnection implements java.sql.Connection {
if (isEmbeddedMode) {
EmbeddedThriftBinaryCLIService embeddedClient = new EmbeddedThriftBinaryCLIService();
embeddedClient.init(null);
client = embeddedClient;
TCLIService.Iface _client = client = embeddedClient;
// Wrap the client with a thread-safe proxy to serialize the RPC calls
client = newSynchronizedClient(_client);
// open client session
openSession();
showLaunchEngineLog();
@ -184,7 +185,9 @@ public class KyuubiConnection implements java.sql.Connection {
// open the client transport
openTransport();
// set up the client
client = new TCLIService.Client(new TBinaryProtocol(transport));
TCLIService.Iface _client = new TCLIService.Client(new TBinaryProtocol(transport));
// Wrap the client with a thread-safe proxy to serialize the RPC calls
client = newSynchronizedClient(_client);
// open client session
openSession();
if (!isBeeLineMode) {
@ -220,9 +223,6 @@ public class KyuubiConnection implements java.sql.Connection {
}
}
}
// Wrap the client with a thread-safe proxy to serialize the RPC calls
client = newSynchronizedClient(client);
}
/**