[KYUUBI #3064] Fix scala NPE issue when adding non-local jar URI to class loader

### _Why are the changes needed?_

Close #3064

Refer the comments by cxzl25

Spark uses the code
 `URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory(hadoopConf))` supports the class path from HDFS.
But because the scala compiler only supports adding file schema urls to the class path, non-file schema urls will cause NPE.

```java
Error: Error operating ExecuteScala: java.lang.NullPointerException
	at scala.tools.nsc.classpath.FileUtils$AbstractFileOps$.isJarOrZip$extension(FileUtils.scala:32)
	at scala.tools.nsc.classpath.ClassPathFactory$.newClassPath(ClassPathFactory.scala:90)
	at scala.tools.nsc.Global.$anonfun$extendCompilerClassPath$1(Global.scala:832)
```

scala.tools.nsc.Global#extendCompilerClassPath
```scala
AbstractFile.getURL(u)
```

scala.reflect.io.AbstractFile#getURL
```scala
  def getURL(url: URL): AbstractFile =
    if (url.getProtocol == "file") {
      val f = new java.io.File(url.toURI)
      if (f.isDirectory) getDirectory(f)
      else getFile(f)
    } else null
```

spark-shell supports --jars hdfs jar. At this time, submit will download the remote jar to the local and pass it to spark-shell through the `spark.repl.local.jars` configuration.

In this pr, I localize the remote jar url at first, and then add it into repl class path.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3292 from turboFei/scala_npe.

Closes #3064

095c40d8 [Fei Wang] filter
37637f76 [Fei Wang] add non-exist
ec87ea85 [Fei Wang] add ut
5a823d3b [Fei Wang] save

Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Fei Wang <fwang12@ebay.com>
This commit is contained in:
Fei Wang 2022-09-06 17:29:52 +08:00
parent 9e0af1423b
commit f61806283f
6 changed files with 93 additions and 7 deletions

View File

@ -17,8 +17,13 @@
package org.apache.kyuubi.engine.spark.operation
import java.io.File
import scala.reflect.internal.util.ScalaClassLoader.URLClassLoader
import scala.tools.nsc.interpreter.Results.{Error, Incomplete, Success}
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkFiles
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
@ -64,8 +69,23 @@ class ExecuteScala(
if (legacyOutput.nonEmpty) {
warn(s"Clearing legacy output from last interpreting:\n $legacyOutput")
}
val jars = spark.sharedState.jarClassLoader.getURLs
repl.addUrlsToClassPath(jars: _*)
val replUrls = repl.classLoader.getParent.asInstanceOf[URLClassLoader].getURLs
spark.sharedState.jarClassLoader.getURLs.filterNot(replUrls.contains).foreach { jar =>
try {
if ("file".equals(jar.toURI.getScheme)) {
repl.addUrlsToClassPath(jar)
} else {
spark.sparkContext.addFile(jar.toString)
val localJarFile = new File(SparkFiles.get(new Path(jar.toURI.getPath).getName))
val localJarUrl = localJarFile.toURI.toURL
if (!replUrls.contains(localJarUrl)) {
repl.addUrlsToClassPath(localJarUrl)
}
}
} catch {
case e: Throwable => error(s"Error adding $jar to repl class path", e)
}
}
repl.interpretWithRedirectOutError(statement) match {
case Success =>

View File

@ -34,7 +34,7 @@ trait SparkQueryTests extends HiveJDBCTestHelper {
protected lazy val SPARK_ENGINE_MAJOR_MINOR_VERSION: (Int, Int) = sparkEngineMajorMinorVersion
protected lazy val httpMode = false;
protected lazy val httpMode = false
test("execute statement - select null") {
withJdbcStatement() { statement =>

View File

@ -60,4 +60,5 @@ trait WithSecuredDFSService extends KerberizedTestHelper {
}
def getHadoopConf: Configuration = miniDFSService.getHadoopConf
def getHadoopConfDir: String = miniDFSService.getHadoopConfDir
}

View File

@ -41,6 +41,7 @@ trait WithSimpleDFSService extends KyuubiFunSuite {
}
def getHadoopConf: Configuration = miniDFSService.getHadoopConf
def getHadoopConfDir: String = miniDFSService.getHadoopConfDir
def getDefaultFS: String = miniDFSService.getHadoopConf.get("fs.defaultFS")
def getDFSPort: Int = miniDFSService.getDFSPort

View File

@ -17,14 +17,20 @@
package org.apache.kyuubi.operation
import java.util.UUID
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TStatusCode}
import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.{Utils, WithKyuubiServer}
import org.apache.kyuubi.{Utils, WithKyuubiServer, WithSimpleDFSService}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, SessionHandle}
class KyuubiOperationPerUserSuite extends WithKyuubiServer with SparkQueryTests {
class KyuubiOperationPerUserSuite
extends WithKyuubiServer with SparkQueryTests with WithSimpleDFSService {
override protected def jdbcUrl: String = getJdbcUrl
@ -32,6 +38,11 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with SparkQueryTests
KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "user")
}
override def beforeAll(): Unit = {
super.beforeAll()
conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR", getHadoopConfDir)
}
test("kyuubi defined function - system_user/session_user") {
withJdbcStatement() { statement =>
val rs = statement.executeQuery("SELECT system_user(), session_user()")
@ -200,4 +211,36 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with SparkQueryTests
}
}
}
test("scala NPE issue with hdfs jar") {
val jarDir = Utils.createTempDir().toFile
val udfCode =
"""
|package test.utils
|
|object Math {
|def add(x: Int, y: Int): Int = x + y
|}
|
|""".stripMargin
val jarFile = UserJarTestUtils.createJarFile(
udfCode,
"test",
s"test-function-${UUID.randomUUID}.jar",
jarDir.toString)
val hadoopConf = getHadoopConf
val dfs = FileSystem.get(hadoopConf)
val dfsJarDir = dfs.makeQualified(new Path(s"jars-${UUID.randomUUID()}"))
val localFs = FileSystem.getLocal(hadoopConf)
val localPath = new Path(jarFile.getAbsolutePath)
val dfsJarPath = new Path(dfsJarDir, "test-function.jar")
FileUtil.copy(localFs, localPath, dfs, dfsJarPath, false, false, hadoopConf)
withJdbcStatement() { statement =>
val kyuubiStatement = statement.asInstanceOf[KyuubiStatement]
statement.executeQuery(s"add jar $dfsJarPath")
val rs = kyuubiStatement.executeScala("println(test.utils.Math.add(1,2))")
rs.next()
assert(rs.getString(1) === "3")
}
}
}

View File

@ -17,19 +17,24 @@
package org.apache.kyuubi.server
import java.io.{File, FileWriter}
import java.net.InetAddress
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.MiniDFSCluster
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi.Logging
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.AbstractService
class MiniDFSService(name: String, hdfsConf: Configuration)
extends AbstractService(name)
with Logging {
private val hadoopConfDir: File = Utils.createTempDir().toFile
private var hdfsCluster: MiniDFSCluster = _
def this(hdfsConf: Configuration = new Configuration()) =
@ -55,6 +60,7 @@ class MiniDFSService(name: String, hdfsConf: Configuration)
s"NameNode address in configuration is " +
s"${hdfsConf.get(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY)}")
super.start()
saveHadoopConf()
}
override def stop(): Unit = {
@ -62,6 +68,21 @@ class MiniDFSService(name: String, hdfsConf: Configuration)
super.stop()
}
private def saveHadoopConf(): Unit = {
val configToWrite = new Configuration(false)
val hostName = InetAddress.getLocalHost.getHostName
hdfsConf.iterator().asScala.foreach { kv =>
val key = kv.getKey
val value = kv.getValue.replaceAll(hostName, "localhost")
configToWrite.set(key, value)
getConf.set(key, value)
}
val writer = new FileWriter(new File(hadoopConfDir, "hdfs-site.xml"))
configToWrite.writeXml(writer)
writer.close()
}
def getHadoopConf: Configuration = hdfsConf
def getDFSPort: Int = hdfsCluster.getNameNodePort
def getHadoopConfDir: String = hadoopConfDir.getAbsolutePath
}