From ba99744b091de27fbd35e06b0fe603c07383da0f Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 4 Aug 2023 09:50:56 +0800 Subject: [PATCH] [KYUUBI #3374] Support password authentication for Trino engine ### _Why are the changes needed?_ Support SSL for trino engine. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [X] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3374 from hddong/support-trino-password. Closes #3374 f39daaf78 [Cheng Pan] improve 6308c4cf7 [hongdongdong] Support SSL for trino engine Lead-authored-by: Cheng Pan Co-authored-by: hongdongdong Signed-off-by: Cheng Pan --- docs/deployment/settings.md | 7 +++ .../trino/session/TrinoSessionImpl.scala | 57 ++++++++++++++----- .../org/apache/kyuubi/config/KyuubiConf.scala | 49 ++++++++++++++++ .../engine/trino/TrinoProcessBuilder.scala | 17 +++++- 4 files changed, 115 insertions(+), 15 deletions(-) diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index 91c68824c..a2750fc7c 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -176,6 +176,13 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co | kyuubi.engine.spark.python.env.archive.exec.path | bin/python | The Python exec path under the Python env archive. | string | 1.7.0 | | kyuubi.engine.spark.python.home.archive | <undefined> | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode. | string | 1.7.0 | | kyuubi.engine.submit.timeout | PT30S | Period to tolerant Driver Pod ephemerally invisible after submitting. In some Resource Managers, e.g. K8s, the Driver Pod is not visible immediately after `spark-submit` is returned. | duration | 1.7.1 | +| kyuubi.engine.trino.connection.keystore.password | <undefined> | The keystore password used for connecting to trino cluster | string | 1.8.0 | +| kyuubi.engine.trino.connection.keystore.path | <undefined> | The keystore path used for connecting to trino cluster | string | 1.8.0 | +| kyuubi.engine.trino.connection.keystore.type | <undefined> | The keystore type used for connecting to trino cluster | string | 1.8.0 | +| kyuubi.engine.trino.connection.password | <undefined> | The password used for connecting to trino cluster | string | 1.8.0 | +| kyuubi.engine.trino.connection.truststore.password | <undefined> | The truststore password used for connecting to trino cluster | string | 1.8.0 | +| kyuubi.engine.trino.connection.truststore.path | <undefined> | The truststore path used for connecting to trino cluster | string | 1.8.0 | +| kyuubi.engine.trino.connection.truststore.type | <undefined> | The truststore type used for connecting to trino cluster | string | 1.8.0 | | kyuubi.engine.trino.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
| seq | 1.7.0 | | kyuubi.engine.trino.extra.classpath | <undefined> | The extra classpath for the Trino query engine, for configuring other libs which may need by the Trino engine | string | 1.6.0 | | kyuubi.engine.trino.java.options | <undefined> | The extra Java options for the Trino query engine | string | 1.6.0 | diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala index 42b21fc29..362ee3ed0 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit import io.airlift.units.Duration import io.trino.client.ClientSession +import io.trino.client.OkHttpUtil import okhttp3.OkHttpClient import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion} @@ -46,14 +47,18 @@ class TrinoSessionImpl( sessionManager: SessionManager) extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) { + val sessionConf: KyuubiConf = sessionManager.getConf + override val handle: SessionHandle = conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle()) + private val username: String = sessionConf + .getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY).getOrElse(currentUser) + var trinoContext: TrinoContext = _ private var clientSession: ClientSession = _ private var catalogName: String = _ private var databaseName: String = _ - private val sessionEvent = TrinoSessionEvent(this) override def open(): Unit = { @@ -66,34 +71,27 @@ class TrinoSessionImpl( case (USE_CATALOG, catalog) => catalogName = catalog case (USE_DATABASE, database) => databaseName = database } - - val httpClient = new OkHttpClient.Builder().build() + if (catalogName == null) { + catalogName = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG) + .getOrElse(throw KyuubiSQLException("Trino default catalog can not be null!")) + } clientSession = createClientSession() - trinoContext = TrinoContext(httpClient, clientSession) + trinoContext = TrinoContext(createHttpClient(), clientSession) super.open() EventBus.post(sessionEvent) } private def createClientSession(): ClientSession = { - val sessionConf = sessionManager.getConf val connectionUrl = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_URL).getOrElse( throw KyuubiSQLException("Trino server url can not be null!")) - if (catalogName == null) { - catalogName = sessionConf.get( - KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG).getOrElse( - throw KyuubiSQLException("Trino default catalog can not be null!")) - } - - val user = sessionConf - .getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY).getOrElse(currentUser) val clientRequestTimeout = sessionConf.get(TrinoConf.CLIENT_REQUEST_TIMEOUT) new ClientSession( URI.create(connectionUrl), - user, + username, Optional.empty(), "kyuubi", Optional.empty(), @@ -114,6 +112,37 @@ class TrinoSessionImpl( true) } + private def createHttpClient(): OkHttpClient = { + val keystorePath = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_KEYSTORE_PATH) + val keystorePassword = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_KEYSTORE_PASSWORD) + val keystoreType = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_KEYSTORE_TYPE) + val truststorePath = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_TRUSTSTORE_PATH) + val truststorePassword = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_TRUSTSTORE_PASSWORD) + val truststoreType = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_TRUSTSTORE_TYPE) + + val serverScheme = clientSession.getServer.getScheme + + val builder = new OkHttpClient.Builder() + + OkHttpUtil.setupSsl( + builder, + Optional.ofNullable(keystorePath.orNull), + Optional.ofNullable(keystorePassword.orNull), + Optional.ofNullable(keystoreType.orNull), + Optional.ofNullable(truststorePath.orNull), + Optional.ofNullable(truststorePassword.orNull), + Optional.ofNullable(truststoreType.orNull)) + + sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_PASSWORD).foreach { password => + require( + serverScheme.equalsIgnoreCase("https"), + "Trino engine using username/password requires HTTPS to be enabled") + builder.addInterceptor(OkHttpUtil.basicAuth(username, password)) + } + + builder.build() + } + override protected def runOperation(operation: Operation): OperationHandle = { sessionEvent.totalOperations += 1 super.runOperation(operation) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 175bf79c6..2de7abf89 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1341,6 +1341,55 @@ object KyuubiConf { .stringConf .createOptional + val ENGINE_TRINO_CONNECTION_PASSWORD: OptionalConfigEntry[String] = + buildConf("kyuubi.engine.trino.connection.password") + .doc("The password used for connecting to trino cluster") + .version("1.8.0") + .stringConf + .createOptional + + val ENGINE_TRINO_CONNECTION_KEYSTORE_PATH: OptionalConfigEntry[String] = + buildConf("kyuubi.engine.trino.connection.keystore.path") + .doc("The keystore path used for connecting to trino cluster") + .version("1.8.0") + .stringConf + .createOptional + + val ENGINE_TRINO_CONNECTION_KEYSTORE_PASSWORD: OptionalConfigEntry[String] = + buildConf("kyuubi.engine.trino.connection.keystore.password") + .doc("The keystore password used for connecting to trino cluster") + .version("1.8.0") + .stringConf + .createOptional + + val ENGINE_TRINO_CONNECTION_KEYSTORE_TYPE: OptionalConfigEntry[String] = + buildConf("kyuubi.engine.trino.connection.keystore.type") + .doc("The keystore type used for connecting to trino cluster") + .version("1.8.0") + .stringConf + .createOptional + + val ENGINE_TRINO_CONNECTION_TRUSTSTORE_PATH: OptionalConfigEntry[String] = + buildConf("kyuubi.engine.trino.connection.truststore.path") + .doc("The truststore path used for connecting to trino cluster") + .version("1.8.0") + .stringConf + .createOptional + + val ENGINE_TRINO_CONNECTION_TRUSTSTORE_PASSWORD: OptionalConfigEntry[String] = + buildConf("kyuubi.engine.trino.connection.truststore.password") + .doc("The truststore password used for connecting to trino cluster") + .version("1.8.0") + .stringConf + .createOptional + + val ENGINE_TRINO_CONNECTION_TRUSTSTORE_TYPE: OptionalConfigEntry[String] = + buildConf("kyuubi.engine.trino.connection.truststore.type") + .doc("The truststore type used for connecting to trino cluster") + .version("1.8.0") + .stringConf + .createOptional + val ENGINE_TRINO_SHOW_PROGRESS: ConfigEntry[Boolean] = buildConf("kyuubi.session.engine.trino.showProgress") .doc("When true, show the progress bar and final info in the Trino engine log.") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala index 7b68e464a..041219dd0 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala @@ -27,6 +27,7 @@ import scala.collection.mutable.ArrayBuffer import com.google.common.annotations.VisibleForTesting import org.apache.kyuubi.{Logging, SCALA_COMPILE_VERSION, Utils} +import org.apache.kyuubi.Utils.REDACTION_REPLACEMENT_TEXT import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY @@ -108,5 +109,19 @@ class TrinoProcessBuilder( override def shortName: String = "trino" - override def toString: String = Utils.redactCommandLineArgs(conf, commands).mkString("\n") + override def toString: String = { + if (commands == null) { + super.toString() + } else { + Utils.redactCommandLineArgs(conf, commands).map { + case arg if arg.contains(ENGINE_TRINO_CONNECTION_PASSWORD.key) => + s"${ENGINE_TRINO_CONNECTION_PASSWORD.key}=$REDACTION_REPLACEMENT_TEXT" + case arg if arg.contains(ENGINE_TRINO_CONNECTION_KEYSTORE_PASSWORD.key) => + s"${ENGINE_TRINO_CONNECTION_KEYSTORE_PASSWORD.key}=$REDACTION_REPLACEMENT_TEXT" + case arg if arg.contains(ENGINE_TRINO_CONNECTION_TRUSTSTORE_PASSWORD.key) => + s"${ENGINE_TRINO_CONNECTION_TRUSTSTORE_PASSWORD.key}=$REDACTION_REPLACEMENT_TEXT" + case arg => arg + }.mkString("\n") + } + } }