inline sasl qop
This commit is contained in:
parent
532529d893
commit
a6b60252c2
@ -1,64 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hive.service.auth;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Possible values of SASL quality-of-protection value.
|
||||
*/
|
||||
public enum SaslQOP {
|
||||
// Authentication only.
|
||||
AUTH("auth"),
|
||||
// Authentication and integrity checking by using signatures.
|
||||
AUTH_INT("auth-int"),
|
||||
// Authentication, integrity and confidentiality checking by using signatures and encryption.
|
||||
AUTH_CONF("auth-conf");
|
||||
|
||||
public final String saslQop;
|
||||
|
||||
private static final Map<String, SaslQOP> STR_TO_ENUM = new HashMap<String, SaslQOP>();
|
||||
|
||||
static {
|
||||
for (SaslQOP saslQop : values()) {
|
||||
STR_TO_ENUM.put(saslQop.toString(), saslQop);
|
||||
}
|
||||
}
|
||||
|
||||
SaslQOP(String saslQop) {
|
||||
this.saslQop = saslQop;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return saslQop;
|
||||
}
|
||||
|
||||
public static SaslQOP fromString(String str) {
|
||||
if (str != null) {
|
||||
str = str.toLowerCase();
|
||||
}
|
||||
SaslQOP saslQOP = STR_TO_ENUM.get(str);
|
||||
if (saslQOP == null) {
|
||||
throw new IllegalArgumentException(
|
||||
"Unknown auth type: " + str + " Allowed values are: " + STR_TO_ENUM.keySet());
|
||||
}
|
||||
return saslQOP;
|
||||
}
|
||||
}
|
||||
@ -28,6 +28,11 @@ import org.slf4j.Logger
|
||||
* Wrapper for [[Utils]] and [[SparkHadoopUtil]]
|
||||
*/
|
||||
object SparkUtils {
|
||||
val SPARK_PREFIX = "spark."
|
||||
val YARN_PREFIX = "yarn."
|
||||
|
||||
val KEYTAB = SPARK_PREFIX + YARN_PREFIX + "keytab"
|
||||
val PRINCIPAL = SPARK_PREFIX + YARN_PREFIX + "principal"
|
||||
|
||||
def addShutdownHook(f: () => Unit): Unit = {
|
||||
ShutdownHookManager.addShutdownHook(f)
|
||||
|
||||
@ -30,30 +30,33 @@ import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge
|
||||
import org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge.Server.ServerMode
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers
|
||||
import org.apache.hive.service.auth.{KyuubiKerberosSaslHelper, KyuubiPlainSaslHelper, SaslQOP}
|
||||
import org.apache.hive.service.auth.{KyuubiKerberosSaslHelper, KyuubiPlainSaslHelper}
|
||||
import org.apache.hive.service.cli.thrift.TCLIService
|
||||
import org.apache.spark.{SparkConf, SparkUtils}
|
||||
import org.apache.spark.KyuubiConf._
|
||||
import org.apache.thrift.TProcessorFactory
|
||||
import org.apache.thrift.transport.{TServerSocket, TTransportException, TTransportFactory}
|
||||
|
||||
import yaooqinn.kyuubi.{KyuubiServerException, KyuubiSQLException}
|
||||
import yaooqinn.kyuubi.{KyuubiServerException, KyuubiSQLException, Logging}
|
||||
|
||||
/**
|
||||
* Authentication
|
||||
*/
|
||||
class KyuubiAuthFactory(conf: SparkConf) {
|
||||
class KyuubiAuthFactory(conf: SparkConf) extends Logging {
|
||||
private[this] val KYUUBI_CLIENT_TOKEN = "kyuubiClientToken"
|
||||
|
||||
private[this] val saslServer: Option[HadoopThriftAuthBridge.Server] =
|
||||
conf.get(AUTHENTICATION_METHOD.key).toUpperCase match {
|
||||
case KERBEROS.name =>
|
||||
val principal: String = conf.get("spark.yarn.principal", "")
|
||||
val keytab: String = conf.get("spark.yarn.keytab", "")
|
||||
require(principal.nonEmpty && keytab.nonEmpty,
|
||||
"keytab and principal are not configured properly")
|
||||
val principal: String = conf.get(SparkUtils.KEYTAB, "")
|
||||
val keytab: String = conf.get(SparkUtils.PRINCIPAL, "")
|
||||
if (principal.nonEmpty && keytab.nonEmpty) {
|
||||
val msg = s"${SparkUtils.KEYTAB} and ${SparkUtils.PRINCIPAL} are not configured" +
|
||||
s" properly for ${KERBEROS.name} Authentication method"
|
||||
throw new KyuubiServerException(msg)
|
||||
}
|
||||
val server = ShimLoader.getHadoopThriftAuthBridge.createServer(keytab, principal)
|
||||
// start delegation token manager
|
||||
info("Starting Kyuubi client token manager")
|
||||
try {
|
||||
server.startDelegationTokenSecretManager(
|
||||
SparkUtils.newConfiguration(conf),
|
||||
|
||||
55
src/main/scala/yaooqinn/kyuubi/auth/SaslQOP.scala
Normal file
55
src/main/scala/yaooqinn/kyuubi/auth/SaslQOP.scala
Normal file
@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package yaooqinn.kyuubi.auth
|
||||
|
||||
import java.util.Locale
|
||||
|
||||
|
||||
trait SaslQOP {
|
||||
override def toString: String = "auth, auth-int, auth-conf"
|
||||
}
|
||||
|
||||
/**
|
||||
* Possible values of SASL quality-of-protection value.
|
||||
*/
|
||||
object SaslQOP {
|
||||
|
||||
/** Authentication only. */
|
||||
case object AUTH extends SaslQOP {
|
||||
override val toString: String = "auth"
|
||||
}
|
||||
|
||||
/** Authentication and integrity checking by using signatures. */
|
||||
case object AUTH_INT extends SaslQOP {
|
||||
override val toString: String = "auth-int"
|
||||
}
|
||||
|
||||
/** Authentication, integrity and confidentiality checking by using signatures and encryption. */
|
||||
case object AUTH_CONF extends SaslQOP {
|
||||
override val toString: String = "auth-conf"
|
||||
}
|
||||
|
||||
def fromString(str: String): SaslQOP = str.toLowerCase(Locale.ROOT) match {
|
||||
case AUTH.toString => AUTH
|
||||
case AUTH_INT.toString => AUTH_INT
|
||||
case AUTH_CONF.toString => AUTH_CONF
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(
|
||||
s"Unknown auth type: $str only ${SaslQOP.toString} allowed.")
|
||||
}
|
||||
|
||||
}
|
||||
@ -32,7 +32,7 @@ import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
|
||||
import org.apache.hadoop.security.authentication.util.KerberosUtil
|
||||
import org.apache.hive.common.util.HiveVersionInfo
|
||||
import org.apache.spark.KyuubiConf._
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.{SparkConf, SparkUtils}
|
||||
import org.apache.zookeeper._
|
||||
import org.apache.zookeeper.data.ACL
|
||||
|
||||
@ -180,8 +180,8 @@ object HighAvailabilityUtils extends Logging {
|
||||
@throws[Exception]
|
||||
private def setUpZooKeeperAuth(conf: SparkConf): Unit = {
|
||||
if (UserGroupInformation.isSecurityEnabled) {
|
||||
var principal = conf.get("spark.yarn.principal")
|
||||
val keyTabFile = conf.get("spark.yarn.keytab")
|
||||
var principal = conf.get(SparkUtils.PRINCIPAL)
|
||||
val keyTabFile = conf.get(SparkUtils.KEYTAB)
|
||||
if (!new File(keyTabFile).exists()) {
|
||||
throw new IOException("key tab does not exists")
|
||||
}
|
||||
|
||||
@ -35,7 +35,7 @@ import org.apache.hadoop.fs.FileSystem
|
||||
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.hive.service.cli.thrift.TProtocolVersion
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
import org.apache.spark.{SparkConf, SparkContext, SparkUtils}
|
||||
import org.apache.spark.KyuubiConf._
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
|
||||
@ -87,7 +87,7 @@ private[kyuubi] class KyuubiSession(
|
||||
val currentUser = UserGroupInformation.getCurrentUser
|
||||
if (withImpersonation) {
|
||||
if (UserGroupInformation.isSecurityEnabled) {
|
||||
if (conf.contains(PRINCIPAL) && conf.contains(KEYTAB)) {
|
||||
if (conf.contains(SparkUtils.PRINCIPAL) && conf.contains(SparkUtils.KEYTAB)) {
|
||||
// If principal and keytab are configured, do re-login in case of token expiry.
|
||||
// Do not check keytab file existing as spark-submit has it done
|
||||
currentUser.reloginFromKeytab()
|
||||
@ -260,8 +260,8 @@ private[kyuubi] class KyuubiSession(
|
||||
}
|
||||
|
||||
// proxy user does not have rights to get token as realuser
|
||||
conf.remove("spark.yarn.keytab")
|
||||
conf.remove("spark.yarn.principal")
|
||||
conf.remove(SparkUtils.KEYTAB)
|
||||
conf.remove(SparkUtils.PRINCIPAL)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -522,10 +522,7 @@ object KyuubiSession {
|
||||
val SPARK_APP_ID: String = "spark.app.id"
|
||||
val DEPRECATED_QUEUE = "mapred.job.queue.name"
|
||||
val QUEUE = "spark.yarn.queue"
|
||||
val KEYTAB = "spark.yarn.keytab"
|
||||
val PRINCIPAL = "spark.yarn.principal"
|
||||
|
||||
val SPARK_PREFIX = "spark."
|
||||
val SPARK_HADOOP_PREFIX = "spark.hadoop."
|
||||
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user