[KYUUBI #6594] Port HIVE-26633: Make thrift client maxMessageSize configurable
# 🔍 Description Fix #6594. This PR ports HIVE-26633(https://github.com/apache/hive/pull/3674): Make thrift client maxMessageSize configurable to fix a regression after upgrading Thrift 0.16 in 1.9.0. ## Types of changes 🔖 - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 --- # Checklist 📝 - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6631 from pan3793/thrift-max-size. Closes #6594 e4841c88e [Cheng Pan] [KYUUBI #6594] Port HIVE-26633: Make thrift client maxMessageSize configurable Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
2d883e7cac
commit
11de72f117
@ -274,6 +274,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
|
||||
| kyuubi.frontend.thrift.binary.ssl.disallowed.protocols | SSLv2,SSLv3 | SSL versions to disallow for Kyuubi thrift binary frontend. | set | 1.7.0 |
|
||||
| kyuubi.frontend.thrift.binary.ssl.enabled | false | Set this to true for using SSL encryption in thrift binary frontend server. | boolean | 1.7.0 |
|
||||
| kyuubi.frontend.thrift.binary.ssl.include.ciphersuites || A comma-separated list of include SSL cipher suite names for thrift binary frontend. | seq | 1.7.0 |
|
||||
| kyuubi.frontend.thrift.client.max.message.size | 1073741824 | Maximum message size in bytes a thrift client will receive. | int | 1.9.3 |
|
||||
| kyuubi.frontend.thrift.http.bind.host | <undefined> | Hostname or IP of the machine on which to run the thrift frontend service via http protocol. | string | 1.6.0 |
|
||||
| kyuubi.frontend.thrift.http.bind.port | 10010 | Port of the machine on which to run the thrift frontend service via http protocol. | int | 1.6.0 |
|
||||
| kyuubi.frontend.thrift.http.compression.enabled | true | Enable thrift http compression via Jetty compression support | boolean | 1.6.0 |
|
||||
|
||||
@ -665,6 +665,13 @@ object KyuubiConf {
|
||||
.version("1.4.0")
|
||||
.fallbackConf(FRONTEND_MAX_MESSAGE_SIZE)
|
||||
|
||||
val FRONTEND_THRIFT_CLIENT_MAX_MESSAGE_SIZE: ConfigEntry[Int] =
|
||||
buildConf("kyuubi.frontend.thrift.client.max.message.size")
|
||||
.doc("Maximum message size in bytes a thrift client will receive.")
|
||||
.version("1.9.3")
|
||||
.intConf
|
||||
.createWithDefault(1 * 1024 * 1024 * 1024) // follow HIVE-26633 to use 1g as default value
|
||||
|
||||
val FRONTEND_THRIFT_HTTP_REQUEST_HEADER_SIZE: ConfigEntry[Int] =
|
||||
buildConf("kyuubi.frontend.thrift.http.request.header.size")
|
||||
.doc("Request header size in bytes, when using HTTP transport mode. Jetty defaults used.")
|
||||
|
||||
@ -97,6 +97,7 @@ public class JdbcConnectionParams {
|
||||
|
||||
static final String CONNECT_TIMEOUT = "connectTimeout";
|
||||
static final String SOCKET_TIMEOUT = "socketTimeout";
|
||||
static final String THRIFT_CLIENT_MAX_MESSAGE_SIZE = "thrift.client.max.message.size";
|
||||
|
||||
// We support ways to specify application name modeled after some existing DBs, since
|
||||
// there's no standard approach.
|
||||
|
||||
@ -67,6 +67,7 @@ import org.apache.kyuubi.jdbc.hive.cli.RowSet;
|
||||
import org.apache.kyuubi.jdbc.hive.cli.RowSetFactory;
|
||||
import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable;
|
||||
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.*;
|
||||
import org.apache.kyuubi.shaded.thrift.TConfiguration;
|
||||
import org.apache.kyuubi.shaded.thrift.TException;
|
||||
import org.apache.kyuubi.shaded.thrift.protocol.TBinaryProtocol;
|
||||
import org.apache.kyuubi.shaded.thrift.transport.THttpClient;
|
||||
@ -419,7 +420,13 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable {
|
||||
boolean useSsl = isSslConnection();
|
||||
// Create an http client from the configs
|
||||
httpClient = getHttpClient(useSsl);
|
||||
transport = new THttpClient(getServerHttpUrl(useSsl), httpClient);
|
||||
int maxMessageSize = getMaxMessageSize();
|
||||
TConfiguration.Builder tConfBuilder = TConfiguration.custom();
|
||||
if (maxMessageSize > 0) {
|
||||
tConfBuilder.setMaxMessageSize(maxMessageSize);
|
||||
}
|
||||
TConfiguration tConf = tConfBuilder.build();
|
||||
transport = new THttpClient(tConf, getServerHttpUrl(useSsl), httpClient);
|
||||
return transport;
|
||||
}
|
||||
|
||||
@ -629,7 +636,8 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable {
|
||||
}
|
||||
|
||||
/** Create underlying SSL or non-SSL transport */
|
||||
private TTransport createUnderlyingTransport() throws TTransportException {
|
||||
private TTransport createUnderlyingTransport() throws TTransportException, SQLException {
|
||||
int maxMessageSize = getMaxMessageSize();
|
||||
TTransport transport = null;
|
||||
// Note: Thrift returns an SSL socket that is already bound to the specified host:port
|
||||
// Therefore an open called on this would be a no-op later
|
||||
@ -643,19 +651,46 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable {
|
||||
Utils.getPassword(sessConfMap, JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD);
|
||||
|
||||
if (sslTrustStore == null || sslTrustStore.isEmpty()) {
|
||||
transport = ThriftUtils.getSSLSocket(host, port, connectTimeout, socketTimeout);
|
||||
transport =
|
||||
ThriftUtils.getSSLSocket(host, port, connectTimeout, socketTimeout, maxMessageSize);
|
||||
} else {
|
||||
transport =
|
||||
ThriftUtils.getSSLSocket(
|
||||
host, port, connectTimeout, socketTimeout, sslTrustStore, sslTrustStorePassword);
|
||||
host,
|
||||
port,
|
||||
connectTimeout,
|
||||
socketTimeout,
|
||||
sslTrustStore,
|
||||
sslTrustStorePassword,
|
||||
maxMessageSize);
|
||||
}
|
||||
} else {
|
||||
// get non-SSL socket transport
|
||||
transport = ThriftUtils.getSocketTransport(host, port, connectTimeout, socketTimeout);
|
||||
transport =
|
||||
ThriftUtils.getSocketTransport(host, port, connectTimeout, socketTimeout, maxMessageSize);
|
||||
}
|
||||
return transport;
|
||||
}
|
||||
|
||||
private int getMaxMessageSize() throws SQLException {
|
||||
String maxMessageSize = sessConfMap.get(JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE);
|
||||
if (maxMessageSize == null) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
try {
|
||||
return Integer.parseInt(maxMessageSize);
|
||||
} catch (Exception e) {
|
||||
String errFormat =
|
||||
"Invalid {} configuration of '{}'. Expected an integer specifying number of bytes. "
|
||||
+ "A configuration of <= 0 uses default max message size.";
|
||||
String errMsg =
|
||||
String.format(
|
||||
errFormat, JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE, maxMessageSize);
|
||||
throw new SQLException(errMsg, "42000", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create transport per the connection options Supported transport options are: - SASL based
|
||||
* transports over + Kerberos + SSL + non-SSL - Raw (non-SASL) socket
|
||||
|
||||
@ -24,23 +24,70 @@ import org.apache.kyuubi.shaded.thrift.transport.TSSLTransportFactory;
|
||||
import org.apache.kyuubi.shaded.thrift.transport.TSocket;
|
||||
import org.apache.kyuubi.shaded.thrift.transport.TTransport;
|
||||
import org.apache.kyuubi.shaded.thrift.transport.TTransportException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This class helps in some aspects of authentication. It creates the proper Thrift classes for the
|
||||
* given configuration as well as helps with authenticating requests.
|
||||
*/
|
||||
public class ThriftUtils {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ThriftUtils.class);
|
||||
|
||||
/**
|
||||
* Configure the provided T transport's max message size.
|
||||
*
|
||||
* @param transport Transport to configure maxMessage for
|
||||
* @param maxMessageSize Maximum allowed message size in bytes, less than or equal to 0 means use
|
||||
* the Thrift library default.
|
||||
* @return The passed in T transport configured with desired max message size. The same object
|
||||
* passed in is returned.
|
||||
*/
|
||||
public static <T extends TTransport> T configureThriftMaxMessageSize(
|
||||
T transport, int maxMessageSize) {
|
||||
if (maxMessageSize > 0) {
|
||||
if (transport.getConfiguration() == null) {
|
||||
LOG.warn(
|
||||
"TTransport {} is returning a null Configuration, Thrift max message size is not getting configured",
|
||||
transport.getClass().getName());
|
||||
return transport;
|
||||
}
|
||||
transport.getConfiguration().setMaxMessageSize(maxMessageSize);
|
||||
}
|
||||
return transport;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a TSocket for the provided host and port with specified connectTimeout, loginTimeout and
|
||||
* maxMessageSize.
|
||||
*
|
||||
* @param host Host to connect to.
|
||||
* @param port Port to connect to.
|
||||
* @param connectTimeout Socket connect timeout (0 means no timeout).
|
||||
* @param socketTimeout Socket read/write timeout (0 means no timeout).
|
||||
* @param maxMessageSize Size in bytes for max allowable Thrift message size, less than or equal
|
||||
* to 0 results in using the Thrift library default.
|
||||
* @return TTransport TSocket for host/port
|
||||
*/
|
||||
public static TTransport getSocketTransport(
|
||||
String host, int port, int connectTimeout, int socketTimeout) throws TTransportException {
|
||||
return new TSocket(TConfiguration.DEFAULT, host, port, socketTimeout, connectTimeout);
|
||||
String host, int port, int connectTimeout, int socketTimeout, int maxMessageSize)
|
||||
throws TTransportException {
|
||||
TConfiguration.Builder tConfBuilder = TConfiguration.custom();
|
||||
if (maxMessageSize > 0) {
|
||||
tConfBuilder.setMaxMessageSize(maxMessageSize);
|
||||
}
|
||||
TConfiguration tConf = tConfBuilder.build();
|
||||
return new TSocket(tConf, host, port, socketTimeout, connectTimeout);
|
||||
}
|
||||
|
||||
public static TTransport getSSLSocket(
|
||||
String host, int port, int connectTimeout, int socketTimeout) throws TTransportException {
|
||||
String host, int port, int connectTimeout, int socketTimeout, int maxMessageSize)
|
||||
throws TTransportException {
|
||||
// The underlying SSLSocket object is bound to host:port with the given SO_TIMEOUT
|
||||
TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, socketTimeout);
|
||||
tSSLSocket.setConnectTimeout(connectTimeout);
|
||||
return getSSLSocketWithHttps(tSSLSocket);
|
||||
return getSSLSocketWithHttps(tSSLSocket, maxMessageSize);
|
||||
}
|
||||
|
||||
public static TTransport getSSLSocket(
|
||||
@ -49,7 +96,8 @@ public class ThriftUtils {
|
||||
int connectTimeout,
|
||||
int socketTimeout,
|
||||
String trustStorePath,
|
||||
String trustStorePassWord)
|
||||
String trustStorePassWord,
|
||||
int maxMessageSize)
|
||||
throws TTransportException {
|
||||
TSSLTransportFactory.TSSLTransportParameters params =
|
||||
new TSSLTransportFactory.TSSLTransportParameters();
|
||||
@ -59,16 +107,18 @@ public class ThriftUtils {
|
||||
// SSLContext created with the given params
|
||||
TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, socketTimeout, params);
|
||||
tSSLSocket.setConnectTimeout(connectTimeout);
|
||||
return getSSLSocketWithHttps(tSSLSocket);
|
||||
return getSSLSocketWithHttps(tSSLSocket, maxMessageSize);
|
||||
}
|
||||
|
||||
// Using endpoint identification algorithm as HTTPS enables us to do
|
||||
// CNAMEs/subjectAltName verification
|
||||
private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws TTransportException {
|
||||
private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket, int maxMessageSize)
|
||||
throws TTransportException {
|
||||
SSLSocket sslSocket = (SSLSocket) tSSLSocket.getSocket();
|
||||
SSLParameters sslParams = sslSocket.getSSLParameters();
|
||||
sslParams.setEndpointIdentificationAlgorithm("HTTPS");
|
||||
sslSocket.setSSLParameters(sslParams);
|
||||
return new TSocket(sslSocket);
|
||||
TSocket tSocket = new TSocket(sslSocket);
|
||||
return configureThriftMaxMessageSize(tSocket, maxMessageSize);
|
||||
}
|
||||
}
|
||||
|
||||
@ -470,8 +470,10 @@ private[kyuubi] object KyuubiSyncThriftClient extends Logging {
|
||||
host: String,
|
||||
port: Int,
|
||||
socketTimeout: Int,
|
||||
connectionTimeout: Int): TProtocol = {
|
||||
val tSocket = new TSocket(TConfiguration.DEFAULT, host, port, socketTimeout, connectionTimeout)
|
||||
connectionTimeout: Int,
|
||||
maxMessageSize: Int): TProtocol = {
|
||||
val tConf = TConfiguration.custom().setMaxMessageSize(maxMessageSize).build()
|
||||
val tSocket = new TSocket(tConf, host, port, socketTimeout, connectionTimeout)
|
||||
val tTransport = PlainSASLHelper.getPlainTransport(user, passwd, tSocket)
|
||||
tTransport.open()
|
||||
new TBinaryProtocol(tTransport)
|
||||
@ -485,15 +487,23 @@ private[kyuubi] object KyuubiSyncThriftClient extends Logging {
|
||||
conf: KyuubiConf): KyuubiSyncThriftClient = {
|
||||
val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
|
||||
val loginTimeout = conf.get(ENGINE_LOGIN_TIMEOUT).toInt
|
||||
val maxMessageSize = conf.get(KyuubiConf.FRONTEND_THRIFT_CLIENT_MAX_MESSAGE_SIZE)
|
||||
val aliveProbeEnabled = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED)
|
||||
val aliveProbeInterval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL).toInt
|
||||
val aliveTimeout = conf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)
|
||||
|
||||
val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout)
|
||||
val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout, maxMessageSize)
|
||||
|
||||
val aliveProbeProtocol =
|
||||
if (aliveProbeEnabled) {
|
||||
Option(createTProtocol(user, passwd, host, port, aliveProbeInterval, loginTimeout))
|
||||
Some(createTProtocol(
|
||||
user,
|
||||
passwd,
|
||||
host,
|
||||
port,
|
||||
aliveProbeInterval,
|
||||
loginTimeout,
|
||||
maxMessageSize))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user