[KYUUBI-230] Re-implement OperationLog (#231)

This commit is contained in:
Kent Yao 2020-05-18 10:42:00 +08:00 committed by GitHub
parent d227e616b2
commit af9801beb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 205 additions and 86 deletions

View File

@ -20,8 +20,6 @@ package yaooqinn.kyuubi.operation
import java.io.{File, FileNotFoundException}
import java.util.concurrent.Future
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.session.OperationLog
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil._
@ -164,7 +162,7 @@ abstract class AbstractOperation(
}
// create OperationLog object with above log file
try {
this.operationLog = new OperationLog(this.opHandle.toString, logFile, new HiveConf())
this.operationLog = new OperationLog(logFile)
} catch {
case e: FileNotFoundException =>
warn("Unable to instantiate OperationLog object for operation: " + this.opHandle, e)
@ -172,13 +170,13 @@ abstract class AbstractOperation(
return
}
// register this operationLog
session.getSessionMgr.getOperationMgr.setOperationLog(session.getUserName, this.operationLog)
session.getSessionMgr.getOperationMgr.setOperationLog(this.operationLog)
}
}
private def unregisterOperationLog(): Unit = {
if (isOperationLogEnabled) {
session.getSessionMgr.getOperationMgr.unregisterOperationLog(session.getUserName)
session.getSessionMgr.getOperationMgr.unregisterOperationLog()
}
}

View File

@ -21,49 +21,29 @@ import java.io.CharArrayWriter
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.ql.session.OperationLog
import org.apache.log4j._
import org.apache.log4j.spi.{Filter, LoggingEvent}
import yaooqinn.kyuubi.Logging
class LogDivertAppender extends WriterAppender with Logging {
private var operationManager: OperationManager = _
private class NameFilter(
var operationManager: OperationManager) extends Filter {
override def decide(ev: LoggingEvent): Int = {
val log = operationManager.getOperationLog
if (log == null) return Filter.DENY
val currentLoggingMode = log.getOpLoggingLevel
// If logging is disabled, deny everything.
if (currentLoggingMode == OperationLog.LoggingLevel.NONE) return Filter.DENY
Filter.NEUTRAL
}
}
/** This is where the log message will go to */
private val writer = new CharArrayWriter
private def initLayout(): Unit = {
// There should be a ConsoleAppender. Copy its Layout.
var layout: Layout = null
Logger.getRootLogger.getAllAppenders.asScala.foreach { ap =>
if (ap.isInstanceOf[ConsoleAppender]) layout = ap.asInstanceOf[Appender].getLayout
}
this.layout =
Option(layout).getOrElse(new PatternLayout("%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n"))
}
this.layout = Logger.getRootLogger
.getAllAppenders.asScala
.find(_.isInstanceOf[ConsoleAppender])
.map(_.asInstanceOf[Appender].getLayout)
.getOrElse(new PatternLayout("%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n"))
def this(operationManager: OperationManager) {
this()
initLayout()
setWriter(writer)
setName("SparkLogDivertAppender")
this.operationManager = operationManager
addFilter(new NameFilter(operationManager))
}
setWriter(writer)
setName("SparkLogDivertAppender")
addFilter(new Filter {
override def decide(loggingEvent: LoggingEvent): Int = {
if (OperationLog.getCurrentOperationLog == null) Filter.DENY else Filter.NEUTRAL
}
})
/**
* Overrides WriterAppender.subAppend(), which does the real logging. No need
@ -74,11 +54,6 @@ class LogDivertAppender extends WriterAppender with Logging {
// That should've gone into our writer. Notify the LogContext.
val logOutput = writer.toString
writer.reset()
val log = operationManager.getOperationLog
if (log == null) {
debug(" ---+++=== Dropped log event from thread " + event.getThreadName)
return
}
log.writeOperationLog(logOutput)
Option(OperationLog.getCurrentOperationLog).foreach(_.write(logOutput))
}
}

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi.operation
import java.io.{BufferedReader, File, FileInputStream, FileNotFoundException, FileOutputStream, InputStreamReader, IOException, PrintStream}
import java.util.ArrayList
import scala.collection.JavaConverters._
import org.apache.commons.io.FileUtils
import org.apache.hadoop.io.IOUtils
import org.apache.spark.sql.Row
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
class LogFile private (
file: File,
private var reader: Option[BufferedReader],
writer: PrintStream,
@volatile private var isRemoved: Boolean = false) extends Logging {
def this(file: File) = {
this(file,
LogFile.createReader(file, isRemoved = false),
new PrintStream(new FileOutputStream(file)))
}
private def resetReader(): Unit = {
reader.foreach(IOUtils.closeStream)
reader = None
}
private def readResults(nLines: Long): Seq[Row] = {
reader = reader.orElse(LogFile.createReader(file, isRemoved))
val logs = new ArrayList[Row]()
reader.foreach { r =>
var i = 1
try {
var line: String = r.readLine()
while ((i < nLines || nLines <= 0) && line != null) {
logs.add(Row(line))
line = r.readLine()
i += 1
}
} catch {
case e: FileNotFoundException =>
val operationHandle = file.getName
val path = file.getAbsolutePath
val msg = if (isRemoved) {
s"Operation[$operationHandle] has been closed and the log file $path has been removed"
} else {
s"Operation[$operationHandle] Log file $path is not found"
}
throw new KyuubiSQLException(msg, e)
}
}
logs.asScala
}
/**
* Read to log file line by line
*
* @param isFetchFirst reset the BufferReader, if fetching from the beginning of the file if true
* @param maxRows maximum result number can reach
* @return a row seq of lines from the log file
*/
def read(isFetchFirst: Boolean, maxRows: Long): Seq[Row] = synchronized {
if (isFetchFirst) resetReader()
readResults(maxRows)
}
/**
* write log to the operation log file
*/
def write(msg: String): Unit = {
writer.print(msg)
}
def close(): Unit = synchronized {
try {
reader.foreach(_.close())
writer.close()
if (!isRemoved) {
FileUtils.forceDelete(file)
isRemoved = true
}
} catch {
case e: IOException =>
error(s"Failed to remove corresponding log file of operation: ${file.getName}", e)
}
}
}
object LogFile {
def createReader(file: File, isRemoved: Boolean): Option[BufferedReader] = try {
Option(new BufferedReader(new InputStreamReader(new FileInputStream(file))))
} catch {
case e: FileNotFoundException =>
val operationHandle = file.getName
val path = file.getAbsolutePath
val msg = if (isRemoved) {
s"Operation[$operationHandle] has been closed and the log file $path has been removed"
} else {
s"Operation[$operationHandle] Log file $path is not found"
}
throw new KyuubiSQLException(msg, e)
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package yaooqinn.kyuubi.operation
import java.io.File
import org.apache.spark.sql.Row
import yaooqinn.kyuubi.Logging
class OperationLog(logFile: LogFile) extends Logging {
def this(file: File) = this(new LogFile(file))
def write(msg: String): Unit = logFile.write(msg)
def read(isFetchFirst: Boolean, maxRows: Long): Seq[Row] = {
logFile.read(isFetchFirst, maxRows)
}
def close(): Unit = logFile.close()
}
object OperationLog {
private final val OPERATION_LOG = new ThreadLocal[OperationLog] {
override def initialValue(): OperationLog = null
}
def setCurrentOperationLog(operationLog: OperationLog): Unit = {
OPERATION_LOG.set(operationLog)
}
def getCurrentOperationLog: OperationLog = OPERATION_LOG.get()
def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove()
}

View File

@ -20,13 +20,9 @@ package yaooqinn.kyuubi.operation
import java.sql.SQLException
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.hadoop.hive.ql.session.OperationLog
import org.apache.log4j.Logger
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
import org.apache.spark.KyuubiConf._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
@ -45,7 +41,6 @@ private[kyuubi] class OperationManager private(name: String)
private lazy val logSchema: StructType = new StructType().add("operation_log", "string")
private val handleToOperation = new ConcurrentHashMap[OperationHandle, KyuubiOperation]
private val userToOperationLog = new ConcurrentHashMap[String, OperationLog]()
override def init(conf: SparkConf): Unit = synchronized {
if (conf.get(LOGGING_OPERATION_ENABLED.key).toBoolean) {
@ -58,33 +53,13 @@ private[kyuubi] class OperationManager private(name: String)
private def initOperationLogCapture(): Unit = {
// Register another Appender (with the same layout) that talks to us.
val ap = new LogDivertAppender(this)
val ap = new LogDivertAppender
Logger.getRootLogger.addAppender(ap)
}
private def getOperationLogByThread: OperationLog = OperationLog.getCurrentOperationLog
def setOperationLog(log: OperationLog): Unit = OperationLog.setCurrentOperationLog(log)
private def getOperationLogByName: OperationLog = {
if (!userToOperationLog.isEmpty) {
userToOperationLog.get(KyuubiSparkUtil.getCurrentUserName)
} else {
null
}
}
def getOperationLog: OperationLog = {
Option(getOperationLogByThread).getOrElse(getOperationLogByName)
}
def setOperationLog(user: String, log: OperationLog): Unit = {
OperationLog.setCurrentOperationLog(log)
userToOperationLog.put(Option(user).getOrElse(KyuubiSparkUtil.getCurrentUserName), log)
}
def unregisterOperationLog(user: String): Unit = {
OperationLog.removeCurrentOperationLog()
userToOperationLog.remove(user)
}
def unregisterOperationLog(): Unit = OperationLog.removeCurrentOperationLog()
def newExecuteStatementOperation(
parentSession: KyuubiSession,
@ -231,7 +206,7 @@ private[kyuubi] class OperationManager private(name: String)
}
try {
// convert logs to RowBasedSet
val logs = opLog.readOperationLog(isFetchFirst(orientation), maxRows).asScala.map(Row(_))
val logs = opLog.read(isFetchFirst(orientation), maxRows)
RowSetBuilder.create(logSchema, logs, getOperation(opHandle).getProtocolVersion)
} catch {
case e: SQLException =>

View File

@ -106,8 +106,7 @@ abstract class ExecuteStatementOperation(
getHandle.getHandleIdentifier)
isOperationLogEnabled = false
} else {
session.getSessionMgr.getOperationMgr
.setOperationLog(session.getUserName, operationLog)
session.getSessionMgr.getOperationMgr.setOperationLog(operationLog)
}
}
}

View File

@ -19,7 +19,6 @@ package yaooqinn.kyuubi.operation
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.hive.ql.session.OperationLog
import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.apache.spark.sql.SparkSession
import org.mockito.Mockito._
@ -69,18 +68,13 @@ class OperationManagerSuite extends SparkFunSuite with Matchers with MockitoSuga
val operationMgr = new OperationManager()
operationMgr.init(conf)
operationMgr.getOperationLog should be(null)
OperationLog.getCurrentOperationLog should be(null)
val log = mock[OperationLog]
operationMgr.setOperationLog(KyuubiSparkUtil.getCurrentUserName, log)
operationMgr.getOperationLog should be(log)
OperationLog.removeCurrentOperationLog()
operationMgr.getOperationLog should be(log)
operationMgr.unregisterOperationLog(KyuubiSparkUtil.getCurrentUserName)
operationMgr.getOperationLog should be(null)
operationMgr.setOperationLog(log)
OperationLog.getCurrentOperationLog should be(log)
operationMgr.unregisterOperationLog()
OperationLog.getCurrentOperationLog should be(null)
}
test("handle operation") {