From e032a6218fcf0c4fa69fbe5510bd6c6f1f51cf5f Mon Sep 17 00:00:00 2001 From: yanghua Date: Fri, 14 Jan 2022 12:46:15 +0800 Subject: [PATCH] =?UTF-8?q?[KYUUBI=20#1757]=20Extract=20workingDir=20from?= =?UTF-8?q?=20engine=20specific=20process=20builde=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …r into ProcBuilder ### _Why are the changes needed?_ ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1758 from yanghua/KYUUBI-1757. Closes #1757 14d3673c [yanghua] Fix CI issue d6696919 [yanghua] [KYUUBI #1757] Extract workingDir from engine specific process builder into ProcBuilder Authored-by: yanghua Signed-off-by: Kent Yao --- .../apache/kyuubi/engine/ProcBuilder.scala | 30 +++++++++++++++++-- .../engine/flink/FlinkProcessBuilder.scala | 28 +---------------- .../engine/spark/SparkProcessBuilder.scala | 28 +---------------- 3 files changed, 29 insertions(+), 57 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala index 55990e05c..87c5a4bf9 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala @@ -20,7 +20,7 @@ package org.apache.kyuubi.engine import java.io.{File, IOException} import java.lang.ProcessBuilder.Redirect import java.nio.charset.StandardCharsets -import java.nio.file.{Files, Path} +import java.nio.file.{Files, Path, Paths} import scala.collection.JavaConverters._ import scala.util.matching.Regex @@ -28,7 +28,7 @@ import scala.util.matching.Regex import com.google.common.collect.EvictingQueue import org.apache.commons.lang3.StringUtils.containsIgnoreCase -import org.apache.kyuubi.{KyuubiSQLException, Logging} +import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.util.NamedThreadFactory @@ -57,7 +57,31 @@ trait ProcBuilder { protected val extraEngineLog: Option[OperationLog] - protected val workingDir: Path + protected val workingDir: Path = { + env.get("KYUUBI_WORK_DIR_ROOT").map { root => + val workingRoot = Paths.get(root).toAbsolutePath + if (!Files.exists(workingRoot)) { + debug(s"Creating KYUUBI_WORK_DIR_ROOT at $workingRoot") + Files.createDirectories(workingRoot) + } + if (Files.isDirectory(workingRoot)) { + workingRoot.toString + } else null + }.map { rootAbs => + val working = Paths.get(rootAbs, proxyUser) + if (!Files.exists(working)) { + debug(s"Creating $proxyUser's working directory at $working") + Files.createDirectories(working) + } + if (Files.isDirectory(working)) { + working + } else { + Utils.createTempDir(rootAbs, proxyUser) + } + }.getOrElse { + Utils.createTempDir(namePrefix = proxyUser) + } + } final lazy val processBuilder: ProcessBuilder = { val pb = new ProcessBuilder(commands: _*) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala index 8a02caf76..7fec4e66e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.flink import java.io.{File, FilenameFilter} import java.net.URI -import java.nio.file.{Files, Path, Paths} +import java.nio.file.{Files, Paths} import com.google.common.annotations.VisibleForTesting @@ -99,32 +99,6 @@ class FlinkProcessBuilder( override protected def commands: Array[String] = Array(executable) - override protected val workingDir: Path = { - env.get("KYUUBI_WORK_DIR_ROOT").map { root => - val workingRoot = Paths.get(root).toAbsolutePath - if (!Files.exists(workingRoot)) { - debug(s"Creating KYUUBI_WORK_DIR_ROOT at $workingRoot") - Files.createDirectories(workingRoot) - } - if (Files.isDirectory(workingRoot)) { - workingRoot.toString - } else null - }.map { rootAbs => - val working = Paths.get(rootAbs, proxyUser) - if (!Files.exists(working)) { - debug(s"Creating $proxyUser's working directory at $working") - Files.createDirectories(working) - } - if (Files.isDirectory(working)) { - working - } else { - Utils.createTempDir(rootAbs, proxyUser) - } - }.getOrElse { - Utils.createTempDir(namePrefix = proxyUser) - } - } - override def toString: String = commands.map { case arg if arg.startsWith("--") => s"\\\n\t$arg" case arg => arg diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index e3c491b64..cefeb47b4 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark import java.io.{File, FilenameFilter, IOException} import java.net.URI -import java.nio.file.{Files, Path, Paths} +import java.nio.file.{Files, Paths} import scala.collection.mutable.ArrayBuffer @@ -98,32 +98,6 @@ class SparkProcessBuilder( } } - override protected val workingDir: Path = { - env.get("KYUUBI_WORK_DIR_ROOT").map { root => - val workingRoot = Paths.get(root).toAbsolutePath - if (!Files.exists(workingRoot)) { - debug(s"Creating KYUUBI_WORK_DIR_ROOT at $workingRoot") - Files.createDirectories(workingRoot) - } - if (Files.isDirectory(workingRoot)) { - workingRoot.toString - } else null - }.map { rootAbs => - val working = Paths.get(rootAbs, proxyUser) - if (!Files.exists(working)) { - debug(s"Creating $proxyUser's working directory at $working") - Files.createDirectories(working) - } - if (Files.isDirectory(working)) { - working - } else { - Utils.createTempDir(rootAbs, proxyUser) - } - }.getOrElse { - Utils.createTempDir(namePrefix = proxyUser) - } - } - override protected def commands: Array[String] = { val buffer = new ArrayBuffer[String]() buffer += executable