diff --git a/externals/kyuubi-spark-monitor/pom.xml b/externals/kyuubi-spark-monitor/pom.xml index 92e02839d..8c60a529e 100644 --- a/externals/kyuubi-spark-monitor/pom.xml +++ b/externals/kyuubi-spark-monitor/pom.xml @@ -31,6 +31,26 @@ jar Kyuubi Project Spark Monitor + + + org.apache.kyuubi + kyuubi-common + ${project.version} + + + + org.apache.spark + spark-sql_${scala.binary.version} + provided + + + + org.apache.spark + spark-core_${scala.binary.version} + provided + + + target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes diff --git a/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala new file mode 100644 index 000000000..f33fbe93b --- /dev/null +++ b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/KyuubiStatementMonitor.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.monitor + +import java.util.concurrent.ArrayBlockingQueue + +import org.apache.kyuubi.Logging +import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo + +// TODO: Thread Safe need to consider +object KyuubiStatementMonitor extends Logging{ + + /** + * This blockingQueue store kyuubiStatementInfo. + * + * Notice: + * 1. When we remove items from this queue, we should ensure those statements have finished. + * If not, we should put them into this queue again. + * 2. There have two kinds of threshold to trigger when to remove items from this queue: + * a. time + * b. this queue's current size + */ + // TODO: Capacity should make configurable + private val kyuubiStatementQueue = new ArrayBlockingQueue[KyuubiStatementInfo](10) + + /** + * This function is used for putting kyuubiStatementInfo into blockingQueue(statementQueue). + * Every time we put an item into this queue, we should judge this queue's current size at first. + * If the size is less than threshold, we need to remove items from this queue. + * @param kyuubiStatementInfo + */ + // TODO: Lack size type threshold and time type threshold + def putStatementInfoIntoQueue(kyuubiStatementInfo: KyuubiStatementInfo): Unit = { + if (kyuubiStatementQueue.size() >= 7) { + removeAndDumpStatementInfoFromQueue() + } + val isSuccess = kyuubiStatementQueue.add(kyuubiStatementInfo) + info(s"Add kyuubiStatementInfo into queue is [$isSuccess], " + + s"statementId is [${kyuubiStatementInfo.statementId}]") + } + + /** + * This function is used for removing kyuubiStatementInfo from blockingQueue(statementQueue) + * and dumpping them to a file by threshold. + */ + // TODO: Need ensure those items have finished. If not, we should put them into this queue again. + private def removeAndDumpStatementInfoFromQueue(): Unit = { + // TODO: Just for test + kyuubiStatementQueue.clear() + } +} diff --git a/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/entity/KyuubiStatementInfo.scala b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/entity/KyuubiStatementInfo.scala new file mode 100644 index 000000000..2b61cee45 --- /dev/null +++ b/externals/kyuubi-spark-monitor/src/main/scala/org/apache/kyuubi/engine/spark/monitor/entity/KyuubiStatementInfo.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.monitor.entity + +import scala.collection.mutable.Map + +import org.apache.spark.sql.execution.QueryExecution + +import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.cli.HandleIdentifier +import org.apache.kyuubi.operation.OperationState.OperationState + +/** + * This object store the summary infomation about statement. + * You can use statementId to get all jobs' or stages' metric that this statement has. + * @param statementId + * @param statement + * @param appId + * @param sessionId + * @param queryExecution: contains physicalPlan, logicPlan and so on + * @param exception + * @param stateToTime: store this statement's every state and the time of occurrence + */ +case class KyuubiStatementInfo( + statementId: String, + statement: String, + appId: String, + sessionId: HandleIdentifier, + stateToTime: Map[OperationState, Long]) { + + var queryExecution: QueryExecution = null + var exception: KyuubiSQLException = null +} diff --git a/externals/kyuubi-spark-monitor/src/test/resources/log4j.properties b/externals/kyuubi-spark-monitor/src/test/resources/log4j.properties new file mode 100644 index 000000000..fee75dc67 --- /dev/null +++ b/externals/kyuubi-spark-monitor/src/test/resources/log4j.properties @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +log4j.rootLogger=DEBUG, CA, FA + +#Console Appender +log4j.appender.CA=org.apache.log4j.ConsoleAppender +log4j.appender.CA.layout=org.apache.log4j.PatternLayout +log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n +log4j.appender.CA.Threshold = FATAL + +#File Appender +log4j.appender.FA=org.apache.log4j.FileAppender +log4j.appender.FA.append=false +log4j.appender.FA.file=target/unit-tests.log +log4j.appender.FA.layout=org.apache.log4j.PatternLayout +log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n + +# Set the logger level of File Appender to WARN +log4j.appender.FA.Threshold = DEBUG + +# SPARK-34128:Suppress undesirable TTransportException warnings involved in THRIFT-4805 +log4j.appender.console.filter.1=org.apache.log4j.varia.StringMatchFilter +log4j.appender.console.filter.1.StringToMatch=Thrift error occurred during processing of message +log4j.appender.console.filter.1.AcceptOnMatch=false diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml index 3bf12ee42..f9b0e777f 100644 --- a/externals/kyuubi-spark-sql-engine/pom.xml +++ b/externals/kyuubi-spark-sql-engine/pom.xml @@ -49,7 +49,7 @@ kyuubi-spark-monitor ${project.version} - + org.apache.spark spark-core_${scala.binary.version} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 27b897ac5..76bf7c9a8 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -19,6 +19,8 @@ package org.apache.kyuubi.engine.spark.operation import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorService, TimeUnit} +import scala.collection.mutable.Map + import org.apache.spark.kyuubi.SQLOperationListener import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types._ @@ -26,6 +28,8 @@ import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, KyuubiSparkUtil} +import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor +import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo import org.apache.kyuubi.operation.{OperationState, OperationType} import org.apache.kyuubi.operation.OperationState.OperationState import org.apache.kyuubi.operation.log.OperationLog @@ -58,6 +62,11 @@ class ExecuteStatement( private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark) + private val kyuubiStatementInfo = KyuubiStatementInfo( + statementId, statement, spark.sparkContext.applicationId, + session.getTypeInfo.identifier, Map(state -> lastAccessTime)) + KyuubiStatementMonitor.putStatementInfoIntoQueue(kyuubiStatementInfo) + override protected def resultSchema: StructType = { if (result == null || result.schema.isEmpty) { new StructType().add("Result", "string") @@ -84,6 +93,7 @@ class ExecuteStatement( // TODO: Make it configurable spark.sparkContext.addSparkListener(operationListener) result = spark.sql(statement) + kyuubiStatementInfo.queryExecution = result.queryExecution debug(result.queryExecution) iter = new ArrayFetchIterator(result.collect()) setState(OperationState.FINISHED) @@ -156,6 +166,16 @@ class ExecuteStatement( spark.sparkContext.removeSparkListener(operationListener) super.cleanup(targetState) } + + override def setState(newState: OperationState): Unit = { + super.setState(newState) + kyuubiStatementInfo.stateToTime.put(newState, lastAccessTime) + } + + override def setOperationException(opEx: KyuubiSQLException): Unit = { + super.setOperationException(opEx) + kyuubiStatementInfo.exception = opEx + } } object ExecuteStatement { diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala new file mode 100644 index 000000000..f54ac522b --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/KyuubiStatementMonitorSuite.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark + +import java.util.concurrent.ArrayBlockingQueue + +import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TGetOperationStatusReq, TOperationHandle} +import org.apache.hive.service.rpc.thrift.TCLIService.Iface +import org.apache.hive.service.rpc.thrift.TOperationState._ +import org.scalatest.PrivateMethodTester + +import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor +import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiStatementInfo +import org.apache.kyuubi.operation.HiveJDBCTests + +class KyuubiStatementMonitorSuite extends WithSparkSQLEngine with HiveJDBCTests + with PrivateMethodTester { + + override protected def jdbcUrl: String = getJdbcUrl + override def withKyuubiConf: Map[String, String] = Map.empty + + test("add kyuubiStatementInfo into queue and remove them by size type threshold") { + val sql = "select timestamp'2021-06-01'" + val total: Int = 7 + // Clear kyuubiStatementQueue first + val getQueue = PrivateMethod[ + ArrayBlockingQueue[KyuubiStatementInfo]](Symbol("kyuubiStatementQueue"))() + val kyuubiStatementQueue = KyuubiStatementMonitor.invokePrivate(getQueue) + kyuubiStatementQueue.clear() + withSessionHandle { (client, handle) => + for ( a <- 1 to total ) { + val req = new TExecuteStatementReq() + req.setSessionHandle(handle) + req.setStatement(sql) + val tExecuteStatementResp = client.ExecuteStatement(req) + val operationHandle = tExecuteStatementResp.getOperationHandle + waitForOperationToComplete(client, operationHandle) + } + + var iterator = kyuubiStatementQueue.iterator() + while (iterator.hasNext) { + val kyuubiStatementInfo = iterator.next() + assert(kyuubiStatementInfo.statement !== null) + assert(kyuubiStatementInfo.statementId !== null) + assert(kyuubiStatementInfo.sessionId !== null) + assert(kyuubiStatementInfo.queryExecution !== null) + assert(kyuubiStatementInfo.stateToTime.size === 4) + } + iterator = null + + // Test for clear kyuubiStatementQueue + // This function is used for avoiding mem leak + val req = new TExecuteStatementReq() + req.setSessionHandle(handle) + req.setStatement(sql) + val tExecuteStatementResp = client.ExecuteStatement(req) + val operationHandle = tExecuteStatementResp.getOperationHandle + waitForOperationToComplete(client, operationHandle) + + assert(kyuubiStatementQueue.size() === 1) + } + } + + private def waitForOperationToComplete(client: Iface, op: TOperationHandle): Unit = { + val req = new TGetOperationStatusReq(op) + var state = client.GetOperationStatus(req).getOperationState + while (state == INITIALIZED_STATE || state == PENDING_STATE || state == RUNNING_STATE) { + state = client.GetOperationStatus(req).getOperationState + } + } +}