[KYUUBI #3374] Support password authentication for Trino engine

### _Why are the changes needed?_

Support SSL for trino engine.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [X] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3374 from hddong/support-trino-password.

Closes #3374

f39daaf78 [Cheng Pan] improve
6308c4cf7 [hongdongdong] Support SSL for trino engine

Lead-authored-by: Cheng Pan <chengpan@apache.org>
Co-authored-by: hongdongdong <hongdd@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2023-08-04 09:50:56 +08:00
parent 7bc0dbf932
commit ba99744b09
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
4 changed files with 115 additions and 15 deletions

View File

@ -176,6 +176,13 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.spark.python.env.archive.exec.path | bin/python | The Python exec path under the Python env archive. | string | 1.7.0 |
| kyuubi.engine.spark.python.home.archive | &lt;undefined&gt; | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode. | string | 1.7.0 |
| kyuubi.engine.submit.timeout | PT30S | Period to tolerant Driver Pod ephemerally invisible after submitting. In some Resource Managers, e.g. K8s, the Driver Pod is not visible immediately after `spark-submit` is returned. | duration | 1.7.1 |
| kyuubi.engine.trino.connection.keystore.password | &lt;undefined&gt; | The keystore password used for connecting to trino cluster | string | 1.8.0 |
| kyuubi.engine.trino.connection.keystore.path | &lt;undefined&gt; | The keystore path used for connecting to trino cluster | string | 1.8.0 |
| kyuubi.engine.trino.connection.keystore.type | &lt;undefined&gt; | The keystore type used for connecting to trino cluster | string | 1.8.0 |
| kyuubi.engine.trino.connection.password | &lt;undefined&gt; | The password used for connecting to trino cluster | string | 1.8.0 |
| kyuubi.engine.trino.connection.truststore.password | &lt;undefined&gt; | The truststore password used for connecting to trino cluster | string | 1.8.0 |
| kyuubi.engine.trino.connection.truststore.path | &lt;undefined&gt; | The truststore path used for connecting to trino cluster | string | 1.8.0 |
| kyuubi.engine.trino.connection.truststore.type | &lt;undefined&gt; | The truststore type used for connecting to trino cluster | string | 1.8.0 |
| kyuubi.engine.trino.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go.<ul> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done.</li></ul> | seq | 1.7.0 |
| kyuubi.engine.trino.extra.classpath | &lt;undefined&gt; | The extra classpath for the Trino query engine, for configuring other libs which may need by the Trino engine | string | 1.6.0 |
| kyuubi.engine.trino.java.options | &lt;undefined&gt; | The extra Java options for the Trino query engine | string | 1.6.0 |

View File

@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit
import io.airlift.units.Duration
import io.trino.client.ClientSession
import io.trino.client.OkHttpUtil
import okhttp3.OkHttpClient
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
@ -46,14 +47,18 @@ class TrinoSessionImpl(
sessionManager: SessionManager)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
val sessionConf: KyuubiConf = sessionManager.getConf
override val handle: SessionHandle =
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
private val username: String = sessionConf
.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY).getOrElse(currentUser)
var trinoContext: TrinoContext = _
private var clientSession: ClientSession = _
private var catalogName: String = _
private var databaseName: String = _
private val sessionEvent = TrinoSessionEvent(this)
override def open(): Unit = {
@ -66,34 +71,27 @@ class TrinoSessionImpl(
case (USE_CATALOG, catalog) => catalogName = catalog
case (USE_DATABASE, database) => databaseName = database
}
val httpClient = new OkHttpClient.Builder().build()
if (catalogName == null) {
catalogName = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG)
.getOrElse(throw KyuubiSQLException("Trino default catalog can not be null!"))
}
clientSession = createClientSession()
trinoContext = TrinoContext(httpClient, clientSession)
trinoContext = TrinoContext(createHttpClient(), clientSession)
super.open()
EventBus.post(sessionEvent)
}
private def createClientSession(): ClientSession = {
val sessionConf = sessionManager.getConf
val connectionUrl = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_URL).getOrElse(
throw KyuubiSQLException("Trino server url can not be null!"))
if (catalogName == null) {
catalogName = sessionConf.get(
KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG).getOrElse(
throw KyuubiSQLException("Trino default catalog can not be null!"))
}
val user = sessionConf
.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY).getOrElse(currentUser)
val clientRequestTimeout = sessionConf.get(TrinoConf.CLIENT_REQUEST_TIMEOUT)
new ClientSession(
URI.create(connectionUrl),
user,
username,
Optional.empty(),
"kyuubi",
Optional.empty(),
@ -114,6 +112,37 @@ class TrinoSessionImpl(
true)
}
private def createHttpClient(): OkHttpClient = {
val keystorePath = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_KEYSTORE_PATH)
val keystorePassword = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_KEYSTORE_PASSWORD)
val keystoreType = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_KEYSTORE_TYPE)
val truststorePath = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_TRUSTSTORE_PATH)
val truststorePassword = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_TRUSTSTORE_PASSWORD)
val truststoreType = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_TRUSTSTORE_TYPE)
val serverScheme = clientSession.getServer.getScheme
val builder = new OkHttpClient.Builder()
OkHttpUtil.setupSsl(
builder,
Optional.ofNullable(keystorePath.orNull),
Optional.ofNullable(keystorePassword.orNull),
Optional.ofNullable(keystoreType.orNull),
Optional.ofNullable(truststorePath.orNull),
Optional.ofNullable(truststorePassword.orNull),
Optional.ofNullable(truststoreType.orNull))
sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_PASSWORD).foreach { password =>
require(
serverScheme.equalsIgnoreCase("https"),
"Trino engine using username/password requires HTTPS to be enabled")
builder.addInterceptor(OkHttpUtil.basicAuth(username, password))
}
builder.build()
}
override protected def runOperation(operation: Operation): OperationHandle = {
sessionEvent.totalOperations += 1
super.runOperation(operation)

View File

@ -1341,6 +1341,55 @@ object KyuubiConf {
.stringConf
.createOptional
val ENGINE_TRINO_CONNECTION_PASSWORD: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.trino.connection.password")
.doc("The password used for connecting to trino cluster")
.version("1.8.0")
.stringConf
.createOptional
val ENGINE_TRINO_CONNECTION_KEYSTORE_PATH: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.trino.connection.keystore.path")
.doc("The keystore path used for connecting to trino cluster")
.version("1.8.0")
.stringConf
.createOptional
val ENGINE_TRINO_CONNECTION_KEYSTORE_PASSWORD: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.trino.connection.keystore.password")
.doc("The keystore password used for connecting to trino cluster")
.version("1.8.0")
.stringConf
.createOptional
val ENGINE_TRINO_CONNECTION_KEYSTORE_TYPE: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.trino.connection.keystore.type")
.doc("The keystore type used for connecting to trino cluster")
.version("1.8.0")
.stringConf
.createOptional
val ENGINE_TRINO_CONNECTION_TRUSTSTORE_PATH: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.trino.connection.truststore.path")
.doc("The truststore path used for connecting to trino cluster")
.version("1.8.0")
.stringConf
.createOptional
val ENGINE_TRINO_CONNECTION_TRUSTSTORE_PASSWORD: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.trino.connection.truststore.password")
.doc("The truststore password used for connecting to trino cluster")
.version("1.8.0")
.stringConf
.createOptional
val ENGINE_TRINO_CONNECTION_TRUSTSTORE_TYPE: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.trino.connection.truststore.type")
.doc("The truststore type used for connecting to trino cluster")
.version("1.8.0")
.stringConf
.createOptional
val ENGINE_TRINO_SHOW_PROGRESS: ConfigEntry[Boolean] =
buildConf("kyuubi.session.engine.trino.showProgress")
.doc("When true, show the progress bar and final info in the Trino engine log.")

View File

@ -27,6 +27,7 @@ import scala.collection.mutable.ArrayBuffer
import com.google.common.annotations.VisibleForTesting
import org.apache.kyuubi.{Logging, SCALA_COMPILE_VERSION, Utils}
import org.apache.kyuubi.Utils.REDACTION_REPLACEMENT_TEXT
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
@ -108,5 +109,19 @@ class TrinoProcessBuilder(
override def shortName: String = "trino"
override def toString: String = Utils.redactCommandLineArgs(conf, commands).mkString("\n")
override def toString: String = {
if (commands == null) {
super.toString()
} else {
Utils.redactCommandLineArgs(conf, commands).map {
case arg if arg.contains(ENGINE_TRINO_CONNECTION_PASSWORD.key) =>
s"${ENGINE_TRINO_CONNECTION_PASSWORD.key}=$REDACTION_REPLACEMENT_TEXT"
case arg if arg.contains(ENGINE_TRINO_CONNECTION_KEYSTORE_PASSWORD.key) =>
s"${ENGINE_TRINO_CONNECTION_KEYSTORE_PASSWORD.key}=$REDACTION_REPLACEMENT_TEXT"
case arg if arg.contains(ENGINE_TRINO_CONNECTION_TRUSTSTORE_PASSWORD.key) =>
s"${ENGINE_TRINO_CONNECTION_TRUSTSTORE_PASSWORD.key}=$REDACTION_REPLACEMENT_TEXT"
case arg => arg
}.mkString("\n")
}
}
}