From f8b8c6c172e7836540e01483feb0ef8f1fc6a13d Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 23 Sep 2020 17:41:51 +0800 Subject: [PATCH] Print spark engine dignosis at client side --- .../kyuubi/engine/spark/KyuubiSparkUtil.scala | 38 +++++++++++++++++++ .../kyuubi/engine/spark/SparkSQLEngine.scala | 10 ++--- .../spark/operation/ExecuteStatement.scala | 2 + .../session/SparkSQLSessionManager.scala | 10 +---- .../kyuubi/session/AbstractSession.scala | 14 +++---- 5 files changed, 51 insertions(+), 23 deletions(-) create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala new file mode 100644 index 000000000..e76e2f7d4 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark + +import java.time.Instant + +import org.apache.spark.sql.SparkSession + +object KyuubiSparkUtil { + + def diagnostics(spark: SparkSession): String = { + s""" + | Spark application name: ${spark.sparkContext.appName} + | application ID: ${spark.sparkContext.applicationId} + | application web UI: ${spark.sparkContext.uiWebUrl.getOrElse("")} + | master: ${spark.sparkContext.master} + | deploy mode: ${spark.sparkContext.deployMode} + | version: ${spark.sparkContext.version} + | Start time: ${Instant.ofEpochMilli(spark.sparkContext.startTime)} + | User: ${spark.sparkContext.sparkUser} + |""".stripMargin + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index fdd231081..8d02ec801 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.engine.spark -import java.time.LocalDateTime +import java.time.LocalTime import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession @@ -51,11 +51,7 @@ object SparkSQLEngine extends Logging { sparkConf.setIfMissing("spark.master", "local") sparkConf.setIfMissing("spark.ui.port", "0") - val appName = Seq( - "kyuubi", - user, - classOf[SparkSQLEngine].getSimpleName, - LocalDateTime.now).mkString("_") + val appName = s"kyuubi_${user}_spark_${LocalTime.now}" sparkConf.setAppName(appName) @@ -105,7 +101,6 @@ object SparkSQLEngine extends Logging { serviceDiscovery.start() sys.addShutdownHook(serviceDiscovery.stop()) } - } def main(args: Array[String]): Unit = { @@ -117,6 +112,7 @@ object SparkSQLEngine extends Logging { spark = createSpark() engine = startEngine(spark) exposeEngine(engine) + info(KyuubiSparkUtil.diagnostics(spark)) } catch { case t: Throwable => error("Error start SparkSQLEngine", t) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 090cb1958..02944d5b7 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types.StructType import org.apache.kyuubi.Logging +import org.apache.kyuubi.engine.spark.KyuubiSparkUtil import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.session.Session @@ -42,6 +43,7 @@ class ExecuteStatement( override protected def runInternal(): Unit = { try { + info(KyuubiSparkUtil.diagnostics(spark)) spark.sparkContext.setJobGroup(statementId, statement) result = spark.sql(statement) iter = result.collect().toList.iterator diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala index 1ccb8e71a..42f0c5253 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala @@ -17,8 +17,6 @@ package org.apache.kyuubi.engine.spark.session -import scala.util.control.NonFatal - import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.spark.sql.SparkSession @@ -64,12 +62,8 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) s" $getOpenSessionCount") handle } catch { - case NonFatal(e) => - try { - sessionImpl.close() - } catch { - case t: Throwable => warn(s"Error closing session $handle for $user", t) - } + case e: Exception => + sessionImpl.close() throw KyuubiSQLException(s"Error opening session $handle for $user: ${e.getMessage}", e) } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala index 8a15be100..66480e1b6 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala @@ -18,7 +18,6 @@ package org.apache.kyuubi.session import scala.collection.JavaConverters._ -import scala.util.control.NonFatal import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion, TRowSet, TTableSchema} @@ -75,12 +74,12 @@ abstract class AbstractSession( } override def close(): Unit = withAcquireRelease() { - opHandleSet.forEach { operationHandle => + opHandleSet.forEach { opHandle => try { - sessionManager.operationManager.closeOperation(operationHandle) + sessionManager.operationManager.closeOperation(opHandle) } catch { - case NonFatal(e) => - warn(s"Error closing operation $operationHandle during closing $handle", e) + case e: Exception => + warn(s"Error closing operation $opHandle during closing $handle for", e) } } } @@ -106,7 +105,7 @@ abstract class AbstractSession( case TGetInfoType.CLI_MAX_COLUMN_NAME_LEN | TGetInfoType.CLI_MAX_SCHEMA_NAME_LEN | TGetInfoType.CLI_MAX_TABLE_NAME_LEN => TGetInfoValue.lenValue(128) - case _ => throw KyuubiSQLException(s"Unrecognized GetInfoType value: ${infoType}") + case _ => throw KyuubiSQLException(s"Unrecognized GetInfoType value: $infoType") } } @@ -219,8 +218,7 @@ abstract class AbstractSession( try { op.close() } catch { - case e: Exception => - warn(s"Error closing timed-out operation ${op.getHandle}", e) + case e: Exception => warn(s"Error closing timed-out operation ${op.getHandle}", e) } } }