[KYUUBI-150][FOLLOWUP]using the classLoader in IsolatedClassLoader (#166)
* reset class loader * do not add jar twice in one create function cmd * the classloader should be closeable * the classloader should be closeable revert * Revert "the classloader should be closeable revert" This reverts commit 35602a0b689da0224a4fe3dcd778afabc3f7a803. * add ut * add ut * add ut * fix ut fix #150
This commit is contained in:
parent
13c68b5550
commit
ba9b424110
3
.gitignore
vendored
3
.gitignore
vendored
@ -22,6 +22,9 @@
|
||||
.settings
|
||||
build/apache-maven*
|
||||
build/scala*
|
||||
build/test
|
||||
kyuubi-server/build
|
||||
kyuubi-server/*example*
|
||||
target/
|
||||
dist/
|
||||
kyuubi-*-bin-*
|
||||
|
||||
@ -63,13 +63,24 @@ private[hive] class IsolatedClientLoader(
|
||||
assert(Try(rootClassLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
|
||||
|
||||
/**
|
||||
* The classloader that is used to load an isolated version of Hive.
|
||||
* (Kent Yao) Different with Spark internal which use an isolated classloader to support different
|
||||
* Hive versions, Kyuubi believe that the hive 1.2.1 is capable to support 1.2 or higher version
|
||||
* Hive metastore servers and the elder hive client versions are not worth to support.
|
||||
*
|
||||
* ANOTHER reason here we close the isolation is because Spark don't expose authorization
|
||||
* functions in [[HiveClient]], which is unable to invoke these methods in different classloaders
|
||||
*
|
||||
* Besides, [[HiveClient]] in normal Spark applications is globally one instance, so this
|
||||
* classloader could/should be non-closeable. But in Kyuubi, this is a session level object
|
||||
* associated with one KyuubiSession/SparkSession, thus, this classloader should be closeable to
|
||||
* support class unloading.
|
||||
*
|
||||
* This classloader is a special URLClassLoader that exposes the addURL method.
|
||||
* So, when we add jar, we can add this new jar directly through the addURL method
|
||||
* instead of stacking a new URLClassLoader on top of it.
|
||||
*/
|
||||
private[hive] val classLoader: MutableURLClassLoader = {
|
||||
new NonClosableMutableURLClassLoader(baseClassLoader)
|
||||
new MutableURLClassLoader(Array.empty, baseClassLoader)
|
||||
}
|
||||
|
||||
private[hive] def addJar(path: URL): Unit = {
|
||||
@ -88,7 +99,7 @@ private[hive] class IsolatedClientLoader(
|
||||
sparkConf,
|
||||
hadoopConf,
|
||||
config,
|
||||
baseClassLoader,
|
||||
classLoader,
|
||||
this).asInstanceOf[HiveClientImpl]
|
||||
} else {
|
||||
ctor.newInstance(
|
||||
@ -96,7 +107,7 @@ private[hive] class IsolatedClientLoader(
|
||||
sparkConf,
|
||||
hadoopConf,
|
||||
config,
|
||||
baseClassLoader,
|
||||
classLoader,
|
||||
this).asInstanceOf[HiveClientImpl]
|
||||
}
|
||||
|
||||
|
||||
@ -35,9 +35,10 @@ import org.apache.spark.KyuubiConf._
|
||||
import org.apache.spark.KyuubiSparkUtil
|
||||
import org.apache.spark.scheduler.cluster.KyuubiSparkExecutorUtils
|
||||
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils}
|
||||
import org.apache.spark.sql.catalyst.catalog.FunctionResource
|
||||
import org.apache.spark.sql.catalyst.catalog.{FileResource, FunctionResource, JarResource}
|
||||
import org.apache.spark.sql.catalyst.parser.ParseException
|
||||
import org.apache.spark.sql.execution.command.AddJarCommand
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.sql.execution.command.{AddFileCommand, AddJarCommand, CreateFunctionCommand}
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
|
||||
@ -308,7 +309,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
}
|
||||
}
|
||||
|
||||
private def localizeAndAndResource(path: String): Unit = try {
|
||||
private def localizeAndAndResource(path: String): Option[String] = try {
|
||||
if (isResourceDownloadable(path)) {
|
||||
val src = new Path(path)
|
||||
val destFileName = src.getName
|
||||
@ -317,12 +318,37 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
val fs = src.getFileSystem(sparkSession.sparkContext.hadoopConfiguration)
|
||||
fs.copyToLocalFile(src, new Path(destFile))
|
||||
FileUtil.chmod(destFile, "ugo+rx", true)
|
||||
AddJarCommand(destFile).run(session.sparkSession)
|
||||
Some(destFile)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} catch {
|
||||
case e: Exception => throw new KyuubiSQLException(s"Failed to read external resource: $path", e)
|
||||
}
|
||||
|
||||
private[operation] def transform(plan: LogicalPlan): LogicalPlan = plan match {
|
||||
case c: CreateFunctionCommand =>
|
||||
val resources =
|
||||
ReflectUtils.getFieldValue(c, "resources").asInstanceOf[Seq[FunctionResource]]
|
||||
resources.foreach {
|
||||
case FunctionResource(JarResource, uri) =>
|
||||
localizeAndAndResource(uri).map(path => AddJarCommand(path).run(sparkSession))
|
||||
case FunctionResource(FileResource, uri) =>
|
||||
localizeAndAndResource(uri).map(path => AddFileCommand(path).run(sparkSession))
|
||||
case o =>
|
||||
throw new KyuubiSQLException(s"Resource Type '${o.resourceType}' is not supported.")
|
||||
}
|
||||
if (resources.isEmpty) {
|
||||
c
|
||||
} else {
|
||||
ReflectUtils.setFieldValue(c, "resources", Seq.empty[FunctionResource])
|
||||
c
|
||||
}
|
||||
case a: AddJarCommand => localizeAndAndResource(a.path).map(AddJarCommand).getOrElse(a)
|
||||
case a: AddFileCommand => localizeAndAndResource(a.path).map(AddFileCommand).getOrElse(a)
|
||||
case _ => plan
|
||||
}
|
||||
|
||||
private def execute(): Unit = {
|
||||
try {
|
||||
statementId = UUID.randomUUID().toString
|
||||
@ -344,19 +370,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
|
||||
KyuubiSparkUtil.setActiveSparkContext(sparkSession.sparkContext)
|
||||
|
||||
val parsedPlan = SparkSQLUtils.parsePlan(sparkSession, statement)
|
||||
parsedPlan match {
|
||||
case c if c.nodeName == "CreateFunctionCommand" =>
|
||||
val resources =
|
||||
ReflectUtils.getFieldValue(c, "resources").asInstanceOf[Seq[FunctionResource]]
|
||||
resources.foreach { case FunctionResource(_, uri) =>
|
||||
localizeAndAndResource(uri)
|
||||
}
|
||||
case a if a.nodeName == "AddJarCommand" =>
|
||||
val path = ReflectUtils.getFieldValue(a, "path").asInstanceOf[String]
|
||||
localizeAndAndResource(path)
|
||||
case _ =>
|
||||
}
|
||||
result = SparkSQLUtils.toDataFrame(sparkSession, parsedPlan)
|
||||
result = SparkSQLUtils.toDataFrame(sparkSession, transform(parsedPlan))
|
||||
KyuubiServerMonitor.getListener(session.getUserName).foreach {
|
||||
_.onStatementParsed(statementId, result.queryExecution.toString())
|
||||
}
|
||||
|
||||
BIN
kyuubi-server/src/test/resources/example-1.0.0-SNAPSHOT.jar
Normal file
BIN
kyuubi-server/src/test/resources/example-1.0.0-SNAPSHOT.jar
Normal file
Binary file not shown.
@ -26,7 +26,8 @@ import org.apache.spark.{KyuubiSparkUtil, SparkConf}
|
||||
trait SecuredFunSuite {
|
||||
|
||||
var kdc: MiniKdc = null
|
||||
val baseDir = KyuubiSparkUtil.createTempDir("kyuubi-kdc")
|
||||
val baseDir = KyuubiSparkUtil.createTempDir(
|
||||
this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath, "kyuubi-kdc")
|
||||
try {
|
||||
val kdcConf = MiniKdc.createConf()
|
||||
kdcConf.setProperty(MiniKdc.INSTANCE, "KyuubiKrbServer")
|
||||
|
||||
@ -23,14 +23,21 @@ import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.hive.service.cli.thrift.TProtocolVersion
|
||||
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite}
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.catalog.FunctionResource
|
||||
import org.apache.spark.sql.execution.SparkSqlParser
|
||||
import org.apache.spark.sql.execution.command.CreateFunctionCommand
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.mockito.Mockito.when
|
||||
import org.scalatest.mock.MockitoSugar
|
||||
|
||||
import yaooqinn.kyuubi.KyuubiSQLException
|
||||
import yaooqinn.kyuubi.cli.FetchOrientation.FETCH_NEXT
|
||||
import yaooqinn.kyuubi.schema.ColumnBasedSet
|
||||
import yaooqinn.kyuubi.session.{KyuubiSession, SessionManager}
|
||||
import yaooqinn.kyuubi.spark.SparkSessionWithUGI
|
||||
import yaooqinn.kyuubi.utils.ReflectUtils
|
||||
|
||||
class KyuubiOperationSuite extends SparkFunSuite {
|
||||
class KyuubiOperationSuite extends SparkFunSuite with MockitoSugar {
|
||||
|
||||
val conf = new SparkConf(loadDefaults = true).setAppName("operation test")
|
||||
KyuubiSparkUtil.setupCommonConfig(conf)
|
||||
@ -151,4 +158,36 @@ class KyuubiOperationSuite extends SparkFunSuite {
|
||||
assert(!KyuubiOperation.isResourceDownloadable("file://a/b/c.jar"))
|
||||
assert(!KyuubiOperation.isResourceDownloadable("dfs://a/b/c.jar"))
|
||||
}
|
||||
|
||||
test("transform plan") {
|
||||
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
|
||||
|
||||
val parser = new SparkSqlParser(new SQLConf)
|
||||
val plan0 = parser.parsePlan("create temporary function a as 'a.b.c'")
|
||||
assert(op.transform(plan0) === plan0)
|
||||
|
||||
val plan1 = parser.parsePlan(
|
||||
"create temporary function a as 'a.b.c' using file 'hdfs://a/b/c.jar'")
|
||||
val e1 = intercept[KyuubiSQLException](op.transform(plan1))
|
||||
assert(e1.getMessage.startsWith("Failed to read external resource"))
|
||||
|
||||
val plan2 = parser.parsePlan(
|
||||
"create temporary function a as 'a.b.c' using jar 'hdfs://a/b/c.jar'")
|
||||
val e2 = intercept[KyuubiSQLException](op.transform(plan2))
|
||||
assert(e2.getMessage.startsWith("Failed to read external resource"))
|
||||
|
||||
val resources = mock[Seq[FunctionResource]]
|
||||
when(resources.isEmpty).thenReturn(false)
|
||||
|
||||
val command = plan2.asInstanceOf[CreateFunctionCommand].copy(resources = resources)
|
||||
val plan4 = op.transform(command)
|
||||
assert(plan4 === command)
|
||||
assert(plan4.asInstanceOf[CreateFunctionCommand].resources !== resources)
|
||||
|
||||
val plan5 = parser.parsePlan(
|
||||
"create temporary function a as 'a.b.c' using archive 'hdfs://a/b/c.jar'")
|
||||
|
||||
val e3 = intercept[KyuubiSQLException](op.transform(plan5))
|
||||
assert(e3.getMessage.startsWith("Resource Type"))
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,71 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package yaooqinn.kyuubi.operation
|
||||
|
||||
import java.io.File
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.hdfs.{HdfsConfiguration, MiniDFSCluster}
|
||||
import org.apache.spark.sql.catalyst.catalog.FunctionResource
|
||||
import org.apache.spark.sql.execution.SparkSqlParser
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
||||
import yaooqinn.kyuubi.utils.ReflectUtils
|
||||
|
||||
class KyuubiOperationWithHDFSSuite extends KyuubiOperationSuite {
|
||||
val hdfsConf = new HdfsConfiguration
|
||||
var cluster: MiniDFSCluster = new MiniDFSCluster.Builder(hdfsConf).build()
|
||||
cluster.waitClusterUp()
|
||||
val fs = cluster.getFileSystem
|
||||
val homeDirectory: Path = fs.getHomeDirectory
|
||||
private val fileName = "example-1.0.0-SNAPSHOT.jar"
|
||||
private val remoteUDFFile = new Path(homeDirectory, fileName)
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
val file = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation + fileName)
|
||||
val localUDFFile = new Path(file.getPath)
|
||||
fs.copyFromLocalFile(localUDFFile, remoteUDFFile)
|
||||
super.beforeAll()
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
fs.delete(remoteUDFFile, true)
|
||||
fs.close()
|
||||
cluster.shutdown()
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
test("transform logical plan") {
|
||||
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
|
||||
val parser = new SparkSqlParser(new SQLConf)
|
||||
val plan0 = parser.parsePlan(
|
||||
s"create temporary function a as 'a.b.c' using file '$remoteUDFFile'")
|
||||
val plan1 = op.transform(plan0)
|
||||
assert(plan0 === plan1)
|
||||
assert(
|
||||
ReflectUtils.getFieldValue(plan1, "resources").asInstanceOf[Seq[FunctionResource]].isEmpty)
|
||||
|
||||
val plan2 = parser.parsePlan(
|
||||
s"create temporary function a as 'a.b.c' using jar '$remoteUDFFile'")
|
||||
val plan3 = op.transform(plan2)
|
||||
assert(plan3 === plan2)
|
||||
assert(
|
||||
ReflectUtils.getFieldValue(plan3, "resources").asInstanceOf[Seq[FunctionResource]].isEmpty)
|
||||
}
|
||||
|
||||
}
|
||||
@ -101,7 +101,8 @@ class KyuubiServerSuite extends SparkFunSuite with BeforeAndAfterEach {
|
||||
test("disable fs caches for secured cluster") {
|
||||
|
||||
var kdc: MiniKdc = null
|
||||
val baseDir = KyuubiSparkUtil.createTempDir(namePrefix = "kyuubi-kdc")
|
||||
val baseDir = KyuubiSparkUtil.createTempDir(
|
||||
this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath, "kyuubi-kdc")
|
||||
try {
|
||||
val kdcConf = MiniKdc.createConf()
|
||||
kdcConf.setProperty(MiniKdc.INSTANCE, "KyuubiKrbServer")
|
||||
|
||||
@ -17,10 +17,10 @@
|
||||
|
||||
package yaooqinn.kyuubi.utils
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.security.UserGroupInformation
|
||||
import org.apache.hadoop.yarn.api.records.{ApplicationId, ContainerLaunchContext, Resource, YarnApplicationState}
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster
|
||||
import org.apache.hadoop.yarn.util.Records
|
||||
import org.apache.spark.SparkFunSuite
|
||||
@ -28,26 +28,27 @@ import org.scalatest.BeforeAndAfterEach
|
||||
|
||||
class KyuubiHadoopUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
|
||||
|
||||
private var cluster: MiniYARNCluster = _
|
||||
private val cluster: MiniYARNCluster =
|
||||
new MiniYARNCluster(this.getClass.getSimpleName, 1, 1, 1, 1)
|
||||
private val yarnClient = YarnClient.createYarnClient()
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
cluster = new MiniYARNCluster(this.getClass.getSimpleName, 1, 1, 1, 1)
|
||||
val hadoopConf = new Configuration()
|
||||
cluster.init(hadoopConf)
|
||||
val yarnConf = new YarnConfiguration()
|
||||
yarnConf.set(YarnConfiguration.IS_MINI_YARN_CLUSTER, "true")
|
||||
cluster.init(yarnConf)
|
||||
cluster.start()
|
||||
yarnClient.init(hadoopConf)
|
||||
yarnClient.init(yarnConf)
|
||||
yarnClient.start()
|
||||
super.beforeAll()
|
||||
}
|
||||
|
||||
override def afterAll(): Unit = {
|
||||
yarnClient.stop()
|
||||
cluster.stop()
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
override def beforeEach(): Unit = {
|
||||
|
||||
super.beforeEach()
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user