From 1a3228f132050ec82225af08252e5e955fe5cc08 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 18 Nov 2021 18:25:01 +0800 Subject: [PATCH] [KYUUBI #1415] Immediately wrap the thrift client with a thread-safe proxy ### _Why are the changes needed?_ Sync before async engine log request has been sent ``` - open session with KyuubiConnection *** FAILED *** java.sql.SQLException: org.apache.kyuubi.shade.org.apache.thrift.transport.TTransportException: Read a negative frame size (-2147418110)! at org.apache.kyuubi.jdbc.hive.KyuubiStatement.runAsyncOnServer(KyuubiStatement.java:271) at org.apache.kyuubi.jdbc.hive.KyuubiStatement.execute(KyuubiStatement.java:193) at org.apache.kyuubi.operation.KyuubiOperationPerConnectionSuite.$anonfun$new$20(KyuubiOperationPerConnectionSuite.scala:145) at org.apache.kyuubi.operation.HiveJDBCTestHelper.withSessionConf(HiveJDBCTestHelper.scala:60) at org.apache.kyuubi.operation.HiveJDBCTestHelper.withSessionConf$(HiveJDBCTestHelper.scala:53) at org.apache.kyuubi.operation.KyuubiOperationPerConnectionSuite.withSessionConf(KyuubiOperationPerConnectionSuite.scala:33) at org.apache.kyuubi.operation.KyuubiOperationPerConnectionSuite.$anonfun$new$19(KyuubiOperationPerConnectionSuite.scala:140) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) ... Cause: org.apache.kyuubi.shade.org.apache.thrift.transport.TTransportException: Read a negative frame size (-2147418110)! at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:456) at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:435) at org.apache.kyuubi.shade.org.apache.thrift.transport.TSaslClientTransport.read(TSaslClientTransport.java:37) at org.apache.kyuubi.shade.org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429) at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318) at org.apache.kyuubi.shade.org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219) at org.apache.kyuubi.shade.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Client.recv_ExecuteStatement(TCLIService.java:237) at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Client.ExecuteStatement(TCLIService.java:224) ... ``` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1415 from pan3793/sync. Closes #1415 5b7ca1bd [Cheng Pan] Immediately wrap the thrift client with a thread-safe proxy Authored-by: Cheng Pan Signed-off-by: fwang12 --- .../apache/kyuubi/jdbc/hive/KyuubiConnection.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java index 873e65155..f50750e97 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java @@ -163,8 +163,9 @@ public class KyuubiConnection implements java.sql.Connection { if (isEmbeddedMode) { EmbeddedThriftBinaryCLIService embeddedClient = new EmbeddedThriftBinaryCLIService(); embeddedClient.init(null); - client = embeddedClient; - + TCLIService.Iface _client = client = embeddedClient; + // Wrap the client with a thread-safe proxy to serialize the RPC calls + client = newSynchronizedClient(_client); // open client session openSession(); showLaunchEngineLog(); @@ -184,7 +185,9 @@ public class KyuubiConnection implements java.sql.Connection { // open the client transport openTransport(); // set up the client - client = new TCLIService.Client(new TBinaryProtocol(transport)); + TCLIService.Iface _client = new TCLIService.Client(new TBinaryProtocol(transport)); + // Wrap the client with a thread-safe proxy to serialize the RPC calls + client = newSynchronizedClient(_client); // open client session openSession(); if (!isBeeLineMode) { @@ -220,9 +223,6 @@ public class KyuubiConnection implements java.sql.Connection { } } } - - // Wrap the client with a thread-safe proxy to serialize the RPC calls - client = newSynchronizedClient(client); } /**