Support capture error from log
This commit is contained in:
parent
396e06a3bb
commit
392754d27b
@ -219,4 +219,20 @@ object KyuubiConf {
|
||||
.doc("Time to back off during login to the frontend service.")
|
||||
.timeConf
|
||||
.createWithDefault(Duration.ofMillis(100).toMillis)
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// SQL Engine Configuration //
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
val ENGINE_SPARK_MAIN_RESOURCE: OptionalConfigEntry[String] =
|
||||
buildConf("engine.spark.main.resource")
|
||||
.doc("The connection string for the zookeeper ensemble")
|
||||
.version("1.0.0")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
val ENGINE_INIT_TIMEOUT: ConfigEntry[Long] = buildConf("engine.initialize.timeout")
|
||||
.doc("Timeout for starting the background engine, e.g. SparkSQLEngine.")
|
||||
.timeConf
|
||||
.createWithDefault(Duration.ofSeconds(60).toMillis)
|
||||
}
|
||||
|
||||
@ -66,7 +66,7 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
|
||||
case RUNNING => startTime = System.currentTimeMillis()
|
||||
case ERROR | FINISHED | CANCELED =>
|
||||
completedTime = System.currentTimeMillis()
|
||||
timeCost = s" ,time taken: ${(completedTime - startTime) / 1000.0} seconds"
|
||||
timeCost = s", time taken: ${(completedTime - startTime) / 1000.0} seconds"
|
||||
case _ =>
|
||||
}
|
||||
info(s"Processing ${session.user}'s query[$statementId]: ${state.name} -> ${newState.name}," +
|
||||
|
||||
@ -1,33 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.engine
|
||||
|
||||
import org.apache.kyuubi.config.{ConfigBuilder, KyuubiConf, OptionalConfigEntry}
|
||||
|
||||
object EngineConf {
|
||||
|
||||
private def buildConf(key: String): ConfigBuilder = KyuubiConf.buildConf(key)
|
||||
|
||||
val ENGINE_SPARK_MAIN_RESOURCE: OptionalConfigEntry[String] =
|
||||
buildConf("engine.spark.main.resource")
|
||||
.doc("The connection string for the zookeeper ensemble")
|
||||
.version("1.0.0")
|
||||
.stringConf
|
||||
.createOptional
|
||||
|
||||
}
|
||||
@ -0,0 +1,121 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.engine
|
||||
|
||||
import java.io.IOException
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.nio.file.{Files, Path, Paths}
|
||||
import java.util.UUID
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.kyuubi.KyuubiSQLException
|
||||
import org.apache.kyuubi.util.NamedThreadFactory
|
||||
|
||||
trait ProcBuilder {
|
||||
import ProcBuilder._
|
||||
|
||||
protected def executable: String
|
||||
|
||||
protected def mainResource: Option[String]
|
||||
|
||||
protected def mainClass: String
|
||||
|
||||
protected def proxyUser: String
|
||||
|
||||
protected def commands: Array[String]
|
||||
|
||||
protected def env: Map[String, String]
|
||||
|
||||
protected def workingDir: Path
|
||||
|
||||
final lazy val processBuilder: ProcessBuilder = {
|
||||
val pb = new ProcessBuilder(commands: _*)
|
||||
|
||||
val envs = pb.environment()
|
||||
envs.putAll(env.asJava)
|
||||
pb.directory(workingDir.toFile)
|
||||
pb
|
||||
}
|
||||
|
||||
private var error: Throwable = UNCAUGHT_ERROR
|
||||
|
||||
final def start: Process = {
|
||||
val procLog = Paths.get(workingDir.toAbsolutePath.toString, UUID.randomUUID().toString)
|
||||
processBuilder.redirectError(procLog.toFile)
|
||||
processBuilder.redirectOutput(procLog.toFile)
|
||||
|
||||
val proc = processBuilder.start()
|
||||
val reader = Files.newBufferedReader(procLog, StandardCharsets.UTF_8)
|
||||
|
||||
val redirect = new Runnable {
|
||||
override def run(): Unit = try {
|
||||
var line: String = reader.readLine
|
||||
while (true) {
|
||||
if (containsIgnoreCase(line, "Exception") && !line.contains("at ")) {
|
||||
val sb = new StringBuilder(line)
|
||||
|
||||
line = reader.readLine()
|
||||
while (line != null && line.startsWith("\tat ")) {
|
||||
sb.append("\n" + line)
|
||||
line = reader.readLine()
|
||||
}
|
||||
|
||||
error = KyuubiSQLException(sb.toString())
|
||||
}
|
||||
line = reader.readLine()
|
||||
}
|
||||
} catch {
|
||||
case _: IOException =>
|
||||
}
|
||||
}
|
||||
|
||||
PROC_BUILD_LOGGER.newThread(redirect).start()
|
||||
proc
|
||||
}
|
||||
|
||||
def getError: Throwable = {
|
||||
if (error == UNCAUGHT_ERROR) {
|
||||
Thread.sleep(3000)
|
||||
}
|
||||
error
|
||||
}
|
||||
}
|
||||
|
||||
object ProcBuilder {
|
||||
private val PROC_BUILD_LOGGER = NamedThreadFactory("process-logger", daemon = true)
|
||||
|
||||
private val UNCAUGHT_ERROR = KyuubiSQLException("Uncaught error")
|
||||
|
||||
def containsIgnoreCase(str: String, searchStr: String): Boolean = {
|
||||
if (str == null || searchStr == null) {
|
||||
false
|
||||
} else {
|
||||
val max = str.length - searchStr.length
|
||||
var i = 0
|
||||
while (i <= max) {
|
||||
if (str.regionMatches(true, i, searchStr, 0, searchStr.length)) {
|
||||
return true
|
||||
}
|
||||
i += 1
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,58 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.engine
|
||||
|
||||
import java.nio.file.{Path, Paths}
|
||||
import java.util.UUID
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
trait ProcessBuilderLike {
|
||||
|
||||
protected def executable: String
|
||||
|
||||
protected def mainResource: Option[String]
|
||||
|
||||
protected def mainClass: String
|
||||
|
||||
protected def proxyUser: String
|
||||
|
||||
protected def commands: Array[String]
|
||||
|
||||
protected def env: Map[String, String]
|
||||
|
||||
protected def workingDir: Path
|
||||
|
||||
final lazy val processBuilder: ProcessBuilder = {
|
||||
val pb = new ProcessBuilder(commands: _*)
|
||||
|
||||
val envs = pb.environment()
|
||||
envs.putAll(env.asJava)
|
||||
|
||||
pb.directory(workingDir.toFile)
|
||||
val procLogFile =
|
||||
Paths.get(workingDir.toAbsolutePath.toString, UUID.randomUUID().toString).toFile
|
||||
pb.redirectError(procLogFile)
|
||||
pb.redirectOutput(procLogFile)
|
||||
pb
|
||||
}
|
||||
|
||||
final def start: Process = {
|
||||
processBuilder.start()
|
||||
}
|
||||
}
|
||||
@ -23,14 +23,14 @@ import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import org.apache.kyuubi._
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.engine.EngineConf.ENGINE_SPARK_MAIN_RESOURCE
|
||||
import org.apache.kyuubi.engine.ProcessBuilderLike
|
||||
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAIN_RESOURCE
|
||||
import org.apache.kyuubi.engine.ProcBuilder
|
||||
|
||||
class SparkProcessBuilder(
|
||||
override val proxyUser: String,
|
||||
conf: Map[String, String],
|
||||
override val env: Map[String, String] = sys.env)
|
||||
extends ProcessBuilderLike {
|
||||
extends ProcBuilder {
|
||||
|
||||
import SparkProcessBuilder._
|
||||
|
||||
@ -75,9 +75,9 @@ class SparkProcessBuilder(
|
||||
|
||||
override protected def workingDir: Path = {
|
||||
env.get("KYUUBI_WORK_DIR_ROOT").map { root =>
|
||||
Utils.createTempDir(root, proxyUser)
|
||||
Utils.createDirectory(root, proxyUser)
|
||||
}.getOrElse {
|
||||
Utils.createTempDir(namePrefix = proxyUser)
|
||||
Utils.createDirectory(System.getProperty("java.io.tmpdir"), proxyUser)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -30,6 +30,7 @@ import org.apache.thrift.transport.{TSocket, TTransport}
|
||||
|
||||
import org.apache.kyuubi.KyuubiSQLException
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf._
|
||||
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
|
||||
import org.apache.kyuubi.ha.client.ServiceDiscovery
|
||||
import org.apache.kyuubi.service.authentication.PlainSASLHelper
|
||||
@ -56,6 +57,7 @@ class KyuubiSessionImpl(
|
||||
|
||||
configureSession()
|
||||
|
||||
private val timeout = sessionConf.get(ENGINE_INIT_TIMEOUT) / 1000
|
||||
private val zkNamespace = s"$zkNamespacePrefix-$user"
|
||||
private val zkPath = ZKPaths.makePath(null, zkNamespace)
|
||||
private lazy val zkClient = ServiceDiscovery.newZookeeperClient(sessionConf)
|
||||
@ -89,10 +91,16 @@ class KyuubiSessionImpl(
|
||||
val builder = new SparkProcessBuilder(user, sessionConf.toSparkPrefixedConf)
|
||||
val process = builder.start
|
||||
var sh = getServerHost
|
||||
var count = 0
|
||||
while (sh.isEmpty) {
|
||||
if (process.waitFor(1, TimeUnit.SECONDS)) {
|
||||
throw KyuubiSQLException("Some error happened")
|
||||
throw builder.getError
|
||||
}
|
||||
if (count >= timeout) {
|
||||
process.destroyForcibly()
|
||||
throw KyuubiSQLException("Timed out to launched Spark")
|
||||
}
|
||||
count += 1
|
||||
sh = getServerHost
|
||||
}
|
||||
val Some((host, port)) = getServerHost
|
||||
|
||||
@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark
|
||||
|
||||
import java.nio.file.{Files, Paths}
|
||||
|
||||
import org.apache.kyuubi.KyuubiFunSuite
|
||||
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf._
|
||||
|
||||
@ -42,6 +42,13 @@ class SparkProcessBuilderSuite extends KyuubiFunSuite {
|
||||
val process = builder.start
|
||||
assert(process.isAlive)
|
||||
process.destroyForcibly()
|
||||
|
||||
val processBuilder = new SparkProcessBuilder("kentyao", conf ++ Map("spark.ui.port" -> "abc"))
|
||||
processBuilder.start
|
||||
val error = processBuilder.getError
|
||||
assert(error.getMessage.contains(
|
||||
"java.lang.IllegalArgumentException: spark.ui.port should be int, but was abc\n\tat"))
|
||||
assert(error.isInstanceOf[KyuubiSQLException])
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user