diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index edf7e4d84..c8734acef 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -189,6 +189,13 @@ kyuubi\.engine
\.operation\.log\.dir
\.root|
engine-pool
|
The name of engine pool.
|
string
|
1.5.0
kyuubi\.engine\.pool
\.size|
-1
|
The size of engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold).
|
int
|
1.4.0
kyuubi\.engine\.pool
\.size\.threshold|
9
|
This parameter is introduced as a server-side parameter, and controls the upper limit of the engine pool.
|
int
|
1.4.0
+kyuubi\.engine
\.security\.crypto
\.cipher|
AES/CBC/PKCS5PADDING
|
The cipher transformation to use for encrypting engine access token.
|
string
|
1.5.0
+kyuubi\.engine
\.security\.crypto
\.ivLength|
16
|
Initial vector length, in bytes.
|
int
|
1.5.0
+kyuubi\.engine
\.security\.crypto
\.keyAlgorithm|
AES
|
The algorithm for generated secret keys.
|
string
|
1.5.0
+kyuubi\.engine
\.security\.crypto
\.keyLength|
128
|
The length in bits of the encryption key to generate. Valid values are 128, 192 and 256
|
int
|
1.5.0
+kyuubi\.engine
\.security\.enabled|
false
|
Whether to enable the internal secure access between Kyuubi server and engine.
|
boolean
|
1.5.0
+kyuubi\.engine
\.security\.secret
\.provider|
org.apache.kyuubi.service.authentication.ZooKeeperEngineSecuritySecretProviderImpl
|
The class used to manage the engine security secret. This class must be a subclass of EngineSecuritySecretProvider.
|
string
|
1.5.0
+kyuubi\.engine
\.security\.token\.max
\.lifetime|
PT10M
|
The max lifetime of the token used for secure access between Kyuubi server and engine.
|
duration
|
1.5.0
kyuubi\.engine\.session
\.initialize\.sql|
|
SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.
|
seq
|
1.3.0
kyuubi\.engine\.share
\.level|
USER
|
Engines will be shared in different levels, available configs are:
|
string
|
1.2.0
kyuubi\.engine\.share
\.level\.sub\.domain|
<undefined>
|
(deprecated) - Using kyuubi.engine.share.level.subdomain instead
|
string
|
1.2.0
@@ -247,6 +254,7 @@ kyuubi\.ha\.zookeeper
\.connection\.max\.retry
\.wait|
EXPONENTIAL_BACKOFF
|
The retry policy for connecting to the zookeeper ensemble, all candidates are:
|
string
|
1.0.0
kyuubi\.ha\.zookeeper
\.connection\.timeout|
15000
|
The timeout(ms) of creating the connection to the zookeeper ensemble
|
int
|
1.0.0
kyuubi\.ha\.zookeeper
\.engine\.auth\.type|
NONE
|
The type of zookeeper authentication for engine, all candidates are
|
string
|
1.3.2
+kyuubi\.ha\.zookeeper
\.engine\.secure\.secret
\.node|
<undefined>
|
The zk node contains the secret that used for internal secure between Kyuubi server and Kyuubi engine, please make sure that it is only visible for Kyuubi.
|
string
|
1.5.0
kyuubi\.ha\.zookeeper
\.namespace|
kyuubi
|
The root directory for the service to deploy its instance uri
|
string
|
1.0.0
kyuubi\.ha\.zookeeper
\.node\.creation\.timeout|
PT2M
|
Timeout for creating zookeeper node
|
duration
|
1.2.0
kyuubi\.ha\.zookeeper
\.publish\.configs|
false
|
When set to true, publish Kerberos configs to Zookeeper.Note that the Hive driver needs to be greater than 1.3 or 2.0 or apply HIVE-11581 patch.
|
boolean
|
1.4.0
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 5af068525..ef986c5be 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1070,6 +1070,59 @@ object KyuubiConf { .stringConf .createWithDefault("engine_operation_logs") + val ENGINE_SECURITY_ENABLED: ConfigEntry[Boolean] = + buildConf("engine.security.enabled") + .doc("Whether to enable the internal secure access between Kyuubi server and engine.") + .version("1.5.0") + .booleanConf + .createWithDefault(false) + + val ENGINE_SECURITY_TOKEN_MAX_LIFETIME: ConfigEntry[Long] = + buildConf("engine.security.token.max.lifetime") + .doc("The max lifetime of the token used for secure access between Kyuubi server and engine.") + .version("1.5.0") + .timeConf + .createWithDefault(Duration.ofMinutes(10).toMillis) + + val ENGINE_SECURITY_SECRET_PROVIDER: ConfigEntry[String] = + buildConf("engine.security.secret.provider") + .doc("The class used to manage the engine security secret. This class must be a " + + "subclass of EngineSecuritySecretProvider.") + .version("1.5.0") + .stringConf + .createWithDefault( + "org.apache.kyuubi.service.authentication.ZooKeeperEngineSecuritySecretProviderImpl") + + val ENGINE_SECURITY_CRYPTO_KEY_LENGTH: ConfigEntry[Int] = + buildConf("engine.security.crypto.keyLength") + .doc("The length in bits of the encryption key to generate. " + + "Valid values are 128, 192 and 256") + .version("1.5.0") + .intConf + .checkValues(Set(128, 192, 256)) + .createWithDefault(128) + + val ENGINE_SECURITY_CRYPTO_IV_LENGTH: ConfigEntry[Int] = + buildConf("engine.security.crypto.ivLength") + .doc("Initial vector length, in bytes.") + .version("1.5.0") + .intConf + .createWithDefault(16) + + val ENGINE_SECURITY_CRYPTO_KEY_ALGORITHM: ConfigEntry[String] = + buildConf("engine.security.crypto.keyAlgorithm") + .doc("The algorithm for generated secret keys.") + .version("1.5.0") + .stringConf + .createWithDefault("AES") + + val ENGINE_SECURITY_CRYPTO_CIPHER_TRANSFORMATION: ConfigEntry[String] = + buildConf("engine.security.crypto.cipher") + .doc("The cipher transformation to use for encrypting engine access token.") + .version("1.5.0") + .stringConf + .createWithDefault("AES/CBC/PKCS5PADDING") + val SESSION_NAME: OptionalConfigEntry[String] = buildConf("session.name") .doc("A human readable name of session and we use empty string by default. " + diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala index e24467ad2..10373840a 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala @@ -109,6 +109,4 @@ abstract class TBinaryFrontendService(name: String) server.foreach(_.stop()) server = None } - - protected def isServer(): Boolean = false } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala index 618dafdb6..31bdf07c7 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala @@ -54,7 +54,7 @@ abstract class TFrontendService(name: String) serverHost.map(InetAddress.getByName).getOrElse(Utils.findLocalInetAddress) protected lazy val serverSocket = new ServerSocket(portNum, -1, serverAddr) protected lazy val authFactory: KyuubiAuthenticationFactory = - new KyuubiAuthenticationFactory(conf) + new KyuubiAuthenticationFactory(conf, isServer()) override def start(): Unit = synchronized { super.start() @@ -462,6 +462,8 @@ abstract class TFrontendService(name: String) resp } + protected def isServer(): Boolean = false + class FeTServerEventHandler extends TServerEventHandler { implicit def toFeServiceServerContext(context: ServerContext): FeServiceServerContext = { context.asInstanceOf[FeServiceServerContext] diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/AuthenticationProviderFactory.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/AuthenticationProviderFactory.scala index 2bd630626..1361f5281 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/AuthenticationProviderFactory.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/AuthenticationProviderFactory.scala @@ -28,6 +28,17 @@ import org.apache.kyuubi.service.authentication.AuthMethods.AuthMethod object AuthenticationProviderFactory { @throws[AuthenticationException] def getAuthenticationProvider( + method: AuthMethod, + conf: KyuubiConf, + isServer: Boolean = true): PasswdAuthenticationProvider = { + if (isServer) { + getAuthenticationProviderForServer(method, conf) + } else { + getAuthenticationProviderForEngine(conf) + } + } + + private def getAuthenticationProviderForServer( method: AuthMethod, conf: KyuubiConf): PasswdAuthenticationProvider = method match { case AuthMethods.NONE => new AnonymousAuthenticationProviderImpl @@ -57,4 +68,12 @@ object AuthenticationProviderFactory { } case _ => throw new AuthenticationException("Not a valid authentication method") } + + private def getAuthenticationProviderForEngine(conf: KyuubiConf): PasswdAuthenticationProvider = { + if (conf.get(KyuubiConf.ENGINE_SECURITY_ENABLED)) { + new EngineSecureAuthenticationProviderImpl + } else { + new AnonymousAuthenticationProviderImpl + } + } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecureAuthenticationProviderImpl.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecureAuthenticationProviderImpl.scala new file mode 100644 index 000000000..709081dd9 --- /dev/null +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecureAuthenticationProviderImpl.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.service.authentication + +class EngineSecureAuthenticationProviderImpl extends PasswdAuthenticationProvider { + override def authenticate(user: String, password: String): Unit = { + EngineSecurityAccessor.get().authToken(password) + } +} diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecurityAccessor.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecurityAccessor.scala new file mode 100644 index 000000000..98cf6a821 --- /dev/null +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecurityAccessor.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.service.authentication + +import javax.crypto.Cipher +import javax.crypto.spec.{IvParameterSpec, SecretKeySpec} + +import org.apache.kyuubi.{KyuubiSQLException, Logging} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf._ + +class EngineSecurityAccessor(conf: KyuubiConf, val isServer: Boolean) { + val cryptoKeyLengthBytes = conf.get(ENGINE_SECURITY_CRYPTO_KEY_LENGTH) / java.lang.Byte.SIZE + val cryptoIvLength = conf.get(ENGINE_SECURITY_CRYPTO_IV_LENGTH) + val cryptoKeyAlgorithm = conf.get(ENGINE_SECURITY_CRYPTO_KEY_ALGORITHM) + val cryptoCipher = conf.get(ENGINE_SECURITY_CRYPTO_CIPHER_TRANSFORMATION) + + private val tokenMaxLifeTime: Long = conf.get(ENGINE_SECURITY_TOKEN_MAX_LIFETIME) + private val provider: EngineSecuritySecretProvider = EngineSecuritySecretProvider.create(conf) + private val (encryptor, decryptor) = + initializeForAuth(cryptoCipher, normalizeSecret(provider.getSecret())) + + private def initializeForAuth(cipher: String, secret: String): (Cipher, Cipher) = { + val secretKeySpec = new SecretKeySpec(secret.getBytes, cryptoKeyAlgorithm) + val nonce = new Array[Byte](cryptoIvLength) + val iv = new IvParameterSpec(nonce) + + val _encryptor = Cipher.getInstance(cipher) + _encryptor.init(Cipher.ENCRYPT_MODE, secretKeySpec, iv) + + val _decryptor = Cipher.getInstance(cipher) + _decryptor.init(Cipher.DECRYPT_MODE, secretKeySpec, iv) + + (_encryptor, _decryptor) + } + + def issueToken(): String = { + encrypt(KyuubiInternalAccessIdentifier.newIdentifier(tokenMaxLifeTime).toJson) + } + + def authToken(tokenStr: String): Unit = { + val identifier = + try { + KyuubiInternalAccessIdentifier.fromJson(decrypt(tokenStr)) + } catch { + case _: Exception => + throw KyuubiSQLException("Invalid engine access token") + } + if (identifier.issueDate + identifier.maxDate < System.currentTimeMillis()) { + throw KyuubiSQLException("The engine access token is expired") + } + } + + private[authentication] def encrypt(value: String): String = { + byteArrayToHexString(encryptor.doFinal(value.getBytes)) + } + + private[authentication] def decrypt(value: String): String = { + new String(decryptor.doFinal(hexStringToByteArray(value))) + } + + private def normalizeSecret(secret: String): String = { + val normalizedSecret = new Array[Char](cryptoKeyLengthBytes) + val placeHolder = ' ' + for (i <- 0 until cryptoKeyLengthBytes) { + if (i < secret.length) { + normalizedSecret.update(i, secret.charAt(i)) + } else { + normalizedSecret.update(i, placeHolder) + } + } + new String(normalizedSecret) + } + + private def hexStringToByteArray(str: String): Array[Byte] = { + val len = str.length + assert(len % 2 == 0) + val data = new Array[Byte](len / 2) + var i = 0 + while (i < len) { + data.update( + i / 2, + ((Character.digit(str.charAt(i), 16) << 4) + + Character.digit(str.charAt(i + 1), 16)).asInstanceOf[Byte]) + i += 2 + } + data + } + + private def byteArrayToHexString(bytes: Array[Byte]): String = { + bytes.map { byte => + Integer.toHexString((byte >> 4) & 0xF) + Integer.toHexString(byte & 0xF) + }.reduce(_ + _) + } +} + +object EngineSecurityAccessor extends Logging { + @volatile private var _engineSecurityAccessor: EngineSecurityAccessor = _ + + def initialize(conf: KyuubiConf, isServer: Boolean): Unit = { + if (_engineSecurityAccessor == null) { + _engineSecurityAccessor = new EngineSecurityAccessor(conf, isServer) + } + } + + def get(): EngineSecurityAccessor = { + _engineSecurityAccessor + } +} diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecuritySecretProvider.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecuritySecretProvider.scala new file mode 100644 index 000000000..5bd9e4092 --- /dev/null +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/EngineSecuritySecretProvider.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.service.authentication + +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.ENGINE_SECURITY_SECRET_PROVIDER + +trait EngineSecuritySecretProvider { + + /** + * Initialize with kyuubi conf. + */ + def initialize(conf: KyuubiConf): Unit + + /** + * Get the secret to encrypt and decrypt the secure access token. + */ + def getSecret(): String +} + +object EngineSecuritySecretProvider { + def create(conf: KyuubiConf): EngineSecuritySecretProvider = { + val providerClass = Class.forName(conf.get(ENGINE_SECURITY_SECRET_PROVIDER)) + val provider = providerClass.getConstructor().newInstance() + .asInstanceOf[EngineSecuritySecretProvider] + provider.initialize(conf) + provider + } +} diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala index c7ddb12b9..b53282517 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala @@ -34,7 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.service.authentication.AuthTypes._ -class KyuubiAuthenticationFactory(conf: KyuubiConf) extends Logging { +class KyuubiAuthenticationFactory(conf: KyuubiConf, isServer: Boolean = true) extends Logging { private val authTypes = conf.get(AUTHENTICATION_METHOD).map(AuthTypes.withName) private val noSasl = authTypes == Seq(NOSASL) @@ -56,6 +56,10 @@ class KyuubiAuthenticationFactory(conf: KyuubiConf) extends Logging { } } + if (conf.get(ENGINE_SECURITY_ENABLED)) { + EngineSecurityAccessor.initialize(conf, isServer) + } + private def getSaslProperties: java.util.Map[String, String] = { val props = new java.util.HashMap[String, String]() val qop = SaslQOP.withName(conf.get(SASL_QOP)) @@ -87,7 +91,8 @@ class KyuubiAuthenticationFactory(conf: KyuubiConf) extends Logging { transportFactory = PlainSASLHelper.getTransportFactory( plainAuthType.toString, conf, - Option(transportFactory)).asInstanceOf[TSaslServerTransport.Factory] + Option(transportFactory), + isServer).asInstanceOf[TSaslServerTransport.Factory] case _ => } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiInternalAccessIdentifier.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiInternalAccessIdentifier.scala new file mode 100644 index 000000000..88744ada8 --- /dev/null +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiInternalAccessIdentifier.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.service.authentication + +import com.fasterxml.jackson.annotation.{JsonAutoDetect, JsonInclude, JsonPropertyOrder} +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +@JsonInclude(Include.NON_ABSENT) +@JsonAutoDetect(getterVisibility = Visibility.ANY, setterVisibility = Visibility.ANY) +@JsonPropertyOrder(alphabetic = true) +class KyuubiInternalAccessIdentifier { + var issueDate: Long = Integer.MAX_VALUE + var maxDate: Long = 0 + + def toJson: String = { + KyuubiInternalAccessIdentifier.mapper.writeValueAsString(this) + } +} + +private[kyuubi] object KyuubiInternalAccessIdentifier { + private val mapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .enable(SerializationFeature.INDENT_OUTPUT) + .registerModule(DefaultScalaModule) + + def fromJson(json: String): KyuubiInternalAccessIdentifier = { + mapper.readValue(json, classOf[KyuubiInternalAccessIdentifier]) + } + + def newIdentifier(maxDate: Long): KyuubiInternalAccessIdentifier = { + val identifier = new KyuubiInternalAccessIdentifier + identifier.issueDate = System.currentTimeMillis() + identifier.maxDate = maxDate + identifier + } +} diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/PlainSASLHelper.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/PlainSASLHelper.scala index 2cb44bb05..3959341ed 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/PlainSASLHelper.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/PlainSASLHelper.scala @@ -40,11 +40,14 @@ object PlainSASLHelper { new TSetIpAddressProcessor[Iface](service) } - private class PlainServerCallbackHandler private (authMethod: AuthMethod, conf: KyuubiConf) + private class PlainServerCallbackHandler private ( + authMethod: AuthMethod, + conf: KyuubiConf, + isServer: Boolean) extends CallbackHandler { - def this(authMethodStr: String, conf: KyuubiConf) = - this(AuthMethods.withName(authMethodStr), conf) + def this(authMethodStr: String, conf: KyuubiConf, isServer: Boolean) = + this(AuthMethods.withName(authMethodStr), conf, isServer) @throws[UnsupportedCallbackException] override def handle(callbacks: Array[Callback]): Unit = { @@ -61,7 +64,8 @@ object PlainSASLHelper { case _ => throw new UnsupportedCallbackException(callback) } } - val provider = AuthenticationProviderFactory.getAuthenticationProvider(authMethod, conf) + val provider = + AuthenticationProviderFactory.getAuthenticationProvider(authMethod, conf, isServer) provider.authenticate(username, password) if (ac != null) ac.setAuthorized(true) } @@ -74,10 +78,11 @@ object PlainSASLHelper { def getTransportFactory( authTypeStr: String, conf: KyuubiConf, - transportFactory: Option[TSaslServerTransport.Factory] = None): TTransportFactory = { + transportFactory: Option[TSaslServerTransport.Factory] = None, + isServer: Boolean = true): TTransportFactory = { val saslFactory = transportFactory.getOrElse(new TSaslServerTransport.Factory()) try { - val handler = new PlainServerCallbackHandler(authTypeStr, conf) + val handler = new PlainServerCallbackHandler(authTypeStr, conf, isServer) val props = new java.util.HashMap[String, String] saslFactory.addServerDefinition("PLAIN", authTypeStr, null, props, handler) } catch { diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/EngineSecurityAccessorSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/EngineSecurityAccessorSuite.scala new file mode 100644 index 000000000..6f4bb672b --- /dev/null +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/EngineSecurityAccessorSuite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.service.authentication + +import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException} +import org.apache.kyuubi.config.KyuubiConf + +class EngineSecurityAccessorSuite extends KyuubiFunSuite { + private val conf = KyuubiConf() + conf.set( + KyuubiConf.ENGINE_SECURITY_SECRET_PROVIDER, + classOf[UserDefinedEngineSecuritySecretProvider].getCanonicalName) + + test("test encrypt/decrypt, issue token/auth token") { + Seq("AES/CBC/PKCS5PADDING", "AES/CTR/PKCS5PADDING").foreach { cipher => + val newConf = conf.clone + newConf.set(KyuubiConf.ENGINE_SECURITY_CRYPTO_CIPHER_TRANSFORMATION, cipher) + + val secureAccessor = new EngineSecurityAccessor(newConf, true) + val value = "tokenToEncrypt" + val encryptedValue = secureAccessor.encrypt(value) + assert(secureAccessor.decrypt(encryptedValue) === value) + + val token = secureAccessor.issueToken() + secureAccessor.authToken(token) + intercept[KyuubiSQLException](secureAccessor.authToken("invalidToken")) + + val engineSecureAccessor = new EngineSecurityAccessor(newConf, false) + engineSecureAccessor.authToken(token) + } + } +} diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/UserDefinedEngineSecuritySecretProvider.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/UserDefinedEngineSecuritySecretProvider.scala new file mode 100644 index 000000000..de74ed98d --- /dev/null +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/authentication/UserDefinedEngineSecuritySecretProvider.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.service.authentication + +import org.apache.kyuubi.config.KyuubiConf + +class UserDefinedEngineSecuritySecretProvider extends EngineSecuritySecretProvider { + override def initialize(kyuubiConf: KyuubiConf): Unit = {} + + override def getSecret(): String = { + "ENGINE____SECRET" + } +} diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala index 8062b3866..8ee3cc180 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/HighAvailabilityConf.scala @@ -148,4 +148,12 @@ object HighAvailabilityConf { .version("1.4.0") .booleanConf .createWithDefault(false) + + val HA_ZK_ENGINE_SECURE_SECRET_NODE: OptionalConfigEntry[String] = + buildConf("ha.zookeeper.engine.secure.secret.node") + .doc("The zk node contains the secret that used for internal secure between Kyuubi server " + + "and Kyuubi engine, please make sure that it is only visible for Kyuubi.") + .version("1.5.0") + .stringConf + .createOptional } diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala index 1c6d18538..fa7d366ed 100644 --- a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ZooKeeperClientProvider.scala @@ -97,7 +97,7 @@ object ZooKeeperClientProvider extends Logging { /** * Creates a zookeeper client before calling `f` and close it after calling `f`. */ - def withZkClient(conf: KyuubiConf)(f: CuratorFramework => Unit): Unit = { + def withZkClient[T](conf: KyuubiConf)(f: CuratorFramework => T): T = { val zkClient = buildZookeeperClient(conf) try { zkClient.start() diff --git a/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala b/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala new file mode 100644 index 000000000..037307951 --- /dev/null +++ b/kyuubi-ha/src/main/scala/org/apache/kyuubi/service/authentication/ZooKeeperEngineSecuritySecretProviderImpl.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.service.authentication + +import java.nio.charset.StandardCharsets + +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_SECURE_SECRET_NODE +import org.apache.kyuubi.ha.client.ZooKeeperClientProvider + +class ZooKeeperEngineSecuritySecretProviderImpl extends EngineSecuritySecretProvider { + import ZooKeeperClientProvider._ + + private var conf: KyuubiConf = _ + + override def initialize(conf: KyuubiConf): Unit = { + this.conf = conf + } + + override def getSecret(): String = { + conf.get(HA_ZK_ENGINE_SECURE_SECRET_NODE).map { zkNode => + withZkClient[String](conf) { zkClient => + new String(zkClient.getData.forPath(zkNode), StandardCharsets.UTF_8) + } + }.getOrElse( + throw new IllegalArgumentException(s"${HA_ZK_ENGINE_SECURE_SECRET_NODE.key} is not defined")) + } +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 3311f2893..e487ff31c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -32,6 +32,7 @@ import org.apache.kyuubi.metrics.MetricsSystem import org.apache.kyuubi.operation.{Operation, OperationHandle} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.server.EventLoggingService +import org.apache.kyuubi.service.authentication.EngineSecurityAccessor class KyuubiSessionImpl( protocol: TProtocolVersion, @@ -82,7 +83,12 @@ class KyuubiSessionImpl( private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit = { withZkClient(sessionConf) { zkClient => val (host, port) = engine.getOrCreate(zkClient, extraEngineLog) - val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous") + val passwd = + if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) { + EngineSecurityAccessor.get().issueToken() + } else { + Option(password).filter(_.nonEmpty).getOrElse("anonymous") + } _client = KyuubiSyncThriftClient.createClient(user, passwd, host, port, sessionConf) _engineSessionHandle = _client.openSession(protocol, user, passwd, normalizedConf) logSessionInfo(s"Connected to engine [$host:$port] with ${_engineSessionHandle}") diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala new file mode 100644 index 000000000..ae5d611e0 --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationWithEngineSecurity.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import java.nio.charset.StandardCharsets + +import org.apache.curator.framework.recipes.nodes.PersistentNode +import org.apache.zookeeper.CreateMode + +import org.apache.kyuubi.WithKyuubiServer +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.ha.HighAvailabilityConf +import org.apache.kyuubi.ha.client.ZooKeeperClientProvider +import org.apache.kyuubi.service.authentication.{EngineSecurityAccessor, ZooKeeperEngineSecuritySecretProviderImpl} + +class KyuubiOperationWithEngineSecurity extends WithKyuubiServer with HiveJDBCTestHelper { + import ZooKeeperClientProvider._ + + override protected def jdbcUrl: String = getJdbcUrl + + private val engineSecretNode = "/SECRET" + + override protected val conf: KyuubiConf = { + KyuubiConf() + .set(KyuubiConf.ENGINE_SECURITY_ENABLED, false) + .set( + KyuubiConf.ENGINE_SECURITY_SECRET_PROVIDER, + classOf[ZooKeeperEngineSecuritySecretProviderImpl].getCanonicalName) + .set(HighAvailabilityConf.HA_ZK_ENGINE_SECURE_SECRET_NODE, engineSecretNode) + } + + override def beforeAll(): Unit = { + super.beforeAll() + withZkClient(conf) { zkClient => + zkClient.create().withMode(CreateMode.PERSISTENT).forPath(engineSecretNode) + val secretNode = new PersistentNode( + zkClient, + CreateMode.PERSISTENT, + false, + engineSecretNode, + "_ENGINE_SECRET_".getBytes(StandardCharsets.UTF_8)) + secretNode.start() + } + + conf.set(KyuubiConf.ENGINE_SECURITY_ENABLED, true) + EngineSecurityAccessor.initialize(conf, true) + } + + test("engine security") { + withJdbcStatement() { statement => + val rs = statement.executeQuery(s"set spark.${KyuubiConf.ENGINE_SECURITY_ENABLED.key}") + assert(rs.next()) + assert(rs.getString(2).contains("true")) + } + } +}