diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index c17fd9e1a..0ab215c11 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -219,4 +219,20 @@ object KyuubiConf { .doc("Time to back off during login to the frontend service.") .timeConf .createWithDefault(Duration.ofMillis(100).toMillis) + + ///////////////////////////////////////////////////////////////////////////////////////////////// + // SQL Engine Configuration // + ///////////////////////////////////////////////////////////////////////////////////////////////// + + val ENGINE_SPARK_MAIN_RESOURCE: OptionalConfigEntry[String] = + buildConf("engine.spark.main.resource") + .doc("The connection string for the zookeeper ensemble") + .version("1.0.0") + .stringConf + .createOptional + + val ENGINE_INIT_TIMEOUT: ConfigEntry[Long] = buildConf("engine.initialize.timeout") + .doc("Timeout for starting the background engine, e.g. SparkSQLEngine.") + .timeConf + .createWithDefault(Duration.ofSeconds(60).toMillis) } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index 812ad082b..d43df1226 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -66,7 +66,7 @@ abstract class AbstractOperation(opType: OperationType, session: Session) case RUNNING => startTime = System.currentTimeMillis() case ERROR | FINISHED | CANCELED => completedTime = System.currentTimeMillis() - timeCost = s" ,time taken: ${(completedTime - startTime) / 1000.0} seconds" + timeCost = s", time taken: ${(completedTime - startTime) / 1000.0} seconds" case _ => } info(s"Processing ${session.user}'s query[$statementId]: ${state.name} -> ${newState.name}," + diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineConf.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineConf.scala deleted file mode 100644 index f08e7677f..000000000 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/EngineConf.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.engine - -import org.apache.kyuubi.config.{ConfigBuilder, KyuubiConf, OptionalConfigEntry} - -object EngineConf { - - private def buildConf(key: String): ConfigBuilder = KyuubiConf.buildConf(key) - - val ENGINE_SPARK_MAIN_RESOURCE: OptionalConfigEntry[String] = - buildConf("engine.spark.main.resource") - .doc("The connection string for the zookeeper ensemble") - .version("1.0.0") - .stringConf - .createOptional - -} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala new file mode 100644 index 000000000..13604b502 --- /dev/null +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine + +import java.io.IOException +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths} +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.util.NamedThreadFactory + +trait ProcBuilder { + import ProcBuilder._ + + protected def executable: String + + protected def mainResource: Option[String] + + protected def mainClass: String + + protected def proxyUser: String + + protected def commands: Array[String] + + protected def env: Map[String, String] + + protected def workingDir: Path + + final lazy val processBuilder: ProcessBuilder = { + val pb = new ProcessBuilder(commands: _*) + + val envs = pb.environment() + envs.putAll(env.asJava) + pb.directory(workingDir.toFile) + pb + } + + private var error: Throwable = UNCAUGHT_ERROR + + final def start: Process = { + val procLog = Paths.get(workingDir.toAbsolutePath.toString, UUID.randomUUID().toString) + processBuilder.redirectError(procLog.toFile) + processBuilder.redirectOutput(procLog.toFile) + + val proc = processBuilder.start() + val reader = Files.newBufferedReader(procLog, StandardCharsets.UTF_8) + + val redirect = new Runnable { + override def run(): Unit = try { + var line: String = reader.readLine + while (true) { + if (containsIgnoreCase(line, "Exception") && !line.contains("at ")) { + val sb = new StringBuilder(line) + + line = reader.readLine() + while (line != null && line.startsWith("\tat ")) { + sb.append("\n" + line) + line = reader.readLine() + } + + error = KyuubiSQLException(sb.toString()) + } + line = reader.readLine() + } + } catch { + case _: IOException => + } + } + + PROC_BUILD_LOGGER.newThread(redirect).start() + proc + } + + def getError: Throwable = { + if (error == UNCAUGHT_ERROR) { + Thread.sleep(3000) + } + error + } +} + +object ProcBuilder { + private val PROC_BUILD_LOGGER = NamedThreadFactory("process-logger", daemon = true) + + private val UNCAUGHT_ERROR = KyuubiSQLException("Uncaught error") + + def containsIgnoreCase(str: String, searchStr: String): Boolean = { + if (str == null || searchStr == null) { + false + } else { + val max = str.length - searchStr.length + var i = 0 + while (i <= max) { + if (str.regionMatches(true, i, searchStr, 0, searchStr.length)) { + return true + } + i += 1 + } + false + } + } + +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/ProcessBuilderLike.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/ProcessBuilderLike.scala deleted file mode 100644 index d197cabd2..000000000 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/ProcessBuilderLike.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.engine - -import java.nio.file.{Path, Paths} -import java.util.UUID - -import scala.collection.JavaConverters._ - -trait ProcessBuilderLike { - - protected def executable: String - - protected def mainResource: Option[String] - - protected def mainClass: String - - protected def proxyUser: String - - protected def commands: Array[String] - - protected def env: Map[String, String] - - protected def workingDir: Path - - final lazy val processBuilder: ProcessBuilder = { - val pb = new ProcessBuilder(commands: _*) - - val envs = pb.environment() - envs.putAll(env.asJava) - - pb.directory(workingDir.toFile) - val procLogFile = - Paths.get(workingDir.toAbsolutePath.toString, UUID.randomUUID().toString).toFile - pb.redirectError(procLogFile) - pb.redirectOutput(procLogFile) - pb - } - - final def start: Process = { - processBuilder.start() - } -} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index 61f082ff8..7c31ee916 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -23,14 +23,14 @@ import scala.collection.mutable.ArrayBuffer import org.apache.kyuubi._ import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.engine.EngineConf.ENGINE_SPARK_MAIN_RESOURCE -import org.apache.kyuubi.engine.ProcessBuilderLike +import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAIN_RESOURCE +import org.apache.kyuubi.engine.ProcBuilder class SparkProcessBuilder( override val proxyUser: String, conf: Map[String, String], override val env: Map[String, String] = sys.env) - extends ProcessBuilderLike { + extends ProcBuilder { import SparkProcessBuilder._ @@ -75,9 +75,9 @@ class SparkProcessBuilder( override protected def workingDir: Path = { env.get("KYUUBI_WORK_DIR_ROOT").map { root => - Utils.createTempDir(root, proxyUser) + Utils.createDirectory(root, proxyUser) }.getOrElse { - Utils.createTempDir(namePrefix = proxyUser) + Utils.createDirectory(System.getProperty("java.io.tmpdir"), proxyUser) } } diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index d828393da..9cf04bd91 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -30,6 +30,7 @@ import org.apache.thrift.transport.{TSocket, TTransport} import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.ha.client.ServiceDiscovery import org.apache.kyuubi.service.authentication.PlainSASLHelper @@ -56,6 +57,7 @@ class KyuubiSessionImpl( configureSession() + private val timeout = sessionConf.get(ENGINE_INIT_TIMEOUT) / 1000 private val zkNamespace = s"$zkNamespacePrefix-$user" private val zkPath = ZKPaths.makePath(null, zkNamespace) private lazy val zkClient = ServiceDiscovery.newZookeeperClient(sessionConf) @@ -89,10 +91,16 @@ class KyuubiSessionImpl( val builder = new SparkProcessBuilder(user, sessionConf.toSparkPrefixedConf) val process = builder.start var sh = getServerHost + var count = 0 while (sh.isEmpty) { if (process.waitFor(1, TimeUnit.SECONDS)) { - throw KyuubiSQLException("Some error happened") + throw builder.getError } + if (count >= timeout) { + process.destroyForcibly() + throw KyuubiSQLException("Timed out to launched Spark") + } + count += 1 sh = getServerHost } val Some((host, port)) = getServerHost diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index 920566ead..ffe0787fd 100644 --- a/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark import java.nio.file.{Files, Paths} -import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ @@ -42,6 +42,13 @@ class SparkProcessBuilderSuite extends KyuubiFunSuite { val process = builder.start assert(process.isAlive) process.destroyForcibly() + + val processBuilder = new SparkProcessBuilder("kentyao", conf ++ Map("spark.ui.port" -> "abc")) + processBuilder.start + val error = processBuilder.getError + assert(error.getMessage.contains( + "java.lang.IllegalArgumentException: spark.ui.port should be int, but was abc\n\tat")) + assert(error.isInstanceOf[KyuubiSQLException]) } }