diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index 2b9a71f10..68014a5cf 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -274,6 +274,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.frontend.thrift.binary.ssl.disallowed.protocols | SSLv2,SSLv3 | SSL versions to disallow for Kyuubi thrift binary frontend. | set | 1.7.0 | | kyuubi.frontend.thrift.binary.ssl.enabled | false | Set this to true for using SSL encryption in thrift binary frontend server. | boolean | 1.7.0 | | kyuubi.frontend.thrift.binary.ssl.include.ciphersuites || A comma-separated list of include SSL cipher suite names for thrift binary frontend. | seq | 1.7.0 | +| kyuubi.frontend.thrift.client.max.message.size | 1073741824 | Maximum message size in bytes a thrift client will receive. | int | 1.9.3 | | kyuubi.frontend.thrift.http.bind.host | <undefined> | Hostname or IP of the machine on which to run the thrift frontend service via http protocol. | string | 1.6.0 | | kyuubi.frontend.thrift.http.bind.port | 10010 | Port of the machine on which to run the thrift frontend service via http protocol. | int | 1.6.0 | | kyuubi.frontend.thrift.http.compression.enabled | true | Enable thrift http compression via Jetty compression support | boolean | 1.6.0 | diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index c265f92eb..a88b5f615 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -665,6 +665,13 @@ object KyuubiConf { .version("1.4.0") .fallbackConf(FRONTEND_MAX_MESSAGE_SIZE) + val FRONTEND_THRIFT_CLIENT_MAX_MESSAGE_SIZE: ConfigEntry[Int] = + buildConf("kyuubi.frontend.thrift.client.max.message.size") + .doc("Maximum message size in bytes a thrift client will receive.") + .version("1.9.3") + .intConf + .createWithDefault(1 * 1024 * 1024 * 1024) // follow HIVE-26633 to use 1g as default value + val FRONTEND_THRIFT_HTTP_REQUEST_HEADER_SIZE: ConfigEntry[Int] = buildConf("kyuubi.frontend.thrift.http.request.header.size") .doc("Request header size in bytes, when using HTTP transport mode. Jetty defaults used.") diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java index cd9fd517e..b3884c694 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java @@ -97,6 +97,7 @@ public class JdbcConnectionParams { static final String CONNECT_TIMEOUT = "connectTimeout"; static final String SOCKET_TIMEOUT = "socketTimeout"; + static final String THRIFT_CLIENT_MAX_MESSAGE_SIZE = "thrift.client.max.message.size"; // We support ways to specify application name modeled after some existing DBs, since // there's no standard approach. diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java index 4c39fb308..eaf71cfa1 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java @@ -67,6 +67,7 @@ import org.apache.kyuubi.jdbc.hive.cli.RowSet; import org.apache.kyuubi.jdbc.hive.cli.RowSetFactory; import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable; import org.apache.kyuubi.shaded.hive.service.rpc.thrift.*; +import org.apache.kyuubi.shaded.thrift.TConfiguration; import org.apache.kyuubi.shaded.thrift.TException; import org.apache.kyuubi.shaded.thrift.protocol.TBinaryProtocol; import org.apache.kyuubi.shaded.thrift.transport.THttpClient; @@ -419,7 +420,13 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable { boolean useSsl = isSslConnection(); // Create an http client from the configs httpClient = getHttpClient(useSsl); - transport = new THttpClient(getServerHttpUrl(useSsl), httpClient); + int maxMessageSize = getMaxMessageSize(); + TConfiguration.Builder tConfBuilder = TConfiguration.custom(); + if (maxMessageSize > 0) { + tConfBuilder.setMaxMessageSize(maxMessageSize); + } + TConfiguration tConf = tConfBuilder.build(); + transport = new THttpClient(tConf, getServerHttpUrl(useSsl), httpClient); return transport; } @@ -629,7 +636,8 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable { } /** Create underlying SSL or non-SSL transport */ - private TTransport createUnderlyingTransport() throws TTransportException { + private TTransport createUnderlyingTransport() throws TTransportException, SQLException { + int maxMessageSize = getMaxMessageSize(); TTransport transport = null; // Note: Thrift returns an SSL socket that is already bound to the specified host:port // Therefore an open called on this would be a no-op later @@ -643,19 +651,46 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable { Utils.getPassword(sessConfMap, JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD); if (sslTrustStore == null || sslTrustStore.isEmpty()) { - transport = ThriftUtils.getSSLSocket(host, port, connectTimeout, socketTimeout); + transport = + ThriftUtils.getSSLSocket(host, port, connectTimeout, socketTimeout, maxMessageSize); } else { transport = ThriftUtils.getSSLSocket( - host, port, connectTimeout, socketTimeout, sslTrustStore, sslTrustStorePassword); + host, + port, + connectTimeout, + socketTimeout, + sslTrustStore, + sslTrustStorePassword, + maxMessageSize); } } else { // get non-SSL socket transport - transport = ThriftUtils.getSocketTransport(host, port, connectTimeout, socketTimeout); + transport = + ThriftUtils.getSocketTransport(host, port, connectTimeout, socketTimeout, maxMessageSize); } return transport; } + private int getMaxMessageSize() throws SQLException { + String maxMessageSize = sessConfMap.get(JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE); + if (maxMessageSize == null) { + return -1; + } + + try { + return Integer.parseInt(maxMessageSize); + } catch (Exception e) { + String errFormat = + "Invalid {} configuration of '{}'. Expected an integer specifying number of bytes. " + + "A configuration of <= 0 uses default max message size."; + String errMsg = + String.format( + errFormat, JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE, maxMessageSize); + throw new SQLException(errMsg, "42000", e); + } + } + /** * Create transport per the connection options Supported transport options are: - SASL based * transports over + Kerberos + SSL + non-SSL - Raw (non-SASL) socket diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java index 7f0099b29..331b871e0 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java @@ -24,23 +24,70 @@ import org.apache.kyuubi.shaded.thrift.transport.TSSLTransportFactory; import org.apache.kyuubi.shaded.thrift.transport.TSocket; import org.apache.kyuubi.shaded.thrift.transport.TTransport; import org.apache.kyuubi.shaded.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class helps in some aspects of authentication. It creates the proper Thrift classes for the * given configuration as well as helps with authenticating requests. */ public class ThriftUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ThriftUtils.class); + + /** + * Configure the provided T transport's max message size. + * + * @param transport Transport to configure maxMessage for + * @param maxMessageSize Maximum allowed message size in bytes, less than or equal to 0 means use + * the Thrift library default. + * @return The passed in T transport configured with desired max message size. The same object + * passed in is returned. + */ + public static T configureThriftMaxMessageSize( + T transport, int maxMessageSize) { + if (maxMessageSize > 0) { + if (transport.getConfiguration() == null) { + LOG.warn( + "TTransport {} is returning a null Configuration, Thrift max message size is not getting configured", + transport.getClass().getName()); + return transport; + } + transport.getConfiguration().setMaxMessageSize(maxMessageSize); + } + return transport; + } + + /** + * Create a TSocket for the provided host and port with specified connectTimeout, loginTimeout and + * maxMessageSize. + * + * @param host Host to connect to. + * @param port Port to connect to. + * @param connectTimeout Socket connect timeout (0 means no timeout). + * @param socketTimeout Socket read/write timeout (0 means no timeout). + * @param maxMessageSize Size in bytes for max allowable Thrift message size, less than or equal + * to 0 results in using the Thrift library default. + * @return TTransport TSocket for host/port + */ public static TTransport getSocketTransport( - String host, int port, int connectTimeout, int socketTimeout) throws TTransportException { - return new TSocket(TConfiguration.DEFAULT, host, port, socketTimeout, connectTimeout); + String host, int port, int connectTimeout, int socketTimeout, int maxMessageSize) + throws TTransportException { + TConfiguration.Builder tConfBuilder = TConfiguration.custom(); + if (maxMessageSize > 0) { + tConfBuilder.setMaxMessageSize(maxMessageSize); + } + TConfiguration tConf = tConfBuilder.build(); + return new TSocket(tConf, host, port, socketTimeout, connectTimeout); } public static TTransport getSSLSocket( - String host, int port, int connectTimeout, int socketTimeout) throws TTransportException { + String host, int port, int connectTimeout, int socketTimeout, int maxMessageSize) + throws TTransportException { // The underlying SSLSocket object is bound to host:port with the given SO_TIMEOUT TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, socketTimeout); tSSLSocket.setConnectTimeout(connectTimeout); - return getSSLSocketWithHttps(tSSLSocket); + return getSSLSocketWithHttps(tSSLSocket, maxMessageSize); } public static TTransport getSSLSocket( @@ -49,7 +96,8 @@ public class ThriftUtils { int connectTimeout, int socketTimeout, String trustStorePath, - String trustStorePassWord) + String trustStorePassWord, + int maxMessageSize) throws TTransportException { TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters(); @@ -59,16 +107,18 @@ public class ThriftUtils { // SSLContext created with the given params TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, socketTimeout, params); tSSLSocket.setConnectTimeout(connectTimeout); - return getSSLSocketWithHttps(tSSLSocket); + return getSSLSocketWithHttps(tSSLSocket, maxMessageSize); } // Using endpoint identification algorithm as HTTPS enables us to do // CNAMEs/subjectAltName verification - private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws TTransportException { + private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket, int maxMessageSize) + throws TTransportException { SSLSocket sslSocket = (SSLSocket) tSSLSocket.getSocket(); SSLParameters sslParams = sslSocket.getSSLParameters(); sslParams.setEndpointIdentificationAlgorithm("HTTPS"); sslSocket.setSSLParameters(sslParams); - return new TSocket(sslSocket); + TSocket tSocket = new TSocket(sslSocket); + return configureThriftMaxMessageSize(tSocket, maxMessageSize); } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala index d34458c64..c36e9ec06 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala @@ -470,8 +470,10 @@ private[kyuubi] object KyuubiSyncThriftClient extends Logging { host: String, port: Int, socketTimeout: Int, - connectionTimeout: Int): TProtocol = { - val tSocket = new TSocket(TConfiguration.DEFAULT, host, port, socketTimeout, connectionTimeout) + connectionTimeout: Int, + maxMessageSize: Int): TProtocol = { + val tConf = TConfiguration.custom().setMaxMessageSize(maxMessageSize).build() + val tSocket = new TSocket(tConf, host, port, socketTimeout, connectionTimeout) val tTransport = PlainSASLHelper.getPlainTransport(user, passwd, tSocket) tTransport.open() new TBinaryProtocol(tTransport) @@ -485,15 +487,23 @@ private[kyuubi] object KyuubiSyncThriftClient extends Logging { conf: KyuubiConf): KyuubiSyncThriftClient = { val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous") val loginTimeout = conf.get(ENGINE_LOGIN_TIMEOUT).toInt + val maxMessageSize = conf.get(KyuubiConf.FRONTEND_THRIFT_CLIENT_MAX_MESSAGE_SIZE) val aliveProbeEnabled = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED) val aliveProbeInterval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL).toInt val aliveTimeout = conf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT) - val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout) + val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout, maxMessageSize) val aliveProbeProtocol = if (aliveProbeEnabled) { - Option(createTProtocol(user, passwd, host, port, aliveProbeInterval, loginTimeout)) + Some(createTProtocol( + user, + passwd, + host, + port, + aliveProbeInterval, + loginTimeout, + maxMessageSize)) } else { None }