From 3617657bedf2ab3db7bf180492b37327a4233490 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 13 Aug 2020 10:41:46 +0800 Subject: [PATCH] Add support for operation_log --- .gitignore | 2 + .../main/scala/org/apache/kyuubi/Utils.scala | 21 +-- .../kyuubi/operation/OperationManager.scala | 22 +-- .../src/test/resources/log4j.properties | 2 +- .../apache/kyuubi/KerberizedTestHelper.scala | 2 +- kyuubi-ha/src/test/resources/log4j.properties | 2 +- .../kyuubi/operation/LogDivertAppender.scala | 1 - .../kyuubi/engine/spark/SparkSQLEngine.scala | 2 +- .../spark/operation/SparkOperation.scala | 9 ++ .../operation/SparkSQLOperationManager.scala | 23 ++- .../operation/log/LogDivertAppender.scala | 65 ++++++++ .../spark/operation/log/OperationLog.scala | 140 ++++++++++++++++++ .../session/SparkSQLSessionManager.scala | 4 +- .../spark/session/SparkSessionImpl.scala | 5 +- .../src/test/resources/log4j.properties | 36 +++++ .../operation/log/OperationLogSuite.scala | 103 +++++++++++++ pom.xml | 2 +- 17 files changed, 401 insertions(+), 40 deletions(-) create mode 100644 kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/LogDivertAppender.scala create mode 100644 kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLog.scala create mode 100644 kyuubi-spark-sql-engine/src/test/resources/log4j.properties create mode 100644 kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLogSuite.scala diff --git a/.gitignore b/.gitignore index 9be664088..5ea8dd72a 100644 --- a/.gitignore +++ b/.gitignore @@ -44,3 +44,5 @@ metrics/report.json metrics/.report.json.crc /kyuubi-ha/embedded_zookeeper/ embedded_zookeeper/ +/kyuubi-spark-sql-engine/spark-warehouse/ +/kyuubi-spark-sql-engine/kyuubi_operation_logs/ diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala index 41eb55518..1d23b70fb 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala @@ -20,10 +20,10 @@ package org.apache.kyuubi import java.io.{File, InputStreamReader, IOException} import java.net.{URI, URISyntaxException} import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths} import java.util.{Properties, UUID} import scala.collection.JavaConverters._ -import scala.util.{Success, Try} import org.apache.hadoop.security.UserGroupInformation @@ -98,16 +98,19 @@ private[kyuubi] object Utils extends Logging { * Create a directory inside the given parent directory. The directory is guaranteed to be * newly created, and is not marked for automatic deletion. */ - def createDirectory(root: String, namePrefix: String = "kyuubi"): File = { + def createDirectory(root: String, namePrefix: String = "kyuubi"): Path = { + var error: Exception = null (0 until MAX_DIR_CREATION_ATTEMPTS).foreach { _ => - val dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString) - Try { dir.mkdirs() } match { - case Success(_) => return dir - case _ => + val candidate = Paths.get(root, s"$namePrefix-${UUID.randomUUID()}") + try { + val path = Files.createDirectories(candidate) + return path + } catch { + case e: IOException => error = e } } throw new IOException("Failed to create a temp directory (under " + root + ") after " + - MAX_DIR_CREATION_ATTEMPTS + " attempts!") + MAX_DIR_CREATION_ATTEMPTS + " attempts!", error) } /** @@ -116,9 +119,9 @@ private[kyuubi] object Utils extends Logging { */ def createTempDir( root: String = System.getProperty("java.io.tmpdir"), - namePrefix: String = "kyuubi"): File = { + namePrefix: String = "kyuubi"): Path = { val dir = createDirectory(root, namePrefix) - dir.deleteOnExit() + dir.toFile.deleteOnExit() dir } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala index fc317882c..d7968e706 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala @@ -17,12 +17,9 @@ package org.apache.kyuubi.operation -import java.nio.ByteBuffer - -import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn, TTableSchema} +import org.apache.hive.service.rpc.thrift._ import org.apache.kyuubi.KyuubiSQLException -import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.service.AbstractService import org.apache.kyuubi.session.Session @@ -37,14 +34,6 @@ abstract class OperationManager(name: String) extends AbstractService(name) { private final val handleToOperation = new java.util.HashMap[OperationHandle, Operation]() - override def initialize(conf: KyuubiConf): Unit = { - super.initialize(conf) - } - - override def start(): Unit = super.start() - - override def stop(): Unit = super.stop() - def newExecuteStatementOperation( session: Session, statement: String, @@ -121,14 +110,7 @@ abstract class OperationManager(name: String) extends AbstractService(name) { def getOperationLogRowSet( opHandle: OperationHandle, order: FetchOrientation, - maxRows: Int): TRowSet = { - // TODO: Support fetch log result - val values = new java.util.ArrayList[String] - val tColumn = TColumn.stringVal(new TStringColumn(values, ByteBuffer.allocate(0))) - val tRow = new TRowSet(0, new java.util.ArrayList[TRow](1)) - tRow.addToColumns(tColumn) - tRow - } + maxRows: Int): TRowSet final def removeExpiredOperations(handles: Seq[OperationHandle]): Seq[Operation] = synchronized { handles.map(handleToOperation.get).filter { operation => diff --git a/kyuubi-common/src/test/resources/log4j.properties b/kyuubi-common/src/test/resources/log4j.properties index c0f79896c..b94ca7ad9 100644 --- a/kyuubi-common/src/test/resources/log4j.properties +++ b/kyuubi-common/src/test/resources/log4j.properties @@ -30,7 +30,7 @@ log4j.appender.FA=org.apache.log4j.FileAppender log4j.appender.FA.append=false log4j.appender.FA.file=target/unit-tests.log log4j.appender.FA.layout=org.apache.log4j.PatternLayout -log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n +log4j.appender.FA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %t %p %c{2}: %m%n # Set the logger level of File Appender to WARN log4j.appender.FA.Threshold = DEBUG \ No newline at end of file diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala index b34ea1d74..0a35ba801 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/KerberizedTestHelper.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.security.UserGroupInformation trait KerberizedTestHelper { var kdc: MiniKdc = _ val baseDir: File = Utils.createTempDir( - this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath, "kyuubi-kdc") + this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath, "kyuubi-kdc").toFile try { val kdcConf = MiniKdc.createConf() diff --git a/kyuubi-ha/src/test/resources/log4j.properties b/kyuubi-ha/src/test/resources/log4j.properties index c0f79896c..b94ca7ad9 100644 --- a/kyuubi-ha/src/test/resources/log4j.properties +++ b/kyuubi-ha/src/test/resources/log4j.properties @@ -30,7 +30,7 @@ log4j.appender.FA=org.apache.log4j.FileAppender log4j.appender.FA.append=false log4j.appender.FA.file=target/unit-tests.log log4j.appender.FA.layout=org.apache.log4j.PatternLayout -log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n +log4j.appender.FA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %t %p %c{2}: %m%n # Set the logger level of File Appender to WARN log4j.appender.FA.Threshold = DEBUG \ No newline at end of file diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/LogDivertAppender.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/LogDivertAppender.scala index ad6399ed7..44841625d 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/LogDivertAppender.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/LogDivertAppender.scala @@ -41,7 +41,6 @@ class LogDivertAppender extends WriterAppender with Logging { override def decide(loggingEvent: LoggingEvent): Int = { if (OperationLog.getCurrentOperationLog == null) Filter.DENY else Filter.NEUTRAL } - }) /** diff --git a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index dd38b4d3f..fcf4a7e3a 100644 --- a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -121,7 +121,7 @@ private object SparkSQLEngine extends Logging { engine = startEngine(spark) } catch { case t: Throwable => - error("Error start SparkSQLRunner", t) + error("Error start SparkSQLEngine", t) if (engine != null) { engine.stop() } else if (spark != null) { diff --git a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 004879e2a..e1a469874 100644 --- a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.StructType import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.engine.spark.operation.log.OperationLog import org.apache.kyuubi.operation.{AbstractOperation, OperationState} import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.operation.OperationState.OperationState @@ -35,6 +36,11 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio protected var iter: Iterator[Row] = _ + private final val operationLog: OperationLog = + OperationLog.createOperationLog(session.handle, getHandle) + + def getOperationLog: OperationLog = operationLog + protected def statement: String = opType.toString protected def resultSchema: StructType @@ -109,6 +115,7 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio override protected def beforeRun(): Unit = { setHasResultSet(true) setState(OperationState.RUNNING) + OperationLog.setCurrentOperationLog(operationLog) } override protected def afterRun(): Unit = { @@ -117,6 +124,7 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio setState(OperationState.FINISHED) } } + OperationLog.removeCurrentOperationLog() } override def cancel(): Unit = { @@ -125,6 +133,7 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio override def close(): Unit = { cleanup(OperationState.CLOSED) + if (operationLog != null) operationLog.close() } override def getResultSetSchema: TTableSchema = SchemaHelper.toTTableSchema(resultSchema) diff --git a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala index 28b2276a8..fb89d6e2c 100644 --- a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala +++ b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala @@ -19,10 +19,14 @@ package org.apache.kyuubi.engine.spark.operation import java.util.concurrent.ConcurrentHashMap +import org.apache.hive.service.rpc.thrift.TRowSet import org.apache.spark.sql.SparkSession import org.apache.kyuubi.KyuubiSQLException -import org.apache.kyuubi.operation.{Operation, OperationManager} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.spark.operation.log.{LogDivertAppender, OperationLog} +import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationManager} +import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.session.{Session, SessionHandle} class SparkSQLOperationManager private (name: String) extends OperationManager(name) { @@ -112,4 +116,21 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n val op = new GetFunctions(spark, session, catalogName, schemaName, functionName) addOperation(op) } + + override def initialize(conf: KyuubiConf): Unit = { + LogDivertAppender.initialize() + super.initialize(conf) + } + + override def getOperationLogRowSet( + opHandle: OperationHandle, + order: FetchOrientation, + maxRows: Int): TRowSet = { + + val log = getOperation(opHandle).asInstanceOf[SparkOperation].getOperationLog + if (log == null) { + throw KyuubiSQLException(s"Couldn't find log associated with $opHandle") + } + log.read(maxRows) + } } diff --git a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/LogDivertAppender.scala b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/LogDivertAppender.scala new file mode 100644 index 000000000..2918aaef0 --- /dev/null +++ b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/LogDivertAppender.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.operation.log + +import java.io.CharArrayWriter + +import scala.collection.JavaConverters._ + +import org.apache.log4j._ +import org.apache.log4j.spi.{Filter, LoggingEvent} + +class LogDivertAppender extends WriterAppender { + + private final val writer = new CharArrayWriter + + private final val lo = Logger.getRootLogger + .getAllAppenders.asScala + .find(_.isInstanceOf[ConsoleAppender]) + .map(_.asInstanceOf[Appender].getLayout) + .getOrElse(new PatternLayout("%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n")) + + setName("KyuubiSparkSQLEngineLogDivertAppender") + setWriter(writer) + setLayout(lo) + + addFilter(new Filter { + override def decide(loggingEvent: LoggingEvent): Int = { + if (OperationLog.getCurrentOperationLog == null) Filter.DENY else Filter.NEUTRAL + } + }) + + /** + * Overrides WriterAppender.subAppend(), which does the real logging. No need + * to worry about concurrency since log4j calls this synchronously. + */ + override protected def subAppend(event: LoggingEvent): Unit = { + super.subAppend(event) + // That should've gone into our writer. Notify the LogContext. + val logOutput = writer.toString + writer.reset() + val log = OperationLog.getCurrentOperationLog + if (log != null) log.write(logOutput) + } +} + +object LogDivertAppender { + def initialize(): Unit = { + org.apache.log4j.Logger.getRootLogger.addAppender(new LogDivertAppender()) + } +} diff --git a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLog.scala b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLog.scala new file mode 100644 index 000000000..3f37cdf09 --- /dev/null +++ b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLog.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.operation.log + +import java.io.IOException +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths} + +import scala.util.control.NonFatal + +import org.apache.commons.io.FileUtils +import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn} + +import org.apache.kyuubi.{KyuubiSQLException, Logging} +import org.apache.kyuubi.operation.OperationHandle +import org.apache.kyuubi.session.SessionHandle + +/** + * TODO: This part is spark-independent, mv to kyuubi-common? + * we can do this after we decide to support other engines + */ +object OperationLog extends Logging { + + final val LOG_ROOT: String = "kyuubi_operation_logs" + private final val OPERATION_LOG: InheritableThreadLocal[OperationLog] = { + new InheritableThreadLocal[OperationLog] { + override def initialValue(): OperationLog = null + } + } + + def setCurrentOperationLog(operationLog: OperationLog): Unit = { + OPERATION_LOG.set(operationLog) + } + + def getCurrentOperationLog: OperationLog = OPERATION_LOG.get() + + def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove() + + /** + * The operation log root directory, here we choose $PWD/[[LOG_ROOT]]/$sessionId/ as the root per + * session, this directory will delete when JVM exit. + */ + def createOperationLogRootDirectory(sessionHandle: SessionHandle): Unit = { + val path = Paths.get(LOG_ROOT, sessionHandle.identifier.toString) + try { + Files.createDirectories(path) + path.toFile.deleteOnExit() + } catch { + case NonFatal(e) => + error(s"Failed to create operation log root directory: $path", e) + } + } + + /** + * Create the OperationLog for each operation, the log file will be located at + * $PWD/[[LOG_ROOT]]/$sessionId/$operationId + * @return + */ + def createOperationLog(sessionHandle: SessionHandle, opHandle: OperationHandle): OperationLog = { + try { + val logPath = Paths.get(LOG_ROOT, sessionHandle.identifier.toString) + val logFile = Paths.get(logPath.toAbsolutePath.toString, opHandle.identifier.toString) + Files.createFile(logFile) + info(s"Created operation log file $logFile") + new OperationLog(logFile) + } catch { + case e: IOException => + error(s"Failed to create operation log for $opHandle in $sessionHandle", e) + null + } + } +} + +class OperationLog(path: Path) extends Logging { + + private lazy val writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8) + private lazy val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8) + + /** + * write log to the operation log file + */ + def write(msg: String): Unit = synchronized { + writer.write(msg) + writer.flush() + } + + /** + * Read to log file line by line + * + * @param maxRows maximum result number can reach + */ + def read(maxRows: Int): TRowSet = synchronized { + val logs = new java.util.ArrayList[String] + var i = 0 + try { + var line: String = reader.readLine() + while ((i < maxRows || maxRows <= 0) && line != null) { + logs.add(line) + line = reader.readLine() + i += 1 + } + } catch { + case e: IOException => + val absPath = path.toAbsolutePath + val opHandle = absPath.getFileName + throw new KyuubiSQLException(s"Operation[$opHandle] log file $absPath is not found", e) + } + val tColumn = TColumn.stringVal(new TStringColumn(logs, ByteBuffer.allocate(0))) + val tRow = new TRowSet(0, new java.util.ArrayList[TRow](0)) + tRow.addToColumns(tColumn) + tRow + } + + def close(): Unit = synchronized { + try { + reader.close() + writer.close() + FileUtils.forceDelete(path.toFile) + } catch { + case e: IOException => + error(s"Failed to remove corresponding log file of operation: ${path.toAbsolutePath}", e) + } + } +} diff --git a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala index c6bb9fab4..8300ddc7a 100644 --- a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala +++ b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala @@ -26,7 +26,6 @@ import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager import org.apache.kyuubi.session.{SessionHandle, SessionManager} - /** * A [[SessionManager]] constructed with [[SparkSession]] which give it the ability to talk with * Spark and let Spark do all the rest heavy work :) @@ -54,9 +53,9 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) val sparkSession = spark.newSession() conf.foreach { case (key, value) => spark.conf.set(key, value)} operationManager.setSparkSession(handle, sparkSession) + sessionImpl.open() info(s"$user's session with $handle is opened, current opening sessions" + s" $getOpenSessionCount") - setSession(handle, sessionImpl) handle } catch { @@ -65,7 +64,6 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) sessionImpl.close() } catch { case t: Throwable => warn(s"Error closing session $handle for $user", t) - } throw KyuubiSQLException(s"Error opening session $handle for $user", e) } diff --git a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala index 82185d40b..b0177aded 100644 --- a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala +++ b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala @@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark.session import org.apache.hive.service.rpc.thrift.TProtocolVersion +import org.apache.kyuubi.engine.spark.operation.log.OperationLog import org.apache.kyuubi.session.{AbstractSession, SessionManager} class SparkSessionImpl( @@ -30,5 +31,7 @@ class SparkSessionImpl( sessionManager: SessionManager) extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) { - override def open(): Unit = {} + override def open(): Unit = { + OperationLog.createOperationLogRootDirectory(handle) + } } diff --git a/kyuubi-spark-sql-engine/src/test/resources/log4j.properties b/kyuubi-spark-sql-engine/src/test/resources/log4j.properties new file mode 100644 index 000000000..b94ca7ad9 --- /dev/null +++ b/kyuubi-spark-sql-engine/src/test/resources/log4j.properties @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +log4j.rootLogger=DEBUG, CA, FA + +#Console Appender +log4j.appender.CA=org.apache.log4j.ConsoleAppender +log4j.appender.CA.layout=org.apache.log4j.PatternLayout +log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n +log4j.appender.CA.Threshold = WARN + + +#File Appender +log4j.appender.FA=org.apache.log4j.FileAppender +log4j.appender.FA.append=false +log4j.appender.FA.file=target/unit-tests.log +log4j.appender.FA.layout=org.apache.log4j.PatternLayout +log4j.appender.FA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %t %p %c{2}: %m%n + +# Set the logger level of File Appender to WARN +log4j.appender.FA.Threshold = DEBUG \ No newline at end of file diff --git a/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLogSuite.scala b/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLogSuite.scala new file mode 100644 index 000000000..51cc001c9 --- /dev/null +++ b/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLogSuite.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.operation.log + +import java.nio.file.{Files, Paths} + +import scala.collection.JavaConverters._ + +import org.apache.hive.service.rpc.thrift.TProtocolVersion + +import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.operation.{OperationHandle, OperationType} +import org.apache.kyuubi.session.SessionHandle + +class OperationLogSuite extends KyuubiFunSuite { + + val msg1 = "This is just a dummy log message 1" + val msg2 = "This is just a dummy log message 2" + + test("create, delete, read and write to operation log") { + val sHandle = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + val oHandle = OperationHandle( + OperationType.EXECUTE_STATEMENT, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + + OperationLog.createOperationLogRootDirectory(sHandle) + assert(Files.exists(Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString))) + assert(Files.isDirectory(Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString))) + + val operationLog = OperationLog.createOperationLog(sHandle, oHandle) + val logFile = + Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString, oHandle.identifier.toString) + assert(Files.exists(logFile)) + + OperationLog.setCurrentOperationLog(operationLog) + assert(OperationLog.getCurrentOperationLog === operationLog) + + OperationLog.removeCurrentOperationLog() + assert(OperationLog.getCurrentOperationLog === null) + + operationLog.write(msg1 + "\n") + val tRowSet1 = operationLog.read(1) + assert(tRowSet1.getColumns.get(0).getStringVal.getValues.get(0) === msg1) + val tRowSet2 = operationLog.read(1) + assert(tRowSet2.getColumns.get(0).getStringVal.getValues.isEmpty) + + operationLog.write(msg1 + "\n") + operationLog.write(msg2 + "\n") + val tRowSet3 = operationLog.read(-1).getColumns.get(0).getStringVal.getValues + assert(tRowSet3.get(0) === msg1) + assert(tRowSet3.get(1) === msg2) + + operationLog.close() + assert(!Files.exists(logFile)) + } + + test("log divert appender") { + val sHandle = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + val oHandle = OperationHandle( + OperationType.EXECUTE_STATEMENT, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + + OperationLog.createOperationLogRootDirectory(sHandle) + val operationLog = OperationLog.createOperationLog(sHandle, oHandle) + OperationLog.setCurrentOperationLog(operationLog) + + LogDivertAppender.initialize() + + info(msg1) + info(msg2) + warn(msg2) + error(msg2) + + val list = + operationLog.read(-1).getColumns.get(0).getStringVal.getValues.asScala.distinct + val logMsg1 = list(0) + assert(logMsg1.contains("INFO") && + logMsg1.contains(classOf[OperationLogSuite].getCanonicalName) && logMsg1.endsWith(msg1)) + val logMsg2 = list(2) + assert(logMsg2.contains("WARN") && logMsg2.endsWith(msg2)) + val logMsg3 = list(3) + assert(logMsg3.contains("ERROR") && logMsg3.endsWith(msg2)) + + OperationLog.removeCurrentOperationLog() + info(msg1) + val list2 = operationLog.read(-1).getColumns.get(0).getStringVal.getValues + assert(list2.isEmpty) + operationLog.close() + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 3a5d40ce4..9efd63c1f 100644 --- a/pom.xml +++ b/pom.xml @@ -504,7 +504,7 @@ . TestSuite.txt - src/test/resources/log4j.properties + file:src/test/resources/log4j.properties ${project.build.directory}/tmp