[KYUUBI #1813] Enable the internal secure authentication between Kyuubi server and engine

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

Enable the internal secure access between Kyuubi server and engine.

This pr use secret and cipher to encrypt a dynamic token identifier for authentication.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1813 from turboFei/engine_secure.

Closes #1813

aa7ad0de [Fei Wang] run a simple query for engien security
34354d97 [Fei Wang] requre zk node must be defined for zk secret provider
f84ce7c9 [Fei Wang] Require zk secret node non empty
b545281d [Fei Wang] refactor
d8d6dfd2 [Fei Wang] address comments
0e9273e1 [Fei Wang] refactor provider conf
d3263c08 [Fei Wang] place holder
e1266e32 [Fei Wang] reformat
9e252fea [Fei Wang] enhance prompt
dbc9100c [Fei Wang] fix conf
ff6b6dea [Fei Wang] refactor
4ea020ed [Fei Wang] refactor
20915902 [Fei Wang] refactor
a7b4e41e [Fei Wang] add more ut:
1e1ceb27 [Fei Wang] use orignal cipher
8d8387b2 [Fei Wang] save
ad777cb4 [Fei Wang] refactor
b8997313 [Fei Wang] save
b997095d [Fei Wang] refactor
e05b47c5 [Fei Wang] move
a322b784 [Fei Wang] refactor
b14f7cbf [Fei Wang]  Enable the internal secure authentication

Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
Fei Wang 2022-01-27 14:33:53 +08:00 committed by ulysses-you
parent 53d59a02bf
commit 33d53db3ff
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
18 changed files with 552 additions and 13 deletions

View File

@ -189,6 +189,13 @@ kyuubi\.engine<br>\.operation\.log\.dir<br>\.root|<div style='width: 65pt;word-w
kyuubi\.engine\.pool<br>\.name|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>engine-pool</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The name of engine pool.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.5.0</div>
kyuubi\.engine\.pool<br>\.size|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>-1</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The size of engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold).</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.engine\.pool<br>\.size\.threshold|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>9</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>This parameter is introduced as a server-side parameter, and controls the upper limit of the engine pool.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.engine<br>\.security\.crypto<br>\.cipher|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>AES/CBC/PKCS5PADDING</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The cipher transformation to use for encrypting engine access token.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.5.0</div>
kyuubi\.engine<br>\.security\.crypto<br>\.ivLength|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>16</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Initial vector length, in bytes.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.5.0</div>
kyuubi\.engine<br>\.security\.crypto<br>\.keyAlgorithm|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>AES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The algorithm for generated secret keys.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.5.0</div>
kyuubi\.engine<br>\.security\.crypto<br>\.keyLength|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>128</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The length in bits of the encryption key to generate. Valid values are 128, 192 and 256</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.5.0</div>
kyuubi\.engine<br>\.security\.enabled|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Whether to enable the internal secure access between Kyuubi server and engine.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.5.0</div>
kyuubi\.engine<br>\.security\.secret<br>\.provider|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>org.apache.kyuubi.service.authentication.ZooKeeperEngineSecuritySecretProviderImpl</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The class used to manage the engine security secret. This class must be a subclass of EngineSecuritySecretProvider.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.5.0</div>
kyuubi\.engine<br>\.security\.token\.max<br>\.lifetime|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT10M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The max lifetime of the token used for secure access between Kyuubi server and engine.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.5.0</div>
kyuubi\.engine\.session<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.3.0</div>
kyuubi\.engine\.share<br>\.level|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>USER</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Engines will be shared in different levels, available configs are: <ul> <li>CONNECTION: engine will not be shared but only used by the current client connection</li> <li>USER: engine will be shared by all sessions created by a unique username, see also kyuubi.engine.share.level.subdomain</li> <li>GROUP: engine will be shared by all sessions created by all users belong to the same primary group name. The engine will be launched by the group name as the effective username, so here the group name is kind of special user who is able to visit the compute resources/data of a team. It follows the [Hadoop GroupsMapping](https://reurl.cc/xE61Y5) to map user to a primary group. If the primary group is not found, it fallback to the USER level. <li>SERVER: the App will be shared by Kyuubi servers</li></ul></div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine\.share<br>\.level\.sub\.domain|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>(deprecated) - Using kyuubi.engine.share.level.subdomain instead</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.2.0</div>
@ -247,6 +254,7 @@ kyuubi\.ha\.zookeeper<br>\.connection\.max\.retry<br>\.wait|<div style='width: 6
kyuubi\.ha\.zookeeper<br>\.connection\.retry<br>\.policy|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>EXPONENTIAL_BACKOFF</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The retry policy for connecting to the zookeeper ensemble, all candidates are: <ul><li>ONE_TIME</li><li> N_TIME</li><li> EXPONENTIAL_BACKOFF</li><li> BOUNDED_EXPONENTIAL_BACKOFF</li><li> UNTIL_ELAPSED</li></ul></div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.ha\.zookeeper<br>\.connection\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>15000</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The timeout(ms) of creating the connection to the zookeeper ensemble</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.ha\.zookeeper<br>\.engine\.auth\.type|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>NONE</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The type of zookeeper authentication for engine, all candidates are <ul><li>NONE</li><li> KERBEROS</li><li> DIGEST</li></ul></div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.2</div>
kyuubi\.ha\.zookeeper<br>\.engine\.secure\.secret<br>\.node|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The zk node contains the secret that used for internal secure between Kyuubi server and Kyuubi engine, please make sure that it is only visible for Kyuubi.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.5.0</div>
kyuubi\.ha\.zookeeper<br>\.namespace|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>kyuubi</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The root directory for the service to deploy its instance uri</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.ha\.zookeeper<br>\.node\.creation\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT2M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Timeout for creating zookeeper node</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.ha\.zookeeper<br>\.publish\.configs|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When set to true, publish Kerberos configs to Zookeeper.Note that the Hive driver needs to be greater than 1.3 or 2.0 or apply HIVE-11581 patch.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.4.0</div>

View File

@ -1070,6 +1070,59 @@ object KyuubiConf {
.stringConf
.createWithDefault("engine_operation_logs")
val ENGINE_SECURITY_ENABLED: ConfigEntry[Boolean] =
buildConf("engine.security.enabled")
.doc("Whether to enable the internal secure access between Kyuubi server and engine.")
.version("1.5.0")
.booleanConf
.createWithDefault(false)
val ENGINE_SECURITY_TOKEN_MAX_LIFETIME: ConfigEntry[Long] =
buildConf("engine.security.token.max.lifetime")
.doc("The max lifetime of the token used for secure access between Kyuubi server and engine.")
.version("1.5.0")
.timeConf
.createWithDefault(Duration.ofMinutes(10).toMillis)
val ENGINE_SECURITY_SECRET_PROVIDER: ConfigEntry[String] =
buildConf("engine.security.secret.provider")
.doc("The class used to manage the engine security secret. This class must be a " +
"subclass of EngineSecuritySecretProvider.")
.version("1.5.0")
.stringConf
.createWithDefault(
"org.apache.kyuubi.service.authentication.ZooKeeperEngineSecuritySecretProviderImpl")
val ENGINE_SECURITY_CRYPTO_KEY_LENGTH: ConfigEntry[Int] =
buildConf("engine.security.crypto.keyLength")
.doc("The length in bits of the encryption key to generate. " +
"Valid values are 128, 192 and 256")
.version("1.5.0")
.intConf
.checkValues(Set(128, 192, 256))
.createWithDefault(128)
val ENGINE_SECURITY_CRYPTO_IV_LENGTH: ConfigEntry[Int] =
buildConf("engine.security.crypto.ivLength")
.doc("Initial vector length, in bytes.")
.version("1.5.0")
.intConf
.createWithDefault(16)
val ENGINE_SECURITY_CRYPTO_KEY_ALGORITHM: ConfigEntry[String] =
buildConf("engine.security.crypto.keyAlgorithm")
.doc("The algorithm for generated secret keys.")
.version("1.5.0")
.stringConf
.createWithDefault("AES")
val ENGINE_SECURITY_CRYPTO_CIPHER_TRANSFORMATION: ConfigEntry[String] =
buildConf("engine.security.crypto.cipher")
.doc("The cipher transformation to use for encrypting engine access token.")
.version("1.5.0")
.stringConf
.createWithDefault("AES/CBC/PKCS5PADDING")
val SESSION_NAME: OptionalConfigEntry[String] =
buildConf("session.name")
.doc("A human readable name of session and we use empty string by default. " +

View File

@ -109,6 +109,4 @@ abstract class TBinaryFrontendService(name: String)
server.foreach(_.stop())
server = None
}
protected def isServer(): Boolean = false
}

View File

@ -54,7 +54,7 @@ abstract class TFrontendService(name: String)
serverHost.map(InetAddress.getByName).getOrElse(Utils.findLocalInetAddress)
protected lazy val serverSocket = new ServerSocket(portNum, -1, serverAddr)
protected lazy val authFactory: KyuubiAuthenticationFactory =
new KyuubiAuthenticationFactory(conf)
new KyuubiAuthenticationFactory(conf, isServer())
override def start(): Unit = synchronized {
super.start()
@ -462,6 +462,8 @@ abstract class TFrontendService(name: String)
resp
}
protected def isServer(): Boolean = false
class FeTServerEventHandler extends TServerEventHandler {
implicit def toFeServiceServerContext(context: ServerContext): FeServiceServerContext = {
context.asInstanceOf[FeServiceServerContext]

View File

@ -28,6 +28,17 @@ import org.apache.kyuubi.service.authentication.AuthMethods.AuthMethod
object AuthenticationProviderFactory {
@throws[AuthenticationException]
def getAuthenticationProvider(
method: AuthMethod,
conf: KyuubiConf,
isServer: Boolean = true): PasswdAuthenticationProvider = {
if (isServer) {
getAuthenticationProviderForServer(method, conf)
} else {
getAuthenticationProviderForEngine(conf)
}
}
private def getAuthenticationProviderForServer(
method: AuthMethod,
conf: KyuubiConf): PasswdAuthenticationProvider = method match {
case AuthMethods.NONE => new AnonymousAuthenticationProviderImpl
@ -57,4 +68,12 @@ object AuthenticationProviderFactory {
}
case _ => throw new AuthenticationException("Not a valid authentication method")
}
private def getAuthenticationProviderForEngine(conf: KyuubiConf): PasswdAuthenticationProvider = {
if (conf.get(KyuubiConf.ENGINE_SECURITY_ENABLED)) {
new EngineSecureAuthenticationProviderImpl
} else {
new AnonymousAuthenticationProviderImpl
}
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.service.authentication
class EngineSecureAuthenticationProviderImpl extends PasswdAuthenticationProvider {
override def authenticate(user: String, password: String): Unit = {
EngineSecurityAccessor.get().authToken(password)
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.service.authentication
import javax.crypto.Cipher
import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
class EngineSecurityAccessor(conf: KyuubiConf, val isServer: Boolean) {
val cryptoKeyLengthBytes = conf.get(ENGINE_SECURITY_CRYPTO_KEY_LENGTH) / java.lang.Byte.SIZE
val cryptoIvLength = conf.get(ENGINE_SECURITY_CRYPTO_IV_LENGTH)
val cryptoKeyAlgorithm = conf.get(ENGINE_SECURITY_CRYPTO_KEY_ALGORITHM)
val cryptoCipher = conf.get(ENGINE_SECURITY_CRYPTO_CIPHER_TRANSFORMATION)
private val tokenMaxLifeTime: Long = conf.get(ENGINE_SECURITY_TOKEN_MAX_LIFETIME)
private val provider: EngineSecuritySecretProvider = EngineSecuritySecretProvider.create(conf)
private val (encryptor, decryptor) =
initializeForAuth(cryptoCipher, normalizeSecret(provider.getSecret()))
private def initializeForAuth(cipher: String, secret: String): (Cipher, Cipher) = {
val secretKeySpec = new SecretKeySpec(secret.getBytes, cryptoKeyAlgorithm)
val nonce = new Array[Byte](cryptoIvLength)
val iv = new IvParameterSpec(nonce)
val _encryptor = Cipher.getInstance(cipher)
_encryptor.init(Cipher.ENCRYPT_MODE, secretKeySpec, iv)
val _decryptor = Cipher.getInstance(cipher)
_decryptor.init(Cipher.DECRYPT_MODE, secretKeySpec, iv)
(_encryptor, _decryptor)
}
def issueToken(): String = {
encrypt(KyuubiInternalAccessIdentifier.newIdentifier(tokenMaxLifeTime).toJson)
}
def authToken(tokenStr: String): Unit = {
val identifier =
try {
KyuubiInternalAccessIdentifier.fromJson(decrypt(tokenStr))
} catch {
case _: Exception =>
throw KyuubiSQLException("Invalid engine access token")
}
if (identifier.issueDate + identifier.maxDate < System.currentTimeMillis()) {
throw KyuubiSQLException("The engine access token is expired")
}
}
private[authentication] def encrypt(value: String): String = {
byteArrayToHexString(encryptor.doFinal(value.getBytes))
}
private[authentication] def decrypt(value: String): String = {
new String(decryptor.doFinal(hexStringToByteArray(value)))
}
private def normalizeSecret(secret: String): String = {
val normalizedSecret = new Array[Char](cryptoKeyLengthBytes)
val placeHolder = ' '
for (i <- 0 until cryptoKeyLengthBytes) {
if (i < secret.length) {
normalizedSecret.update(i, secret.charAt(i))
} else {
normalizedSecret.update(i, placeHolder)
}
}
new String(normalizedSecret)
}
private def hexStringToByteArray(str: String): Array[Byte] = {
val len = str.length
assert(len % 2 == 0)
val data = new Array[Byte](len / 2)
var i = 0
while (i < len) {
data.update(
i / 2,
((Character.digit(str.charAt(i), 16) << 4) +
Character.digit(str.charAt(i + 1), 16)).asInstanceOf[Byte])
i += 2
}
data
}
private def byteArrayToHexString(bytes: Array[Byte]): String = {
bytes.map { byte =>
Integer.toHexString((byte >> 4) & 0xF) + Integer.toHexString(byte & 0xF)
}.reduce(_ + _)
}
}
object EngineSecurityAccessor extends Logging {
@volatile private var _engineSecurityAccessor: EngineSecurityAccessor = _
def initialize(conf: KyuubiConf, isServer: Boolean): Unit = {
if (_engineSecurityAccessor == null) {
_engineSecurityAccessor = new EngineSecurityAccessor(conf, isServer)
}
}
def get(): EngineSecurityAccessor = {
_engineSecurityAccessor
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.service.authentication
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SECURITY_SECRET_PROVIDER
trait EngineSecuritySecretProvider {
/**
* Initialize with kyuubi conf.
*/
def initialize(conf: KyuubiConf): Unit
/**
* Get the secret to encrypt and decrypt the secure access token.
*/
def getSecret(): String
}
object EngineSecuritySecretProvider {
def create(conf: KyuubiConf): EngineSecuritySecretProvider = {
val providerClass = Class.forName(conf.get(ENGINE_SECURITY_SECRET_PROVIDER))
val provider = providerClass.getConstructor().newInstance()
.asInstanceOf[EngineSecuritySecretProvider]
provider.initialize(conf)
provider
}
}

View File

@ -34,7 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.service.authentication.AuthTypes._
class KyuubiAuthenticationFactory(conf: KyuubiConf) extends Logging {
class KyuubiAuthenticationFactory(conf: KyuubiConf, isServer: Boolean = true) extends Logging {
private val authTypes = conf.get(AUTHENTICATION_METHOD).map(AuthTypes.withName)
private val noSasl = authTypes == Seq(NOSASL)
@ -56,6 +56,10 @@ class KyuubiAuthenticationFactory(conf: KyuubiConf) extends Logging {
}
}
if (conf.get(ENGINE_SECURITY_ENABLED)) {
EngineSecurityAccessor.initialize(conf, isServer)
}
private def getSaslProperties: java.util.Map[String, String] = {
val props = new java.util.HashMap[String, String]()
val qop = SaslQOP.withName(conf.get(SASL_QOP))
@ -87,7 +91,8 @@ class KyuubiAuthenticationFactory(conf: KyuubiConf) extends Logging {
transportFactory = PlainSASLHelper.getTransportFactory(
plainAuthType.toString,
conf,
Option(transportFactory)).asInstanceOf[TSaslServerTransport.Factory]
Option(transportFactory),
isServer).asInstanceOf[TSaslServerTransport.Factory]
case _ =>
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.service.authentication
import com.fasterxml.jackson.annotation.{JsonAutoDetect, JsonInclude, JsonPropertyOrder}
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
@JsonInclude(Include.NON_ABSENT)
@JsonAutoDetect(getterVisibility = Visibility.ANY, setterVisibility = Visibility.ANY)
@JsonPropertyOrder(alphabetic = true)
class KyuubiInternalAccessIdentifier {
var issueDate: Long = Integer.MAX_VALUE
var maxDate: Long = 0
def toJson: String = {
KyuubiInternalAccessIdentifier.mapper.writeValueAsString(this)
}
}
private[kyuubi] object KyuubiInternalAccessIdentifier {
private val mapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.enable(SerializationFeature.INDENT_OUTPUT)
.registerModule(DefaultScalaModule)
def fromJson(json: String): KyuubiInternalAccessIdentifier = {
mapper.readValue(json, classOf[KyuubiInternalAccessIdentifier])
}
def newIdentifier(maxDate: Long): KyuubiInternalAccessIdentifier = {
val identifier = new KyuubiInternalAccessIdentifier
identifier.issueDate = System.currentTimeMillis()
identifier.maxDate = maxDate
identifier
}
}

View File

@ -40,11 +40,14 @@ object PlainSASLHelper {
new TSetIpAddressProcessor[Iface](service)
}
private class PlainServerCallbackHandler private (authMethod: AuthMethod, conf: KyuubiConf)
private class PlainServerCallbackHandler private (
authMethod: AuthMethod,
conf: KyuubiConf,
isServer: Boolean)
extends CallbackHandler {
def this(authMethodStr: String, conf: KyuubiConf) =
this(AuthMethods.withName(authMethodStr), conf)
def this(authMethodStr: String, conf: KyuubiConf, isServer: Boolean) =
this(AuthMethods.withName(authMethodStr), conf, isServer)
@throws[UnsupportedCallbackException]
override def handle(callbacks: Array[Callback]): Unit = {
@ -61,7 +64,8 @@ object PlainSASLHelper {
case _ => throw new UnsupportedCallbackException(callback)
}
}
val provider = AuthenticationProviderFactory.getAuthenticationProvider(authMethod, conf)
val provider =
AuthenticationProviderFactory.getAuthenticationProvider(authMethod, conf, isServer)
provider.authenticate(username, password)
if (ac != null) ac.setAuthorized(true)
}
@ -74,10 +78,11 @@ object PlainSASLHelper {
def getTransportFactory(
authTypeStr: String,
conf: KyuubiConf,
transportFactory: Option[TSaslServerTransport.Factory] = None): TTransportFactory = {
transportFactory: Option[TSaslServerTransport.Factory] = None,
isServer: Boolean = true): TTransportFactory = {
val saslFactory = transportFactory.getOrElse(new TSaslServerTransport.Factory())
try {
val handler = new PlainServerCallbackHandler(authTypeStr, conf)
val handler = new PlainServerCallbackHandler(authTypeStr, conf, isServer)
val props = new java.util.HashMap[String, String]
saslFactory.addServerDefinition("PLAIN", authTypeStr, null, props, handler)
} catch {

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.service.authentication
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
import org.apache.kyuubi.config.KyuubiConf
class EngineSecurityAccessorSuite extends KyuubiFunSuite {
private val conf = KyuubiConf()
conf.set(
KyuubiConf.ENGINE_SECURITY_SECRET_PROVIDER,
classOf[UserDefinedEngineSecuritySecretProvider].getCanonicalName)
test("test encrypt/decrypt, issue token/auth token") {
Seq("AES/CBC/PKCS5PADDING", "AES/CTR/PKCS5PADDING").foreach { cipher =>
val newConf = conf.clone
newConf.set(KyuubiConf.ENGINE_SECURITY_CRYPTO_CIPHER_TRANSFORMATION, cipher)
val secureAccessor = new EngineSecurityAccessor(newConf, true)
val value = "tokenToEncrypt"
val encryptedValue = secureAccessor.encrypt(value)
assert(secureAccessor.decrypt(encryptedValue) === value)
val token = secureAccessor.issueToken()
secureAccessor.authToken(token)
intercept[KyuubiSQLException](secureAccessor.authToken("invalidToken"))
val engineSecureAccessor = new EngineSecurityAccessor(newConf, false)
engineSecureAccessor.authToken(token)
}
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.service.authentication
import org.apache.kyuubi.config.KyuubiConf
class UserDefinedEngineSecuritySecretProvider extends EngineSecuritySecretProvider {
override def initialize(kyuubiConf: KyuubiConf): Unit = {}
override def getSecret(): String = {
"ENGINE____SECRET"
}
}

View File

@ -148,4 +148,12 @@ object HighAvailabilityConf {
.version("1.4.0")
.booleanConf
.createWithDefault(false)
val HA_ZK_ENGINE_SECURE_SECRET_NODE: OptionalConfigEntry[String] =
buildConf("ha.zookeeper.engine.secure.secret.node")
.doc("The zk node contains the secret that used for internal secure between Kyuubi server " +
"and Kyuubi engine, please make sure that it is only visible for Kyuubi.")
.version("1.5.0")
.stringConf
.createOptional
}

View File

@ -97,7 +97,7 @@ object ZooKeeperClientProvider extends Logging {
/**
* Creates a zookeeper client before calling `f` and close it after calling `f`.
*/
def withZkClient(conf: KyuubiConf)(f: CuratorFramework => Unit): Unit = {
def withZkClient[T](conf: KyuubiConf)(f: CuratorFramework => T): T = {
val zkClient = buildZookeeperClient(conf)
try {
zkClient.start()

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.service.authentication
import java.nio.charset.StandardCharsets
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_SECURE_SECRET_NODE
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
class ZooKeeperEngineSecuritySecretProviderImpl extends EngineSecuritySecretProvider {
import ZooKeeperClientProvider._
private var conf: KyuubiConf = _
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
}
override def getSecret(): String = {
conf.get(HA_ZK_ENGINE_SECURE_SECRET_NODE).map { zkNode =>
withZkClient[String](conf) { zkClient =>
new String(zkClient.getData.forPath(zkNode), StandardCharsets.UTF_8)
}
}.getOrElse(
throw new IllegalArgumentException(s"${HA_ZK_ENGINE_SECURE_SECRET_NODE.key} is not defined"))
}
}

View File

@ -32,6 +32,7 @@ import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.server.EventLoggingService
import org.apache.kyuubi.service.authentication.EngineSecurityAccessor
class KyuubiSessionImpl(
protocol: TProtocolVersion,
@ -82,7 +83,12 @@ class KyuubiSessionImpl(
private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit = {
withZkClient(sessionConf) { zkClient =>
val (host, port) = engine.getOrCreate(zkClient, extraEngineLog)
val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
val passwd =
if (sessionManager.getConf.get(ENGINE_SECURITY_ENABLED)) {
EngineSecurityAccessor.get().issueToken()
} else {
Option(password).filter(_.nonEmpty).getOrElse("anonymous")
}
_client = KyuubiSyncThriftClient.createClient(user, passwd, host, port, sessionConf)
_engineSessionHandle = _client.openSession(protocol, user, passwd, normalizedConf)
logSessionInfo(s"Connected to engine [$host:$port] with ${_engineSessionHandle}")

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation
import java.nio.charset.StandardCharsets
import org.apache.curator.framework.recipes.nodes.PersistentNode
import org.apache.zookeeper.CreateMode
import org.apache.kyuubi.WithKyuubiServer
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
import org.apache.kyuubi.service.authentication.{EngineSecurityAccessor, ZooKeeperEngineSecuritySecretProviderImpl}
class KyuubiOperationWithEngineSecurity extends WithKyuubiServer with HiveJDBCTestHelper {
import ZooKeeperClientProvider._
override protected def jdbcUrl: String = getJdbcUrl
private val engineSecretNode = "/SECRET"
override protected val conf: KyuubiConf = {
KyuubiConf()
.set(KyuubiConf.ENGINE_SECURITY_ENABLED, false)
.set(
KyuubiConf.ENGINE_SECURITY_SECRET_PROVIDER,
classOf[ZooKeeperEngineSecuritySecretProviderImpl].getCanonicalName)
.set(HighAvailabilityConf.HA_ZK_ENGINE_SECURE_SECRET_NODE, engineSecretNode)
}
override def beforeAll(): Unit = {
super.beforeAll()
withZkClient(conf) { zkClient =>
zkClient.create().withMode(CreateMode.PERSISTENT).forPath(engineSecretNode)
val secretNode = new PersistentNode(
zkClient,
CreateMode.PERSISTENT,
false,
engineSecretNode,
"_ENGINE_SECRET_".getBytes(StandardCharsets.UTF_8))
secretNode.start()
}
conf.set(KyuubiConf.ENGINE_SECURITY_ENABLED, true)
EngineSecurityAccessor.initialize(conf, true)
}
test("engine security") {
withJdbcStatement() { statement =>
val rs = statement.executeQuery(s"set spark.${KyuubiConf.ENGINE_SECURITY_ENABLED.key}")
assert(rs.next())
assert(rs.getString(2).contains("true"))
}
}
}