fix #259 engine scope

Squashed commit of the following:

commit bb598b101da43b145bd4c95976e8decd39a02210
Merge: 54e730c c4cf523
Author: zen <xinjingziranchan@gmail.com>
Date:   Thu Dec 24 10:53:42 2020 +0800

    Merge remote-tracking branch 'origin/pr2-engine-scope' into pr2-engine-scope

    # Conflicts:
    #	externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala

commit 54e730c5b26ffb147e5c1266bdd036460cc20d82
Merge: 62d1575 512fbc5
Author: zen <xinjingziranchan@gmail.com>
Date:   Thu Dec 24 10:51:21 2020 +0800

    Merge branch 'master' into pr2-engine-scope

    # Conflicts:
    #	externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala

commit c4cf523dc5bc409ea7ac742292d5e6e2d6421b8a
Author: zen <xinjingziranchan@gmail.com>
Date:   Mon Dec 21 23:10:12 2020 +0800

    merge master and fix the conflicts

commit c53b166f1eb3e8a698f498e1d123c815cef5d665
Merge: 62d1575 e8bc44c
Author: zen <xinjingziranchan@gmail.com>
Date:   Mon Dec 21 23:02:26 2020 +0800

    Merge branch 'master' into pr2-engine-scope

    # Conflicts:
    #	externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala

commit 62d15751ea6b213319501bcfac442d21d0cd5ca6
Author: zen <xinjingziranchan@gmail.com>
Date:   Thu Dec 10 15:40:48 2020 +0800

    fix zkNamespace

commit 7e3e432318ed931b1ad16c839c4a027e00bc6ef9
Author: zen <xinjingziranchan@gmail.com>
Date:   Thu Dec 10 14:34:34 2020 +0800

    Support for configuring the session scope of SparkSQLEngine.
This commit is contained in:
Zen 2020-12-24 19:02:40 +08:00 committed by Kent Yao
parent 512fbc50cf
commit 7a79b16cdf
No known key found for this signature in database
GPG Key ID: A4F0BE81C89B595B
6 changed files with 223 additions and 4 deletions

View File

@ -25,6 +25,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.{EngineAppName, EngineScope}
import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{RetryPolicies, ServiceDiscovery}
@ -38,6 +39,11 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
override private[kyuubi] val backendService = new SparkSQLBackendService(spark)
val appName: EngineAppName = EngineAppName.parseAppName(
spark.conf.get(EngineAppName.SPARK_APP_NAME_KEY),
SparkSQLEngine.kyuubiConf
)
override protected def stopServer(): Unit = {
countDownLatch.countDown()
spark.stop()
@ -59,7 +65,7 @@ object SparkSQLEngine extends Logging {
val appName = s"kyuubi_${user}_spark_${Instant.now}"
sparkConf.setAppName(appName)
sparkConf.setIfMissing(EngineAppName.SPARK_APP_NAME_KEY, appName)
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_BIND_PORT, 0)
kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString)
@ -93,10 +99,22 @@ object SparkSQLEngine extends Logging {
val needExpose = kyuubiConf.get(HA_ZK_QUORUM).nonEmpty
if (needExpose) {
val zkNamespacePrefix = kyuubiConf.get(HA_ZK_NAMESPACE)
val serviceDiscovery = new ServiceDiscovery(engine, s"$zkNamespacePrefix-$user")
val zkNamespace = engine.appName.makeZkPath(zkNamespacePrefix)
val serviceDiscovery = new ServiceDiscovery(engine, zkNamespace.substring(1))
serviceDiscovery.initialize(kyuubiConf)
serviceDiscovery.start()
sys.addShutdownHook(serviceDiscovery.stop())
sys.addShutdownHook({
serviceDiscovery.stop()
if (EngineScope.SESSION.equals(engine.appName.getEngineScope)) {
val zkClient = ServiceDiscovery.startZookeeperClient(kyuubiConf)
try {
info(s"Deleting engine service's namespace: $zkNamespace")
zkClient.delete().forPath(zkNamespace)
} finally {
zkClient.close()
}
}
})
}
}

View File

@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.engine.EngineScope
import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP}
case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
@ -471,4 +472,16 @@ object KyuubiConf {
.version("1.0.0")
.timeConf
.createWithDefault(Duration.ofSeconds(5).toMillis)
val ENGINE_SCOPE: ConfigEntry[String] = buildConf("session.engine.scope")
.doc("The engine session scope.<ul>" +
" <li>S: One engine per kyuubi session in kyuubi cluster.</li>" +
" <li>U: One engine per user in kyuubi cluster.</li>" +
" <li>G: One engine per group in kyuubi cluster.</li>" +
" <li>K: One engine per kyuubi server in kyuubi cluster.</li></ul>")
.version("1.0.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(EngineScope.values.map(_.toString))
.createWithDefault(EngineScope.USER.toString)
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine
import org.apache.curator.utils.ZKPaths
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SCOPE
import org.apache.kyuubi.engine.EngineScope.{EngineScope, SESSION}
class EngineAppName(user: String, sessionId: String, conf: KyuubiConf) {
import EngineAppName._
private val engineScope = EngineScope.withName(conf.get(ENGINE_SCOPE))
def getEngineScope: EngineScope = engineScope
/**
* kyuubi_[KGUS]_user_sessionid
*
* @return engine app name
*/
def generateAppName(): String = {
StringBuilder.newBuilder.append(APP_NAME_PREFIX)
.append(DELIMITER).append(engineScope)
.append(DELIMITER).append(user)
.append(DELIMITER).append(sessionId).mkString
}
/**
* if engine scope is [S] return
* /[zkNamespace]-engine/S/[user]/[sessionId]
* else return
* /[zkNamespace]-engine/[engineScope]/[user]
*
* @param zkNamespace zk root path
* @return engine zk path
*/
def makeZkPath(zkNamespace: String): String = {
val namespace = zkNamespace + "-" + ZK_NAMESPACE_SUFFIX
engineScope match {
case SESSION =>
ZKPaths.makePath(namespace, engineScope.toString, user, sessionId)
case _ =>
ZKPaths.makePath(namespace, engineScope.toString, user)
}
}
}
object EngineAppName {
private val APP_NAME_PREFIX = "kyuubi"
private val ZK_NAMESPACE_SUFFIX = "engine"
private val DELIMITER = "_"
val SPARK_APP_NAME_KEY = "spark.app.name"
def apply(user: String, sessionId: String, conf: KyuubiConf): EngineAppName =
new EngineAppName(user, sessionId, conf)
def parseAppName(appName: String, conf: KyuubiConf): EngineAppName = {
val params = appName.split(DELIMITER)
val clone = conf.clone
clone.set(ENGINE_SCOPE, params(1))
EngineAppName(params(2), params(3), conf)
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine
object EngineScope extends Enumeration {
type EngineScope = Value
val SESSION = Value("S")
val USER = Value("U")
val GROUP = Value("G")
val SERVER = Value("K")
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.EngineScope.EngineScope
class EngineAppNameSuite extends KyuubiFunSuite {
private val kyuubiConf: KyuubiConf = KyuubiConf()
private val zkNamespace: String = "kyuubi"
private val user: String = "hive"
private val handle: String = "a9938028-667d-4006-993e-0bdb5a14ae91"
override def beforeAll(): Unit = {
super.beforeAll()
}
test("SparkSQLEngineAppName") {
// SESSION SCOPE
val sessionScopeAppName = "kyuubi_S_hive_a9938028-667d-4006-993e-0bdb5a14ae91"
val sessionScopeZkPath = "/kyuubi-engine/S/hive/a9938028-667d-4006-993e-0bdb5a14ae91"
checkAppNameAndZkPath(EngineScope.SESSION, sessionScopeAppName, sessionScopeZkPath)
// USER SCOPE
val userScopeAppName = "kyuubi_U_hive_a9938028-667d-4006-993e-0bdb5a14ae91"
val userScopeZkPath = "/kyuubi-engine/U/hive"
checkAppNameAndZkPath(EngineScope.USER, userScopeAppName, userScopeZkPath)
// GROUP SCOPE
val groupScopeAppName = "kyuubi_G_hive_a9938028-667d-4006-993e-0bdb5a14ae91"
val groupScopeZkPath = "/kyuubi-engine/G/hive"
checkAppNameAndZkPath(EngineScope.GROUP, groupScopeAppName, groupScopeZkPath)
// SERVER SCOPE
val serverScopeAppName = "kyuubi_K_hive_a9938028-667d-4006-993e-0bdb5a14ae91"
val serverScopeZkPath = "/kyuubi-engine/K/hive"
checkAppNameAndZkPath(EngineScope.SERVER, serverScopeAppName, serverScopeZkPath)
}
private def checkAppNameAndZkPath(scope: EngineScope,
expectAppName: String, expectZkPath: String): Unit = {
kyuubiConf.set(KyuubiConf.ENGINE_SCOPE, scope.toString)
val engine = EngineAppName(user, handle, kyuubiConf.getUserDefaults(user))
assert(engine.generateAppName() === expectAppName)
assert(engine.makeZkPath(zkNamespace) === expectZkPath)
val zkPath = EngineAppName.parseAppName(expectAppName, kyuubiConf).makeZkPath(zkNamespace)
assert(zkPath === expectZkPath)
}
}

View File

@ -32,6 +32,7 @@ import org.apache.thrift.transport.{TSocket, TTransport}
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.EngineAppName
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.service.authentication.PlainSASLHelper
@ -54,10 +55,12 @@ class KyuubiSessionImpl(
case ("use:database", _) =>
case (key, value) => sessionConf.set(key, value)
}
sessionConf.set(EngineAppName.SPARK_APP_NAME_KEY, engineAppName.generateAppName())
}
private val engineAppName = EngineAppName(user, handle.identifier.toString, sessionConf)
private val timeout: Long = sessionConf.get(ENGINE_INIT_TIMEOUT)
private val zkNamespace = s"$zkNamespacePrefix-$user"
private val zkNamespace = engineAppName.makeZkPath(zkNamespacePrefix)
private val zkPath = ZKPaths.makePath(null, zkNamespace)
private lazy val zkClient = ServiceDiscovery.startZookeeperClient(sessionConf)