diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index f6966e141..46cbfd65f 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -98,6 +98,7 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio } override protected def beforeRun(): Unit = { + Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader) setHasResultSet(true) setState(OperationState.RUNNING) OperationLog.setCurrentOperationLog(operationLog) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala index 09e636412..e3b839547 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala @@ -18,7 +18,6 @@ package org.apache.kyuubi import java.io.{File, InputStreamReader, IOException} -import java.net.{URI, URISyntaxException} import java.nio.charset.StandardCharsets import java.nio.file.{Files, Path, Paths} import java.util.{Properties, UUID} @@ -44,7 +43,6 @@ private[kyuubi] object Utils extends Logging { env.get(KYUUBI_CONF_DIR) .orElse(env.get(KYUUBI_HOME).map(_ + File.separator + "/conf")) .map( d => new File(d + File.separator + KYUUBI_CONF_FILE_NAME)) - .filter(f => f.exists() && f.isFile) .orElse { Option(getClass.getClassLoader.getResource(KYUUBI_CONF_FILE_NAME)).map { url => new File(url.getFile) @@ -55,48 +53,25 @@ private[kyuubi] object Utils extends Logging { def getPropertiesFromFile(file: Option[File]): Map[String, String] = { file.map { f => info(s"Loading Kyuubi properties from ${f.getAbsolutePath}") - val reader = new InputStreamReader(f.toURI.toURL.openStream(), StandardCharsets.UTF_8) try { - val properties = new Properties() - properties.load(reader) - properties.stringPropertyNames().asScala.map { k => - (k, properties.getProperty(k).trim) - }.toMap + val reader = new InputStreamReader(f.toURI.toURL.openStream(), StandardCharsets.UTF_8) + try { + val properties = new Properties() + properties.load(reader) + properties.stringPropertyNames().asScala.map { k => + (k, properties.getProperty(k).trim) + }.toMap + } finally { + reader.close() + } } catch { case e: IOException => throw new KyuubiException( s"Failed when loading Kyuubi properties from ${f.getAbsolutePath}", e) - } finally { - reader.close() } }.getOrElse(Map.empty) } - /** - * Return a well-formed URI for the file described by a user input string. - * - * If the supplied path does not contain a scheme, or is a relative path, it will be - * converted into an absolute path with a file:// scheme. - */ - def resolveURI(path: String): URI = { - try { - val uri = new URI(path) - if (uri.getScheme != null) { - return uri - } - // make sure to handle if the path has a fragment (applies to yarn - // distributed cache) - if (uri.getFragment != null) { - val absoluteURI = new File(uri.getPath).getAbsoluteFile.toURI - return new URI(absoluteURI.getScheme, absoluteURI.getHost, absoluteURI.getPath, - uri.getFragment) - } - } catch { - case _: URISyntaxException => - } - new File(path).getAbsoluteFile.toURI - } - private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 /** diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 0ab215c11..c5bb6e623 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -143,7 +143,7 @@ object KyuubiConf { .doc("The temporary directory for the embedded zookeeper server") .version("1.0.0") .stringConf - .createWithDefault(Utils.resolveURI("embedded_zookeeper").getRawPath) + .createWithDefault("embedded_zookeeper") val SERVER_PRINCIPAL: OptionalConfigEntry[String] = buildConf("server.principal") .doc("") diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/UtilsSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/UtilsSuite.scala index e7bf2b1c0..2cd92f2e2 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/UtilsSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/UtilsSuite.scala @@ -17,7 +17,8 @@ package org.apache.kyuubi -import java.io.File +import java.io.{File, IOException} +import java.nio.file.Files import java.util.Properties class UtilsSuite extends KyuubiFunSuite { @@ -69,26 +70,19 @@ class UtilsSuite extends KyuubiFunSuite { val props = Utils.getPropertiesFromFile(Option(propsFile)) assert(props("kyuubi.yes") === "yes") assert(!props.contains("kyuubi.no")) - } - test("resolveURI") { - def assertResolves(before: String, after: String): Unit = { - // This should test only single paths - assert(before.split(",").length === 1) - def resolve(uri: String): String = Utils.resolveURI(uri).toString - assert(resolve(before) === after) - assert(resolve(after) === after) - // Repeated invocations of resolveURI should yield the same result - assert(resolve(resolve(after)) === after) - assert(resolve(resolve(resolve(after))) === after) + val e = intercept[KyuubiException] { + Utils.getPropertiesFromFile(Some(new File("invalid-file"))) } - assertResolves("hdfs:/root/spark.jar", "hdfs:/root/spark.jar") - assertResolves("hdfs:///root/spark.jar#app.jar", "hdfs:///root/spark.jar#app.jar") - assertResolves("file:/C:/path/to/file.txt", "file:/C:/path/to/file.txt") - assertResolves("file:///C:/path/to/file.txt", "file:///C:/path/to/file.txt") - assertResolves("file:/C:/file.txt#alias.txt", "file:/C:/file.txt#alias.txt") - assertResolves("file:foo", "file:foo") - assertResolves("file:foo:baby", "file:foo:baby") + assert(e.getMessage contains "Failed when loading Kyuubi properties from") } + test("create directory") { + val path = Utils.createDirectory(System.getProperty("java.io.tmpdir")) + assert(Files.exists(path)) + assert(path.getFileName.toString.startsWith("kyuubi-")) + path.toFile.deleteOnExit() + val e = intercept[IOException](Utils.createDirectory("/")) + assert(e.getMessage === "Failed to create a temp directory (under /) after 10 attempts!") + } }