diff --git a/.gitignore b/.gitignore index ecd2d19cd..0e8c42d82 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,9 @@ .settings build/apache-maven* build/scala* +build/test +kyuubi-server/build +kyuubi-server/*example* target/ dist/ kyuubi-*-bin-* diff --git a/kyuubi-server/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/kyuubi-server/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 616a29797..f37cd6031 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -63,13 +63,24 @@ private[hive] class IsolatedClientLoader( assert(Try(rootClassLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure) /** - * The classloader that is used to load an isolated version of Hive. + * (Kent Yao) Different with Spark internal which use an isolated classloader to support different + * Hive versions, Kyuubi believe that the hive 1.2.1 is capable to support 1.2 or higher version + * Hive metastore servers and the elder hive client versions are not worth to support. + * + * ANOTHER reason here we close the isolation is because Spark don't expose authorization + * functions in [[HiveClient]], which is unable to invoke these methods in different classloaders + * + * Besides, [[HiveClient]] in normal Spark applications is globally one instance, so this + * classloader could/should be non-closeable. But in Kyuubi, this is a session level object + * associated with one KyuubiSession/SparkSession, thus, this classloader should be closeable to + * support class unloading. + * * This classloader is a special URLClassLoader that exposes the addURL method. * So, when we add jar, we can add this new jar directly through the addURL method * instead of stacking a new URLClassLoader on top of it. */ private[hive] val classLoader: MutableURLClassLoader = { - new NonClosableMutableURLClassLoader(baseClassLoader) + new MutableURLClassLoader(Array.empty, baseClassLoader) } private[hive] def addJar(path: URL): Unit = { @@ -88,7 +99,7 @@ private[hive] class IsolatedClientLoader( sparkConf, hadoopConf, config, - baseClassLoader, + classLoader, this).asInstanceOf[HiveClientImpl] } else { ctor.newInstance( @@ -96,7 +107,7 @@ private[hive] class IsolatedClientLoader( sparkConf, hadoopConf, config, - baseClassLoader, + classLoader, this).asInstanceOf[HiveClientImpl] } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 2cd46fd7a..98ad355c5 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -35,9 +35,10 @@ import org.apache.spark.KyuubiConf._ import org.apache.spark.KyuubiSparkUtil import org.apache.spark.scheduler.cluster.KyuubiSparkExecutorUtils import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils} -import org.apache.spark.sql.catalyst.catalog.FunctionResource +import org.apache.spark.sql.catalyst.catalog.{FileResource, FunctionResource, JarResource} import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.execution.command.AddJarCommand +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.{AddFileCommand, AddJarCommand, CreateFunctionCommand} import org.apache.spark.sql.types._ import yaooqinn.kyuubi.{KyuubiSQLException, Logging} @@ -308,7 +309,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } } - private def localizeAndAndResource(path: String): Unit = try { + private def localizeAndAndResource(path: String): Option[String] = try { if (isResourceDownloadable(path)) { val src = new Path(path) val destFileName = src.getName @@ -317,12 +318,37 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging val fs = src.getFileSystem(sparkSession.sparkContext.hadoopConfiguration) fs.copyToLocalFile(src, new Path(destFile)) FileUtil.chmod(destFile, "ugo+rx", true) - AddJarCommand(destFile).run(session.sparkSession) + Some(destFile) + } else { + None } } catch { case e: Exception => throw new KyuubiSQLException(s"Failed to read external resource: $path", e) } + private[operation] def transform(plan: LogicalPlan): LogicalPlan = plan match { + case c: CreateFunctionCommand => + val resources = + ReflectUtils.getFieldValue(c, "resources").asInstanceOf[Seq[FunctionResource]] + resources.foreach { + case FunctionResource(JarResource, uri) => + localizeAndAndResource(uri).map(path => AddJarCommand(path).run(sparkSession)) + case FunctionResource(FileResource, uri) => + localizeAndAndResource(uri).map(path => AddFileCommand(path).run(sparkSession)) + case o => + throw new KyuubiSQLException(s"Resource Type '${o.resourceType}' is not supported.") + } + if (resources.isEmpty) { + c + } else { + ReflectUtils.setFieldValue(c, "resources", Seq.empty[FunctionResource]) + c + } + case a: AddJarCommand => localizeAndAndResource(a.path).map(AddJarCommand).getOrElse(a) + case a: AddFileCommand => localizeAndAndResource(a.path).map(AddFileCommand).getOrElse(a) + case _ => plan + } + private def execute(): Unit = { try { statementId = UUID.randomUUID().toString @@ -344,19 +370,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging KyuubiSparkUtil.setActiveSparkContext(sparkSession.sparkContext) val parsedPlan = SparkSQLUtils.parsePlan(sparkSession, statement) - parsedPlan match { - case c if c.nodeName == "CreateFunctionCommand" => - val resources = - ReflectUtils.getFieldValue(c, "resources").asInstanceOf[Seq[FunctionResource]] - resources.foreach { case FunctionResource(_, uri) => - localizeAndAndResource(uri) - } - case a if a.nodeName == "AddJarCommand" => - val path = ReflectUtils.getFieldValue(a, "path").asInstanceOf[String] - localizeAndAndResource(path) - case _ => - } - result = SparkSQLUtils.toDataFrame(sparkSession, parsedPlan) + result = SparkSQLUtils.toDataFrame(sparkSession, transform(parsedPlan)) KyuubiServerMonitor.getListener(session.getUserName).foreach { _.onStatementParsed(statementId, result.queryExecution.toString()) } diff --git a/kyuubi-server/src/test/resources/example-1.0.0-SNAPSHOT.jar b/kyuubi-server/src/test/resources/example-1.0.0-SNAPSHOT.jar new file mode 100644 index 000000000..5abe50994 Binary files /dev/null and b/kyuubi-server/src/test/resources/example-1.0.0-SNAPSHOT.jar differ diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/SecuredFunSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/SecuredFunSuite.scala index 90423cd44..2dad6cfec 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/SecuredFunSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/SecuredFunSuite.scala @@ -26,7 +26,8 @@ import org.apache.spark.{KyuubiSparkUtil, SparkConf} trait SecuredFunSuite { var kdc: MiniKdc = null - val baseDir = KyuubiSparkUtil.createTempDir("kyuubi-kdc") + val baseDir = KyuubiSparkUtil.createTempDir( + this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath, "kyuubi-kdc") try { val kdcConf = MiniKdc.createConf() kdcConf.setProperty(MiniKdc.INSTANCE, "KyuubiKrbServer") diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala index d165cc707..0fbb2e373 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala @@ -23,14 +23,21 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.FunctionResource +import org.apache.spark.sql.execution.SparkSqlParser +import org.apache.spark.sql.execution.command.CreateFunctionCommand +import org.apache.spark.sql.internal.SQLConf +import org.mockito.Mockito.when +import org.scalatest.mock.MockitoSugar +import yaooqinn.kyuubi.KyuubiSQLException import yaooqinn.kyuubi.cli.FetchOrientation.FETCH_NEXT import yaooqinn.kyuubi.schema.ColumnBasedSet import yaooqinn.kyuubi.session.{KyuubiSession, SessionManager} import yaooqinn.kyuubi.spark.SparkSessionWithUGI import yaooqinn.kyuubi.utils.ReflectUtils -class KyuubiOperationSuite extends SparkFunSuite { +class KyuubiOperationSuite extends SparkFunSuite with MockitoSugar { val conf = new SparkConf(loadDefaults = true).setAppName("operation test") KyuubiSparkUtil.setupCommonConfig(conf) @@ -151,4 +158,36 @@ class KyuubiOperationSuite extends SparkFunSuite { assert(!KyuubiOperation.isResourceDownloadable("file://a/b/c.jar")) assert(!KyuubiOperation.isResourceDownloadable("dfs://a/b/c.jar")) } + + test("transform plan") { + val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement) + + val parser = new SparkSqlParser(new SQLConf) + val plan0 = parser.parsePlan("create temporary function a as 'a.b.c'") + assert(op.transform(plan0) === plan0) + + val plan1 = parser.parsePlan( + "create temporary function a as 'a.b.c' using file 'hdfs://a/b/c.jar'") + val e1 = intercept[KyuubiSQLException](op.transform(plan1)) + assert(e1.getMessage.startsWith("Failed to read external resource")) + + val plan2 = parser.parsePlan( + "create temporary function a as 'a.b.c' using jar 'hdfs://a/b/c.jar'") + val e2 = intercept[KyuubiSQLException](op.transform(plan2)) + assert(e2.getMessage.startsWith("Failed to read external resource")) + + val resources = mock[Seq[FunctionResource]] + when(resources.isEmpty).thenReturn(false) + + val command = plan2.asInstanceOf[CreateFunctionCommand].copy(resources = resources) + val plan4 = op.transform(command) + assert(plan4 === command) + assert(plan4.asInstanceOf[CreateFunctionCommand].resources !== resources) + + val plan5 = parser.parsePlan( + "create temporary function a as 'a.b.c' using archive 'hdfs://a/b/c.jar'") + + val e3 = intercept[KyuubiSQLException](op.transform(plan5)) + assert(e3.getMessage.startsWith("Resource Type")) + } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationWithHDFSSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationWithHDFSSuite.scala new file mode 100644 index 000000000..9a36f598c --- /dev/null +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationWithHDFSSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.operation + +import java.io.File + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hdfs.{HdfsConfiguration, MiniDFSCluster} +import org.apache.spark.sql.catalyst.catalog.FunctionResource +import org.apache.spark.sql.execution.SparkSqlParser +import org.apache.spark.sql.internal.SQLConf + +import yaooqinn.kyuubi.utils.ReflectUtils + +class KyuubiOperationWithHDFSSuite extends KyuubiOperationSuite { + val hdfsConf = new HdfsConfiguration + var cluster: MiniDFSCluster = new MiniDFSCluster.Builder(hdfsConf).build() + cluster.waitClusterUp() + val fs = cluster.getFileSystem + val homeDirectory: Path = fs.getHomeDirectory + private val fileName = "example-1.0.0-SNAPSHOT.jar" + private val remoteUDFFile = new Path(homeDirectory, fileName) + + override def beforeAll(): Unit = { + val file = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation + fileName) + val localUDFFile = new Path(file.getPath) + fs.copyFromLocalFile(localUDFFile, remoteUDFFile) + super.beforeAll() + } + + override def afterAll(): Unit = { + fs.delete(remoteUDFFile, true) + fs.close() + cluster.shutdown() + super.afterAll() + } + + test("transform logical plan") { + val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement) + val parser = new SparkSqlParser(new SQLConf) + val plan0 = parser.parsePlan( + s"create temporary function a as 'a.b.c' using file '$remoteUDFFile'") + val plan1 = op.transform(plan0) + assert(plan0 === plan1) + assert( + ReflectUtils.getFieldValue(plan1, "resources").asInstanceOf[Seq[FunctionResource]].isEmpty) + + val plan2 = parser.parsePlan( + s"create temporary function a as 'a.b.c' using jar '$remoteUDFFile'") + val plan3 = op.transform(plan2) + assert(plan3 === plan2) + assert( + ReflectUtils.getFieldValue(plan3, "resources").asInstanceOf[Seq[FunctionResource]].isEmpty) + } + +} diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala index 85009250d..20e65ba10 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/KyuubiServerSuite.scala @@ -101,7 +101,8 @@ class KyuubiServerSuite extends SparkFunSuite with BeforeAndAfterEach { test("disable fs caches for secured cluster") { var kdc: MiniKdc = null - val baseDir = KyuubiSparkUtil.createTempDir(namePrefix = "kyuubi-kdc") + val baseDir = KyuubiSparkUtil.createTempDir( + this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath, "kyuubi-kdc") try { val kdcConf = MiniKdc.createConf() kdcConf.setProperty(MiniKdc.INSTANCE, "KyuubiKrbServer") diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala index 39467381d..ada11ec03 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala @@ -17,10 +17,10 @@ package yaooqinn.kyuubi.utils -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.records.{ApplicationId, ContainerLaunchContext, Resource, YarnApplicationState} import org.apache.hadoop.yarn.client.api.YarnClient +import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster import org.apache.hadoop.yarn.util.Records import org.apache.spark.SparkFunSuite @@ -28,26 +28,27 @@ import org.scalatest.BeforeAndAfterEach class KyuubiHadoopUtilSuite extends SparkFunSuite with BeforeAndAfterEach { - private var cluster: MiniYARNCluster = _ + private val cluster: MiniYARNCluster = + new MiniYARNCluster(this.getClass.getSimpleName, 1, 1, 1, 1) private val yarnClient = YarnClient.createYarnClient() override def beforeAll(): Unit = { - cluster = new MiniYARNCluster(this.getClass.getSimpleName, 1, 1, 1, 1) - val hadoopConf = new Configuration() - cluster.init(hadoopConf) + val yarnConf = new YarnConfiguration() + yarnConf.set(YarnConfiguration.IS_MINI_YARN_CLUSTER, "true") + cluster.init(yarnConf) cluster.start() - yarnClient.init(hadoopConf) + yarnClient.init(yarnConf) yarnClient.start() super.beforeAll() } override def afterAll(): Unit = { + yarnClient.stop() cluster.stop() super.afterAll() } override def beforeEach(): Unit = { - super.beforeEach() }