diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/AbstractOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/AbstractOperation.scala index 1759c4fea..661acb1ea 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/AbstractOperation.scala @@ -20,8 +20,6 @@ package yaooqinn.kyuubi.operation import java.io.{File, FileNotFoundException} import java.util.concurrent.Future -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.session.OperationLog import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.KyuubiConf._ import org.apache.spark.KyuubiSparkUtil._ @@ -164,7 +162,7 @@ abstract class AbstractOperation( } // create OperationLog object with above log file try { - this.operationLog = new OperationLog(this.opHandle.toString, logFile, new HiveConf()) + this.operationLog = new OperationLog(logFile) } catch { case e: FileNotFoundException => warn("Unable to instantiate OperationLog object for operation: " + this.opHandle, e) @@ -172,13 +170,13 @@ abstract class AbstractOperation( return } // register this operationLog - session.getSessionMgr.getOperationMgr.setOperationLog(session.getUserName, this.operationLog) + session.getSessionMgr.getOperationMgr.setOperationLog(this.operationLog) } } private def unregisterOperationLog(): Unit = { if (isOperationLogEnabled) { - session.getSessionMgr.getOperationMgr.unregisterOperationLog(session.getUserName) + session.getSessionMgr.getOperationMgr.unregisterOperationLog() } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/LogDivertAppender.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/LogDivertAppender.scala index 4ef2e8658..487d7a695 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/LogDivertAppender.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/LogDivertAppender.scala @@ -21,49 +21,29 @@ import java.io.CharArrayWriter import scala.collection.JavaConverters._ -import org.apache.hadoop.hive.ql.session.OperationLog import org.apache.log4j._ import org.apache.log4j.spi.{Filter, LoggingEvent} import yaooqinn.kyuubi.Logging class LogDivertAppender extends WriterAppender with Logging { - - private var operationManager: OperationManager = _ - - private class NameFilter( - var operationManager: OperationManager) extends Filter { - override def decide(ev: LoggingEvent): Int = { - val log = operationManager.getOperationLog - if (log == null) return Filter.DENY - val currentLoggingMode = log.getOpLoggingLevel - // If logging is disabled, deny everything. - if (currentLoggingMode == OperationLog.LoggingLevel.NONE) return Filter.DENY - Filter.NEUTRAL - } - } - /** This is where the log message will go to */ private val writer = new CharArrayWriter - private def initLayout(): Unit = { - // There should be a ConsoleAppender. Copy its Layout. - var layout: Layout = null - Logger.getRootLogger.getAllAppenders.asScala.foreach { ap => - if (ap.isInstanceOf[ConsoleAppender]) layout = ap.asInstanceOf[Appender].getLayout - } - this.layout = - Option(layout).getOrElse(new PatternLayout("%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n")) - } + this.layout = Logger.getRootLogger + .getAllAppenders.asScala + .find(_.isInstanceOf[ConsoleAppender]) + .map(_.asInstanceOf[Appender].getLayout) + .getOrElse(new PatternLayout("%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n")) - def this(operationManager: OperationManager) { - this() - initLayout() - setWriter(writer) - setName("SparkLogDivertAppender") - this.operationManager = operationManager - addFilter(new NameFilter(operationManager)) - } + setWriter(writer) + setName("SparkLogDivertAppender") + addFilter(new Filter { + override def decide(loggingEvent: LoggingEvent): Int = { + if (OperationLog.getCurrentOperationLog == null) Filter.DENY else Filter.NEUTRAL + } + + }) /** * Overrides WriterAppender.subAppend(), which does the real logging. No need @@ -74,11 +54,6 @@ class LogDivertAppender extends WriterAppender with Logging { // That should've gone into our writer. Notify the LogContext. val logOutput = writer.toString writer.reset() - val log = operationManager.getOperationLog - if (log == null) { - debug(" ---+++=== Dropped log event from thread " + event.getThreadName) - return - } - log.writeOperationLog(logOutput) + Option(OperationLog.getCurrentOperationLog).foreach(_.write(logOutput)) } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/LogFile.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/LogFile.scala new file mode 100644 index 000000000..21717b039 --- /dev/null +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/LogFile.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.operation + +import java.io.{BufferedReader, File, FileInputStream, FileNotFoundException, FileOutputStream, InputStreamReader, IOException, PrintStream} +import java.util.ArrayList + +import scala.collection.JavaConverters._ + +import org.apache.commons.io.FileUtils +import org.apache.hadoop.io.IOUtils +import org.apache.spark.sql.Row + +import yaooqinn.kyuubi.{KyuubiSQLException, Logging} + +class LogFile private ( + file: File, + private var reader: Option[BufferedReader], + writer: PrintStream, + @volatile private var isRemoved: Boolean = false) extends Logging { + + def this(file: File) = { + this(file, + LogFile.createReader(file, isRemoved = false), + new PrintStream(new FileOutputStream(file))) + } + + private def resetReader(): Unit = { + reader.foreach(IOUtils.closeStream) + reader = None + } + + private def readResults(nLines: Long): Seq[Row] = { + reader = reader.orElse(LogFile.createReader(file, isRemoved)) + + val logs = new ArrayList[Row]() + reader.foreach { r => + var i = 1 + try { + var line: String = r.readLine() + while ((i < nLines || nLines <= 0) && line != null) { + logs.add(Row(line)) + line = r.readLine() + i += 1 + } + } catch { + case e: FileNotFoundException => + val operationHandle = file.getName + val path = file.getAbsolutePath + val msg = if (isRemoved) { + s"Operation[$operationHandle] has been closed and the log file $path has been removed" + } else { + s"Operation[$operationHandle] Log file $path is not found" + } + throw new KyuubiSQLException(msg, e) + } + } + logs.asScala + } + + /** + * Read to log file line by line + * + * @param isFetchFirst reset the BufferReader, if fetching from the beginning of the file if true + * @param maxRows maximum result number can reach + * @return a row seq of lines from the log file + */ + def read(isFetchFirst: Boolean, maxRows: Long): Seq[Row] = synchronized { + if (isFetchFirst) resetReader() + + readResults(maxRows) + } + + /** + * write log to the operation log file + */ + def write(msg: String): Unit = { + writer.print(msg) + } + + + def close(): Unit = synchronized { + try { + reader.foreach(_.close()) + writer.close() + if (!isRemoved) { + FileUtils.forceDelete(file) + isRemoved = true + } + } catch { + case e: IOException => + error(s"Failed to remove corresponding log file of operation: ${file.getName}", e) + } + } +} + +object LogFile { + + def createReader(file: File, isRemoved: Boolean): Option[BufferedReader] = try { + Option(new BufferedReader(new InputStreamReader(new FileInputStream(file)))) + } catch { + case e: FileNotFoundException => + val operationHandle = file.getName + val path = file.getAbsolutePath + val msg = if (isRemoved) { + s"Operation[$operationHandle] has been closed and the log file $path has been removed" + } else { + s"Operation[$operationHandle] Log file $path is not found" + } + throw new KyuubiSQLException(msg, e) + } +} diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationLog.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationLog.scala new file mode 100644 index 000000000..663a711f2 --- /dev/null +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationLog.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package yaooqinn.kyuubi.operation + +import java.io.File + +import org.apache.spark.sql.Row + +import yaooqinn.kyuubi.Logging + +class OperationLog(logFile: LogFile) extends Logging { + + def this(file: File) = this(new LogFile(file)) + + def write(msg: String): Unit = logFile.write(msg) + + def read(isFetchFirst: Boolean, maxRows: Long): Seq[Row] = { + logFile.read(isFetchFirst, maxRows) + } + + def close(): Unit = logFile.close() +} + +object OperationLog { + private final val OPERATION_LOG = new ThreadLocal[OperationLog] { + override def initialValue(): OperationLog = null + } + + def setCurrentOperationLog(operationLog: OperationLog): Unit = { + OPERATION_LOG.set(operationLog) + } + + def getCurrentOperationLog: OperationLog = OPERATION_LOG.get() + + def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove() +} diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala index ea168aabb..f95f0b9a2 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala @@ -20,13 +20,9 @@ package yaooqinn.kyuubi.operation import java.sql.SQLException import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConverters._ - -import org.apache.hadoop.hive.ql.session.OperationLog import org.apache.log4j.Logger import org.apache.spark.{KyuubiSparkUtil, SparkConf} import org.apache.spark.KyuubiConf._ -import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import yaooqinn.kyuubi.{KyuubiSQLException, Logging} @@ -45,7 +41,6 @@ private[kyuubi] class OperationManager private(name: String) private lazy val logSchema: StructType = new StructType().add("operation_log", "string") private val handleToOperation = new ConcurrentHashMap[OperationHandle, KyuubiOperation] - private val userToOperationLog = new ConcurrentHashMap[String, OperationLog]() override def init(conf: SparkConf): Unit = synchronized { if (conf.get(LOGGING_OPERATION_ENABLED.key).toBoolean) { @@ -58,33 +53,13 @@ private[kyuubi] class OperationManager private(name: String) private def initOperationLogCapture(): Unit = { // Register another Appender (with the same layout) that talks to us. - val ap = new LogDivertAppender(this) + val ap = new LogDivertAppender Logger.getRootLogger.addAppender(ap) } - private def getOperationLogByThread: OperationLog = OperationLog.getCurrentOperationLog + def setOperationLog(log: OperationLog): Unit = OperationLog.setCurrentOperationLog(log) - private def getOperationLogByName: OperationLog = { - if (!userToOperationLog.isEmpty) { - userToOperationLog.get(KyuubiSparkUtil.getCurrentUserName) - } else { - null - } - } - - def getOperationLog: OperationLog = { - Option(getOperationLogByThread).getOrElse(getOperationLogByName) - } - - def setOperationLog(user: String, log: OperationLog): Unit = { - OperationLog.setCurrentOperationLog(log) - userToOperationLog.put(Option(user).getOrElse(KyuubiSparkUtil.getCurrentUserName), log) - } - - def unregisterOperationLog(user: String): Unit = { - OperationLog.removeCurrentOperationLog() - userToOperationLog.remove(user) - } + def unregisterOperationLog(): Unit = OperationLog.removeCurrentOperationLog() def newExecuteStatementOperation( parentSession: KyuubiSession, @@ -231,7 +206,7 @@ private[kyuubi] class OperationManager private(name: String) } try { // convert logs to RowBasedSet - val logs = opLog.readOperationLog(isFetchFirst(orientation), maxRows).asScala.map(Row(_)) + val logs = opLog.read(isFetchFirst(orientation), maxRows) RowSetBuilder.create(logSchema, logs, getOperation(opHandle).getProtocolVersion) } catch { case e: SQLException => diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementOperation.scala index 87588cb66..103d15ac3 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/statement/ExecuteStatementOperation.scala @@ -106,8 +106,7 @@ abstract class ExecuteStatementOperation( getHandle.getHandleIdentifier) isOperationLogEnabled = false } else { - session.getSessionMgr.getOperationMgr - .setOperationLog(session.getUserName, operationLog) + session.getSessionMgr.getOperationMgr.setOperationLog(operationLog) } } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationManagerSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationManagerSuite.scala index 1f5c799eb..8ccbc2f97 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationManagerSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/OperationManagerSuite.scala @@ -19,7 +19,6 @@ package yaooqinn.kyuubi.operation import scala.collection.mutable.ArrayBuffer -import org.apache.hadoop.hive.ql.session.OperationLog import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite} import org.apache.spark.sql.SparkSession import org.mockito.Mockito._ @@ -69,18 +68,13 @@ class OperationManagerSuite extends SparkFunSuite with Matchers with MockitoSuga val operationMgr = new OperationManager() operationMgr.init(conf) - operationMgr.getOperationLog should be(null) + OperationLog.getCurrentOperationLog should be(null) val log = mock[OperationLog] - operationMgr.setOperationLog(KyuubiSparkUtil.getCurrentUserName, log) - - operationMgr.getOperationLog should be(log) - - OperationLog.removeCurrentOperationLog() - operationMgr.getOperationLog should be(log) - - operationMgr.unregisterOperationLog(KyuubiSparkUtil.getCurrentUserName) - operationMgr.getOperationLog should be(null) + operationMgr.setOperationLog(log) + OperationLog.getCurrentOperationLog should be(log) + operationMgr.unregisterOperationLog() + OperationLog.getCurrentOperationLog should be(null) } test("handle operation") {