diff --git a/build/Dockerfile.HMS b/build/Dockerfile.HMS index b6ec4a6b6..ef3ca422f 100644 --- a/build/Dockerfile.HMS +++ b/build/Dockerfile.HMS @@ -24,7 +24,7 @@ # --build-arg HIVE_VERSION="2.3.9" \ # --build-arg HADOOP_VERSION="2.10.2" \ # --file build/Dockerfile.HMS \ -# --tag apache/kyuubi-hive-metastore: \ +# --tag nekyuubi/kyuubi-hive-metastore: \ # . # Options: # -f, --file this docker file diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala index 4c5930a52..a2e5f4c0c 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala @@ -146,6 +146,10 @@ trait KerberizedTestHelper extends KyuubiFunSuite { val authType = "hadoop.security.authentication" try { conf.set(authType, "KERBEROS") + // auth_to_local is required for non-default realm + conf.set( + "hadoop.security.auth_to_local", + "DEFAULT RULE:[1:$1] RULE:[2:$1]") System.setProperty("java.security.krb5.conf", krb5ConfPath) UserGroupInformation.setConfiguration(conf) assert(UserGroupInformation.isSecurityEnabled) diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml index cb9c39a11..9ab3b3f1d 100644 --- a/kyuubi-server/pom.xml +++ b/kyuubi-server/pom.xml @@ -109,115 +109,6 @@ kubernetes-httpclient-okhttp - - org.apache.hive - hive-metastore - ${hive.version} - test - - - * - * - - - - - - org.apache.hive - hive-standalone-metastore - ${hive.version} - test - - - * - * - - - - - - org.apache.hive - hive-serde - ${hive.version} - test - - - * - * - - - - - - org.apache.hive.shims - hive-shims-common - ${hive.version} - test - - - * - * - - - - - - org.apache.hive.shims - hive-shims-0.23 - ${hive.version} - test - - - * - * - - - - - - org.apache.hive - hive-common - ${hive.version} - test - - - * - * - - - - - - org.apache.hive - hive-storage-api - ${hive.storage-api.version} - test - - - * - * - - - - - - org.apache.thrift - libfb303 - test - - - - org.apache.thrift - libthrift - test - - - - org.apache.hive - hive-service-rpc - test - - commons-lang commons-lang @@ -355,78 +246,6 @@ test - - org.apache.hive - hive-standalone-metastore - ${hive.version} - test-jar - test - - - org.apache.hadoop - * - - - org.apache.hive - * - - - org.apache.thrift - * - - - co.cask.tephra - * - - - javolution - javolution - - - com.google.guava - guava - - - com.google.protobuf - protobuf-java - - - org.apache.hbase - hbase-client - - - com.jobbox - bonecp - - - com.zaxxer - HikariCP - - - commons-cli - commons-cli - - - commons-lang - commons-lang - - - commons-pool - commons-pool - - - commons-dbcp - commons-dbcp - - - - - - org.apache.derby - derby - test - - org.apache.kafka kafka-clients @@ -444,20 +263,6 @@ test - - org.apache.hive - hive-exec - ${hive.version} - core - test - - - * - * - - - - org.apache.hadoop hadoop-minikdc diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProvider.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProvider.scala index f1966f13d..5d0d46180 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProvider.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProvider.scala @@ -27,6 +27,7 @@ import org.apache.kyuubi.Logging import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.shaded.hive.metastore.{IMetaStoreClient, RetryingMetaStoreClient} import org.apache.kyuubi.shaded.hive.metastore.conf.MetastoreConf +import org.apache.kyuubi.shaded.hive.metastore.conf.MetastoreConf.ConfVars import org.apache.kyuubi.shaded.hive.metastore.security.DelegationTokenIdentifier class HiveDelegationTokenProvider extends HadoopDelegationTokenProvider with Logging { @@ -39,17 +40,18 @@ class HiveDelegationTokenProvider extends HadoopDelegationTokenProvider with Log override def initialize(hadoopConf: Configuration, kyuubiConf: KyuubiConf): Unit = { val conf = MetastoreConf.newMetastoreConf(hadoopConf) - val metastoreUris = conf.getTrimmed("hive.metastore.uris", "") + val metastoreUris = MetastoreConf.getVar(hadoopConf, ConfVars.THRIFT_URIS) // SQL engine requires token alias to be `hive.metastore.uris` tokenAlias = new Text(metastoreUris) if (SecurityUtil.getAuthenticationMethod(hadoopConf) != AuthenticationMethod.SIMPLE && metastoreUris.nonEmpty - && conf.getBoolean("hive.metastore.sasl.enabled", false)) { + && MetastoreConf.getBoolVar(conf, ConfVars.USE_THRIFT_SASL)) { - val principalKey = "hive.metastore.kerberos.principal" - principal = conf.getTrimmed(principalKey, "") - require(principal.nonEmpty, s"Hive principal $principalKey undefined") + principal = MetastoreConf.getVar(hadoopConf, ConfVars.KERBEROS_PRINCIPAL) + require( + principal.nonEmpty, + s"Hive principal ${ConfVars.KERBEROS_PRINCIPAL.getVarname} undefined") client = Some(RetryingMetaStoreClient.getProxy(conf)) info(s"Created HiveMetaStoreClient with metastore uris $metastoreUris") diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredHMSContainer.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredHMSContainer.scala new file mode 100644 index 000000000..0ed65f174 --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredHMSContainer.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi + +import java.io.File +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} + +import com.dimafeng.testcontainers.{ContainerDef, GenericContainer} +import com.dimafeng.testcontainers.scalatest.TestContainerForAll +import com.github.dockerjava.api.model.{ExposedPort, Ports} +import org.apache.hadoop.conf.Configuration +import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy + +import org.apache.kyuubi.shaded.hive.metastore.conf.MetastoreConf +import org.apache.kyuubi.shaded.hive.metastore.conf.MetastoreConf.ConfVars._ + +trait WithSecuredHMSContainer extends KerberizedTestHelper with TestContainerForAll { + + final val HADOOP_SECURITY_AUTHENTICATION = "kerberos" + final val HIVE_METASTORE_KERBEROS_REALM = "TEST.ORG" + final val HIVE_METASTORE_KERBEROS_PRINCIPAL = "hive/localhost" + final val HIVE_METASTORE_KERBEROS_KEYTAB = "/hive.service.keytab" + final val DOCKER_IMAGE_NAME = "nekyuubi/kyuubi-hive-metastore:latest" + + private val tempDir = Utils.createTempDir(prefix = "kyuubi-server-hms") + private val exposedKdcPort = 88 + private val exposedHmsPort = 9083 + private val testPrincipalOverride = + HIVE_METASTORE_KERBEROS_PRINCIPAL + "@" + HIVE_METASTORE_KERBEROS_REALM + private val krb5ConfPathOverride = new File(tempDir.toFile, "krb5.conf").getAbsolutePath + + val hiveConf: Configuration = MetastoreConf.newMetastoreConf() + + override val testKeytab: String = new File(tempDir.toFile, "hive.service.keytab").getAbsolutePath + override val containerDef: HMSContainer.Def = HMSContainer.Def( + DOCKER_IMAGE_NAME, + exposedKdcPort, + exposedHmsPort, + env = Map( + "HADOOP_SECURITY_AUTHENTICATION" -> HADOOP_SECURITY_AUTHENTICATION, + "HIVE_METASTORE_KERBEROS_PRINCIPAL" -> HIVE_METASTORE_KERBEROS_PRINCIPAL, + "HIVE_METASTORE_KERBEROS_KEYTAB" -> HIVE_METASTORE_KERBEROS_KEYTAB)) + + override def beforeAll(): Unit = { + super.beforeAll() + testPrincipal = testPrincipalOverride + krb5ConfPath = krb5ConfPathOverride + } + + override def afterContainersStart(containers: Containers): Unit = { + containers.copyFileFromContainer(HIVE_METASTORE_KERBEROS_KEYTAB, testKeytab) + + MetastoreConf.setVar( + hiveConf, + THRIFT_URIS, + "thrift://localhost:" + containers.mappedPort(exposedHmsPort)) + MetastoreConf.setBoolVar(hiveConf, USE_THRIFT_SASL, true) + MetastoreConf.setVar( + hiveConf, + KERBEROS_PRINCIPAL, + HIVE_METASTORE_KERBEROS_PRINCIPAL + "@" + HIVE_METASTORE_KERBEROS_REALM) + hiveConf.set("hadoop.security.authentication", "kerberos") + + val krb5ConfContent = + s"""[libdefaults] + | default_realm = $HIVE_METASTORE_KERBEROS_REALM + | dns_lookup_realm = false + | dns_lookup_kdc = false + | forwardable = true + | + |[realms] + | TEST.ORG = { + | kdc = localhost:${containers.getKdcUdpPort} + | } + | + |""".stripMargin + + val writer = Files.newBufferedWriter(Paths.get(krb5ConfPathOverride), StandardCharsets.UTF_8) + writer.write(krb5ConfContent) + writer.close() + } +} + +class HMSContainer(exposedKdcPort: Int, underlying: GenericContainer) + extends GenericContainer(underlying) { + + def getKdcUdpPort: Int = { + underlying.container.getContainerInfo.getNetworkSettings.getPorts.getBindings + .get(ExposedPort.udp(exposedKdcPort)).head.getHostPortSpec.toInt + } +} + +object HMSContainer { + case class Def( + dockerImage: String, + exposedKdcPort: Int, + exposedHmsPort: Int, + env: Map[String, String] = Map()) + extends ContainerDef { + + override type Container = HMSContainer + + override def createContainer(): Container = { + val container = new HMSContainer( + exposedKdcPort, + GenericContainer( + dockerImage, + env = env, + waitStrategy = new HostPortWaitStrategy().forPorts(exposedHmsPort))) + + container.container.withExposedPorts(exposedKdcPort, exposedHmsPort) + container.container.withCreateContainerCmdModifier(cmd => { + val udpExposedPort = ExposedPort.udp(exposedKdcPort) + val exposedPorts = new java.util.LinkedList[ExposedPort]() + for (p <- cmd.getExposedPorts) { + exposedPorts.add(p) + } + exposedPorts.add(udpExposedPort) + cmd.withExposedPorts(exposedPorts) + + // Add previous port bindings and UDP port binding + val ports = cmd.getHostConfig.getPortBindings + ports.bind(udpExposedPort, Ports.Binding.empty) + cmd.getHostConfig.withPortBindings(ports) + }) + container + } + } +} diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProviderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProviderSuite.scala index 76d3e5614..d2108e912 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProviderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/credentials/HiveDelegationTokenProviderSuite.scala @@ -17,86 +17,18 @@ package org.apache.kyuubi.credentials -import java.io.{File, FileOutputStream} -import java.net.URLClassLoader -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.locks.ReentrantLock - import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future -import org.apache.commons.io.FileUtils -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.conf.HiveConf.ConfVars._ -import org.apache.hadoop.hive.metastore.{HiveMetaException, HiveMetaStore} -import org.apache.hadoop.hive.metastore.security.{HadoopThriftAuthBridge, HadoopThriftAuthBridge23} import org.apache.hadoop.io.Text import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import org.apache.hadoop.security.authorize.ProxyUsers -import org.apache.thrift.TProcessor -import org.apache.thrift.protocol.TProtocol -import org.scalatest.Assertions._ -import org.scalatest.concurrent.Eventually._ -import org.scalatest.time.SpanSugar.convertIntToGrainOfTime -import org.apache.kyuubi.{KerberizedTestHelper, Logging, Utils} +import org.apache.kyuubi.WithSecuredHMSContainer import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.credentials.LocalMetaServer.defaultHiveConf +import org.apache.kyuubi.shaded.hive.metastore.conf.MetastoreConf +import org.apache.kyuubi.shaded.hive.metastore.conf.MetastoreConf.ConfVars import org.apache.kyuubi.shaded.hive.metastore.security.DelegationTokenIdentifier -class HiveDelegationTokenProviderSuite extends KerberizedTestHelper { - - private val hadoopConfDir: File = Utils.createTempDir().toFile - private var hiveConf: HiveConf = _ - - override def beforeAll(): Unit = { - super.beforeAll() - - tryWithSecurityEnabled { - UserGroupInformation.loginUserFromKeytab(testPrincipal, testKeytab) - - // HiveMetaStore creates a new hadoop `Configuration` object for each request to verify - // whether user can impersonate others. - // So we create a URLClassLoader with core-site.xml and set it as the thread's context - // classloader when creating `Configuration` object. - val conf = new Configuration(false) - conf.set("hadoop.security.authentication", "kerberos") - val realUser = UserGroupInformation.getCurrentUser.getShortUserName - conf.set(s"hadoop.proxyuser.$realUser.groups", "*") - conf.set(s"hadoop.proxyuser.$realUser.hosts", "*") - - val xml = new File(hadoopConfDir, "core-site.xml") - val os = new FileOutputStream(xml) - try { - conf.writeXml(os) - } finally { - os.close() - } - - val classloader = - new URLClassLoader( - Array(hadoopConfDir.toURI.toURL), - classOf[Configuration].getClassLoader) - - hiveConf = LocalMetaServer.defaultHiveConf() - hiveConf.addResource(conf) - hiveConf.setVar(METASTORE_USE_THRIFT_SASL, "true") - hiveConf.setVar(METASTORE_KERBEROS_PRINCIPAL, testPrincipal) - hiveConf.setVar(METASTORE_KERBEROS_KEYTAB_FILE, testKeytab) - hiveConf.setVar(METASTORE_CONNECTION_POOLING_TYPE, "NONE") - hiveConf.setVar(METASTORE_AUTO_CREATE_ALL, "true") - hiveConf.setVar(METASTORE_SCHEMA_VERIFICATION, "false") - ProxyUsers.refreshSuperUserGroupsConfiguration(hiveConf) - val metaServer = new LocalMetaServer(hiveConf, classloader) - metaServer.start() - } - } - - override def afterAll(): Unit = { - FileUtils.deleteDirectory(hadoopConfDir) - } +class HiveDelegationTokenProviderSuite extends WithSecuredHMSContainer { test("obtain hive delegation token") { tryWithSecurityEnabled { @@ -115,7 +47,7 @@ class HiveDelegationTokenProviderSuite extends KerberizedTestHelper { credentials.getTokenMap.asScala .filter(_._2.getKind == DelegationTokenIdentifier.HIVE_DELEGATION_KIND) .head - assert(aliasAndToken._1 == new Text(hiveConf.getTrimmed("hive.metastore.uris"))) + assert(aliasAndToken._1 == new Text(MetastoreConf.getVar(hiveConf, ConfVars.THRIFT_URIS))) assert(aliasAndToken._2 != null) val token = aliasAndToken._2 @@ -127,93 +59,3 @@ class HiveDelegationTokenProviderSuite extends KerberizedTestHelper { } } } - -class LocalMetaServer( - hiveConf: HiveConf = defaultHiveConf(), - serverContextClassLoader: ClassLoader) - extends Logging { - import LocalMetaServer._ - - def start(): Unit = { - val startLock = new ReentrantLock - val startCondition = startLock.newCondition - val startedServing = new AtomicBoolean(false) - val startFailed = new AtomicBoolean(false) - - Future { - try { - HiveMetaStore.startMetaStore( - port, - new HadoopThriftAuthBridgeWithServerContextClassLoader( - serverContextClassLoader), - hiveConf, - startLock, - startCondition, - startedServing) - } catch { - case t: Throwable => - error("Failed to start LocalMetaServer", t) - startFailed.set(true) - } - } - - eventually(timeout(30.seconds), interval(100.milliseconds)) { - assert(startedServing.get() || startFailed.get()) - } - - if (startFailed.get()) { - throw new HiveMetaException("Failed to start LocalMetaServer") - } - } - - def getHiveConf: HiveConf = hiveConf -} - -object LocalMetaServer { - - private val port = 20101 - - def defaultHiveConf(): HiveConf = { - val hiveConf = new HiveConf() - hiveConf.setVar(METASTOREURIS, "thrift://localhost:" + port) - hiveConf.setVar(METASTORE_SCHEMA_VERIFICATION, "false") - hiveConf.set("datanucleus.schema.autoCreateTables", "true") - hiveConf - } - -} - -class HadoopThriftAuthBridgeWithServerContextClassLoader(classloader: ClassLoader) - extends HadoopThriftAuthBridge23 { - - override def createServer( - keytabFile: String, - principalConf: String, - clientConf: String): HadoopThriftAuthBridge.Server = { - new Server(keytabFile, principalConf, clientConf) - } - - class Server(keytabFile: String, principalConf: String, clientConf: String) - extends HadoopThriftAuthBridge.Server(keytabFile, principalConf, clientConf) { - - override def wrapProcessor(processor: TProcessor): TProcessor = { - new SetThreadContextClassLoaderProcess(super.wrapProcessor(processor)) - } - - } - - class SetThreadContextClassLoaderProcess(wrapped: TProcessor) extends TProcessor { - - override def process(in: TProtocol, out: TProtocol): Boolean = { - val origin = Thread.currentThread().getContextClassLoader - try { - Thread.currentThread().setContextClassLoader(classloader) - wrapped.process(in, out) - } finally { - Thread.currentThread().setContextClassLoader(origin) - } - } - - } - -}