From f61806283f68c97cddb86e1f404c3cc8a552c740 Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Tue, 6 Sep 2022 17:29:52 +0800 Subject: [PATCH] [KYUUBI #3064] Fix scala NPE issue when adding non-local jar URI to class loader ### _Why are the changes needed?_ Close #3064 Refer the comments by cxzl25 Spark uses the code `URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory(hadoopConf))` supports the class path from HDFS. But because the scala compiler only supports adding file schema urls to the class path, non-file schema urls will cause NPE. ```java Error: Error operating ExecuteScala: java.lang.NullPointerException at scala.tools.nsc.classpath.FileUtils$AbstractFileOps$.isJarOrZip$extension(FileUtils.scala:32) at scala.tools.nsc.classpath.ClassPathFactory$.newClassPath(ClassPathFactory.scala:90) at scala.tools.nsc.Global.$anonfun$extendCompilerClassPath$1(Global.scala:832) ``` scala.tools.nsc.Global#extendCompilerClassPath ```scala AbstractFile.getURL(u) ``` scala.reflect.io.AbstractFile#getURL ```scala def getURL(url: URL): AbstractFile = if (url.getProtocol == "file") { val f = new java.io.File(url.toURI) if (f.isDirectory) getDirectory(f) else getFile(f) } else null ``` spark-shell supports --jars hdfs jar. At this time, submit will download the remote jar to the local and pass it to spark-shell through the `spark.repl.local.jars` configuration. In this pr, I localize the remote jar url at first, and then add it into repl class path. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #3292 from turboFei/scala_npe. Closes #3064 095c40d8 [Fei Wang] filter 37637f76 [Fei Wang] add non-exist ec87ea85 [Fei Wang] add ut 5a823d3b [Fei Wang] save Authored-by: Fei Wang Signed-off-by: Fei Wang --- .../engine/spark/operation/ExecuteScala.scala | 24 +++++++++- .../kyuubi/operation/SparkQueryTests.scala | 2 +- .../apache/kyuubi/WithSecuredDFSService.scala | 1 + .../apache/kyuubi/WithSimpleDFSService.scala | 1 + .../KyuubiOperationPerUserSuite.scala | 47 ++++++++++++++++++- .../apache/kyuubi/server/MiniDFSService.scala | 25 +++++++++- 6 files changed, 93 insertions(+), 7 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala index a67f7295e..0ef31a897 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala @@ -17,8 +17,13 @@ package org.apache.kyuubi.engine.spark.operation +import java.io.File + +import scala.reflect.internal.util.ScalaClassLoader.URLClassLoader import scala.tools.nsc.interpreter.Results.{Error, Incomplete, Success} +import org.apache.hadoop.fs.Path +import org.apache.spark.SparkFiles import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType @@ -64,8 +69,23 @@ class ExecuteScala( if (legacyOutput.nonEmpty) { warn(s"Clearing legacy output from last interpreting:\n $legacyOutput") } - val jars = spark.sharedState.jarClassLoader.getURLs - repl.addUrlsToClassPath(jars: _*) + val replUrls = repl.classLoader.getParent.asInstanceOf[URLClassLoader].getURLs + spark.sharedState.jarClassLoader.getURLs.filterNot(replUrls.contains).foreach { jar => + try { + if ("file".equals(jar.toURI.getScheme)) { + repl.addUrlsToClassPath(jar) + } else { + spark.sparkContext.addFile(jar.toString) + val localJarFile = new File(SparkFiles.get(new Path(jar.toURI.getPath).getName)) + val localJarUrl = localJarFile.toURI.toURL + if (!replUrls.contains(localJarUrl)) { + repl.addUrlsToClassPath(localJarUrl) + } + } + } catch { + case e: Throwable => error(s"Error adding $jar to repl class path", e) + } + } repl.interpretWithRedirectOutError(statement) match { case Success => diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala index f0407cb52..726948485 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala @@ -34,7 +34,7 @@ trait SparkQueryTests extends HiveJDBCTestHelper { protected lazy val SPARK_ENGINE_MAJOR_MINOR_VERSION: (Int, Int) = sparkEngineMajorMinorVersion - protected lazy val httpMode = false; + protected lazy val httpMode = false test("execute statement - select null") { withJdbcStatement() { statement => diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala index cfea0d2bb..43bea7c6a 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala @@ -60,4 +60,5 @@ trait WithSecuredDFSService extends KerberizedTestHelper { } def getHadoopConf: Configuration = miniDFSService.getHadoopConf + def getHadoopConfDir: String = miniDFSService.getHadoopConfDir } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala index e7d0fbfe1..0e29f4411 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala @@ -41,6 +41,7 @@ trait WithSimpleDFSService extends KyuubiFunSuite { } def getHadoopConf: Configuration = miniDFSService.getHadoopConf + def getHadoopConfDir: String = miniDFSService.getHadoopConfDir def getDefaultFS: String = miniDFSService.getHadoopConf.get("fs.defaultFS") def getDFSPort: Int = miniDFSService.getDFSPort diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala index 313335005..c2f1be1d4 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala @@ -17,14 +17,20 @@ package org.apache.kyuubi.operation +import java.util.UUID + +import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TStatusCode} import org.scalatest.time.SpanSugar._ -import org.apache.kyuubi.{Utils, WithKyuubiServer} +import org.apache.kyuubi.{Utils, WithKyuubiServer, WithSimpleDFSService} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX +import org.apache.kyuubi.jdbc.hive.KyuubiStatement import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, SessionHandle} -class KyuubiOperationPerUserSuite extends WithKyuubiServer with SparkQueryTests { +class KyuubiOperationPerUserSuite + extends WithKyuubiServer with SparkQueryTests with WithSimpleDFSService { override protected def jdbcUrl: String = getJdbcUrl @@ -32,6 +38,11 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with SparkQueryTests KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "user") } + override def beforeAll(): Unit = { + super.beforeAll() + conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR", getHadoopConfDir) + } + test("kyuubi defined function - system_user/session_user") { withJdbcStatement() { statement => val rs = statement.executeQuery("SELECT system_user(), session_user()") @@ -200,4 +211,36 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with SparkQueryTests } } } + + test("scala NPE issue with hdfs jar") { + val jarDir = Utils.createTempDir().toFile + val udfCode = + """ + |package test.utils + | + |object Math { + |def add(x: Int, y: Int): Int = x + y + |} + | + |""".stripMargin + val jarFile = UserJarTestUtils.createJarFile( + udfCode, + "test", + s"test-function-${UUID.randomUUID}.jar", + jarDir.toString) + val hadoopConf = getHadoopConf + val dfs = FileSystem.get(hadoopConf) + val dfsJarDir = dfs.makeQualified(new Path(s"jars-${UUID.randomUUID()}")) + val localFs = FileSystem.getLocal(hadoopConf) + val localPath = new Path(jarFile.getAbsolutePath) + val dfsJarPath = new Path(dfsJarDir, "test-function.jar") + FileUtil.copy(localFs, localPath, dfs, dfsJarPath, false, false, hadoopConf) + withJdbcStatement() { statement => + val kyuubiStatement = statement.asInstanceOf[KyuubiStatement] + statement.executeQuery(s"add jar $dfsJarPath") + val rs = kyuubiStatement.executeScala("println(test.utils.Math.add(1,2))") + rs.next() + assert(rs.getString(1) === "3") + } + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala index d7f01e728..caacbb6bf 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala @@ -17,19 +17,24 @@ package org.apache.kyuubi.server +import java.io.{File, FileWriter} +import java.net.InetAddress + +import scala.collection.JavaConverters._ + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hdfs.MiniDFSCluster import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys import org.apache.hadoop.security.UserGroupInformation -import org.apache.kyuubi.Logging +import org.apache.kyuubi.{Logging, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.service.AbstractService class MiniDFSService(name: String, hdfsConf: Configuration) extends AbstractService(name) with Logging { - + private val hadoopConfDir: File = Utils.createTempDir().toFile private var hdfsCluster: MiniDFSCluster = _ def this(hdfsConf: Configuration = new Configuration()) = @@ -55,6 +60,7 @@ class MiniDFSService(name: String, hdfsConf: Configuration) s"NameNode address in configuration is " + s"${hdfsConf.get(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY)}") super.start() + saveHadoopConf() } override def stop(): Unit = { @@ -62,6 +68,21 @@ class MiniDFSService(name: String, hdfsConf: Configuration) super.stop() } + private def saveHadoopConf(): Unit = { + val configToWrite = new Configuration(false) + val hostName = InetAddress.getLocalHost.getHostName + hdfsConf.iterator().asScala.foreach { kv => + val key = kv.getKey + val value = kv.getValue.replaceAll(hostName, "localhost") + configToWrite.set(key, value) + getConf.set(key, value) + } + val writer = new FileWriter(new File(hadoopConfDir, "hdfs-site.xml")) + configToWrite.writeXml(writer) + writer.close() + } + def getHadoopConf: Configuration = hdfsConf def getDFSPort: Int = hdfsCluster.getNameNodePort + def getHadoopConfDir: String = hadoopConfDir.getAbsolutePath }