KinitAuxiliaryService added
This commit is contained in:
parent
ba9bff1487
commit
1a4299f8a1
@ -1,4 +1,4 @@
|
||||
# Kyuubi [](https://www.apache.org/licenses/LICENSE-2.0.html) [](https://github.com/yaooqinn/kyuubi) [](https://github.com/yaooqinn/kyuubi/releases) [](https://codecov.io/gh/yaooqinn/kyuubi) [](https://travis-ci.org/yaooqinn/kyuubi) [](http://hits.dwyl.io/yaooqinn/kyuubi)
|
||||
# Kyuubi [](https://www.apache.org/licenses/LICENSE-2.0.html) [](https://github.com/yaooqinn/kyuubi) [](https://github.com/yaooqinn/kyuubi/releases) [](https://codecov.io/gh/yaooqinn/kyuubi) [](https://travis-ci.org/yaooqinn/kyuubi) [](http://hits.dwyl.io/yaooqinn/kyuubi) [](https://depshield.github.io)
|
||||
|
||||
<img style="zoom: 0.3141592653589" src="docs/imgs/kyuubi.png" />
|
||||
|
||||
|
||||
@ -152,17 +152,30 @@ object KyuubiConf {
|
||||
.createWithDefault("embedded_zookeeper")
|
||||
|
||||
val SERVER_PRINCIPAL: OptionalConfigEntry[String] = buildConf("server.principal")
|
||||
.doc("")
|
||||
.doc("Name of the Kerberos principal.")
|
||||
.version("1.0.0")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val SERVER_KEYTAB: OptionalConfigEntry[String] = buildConf("server.keytab")
|
||||
.doc("")
|
||||
.doc("Location of Kyuubi server's keytab.")
|
||||
.version("1.0.0")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val KINIT_INTERVAL: ConfigEntry[Long] = buildConf("kinit.interval")
|
||||
.doc("How often will Kyuubi server run `kinit -kt [keytab] [princical]` to renew the" +
|
||||
" local Kerberos credentials cache")
|
||||
.version("1.0.0")
|
||||
.timeConf
|
||||
.createWithDefaultString("PT1H")
|
||||
|
||||
val KINIT_MAX_ATTEMPTS: ConfigEntry[Int] = buildConf("kinit.max.attempts")
|
||||
.doc("How many times will `kinit` process retry")
|
||||
.version("1.0.0")
|
||||
.intConf
|
||||
.createWithDefault(10)
|
||||
|
||||
val OPERATION_IDLE_TIMEOUT: ConfigEntry[Long] = buildConf("operation.idle.timeout")
|
||||
.doc("Operation will be closed when it's not accessed for this duration of time")
|
||||
.version("1.0.0")
|
||||
|
||||
@ -0,0 +1,86 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.service
|
||||
|
||||
import java.util.concurrent.{Callable, TimeUnit}
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.util.ThreadUtils
|
||||
|
||||
class KinitAuxiliaryService() extends AbstractService("KinitAuxiliaryService") {
|
||||
|
||||
private val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor(getName)
|
||||
|
||||
private var kinitInterval: Long = _
|
||||
private var kinitMaxAttempts: Int = _
|
||||
@volatile private var kinitAttempts: Int = _
|
||||
|
||||
private var kinitTask: Runnable = _
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = {
|
||||
if (UserGroupInformation.isSecurityEnabled) {
|
||||
val keytab = conf.get(KyuubiConf.SERVER_KEYTAB)
|
||||
val principal = conf.get(KyuubiConf.SERVER_PRINCIPAL)
|
||||
kinitInterval = conf.get(KyuubiConf.KINIT_INTERVAL)
|
||||
kinitMaxAttempts = conf.get(KyuubiConf.KINIT_MAX_ATTEMPTS)
|
||||
|
||||
require(keytab.nonEmpty && principal.nonEmpty, "principal or keytab is missing")
|
||||
UserGroupInformation.loginUserFromKeytab(principal.get, keytab.get)
|
||||
val commands = Seq("kinit", "-kt", keytab.get, principal.get)
|
||||
val kinitProc = new ProcessBuilder(commands: _*).inheritIO()
|
||||
kinitTask = new Runnable {
|
||||
override def run(): Unit = {
|
||||
val process = kinitProc.start()
|
||||
if (process.waitFor() == 0) {
|
||||
debug("Successfully kinited")
|
||||
executor.schedule(this, kinitInterval, TimeUnit.MILLISECONDS)
|
||||
} else {
|
||||
if (kinitAttempts >= kinitMaxAttempts) {
|
||||
error(s"Failed to kinit with $kinitAttempts attempts, will exit...")
|
||||
System.exit(-1)
|
||||
}
|
||||
kinitAttempts += 1
|
||||
executor.submit(this)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
super.initialize(conf)
|
||||
}
|
||||
|
||||
|
||||
override def start(): Unit = {
|
||||
super.start()
|
||||
if (UserGroupInformation.isSecurityEnabled) {
|
||||
executor.schedule(kinitTask, kinitInterval, TimeUnit.MILLISECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
override def stop(): Unit = {
|
||||
super.stop()
|
||||
executor.shutdown()
|
||||
try {
|
||||
executor.awaitTermination(10, TimeUnit.SECONDS)
|
||||
} catch {
|
||||
case _: InterruptedException =>
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -23,39 +23,57 @@ import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.minikdc.MiniKdc
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
|
||||
trait KerberizedTestHelper {
|
||||
var kdc: MiniKdc = _
|
||||
trait KerberizedTestHelper extends KyuubiFunSuite {
|
||||
val baseDir: File = Utils.createTempDir(
|
||||
this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath, "kyuubi-kdc").toFile
|
||||
|
||||
try {
|
||||
val kdcConf = MiniKdc.createConf()
|
||||
val hostName = "localhost"
|
||||
kdcConf.setProperty(MiniKdc.INSTANCE, "KyuubiKrbServer")
|
||||
kdcConf.setProperty(MiniKdc.ORG_NAME, "KYUUBI")
|
||||
kdcConf.setProperty(MiniKdc.ORG_DOMAIN, "COM")
|
||||
|
||||
if (kdc == null) {
|
||||
kdc = new MiniKdc(kdcConf, baseDir)
|
||||
kdc.start()
|
||||
kdcConf.setProperty(MiniKdc.KDC_BIND_ADDRESS, hostName)
|
||||
if (logger.isDebugEnabled) {
|
||||
kdcConf.setProperty(MiniKdc.DEBUG, "true")
|
||||
}
|
||||
val kdc = new MiniKdc(kdcConf, baseDir)
|
||||
try {
|
||||
kdc.start()
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
throw new AssertionError("unable to create temporary directory: " + e.getMessage)
|
||||
}
|
||||
private val keytabFile = new File(baseDir, "kyuubi-test.keytab")
|
||||
|
||||
protected val testKeytab: String = keytabFile.getAbsolutePath
|
||||
|
||||
protected var testPrincipal = s"client/$hostName"
|
||||
kdc.createPrincipal(keytabFile, testPrincipal)
|
||||
|
||||
testPrincipal = testPrincipal + "@" + kdc.getRealm
|
||||
|
||||
info(s"KerberizedTest Principal: $testPrincipal")
|
||||
info(s"KerberizedTest Keytab: $testKeytab")
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
kdc.stop()
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
def tryWithSecurityEnabled(block: => Unit): Unit = {
|
||||
val conf = new Configuration()
|
||||
assert(!UserGroupInformation.isSecurityEnabled)
|
||||
val currentUser = UserGroupInformation.getCurrentUser
|
||||
val authType = "hadoop.security.authentication"
|
||||
try {
|
||||
conf.set(authType, "KERBEROS")
|
||||
System.setProperty("java.security.krb5.realm", kdc.getRealm)
|
||||
System.setProperty("java.security.krb5.conf", kdc.getKrb5conf.getAbsolutePath)
|
||||
UserGroupInformation.setConfiguration(conf)
|
||||
assert(UserGroupInformation.isSecurityEnabled)
|
||||
block
|
||||
} finally {
|
||||
conf.unset(authType)
|
||||
System.clearProperty("java.security.krb5.realm")
|
||||
System.clearProperty("java.security.krb5.conf")
|
||||
UserGroupInformation.setLoginUser(currentUser)
|
||||
UserGroupInformation.setConfiguration(conf)
|
||||
assert(!UserGroupInformation.isSecurityEnabled)
|
||||
}
|
||||
|
||||
@ -0,0 +1,58 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.service
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
|
||||
import org.apache.kyuubi.KerberizedTestHelper
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
|
||||
class KinitAuxiliaryServiceSuite extends KerberizedTestHelper {
|
||||
|
||||
test("non secured kinit service") {
|
||||
val service = new KinitAuxiliaryService()
|
||||
assert(service.getServiceState === ServiceState.LATENT)
|
||||
service.initialize(KyuubiConf())
|
||||
assert(service.getServiceState === ServiceState.INITIALIZED)
|
||||
service.start()
|
||||
assert(service.getServiceState === ServiceState.STARTED)
|
||||
service.stop()
|
||||
assert(service.getServiceState === ServiceState.STOPPED)
|
||||
}
|
||||
|
||||
test("secured kinit service") {
|
||||
tryWithSecurityEnabled {
|
||||
val service = new KinitAuxiliaryService()
|
||||
val conf = KyuubiConf()
|
||||
val e = intercept[IllegalArgumentException](service.initialize(conf))
|
||||
assert(e.getMessage === "requirement failed: principal or keytab is missing")
|
||||
conf.set(KyuubiConf.SERVER_PRINCIPAL, testPrincipal)
|
||||
.set(KyuubiConf.SERVER_KEYTAB, testKeytab)
|
||||
.set(KyuubiConf.KINIT_INTERVAL, 0L)
|
||||
service.initialize(conf)
|
||||
assert(UserGroupInformation.getCurrentUser.hasKerberosCredentials)
|
||||
assert(UserGroupInformation.getCurrentUser.isFromKeytab)
|
||||
service.start()
|
||||
assert(service.getServiceState === ServiceState.STARTED)
|
||||
service.stop()
|
||||
assert(service.getServiceState === ServiceState.STOPPED)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -27,7 +27,7 @@ import org.apache.kyuubi.{KerberizedTestHelper, KYUUBI_VERSION, KyuubiFunSuite}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.ha.HighAvailabilityConf._
|
||||
import org.apache.kyuubi.ha.server.EmbeddedZkServer
|
||||
import org.apache.kyuubi.service.{Serverable, ServiceState}
|
||||
import org.apache.kyuubi.service.{NoopServer, Serverable, ServiceState}
|
||||
|
||||
class ServiceDiscoverySuite extends KyuubiFunSuite with KerberizedTestHelper {
|
||||
val zkServer = new EmbeddedZkServer()
|
||||
@ -74,21 +74,15 @@ class ServiceDiscoverySuite extends KyuubiFunSuite with KerberizedTestHelper {
|
||||
}
|
||||
|
||||
test("publish instance to embedded zookeeper server") {
|
||||
var deleted = false
|
||||
val instance = "kentyao.apache.org:10009"
|
||||
|
||||
conf
|
||||
.unset(KyuubiConf.SERVER_KEYTAB)
|
||||
.unset(KyuubiConf.SERVER_PRINCIPAL)
|
||||
.set(HA_ZK_QUORUM, zkServer.getConnectString)
|
||||
|
||||
val server: Serverable = new Serverable("test") {
|
||||
override private[kyuubi] val backendService = null
|
||||
|
||||
override protected def stopServer(): Unit = { deleted = true }
|
||||
|
||||
override def connectionUrl: String = instance
|
||||
}
|
||||
val server: Serverable = new NoopServer()
|
||||
server.initialize(conf)
|
||||
server.start()
|
||||
|
||||
val namespace = "kyuubiserver"
|
||||
val znodeRoot = s"/$namespace"
|
||||
@ -102,15 +96,16 @@ class ServiceDiscoverySuite extends KyuubiFunSuite with KerberizedTestHelper {
|
||||
assert(framework.checkExists().forPath(znodeRoot) !== null)
|
||||
val children = framework.getChildren.forPath(znodeRoot).asScala
|
||||
assert(children.head ===
|
||||
s"serviceUri=$instance;version=$KYUUBI_VERSION;sequence=0000000000")
|
||||
s"serviceUri=${server.connectionUrl};version=$KYUUBI_VERSION;sequence=0000000000")
|
||||
|
||||
children.foreach { child =>
|
||||
framework.delete().forPath(s"""$znodeRoot/$child""")
|
||||
}
|
||||
Thread.sleep(5000)
|
||||
assert(deleted, "Post hook called")
|
||||
assert(serviceDiscovery.getServiceState === ServiceState.STOPPED)
|
||||
assert(server.getServiceState === ServiceState.STOPPED)
|
||||
} finally {
|
||||
server.stop()
|
||||
serviceDiscovery.stop()
|
||||
framework.close()
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user