Print spark engine dignosis at client side

This commit is contained in:
Kent Yao 2020-09-23 17:41:51 +08:00
parent 53205a43cc
commit f8b8c6c172
5 changed files with 51 additions and 23 deletions

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark
import java.time.Instant
import org.apache.spark.sql.SparkSession
object KyuubiSparkUtil {
def diagnostics(spark: SparkSession): String = {
s"""
| Spark application name: ${spark.sparkContext.appName}
| application ID: ${spark.sparkContext.applicationId}
| application web UI: ${spark.sparkContext.uiWebUrl.getOrElse("")}
| master: ${spark.sparkContext.master}
| deploy mode: ${spark.sparkContext.deployMode}
| version: ${spark.sparkContext.version}
| Start time: ${Instant.ofEpochMilli(spark.sparkContext.startTime)}
| User: ${spark.sparkContext.sparkUser}
|""".stripMargin
}
}

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.spark
import java.time.LocalDateTime
import java.time.LocalTime
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
@ -51,11 +51,7 @@ object SparkSQLEngine extends Logging {
sparkConf.setIfMissing("spark.master", "local")
sparkConf.setIfMissing("spark.ui.port", "0")
val appName = Seq(
"kyuubi",
user,
classOf[SparkSQLEngine].getSimpleName,
LocalDateTime.now).mkString("_")
val appName = s"kyuubi_${user}_spark_${LocalTime.now}"
sparkConf.setAppName(appName)
@ -105,7 +101,6 @@ object SparkSQLEngine extends Logging {
serviceDiscovery.start()
sys.addShutdownHook(serviceDiscovery.stop())
}
}
def main(args: Array[String]): Unit = {
@ -117,6 +112,7 @@ object SparkSQLEngine extends Logging {
spark = createSpark()
engine = startEngine(spark)
exposeEngine(engine)
info(KyuubiSparkUtil.diagnostics(spark))
} catch {
case t: Throwable =>
error("Error start SparkSQLEngine", t)

View File

@ -21,6 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.session.Session
@ -42,6 +43,7 @@ class ExecuteStatement(
override protected def runInternal(): Unit = {
try {
info(KyuubiSparkUtil.diagnostics(spark))
spark.sparkContext.setJobGroup(statementId, statement)
result = spark.sql(statement)
iter = result.collect().toList.iterator

View File

@ -17,8 +17,6 @@
package org.apache.kyuubi.engine.spark.session
import scala.util.control.NonFatal
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.spark.sql.SparkSession
@ -64,12 +62,8 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
s" $getOpenSessionCount")
handle
} catch {
case NonFatal(e) =>
try {
sessionImpl.close()
} catch {
case t: Throwable => warn(s"Error closing session $handle for $user", t)
}
case e: Exception =>
sessionImpl.close()
throw KyuubiSQLException(s"Error opening session $handle for $user: ${e.getMessage}", e)
}
}

View File

@ -18,7 +18,6 @@
package org.apache.kyuubi.session
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion, TRowSet, TTableSchema}
@ -75,12 +74,12 @@ abstract class AbstractSession(
}
override def close(): Unit = withAcquireRelease() {
opHandleSet.forEach { operationHandle =>
opHandleSet.forEach { opHandle =>
try {
sessionManager.operationManager.closeOperation(operationHandle)
sessionManager.operationManager.closeOperation(opHandle)
} catch {
case NonFatal(e) =>
warn(s"Error closing operation $operationHandle during closing $handle", e)
case e: Exception =>
warn(s"Error closing operation $opHandle during closing $handle for", e)
}
}
}
@ -106,7 +105,7 @@ abstract class AbstractSession(
case TGetInfoType.CLI_MAX_COLUMN_NAME_LEN |
TGetInfoType.CLI_MAX_SCHEMA_NAME_LEN |
TGetInfoType.CLI_MAX_TABLE_NAME_LEN => TGetInfoValue.lenValue(128)
case _ => throw KyuubiSQLException(s"Unrecognized GetInfoType value: ${infoType}")
case _ => throw KyuubiSQLException(s"Unrecognized GetInfoType value: $infoType")
}
}
@ -219,8 +218,7 @@ abstract class AbstractSession(
try {
op.close()
} catch {
case e: Exception =>
warn(s"Error closing timed-out operation ${op.getHandle}", e)
case e: Exception => warn(s"Error closing timed-out operation ${op.getHandle}", e)
}
}
}