[KYUUBI #6145] Use HiveMetaStore Docker for testing
# 🔍 Description ## Issue References 🔗 This pull request uses HiveMetaStore Docker for testing to remove HiveMetaStore test scope deps. ## Describe Your Solution 🔧 ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests --- # Checklist 📝 - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6145 from zhouyifan279/secured-hms-container. Closes #6145 f77429481 [zhouyifan279] Use kyuubi-relocated-hive-metastore-client 2ed1f0f2d [Cheng Pan] image 635fc2e51 [zhouyifan279] Use HiveMetaStore Docker for testing Lead-authored-by: zhouyifan279 <zhouyifan279@gmail.com> Co-authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
869400ddb8
commit
dd69c4fb33
@ -24,7 +24,7 @@
|
||||
# --build-arg HIVE_VERSION="2.3.9" \
|
||||
# --build-arg HADOOP_VERSION="2.10.2" \
|
||||
# --file build/Dockerfile.HMS \
|
||||
# --tag apache/kyuubi-hive-metastore:<tag> \
|
||||
# --tag nekyuubi/kyuubi-hive-metastore:<tag> \
|
||||
# .
|
||||
# Options:
|
||||
# -f, --file this docker file
|
||||
|
||||
@ -146,6 +146,10 @@ trait KerberizedTestHelper extends KyuubiFunSuite {
|
||||
val authType = "hadoop.security.authentication"
|
||||
try {
|
||||
conf.set(authType, "KERBEROS")
|
||||
// auth_to_local is required for non-default realm
|
||||
conf.set(
|
||||
"hadoop.security.auth_to_local",
|
||||
"DEFAULT RULE:[1:$1] RULE:[2:$1]")
|
||||
System.setProperty("java.security.krb5.conf", krb5ConfPath)
|
||||
UserGroupInformation.setConfiguration(conf)
|
||||
assert(UserGroupInformation.isSecurityEnabled)
|
||||
|
||||
@ -109,115 +109,6 @@
|
||||
<artifactId>kubernetes-httpclient-okhttp</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-metastore</artifactId>
|
||||
<version>${hive.version}</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>*</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-standalone-metastore</artifactId>
|
||||
<version>${hive.version}</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>*</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-serde</artifactId>
|
||||
<version>${hive.version}</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>*</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hive.shims</groupId>
|
||||
<artifactId>hive-shims-common</artifactId>
|
||||
<version>${hive.version}</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>*</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hive.shims</groupId>
|
||||
<artifactId>hive-shims-0.23</artifactId>
|
||||
<version>${hive.version}</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>*</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-common</artifactId>
|
||||
<version>${hive.version}</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>*</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-storage-api</artifactId>
|
||||
<version>${hive.storage-api.version}</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>*</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.thrift</groupId>
|
||||
<artifactId>libfb303</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.thrift</groupId>
|
||||
<artifactId>libthrift</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-service-rpc</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>commons-lang</groupId>
|
||||
<artifactId>commons-lang</artifactId>
|
||||
@ -355,78 +246,6 @@
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-standalone-metastore</artifactId>
|
||||
<version>${hive.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.thrift</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>co.cask.tephra</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>javolution</groupId>
|
||||
<artifactId>javolution</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.protobuf</groupId>
|
||||
<artifactId>protobuf-java</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hbase</groupId>
|
||||
<artifactId>hbase-client</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.jobbox</groupId>
|
||||
<artifactId>bonecp</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.zaxxer</groupId>
|
||||
<artifactId>HikariCP</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>commons-cli</groupId>
|
||||
<artifactId>commons-cli</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>commons-lang</groupId>
|
||||
<artifactId>commons-lang</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>commons-pool</groupId>
|
||||
<artifactId>commons-pool</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>commons-dbcp</groupId>
|
||||
<artifactId>commons-dbcp</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
<artifactId>derby</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
@ -444,20 +263,6 @@
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hive</groupId>
|
||||
<artifactId>hive-exec</artifactId>
|
||||
<version>${hive.version}</version>
|
||||
<classifier>core</classifier>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>*</groupId>
|
||||
<artifactId>*</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-minikdc</artifactId>
|
||||
|
||||
@ -27,6 +27,7 @@ import org.apache.kyuubi.Logging
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.shaded.hive.metastore.{IMetaStoreClient, RetryingMetaStoreClient}
|
||||
import org.apache.kyuubi.shaded.hive.metastore.conf.MetastoreConf
|
||||
import org.apache.kyuubi.shaded.hive.metastore.conf.MetastoreConf.ConfVars
|
||||
import org.apache.kyuubi.shaded.hive.metastore.security.DelegationTokenIdentifier
|
||||
|
||||
class HiveDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging {
|
||||
@ -39,17 +40,18 @@ class HiveDelegationTokenProvider extends HadoopDelegationTokenProvider with Log
|
||||
|
||||
override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf): Unit = {
|
||||
val conf = MetastoreConf.newMetastoreConf(hadoopConf)
|
||||
val metastoreUris = conf.getTrimmed("hive.metastore.uris", "")
|
||||
val metastoreUris = MetastoreConf.getVar(hadoopConf, ConfVars.THRIFT_URIS)
|
||||
// SQL engine requires token alias to be `hive.metastore.uris`
|
||||
tokenAlias = new Text(metastoreUris)
|
||||
|
||||
if (SecurityUtil.getAuthenticationMethod(hadoopConf) != AuthenticationMethod.SIMPLE
|
||||
&& metastoreUris.nonEmpty
|
||||
&& conf.getBoolean("hive.metastore.sasl.enabled", false)) {
|
||||
&& MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_SASL)) {
|
||||
|
||||
val principalKey = "hive.metastore.kerberos.principal"
|
||||
principal = conf.getTrimmed(principalKey, "")
|
||||
require(principal.nonEmpty, s"Hive principal $principalKey undefined")
|
||||
principal = MetastoreConf.getVar(hadoopConf, ConfVars.KERBEROS_PRINCIPAL)
|
||||
require(
|
||||
principal.nonEmpty,
|
||||
s"Hive principal ${ConfVars.KERBEROS_PRINCIPAL.getVarname} undefined")
|
||||
|
||||
client = Some(RetryingMetaStoreClient.getProxy(conf))
|
||||
info(s"Created HiveMetaStoreClient with metastore uris $metastoreUris")
|
||||
|
||||
@ -0,0 +1,145 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi
|
||||
|
||||
import java.io.File
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.nio.file.{Files, Paths}
|
||||
|
||||
import com.dimafeng.testcontainers.{ContainerDef, GenericContainer}
|
||||
import com.dimafeng.testcontainers.scalatest.TestContainerForAll
|
||||
import com.github.dockerjava.api.model.{ExposedPort, Ports}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy
|
||||
|
||||
import org.apache.kyuubi.shaded.hive.metastore.conf.MetastoreConf
|
||||
import org.apache.kyuubi.shaded.hive.metastore.conf.MetastoreConf.ConfVars._
|
||||
|
||||
trait WithSecuredHMSContainer extends KerberizedTestHelper with TestContainerForAll {
|
||||
|
||||
final val HADOOP_SECURITY_AUTHENTICATION = "kerberos"
|
||||
final val HIVE_METASTORE_KERBEROS_REALM = "TEST.ORG"
|
||||
final val HIVE_METASTORE_KERBEROS_PRINCIPAL = "hive/localhost"
|
||||
final val HIVE_METASTORE_KERBEROS_KEYTAB = "/hive.service.keytab"
|
||||
final val DOCKER_IMAGE_NAME = "nekyuubi/kyuubi-hive-metastore:latest"
|
||||
|
||||
private val tempDir = Utils.createTempDir(prefix = "kyuubi-server-hms")
|
||||
private val exposedKdcPort = 88
|
||||
private val exposedHmsPort = 9083
|
||||
private val testPrincipalOverride =
|
||||
HIVE_METASTORE_KERBEROS_PRINCIPAL + "@" + HIVE_METASTORE_KERBEROS_REALM
|
||||
private val krb5ConfPathOverride = new File(tempDir.toFile, "krb5.conf").getAbsolutePath
|
||||
|
||||
val hiveConf: Configuration = MetastoreConf.newMetastoreConf()
|
||||
|
||||
override val testKeytab: String = new File(tempDir.toFile, "hive.service.keytab").getAbsolutePath
|
||||
override val containerDef: HMSContainer.Def = HMSContainer.Def(
|
||||
DOCKER_IMAGE_NAME,
|
||||
exposedKdcPort,
|
||||
exposedHmsPort,
|
||||
env = Map(
|
||||
"HADOOP_SECURITY_AUTHENTICATION" -> HADOOP_SECURITY_AUTHENTICATION,
|
||||
"HIVE_METASTORE_KERBEROS_PRINCIPAL" -> HIVE_METASTORE_KERBEROS_PRINCIPAL,
|
||||
"HIVE_METASTORE_KERBEROS_KEYTAB" -> HIVE_METASTORE_KERBEROS_KEYTAB))
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
testPrincipal = testPrincipalOverride
|
||||
krb5ConfPath = krb5ConfPathOverride
|
||||
}
|
||||
|
||||
override def afterContainersStart(containers: Containers): Unit = {
|
||||
containers.copyFileFromContainer(HIVE_METASTORE_KERBEROS_KEYTAB, testKeytab)
|
||||
|
||||
MetastoreConf.setVar(
|
||||
hiveConf,
|
||||
THRIFT_URIS,
|
||||
"thrift://localhost:" + containers.mappedPort(exposedHmsPort))
|
||||
MetastoreConf.setBoolVar(hiveConf, USE_THRIFT_SASL, true)
|
||||
MetastoreConf.setVar(
|
||||
hiveConf,
|
||||
KERBEROS_PRINCIPAL,
|
||||
HIVE_METASTORE_KERBEROS_PRINCIPAL + "@" + HIVE_METASTORE_KERBEROS_REALM)
|
||||
hiveConf.set("hadoop.security.authentication", "kerberos")
|
||||
|
||||
val krb5ConfContent =
|
||||
s"""[libdefaults]
|
||||
| default_realm = $HIVE_METASTORE_KERBEROS_REALM
|
||||
| dns_lookup_realm = false
|
||||
| dns_lookup_kdc = false
|
||||
| forwardable = true
|
||||
|
|
||||
|[realms]
|
||||
| TEST.ORG = {
|
||||
| kdc = localhost:${containers.getKdcUdpPort}
|
||||
| }
|
||||
|
|
||||
|""".stripMargin
|
||||
|
||||
val writer = Files.newBufferedWriter(Paths.get(krb5ConfPathOverride), StandardCharsets.UTF_8)
|
||||
writer.write(krb5ConfContent)
|
||||
writer.close()
|
||||
}
|
||||
}
|
||||
|
||||
class HMSContainer(exposedKdcPort: Int, underlying: GenericContainer)
|
||||
extends GenericContainer(underlying) {
|
||||
|
||||
def getKdcUdpPort: Int = {
|
||||
underlying.container.getContainerInfo.getNetworkSettings.getPorts.getBindings
|
||||
.get(ExposedPort.udp(exposedKdcPort)).head.getHostPortSpec.toInt
|
||||
}
|
||||
}
|
||||
|
||||
object HMSContainer {
|
||||
case class Def(
|
||||
dockerImage: String,
|
||||
exposedKdcPort: Int,
|
||||
exposedHmsPort: Int,
|
||||
env: Map[String, String] = Map())
|
||||
extends ContainerDef {
|
||||
|
||||
override type Container = HMSContainer
|
||||
|
||||
override def createContainer(): Container = {
|
||||
val container = new HMSContainer(
|
||||
exposedKdcPort,
|
||||
GenericContainer(
|
||||
dockerImage,
|
||||
env = env,
|
||||
waitStrategy = new HostPortWaitStrategy().forPorts(exposedHmsPort)))
|
||||
|
||||
container.container.withExposedPorts(exposedKdcPort, exposedHmsPort)
|
||||
container.container.withCreateContainerCmdModifier(cmd => {
|
||||
val udpExposedPort = ExposedPort.udp(exposedKdcPort)
|
||||
val exposedPorts = new java.util.LinkedList[ExposedPort]()
|
||||
for (p <- cmd.getExposedPorts) {
|
||||
exposedPorts.add(p)
|
||||
}
|
||||
exposedPorts.add(udpExposedPort)
|
||||
cmd.withExposedPorts(exposedPorts)
|
||||
|
||||
// Add previous port bindings and UDP port binding
|
||||
val ports = cmd.getHostConfig.getPortBindings
|
||||
ports.bind(udpExposedPort, Ports.Binding.empty)
|
||||
cmd.getHostConfig.withPortBindings(ports)
|
||||
})
|
||||
container
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -17,86 +17,18 @@
|
||||
|
||||
package org.apache.kyuubi.credentials
|
||||
|
||||
import java.io.{File, FileOutputStream}
|
||||
import java.net.URLClassLoader
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import scala.concurrent.Future
|
||||
|
||||
import org.apache.commons.io.FileUtils
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.hive.conf.HiveConf
|
||||
import org.apache.hadoop.hive.conf.HiveConf.ConfVars._
|
||||
import org.apache.hadoop.hive.metastore.{HiveMetaException, HiveMetaStore}
|
||||
import org.apache.hadoop.hive.metastore.security.{HadoopThriftAuthBridge, HadoopThriftAuthBridge23}
|
||||
import org.apache.hadoop.io.Text
|
||||
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers
|
||||
import org.apache.thrift.TProcessor
|
||||
import org.apache.thrift.protocol.TProtocol
|
||||
import org.scalatest.Assertions._
|
||||
import org.scalatest.concurrent.Eventually._
|
||||
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
|
||||
|
||||
import org.apache.kyuubi.{KerberizedTestHelper, Logging, Utils}
|
||||
import org.apache.kyuubi.WithSecuredHMSContainer
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.credentials.LocalMetaServer.defaultHiveConf
|
||||
import org.apache.kyuubi.shaded.hive.metastore.conf.MetastoreConf
|
||||
import org.apache.kyuubi.shaded.hive.metastore.conf.MetastoreConf.ConfVars
|
||||
import org.apache.kyuubi.shaded.hive.metastore.security.DelegationTokenIdentifier
|
||||
|
||||
class HiveDelegationTokenProviderSuite extends KerberizedTestHelper {
|
||||
|
||||
private val hadoopConfDir: File = Utils.createTempDir().toFile
|
||||
private var hiveConf: HiveConf = _
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
|
||||
tryWithSecurityEnabled {
|
||||
UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab)
|
||||
|
||||
// HiveMetaStore creates a new hadoop `Configuration` object for each request to verify
|
||||
// whether user can impersonate others.
|
||||
// So we create a URLClassLoader with core-site.xml and set it as the thread's context
|
||||
// classloader when creating `Configuration` object.
|
||||
val conf = new Configuration(false)
|
||||
conf.set("hadoop.security.authentication", "kerberos")
|
||||
val realUser = UserGroupInformation.getCurrentUser.getShortUserName
|
||||
conf.set(s"hadoop.proxyuser.$realUser.groups", "*")
|
||||
conf.set(s"hadoop.proxyuser.$realUser.hosts", "*")
|
||||
|
||||
val xml = new File(hadoopConfDir, "core-site.xml")
|
||||
val os = new FileOutputStream(xml)
|
||||
try {
|
||||
conf.writeXml(os)
|
||||
} finally {
|
||||
os.close()
|
||||
}
|
||||
|
||||
val classloader =
|
||||
new URLClassLoader(
|
||||
Array(hadoopConfDir.toURI.toURL),
|
||||
classOf[Configuration].getClassLoader)
|
||||
|
||||
hiveConf = LocalMetaServer.defaultHiveConf()
|
||||
hiveConf.addResource(conf)
|
||||
hiveConf.setVar(METASTORE_USE_THRIFT_SASL, "true")
|
||||
hiveConf.setVar(METASTORE_KERBEROS_PRINCIPAL, testPrincipal)
|
||||
hiveConf.setVar(METASTORE_KERBEROS_KEYTAB_FILE, testKeytab)
|
||||
hiveConf.setVar(METASTORE_CONNECTION_POOLING_TYPE, "NONE")
|
||||
hiveConf.setVar(METASTORE_AUTO_CREATE_ALL, "true")
|
||||
hiveConf.setVar(METASTORE_SCHEMA_VERIFICATION, "false")
|
||||
ProxyUsers.refreshSuperUserGroupsConfiguration(hiveConf)
|
||||
val metaServer = new LocalMetaServer(hiveConf, classloader)
|
||||
metaServer.start()
|
||||
}
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
FileUtils.deleteDirectory(hadoopConfDir)
|
||||
}
|
||||
class HiveDelegationTokenProviderSuite extends WithSecuredHMSContainer {
|
||||
|
||||
test("obtain hive delegation token") {
|
||||
tryWithSecurityEnabled {
|
||||
@ -115,7 +47,7 @@ class HiveDelegationTokenProviderSuite extends KerberizedTestHelper {
|
||||
credentials.getTokenMap.asScala
|
||||
.filter(_._2.getKind == DelegationTokenIdentifier.HIVE_DELEGATION_KIND)
|
||||
.head
|
||||
assert(aliasAndToken._1 == new Text(hiveConf.getTrimmed("hive.metastore.uris")))
|
||||
assert(aliasAndToken._1 == new Text(MetastoreConf.getVar(hiveConf, ConfVars.THRIFT_URIS)))
|
||||
assert(aliasAndToken._2 != null)
|
||||
|
||||
val token = aliasAndToken._2
|
||||
@ -127,93 +59,3 @@ class HiveDelegationTokenProviderSuite extends KerberizedTestHelper {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class LocalMetaServer(
|
||||
hiveConf: HiveConf = defaultHiveConf(),
|
||||
serverContextClassLoader: ClassLoader)
|
||||
extends Logging {
|
||||
import LocalMetaServer._
|
||||
|
||||
def start(): Unit = {
|
||||
val startLock = new ReentrantLock
|
||||
val startCondition = startLock.newCondition
|
||||
val startedServing = new AtomicBoolean(false)
|
||||
val startFailed = new AtomicBoolean(false)
|
||||
|
||||
Future {
|
||||
try {
|
||||
HiveMetaStore.startMetaStore(
|
||||
port,
|
||||
new HadoopThriftAuthBridgeWithServerContextClassLoader(
|
||||
serverContextClassLoader),
|
||||
hiveConf,
|
||||
startLock,
|
||||
startCondition,
|
||||
startedServing)
|
||||
} catch {
|
||||
case t: Throwable =>
|
||||
error("Failed to start LocalMetaServer", t)
|
||||
startFailed.set(true)
|
||||
}
|
||||
}
|
||||
|
||||
eventually(timeout(30.seconds), interval(100.milliseconds)) {
|
||||
assert(startedServing.get() || startFailed.get())
|
||||
}
|
||||
|
||||
if (startFailed.get()) {
|
||||
throw new HiveMetaException("Failed to start LocalMetaServer")
|
||||
}
|
||||
}
|
||||
|
||||
def getHiveConf: HiveConf = hiveConf
|
||||
}
|
||||
|
||||
object LocalMetaServer {
|
||||
|
||||
private val port = 20101
|
||||
|
||||
def defaultHiveConf(): HiveConf = {
|
||||
val hiveConf = new HiveConf()
|
||||
hiveConf.setVar(METASTOREURIS, "thrift://localhost:" + port)
|
||||
hiveConf.setVar(METASTORE_SCHEMA_VERIFICATION, "false")
|
||||
hiveConf.set("datanucleus.schema.autoCreateTables", "true")
|
||||
hiveConf
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class HadoopThriftAuthBridgeWithServerContextClassLoader(classloader: ClassLoader)
|
||||
extends HadoopThriftAuthBridge23 {
|
||||
|
||||
override def createServer(
|
||||
keytabFile: String,
|
||||
principalConf: String,
|
||||
clientConf: String): HadoopThriftAuthBridge.Server = {
|
||||
new Server(keytabFile, principalConf, clientConf)
|
||||
}
|
||||
|
||||
class Server(keytabFile: String, principalConf: String, clientConf: String)
|
||||
extends HadoopThriftAuthBridge.Server(keytabFile, principalConf, clientConf) {
|
||||
|
||||
override def wrapProcessor(processor: TProcessor): TProcessor = {
|
||||
new SetThreadContextClassLoaderProcess(super.wrapProcessor(processor))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class SetThreadContextClassLoaderProcess(wrapped: TProcessor) extends TProcessor {
|
||||
|
||||
override def process(in: TProtocol, out: TProtocol): Boolean = {
|
||||
val origin = Thread.currentThread().getContextClassLoader
|
||||
try {
|
||||
Thread.currentThread().setContextClassLoader(classloader)
|
||||
wrapped.process(in, out)
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(origin)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user