Add support for operation_log

This commit is contained in:
Kent Yao 2020-08-13 10:41:46 +08:00
parent 7682225145
commit 3617657bed
17 changed files with 401 additions and 40 deletions

2
.gitignore vendored
View File

@ -44,3 +44,5 @@ metrics/report.json
metrics/.report.json.crc
/kyuubi-ha/embedded_zookeeper/
embedded_zookeeper/
/kyuubi-spark-sql-engine/spark-warehouse/
/kyuubi-spark-sql-engine/kyuubi_operation_logs/

View File

@ -20,10 +20,10 @@ package org.apache.kyuubi
import java.io.{File, InputStreamReader, IOException}
import java.net.{URI, URISyntaxException}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
import java.util.{Properties, UUID}
import scala.collection.JavaConverters._
import scala.util.{Success, Try}
import org.apache.hadoop.security.UserGroupInformation
@ -98,16 +98,19 @@ private[kyuubi] object Utils extends Logging {
* Create a directory inside the given parent directory. The directory is guaranteed to be
* newly created, and is not marked for automatic deletion.
*/
def createDirectory(root: String, namePrefix: String = "kyuubi"): File = {
def createDirectory(root: String, namePrefix: String = "kyuubi"): Path = {
var error: Exception = null
(0 until MAX_DIR_CREATION_ATTEMPTS).foreach { _ =>
val dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString)
Try { dir.mkdirs() } match {
case Success(_) => return dir
case _ =>
val candidate = Paths.get(root, s"$namePrefix-${UUID.randomUUID()}")
try {
val path = Files.createDirectories(candidate)
return path
} catch {
case e: IOException => error = e
}
}
throw new IOException("Failed to create a temp directory (under " + root + ") after " +
MAX_DIR_CREATION_ATTEMPTS + " attempts!")
MAX_DIR_CREATION_ATTEMPTS + " attempts!", error)
}
/**
@ -116,9 +119,9 @@ private[kyuubi] object Utils extends Logging {
*/
def createTempDir(
root: String = System.getProperty("java.io.tmpdir"),
namePrefix: String = "kyuubi"): File = {
namePrefix: String = "kyuubi"): Path = {
val dir = createDirectory(root, namePrefix)
dir.deleteOnExit()
dir.toFile.deleteOnExit()
dir
}

View File

@ -17,12 +17,9 @@
package org.apache.kyuubi.operation
import java.nio.ByteBuffer
import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn, TTableSchema}
import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.session.Session
@ -37,14 +34,6 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
private final val handleToOperation = new java.util.HashMap[OperationHandle, Operation]()
override def initialize(conf: KyuubiConf): Unit = {
super.initialize(conf)
}
override def start(): Unit = super.start()
override def stop(): Unit = super.stop()
def newExecuteStatementOperation(
session: Session,
statement: String,
@ -121,14 +110,7 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = {
// TODO: Support fetch log result
val values = new java.util.ArrayList[String]
val tColumn = TColumn.stringVal(new TStringColumn(values, ByteBuffer.allocate(0)))
val tRow = new TRowSet(0, new java.util.ArrayList[TRow](1))
tRow.addToColumns(tColumn)
tRow
}
maxRows: Int): TRowSet
final def removeExpiredOperations(handles: Seq[OperationHandle]): Seq[Operation] = synchronized {
handles.map(handleToOperation.get).filter { operation =>

View File

@ -30,7 +30,7 @@ log4j.appender.FA=org.apache.log4j.FileAppender
log4j.appender.FA.append=false
log4j.appender.FA.file=target/unit-tests.log
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n
log4j.appender.FA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %t %p %c{2}: %m%n
# Set the logger level of File Appender to WARN
log4j.appender.FA.Threshold = DEBUG

View File

@ -26,7 +26,7 @@ import org.apache.hadoop.security.UserGroupInformation
trait KerberizedTestHelper {
var kdc: MiniKdc = _
val baseDir: File = Utils.createTempDir(
this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath, "kyuubi-kdc")
this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath, "kyuubi-kdc").toFile
try {
val kdcConf = MiniKdc.createConf()

View File

@ -30,7 +30,7 @@ log4j.appender.FA=org.apache.log4j.FileAppender
log4j.appender.FA.append=false
log4j.appender.FA.file=target/unit-tests.log
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n
log4j.appender.FA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %t %p %c{2}: %m%n
# Set the logger level of File Appender to WARN
log4j.appender.FA.Threshold = DEBUG

View File

@ -41,7 +41,6 @@ class LogDivertAppender extends WriterAppender with Logging {
override def decide(loggingEvent: LoggingEvent): Int = {
if (OperationLog.getCurrentOperationLog == null) Filter.DENY else Filter.NEUTRAL
}
})
/**

View File

@ -121,7 +121,7 @@ private object SparkSQLEngine extends Logging {
engine = startEngine(spark)
} catch {
case t: Throwable =>
error("Error start SparkSQLRunner", t)
error("Error start SparkSQLEngine", t)
if (engine != null) {
engine.stop()
} else if (spark != null) {

View File

@ -23,6 +23,7 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.spark.operation.log.OperationLog
import org.apache.kyuubi.operation.{AbstractOperation, OperationState}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationState.OperationState
@ -35,6 +36,11 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
protected var iter: Iterator[Row] = _
private final val operationLog: OperationLog =
OperationLog.createOperationLog(session.handle, getHandle)
def getOperationLog: OperationLog = operationLog
protected def statement: String = opType.toString
protected def resultSchema: StructType
@ -109,6 +115,7 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
override protected def beforeRun(): Unit = {
setHasResultSet(true)
setState(OperationState.RUNNING)
OperationLog.setCurrentOperationLog(operationLog)
}
override protected def afterRun(): Unit = {
@ -117,6 +124,7 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
setState(OperationState.FINISHED)
}
}
OperationLog.removeCurrentOperationLog()
}
override def cancel(): Unit = {
@ -125,6 +133,7 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
override def close(): Unit = {
cleanup(OperationState.CLOSED)
if (operationLog != null) operationLog.close()
}
override def getResultSetSchema: TTableSchema = SchemaHelper.toTTableSchema(resultSchema)

View File

@ -19,10 +19,14 @@ package org.apache.kyuubi.engine.spark.operation
import java.util.concurrent.ConcurrentHashMap
import org.apache.hive.service.rpc.thrift.TRowSet
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.{Operation, OperationManager}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.operation.log.{LogDivertAppender, OperationLog}
import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationManager}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.{Session, SessionHandle}
class SparkSQLOperationManager private (name: String) extends OperationManager(name) {
@ -112,4 +116,21 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
val op = new GetFunctions(spark, session, catalogName, schemaName, functionName)
addOperation(op)
}
override def initialize(conf: KyuubiConf): Unit = {
LogDivertAppender.initialize()
super.initialize(conf)
}
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = {
val log = getOperation(opHandle).asInstanceOf[SparkOperation].getOperationLog
if (log == null) {
throw KyuubiSQLException(s"Couldn't find log associated with $opHandle")
}
log.read(maxRows)
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.operation.log
import java.io.CharArrayWriter
import scala.collection.JavaConverters._
import org.apache.log4j._
import org.apache.log4j.spi.{Filter, LoggingEvent}
class LogDivertAppender extends WriterAppender {
private final val writer = new CharArrayWriter
private final val lo = Logger.getRootLogger
.getAllAppenders.asScala
.find(_.isInstanceOf[ConsoleAppender])
.map(_.asInstanceOf[Appender].getLayout)
.getOrElse(new PatternLayout("%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n"))
setName("KyuubiSparkSQLEngineLogDivertAppender")
setWriter(writer)
setLayout(lo)
addFilter(new Filter {
override def decide(loggingEvent: LoggingEvent): Int = {
if (OperationLog.getCurrentOperationLog == null) Filter.DENY else Filter.NEUTRAL
}
})
/**
* Overrides WriterAppender.subAppend(), which does the real logging. No need
* to worry about concurrency since log4j calls this synchronously.
*/
override protected def subAppend(event: LoggingEvent): Unit = {
super.subAppend(event)
// That should've gone into our writer. Notify the LogContext.
val logOutput = writer.toString
writer.reset()
val log = OperationLog.getCurrentOperationLog
if (log != null) log.write(logOutput)
}
}
object LogDivertAppender {
def initialize(): Unit = {
org.apache.log4j.Logger.getRootLogger.addAppender(new LogDivertAppender())
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.operation.log
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
import scala.util.control.NonFatal
import org.apache.commons.io.FileUtils
import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn}
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.operation.OperationHandle
import org.apache.kyuubi.session.SessionHandle
/**
* TODO: This part is spark-independent, mv to kyuubi-common?
* we can do this after we decide to support other engines
*/
object OperationLog extends Logging {
final val LOG_ROOT: String = "kyuubi_operation_logs"
private final val OPERATION_LOG: InheritableThreadLocal[OperationLog] = {
new InheritableThreadLocal[OperationLog] {
override def initialValue(): OperationLog = null
}
}
def setCurrentOperationLog(operationLog: OperationLog): Unit = {
OPERATION_LOG.set(operationLog)
}
def getCurrentOperationLog: OperationLog = OPERATION_LOG.get()
def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove()
/**
* The operation log root directory, here we choose $PWD/[[LOG_ROOT]]/$sessionId/ as the root per
* session, this directory will delete when JVM exit.
*/
def createOperationLogRootDirectory(sessionHandle: SessionHandle): Unit = {
val path = Paths.get(LOG_ROOT, sessionHandle.identifier.toString)
try {
Files.createDirectories(path)
path.toFile.deleteOnExit()
} catch {
case NonFatal(e) =>
error(s"Failed to create operation log root directory: $path", e)
}
}
/**
* Create the OperationLog for each operation, the log file will be located at
* $PWD/[[LOG_ROOT]]/$sessionId/$operationId
* @return
*/
def createOperationLog(sessionHandle: SessionHandle, opHandle: OperationHandle): OperationLog = {
try {
val logPath = Paths.get(LOG_ROOT, sessionHandle.identifier.toString)
val logFile = Paths.get(logPath.toAbsolutePath.toString, opHandle.identifier.toString)
Files.createFile(logFile)
info(s"Created operation log file $logFile")
new OperationLog(logFile)
} catch {
case e: IOException =>
error(s"Failed to create operation log for $opHandle in $sessionHandle", e)
null
}
}
}
class OperationLog(path: Path) extends Logging {
private lazy val writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)
private lazy val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
/**
* write log to the operation log file
*/
def write(msg: String): Unit = synchronized {
writer.write(msg)
writer.flush()
}
/**
* Read to log file line by line
*
* @param maxRows maximum result number can reach
*/
def read(maxRows: Int): TRowSet = synchronized {
val logs = new java.util.ArrayList[String]
var i = 0
try {
var line: String = reader.readLine()
while ((i < maxRows || maxRows <= 0) && line != null) {
logs.add(line)
line = reader.readLine()
i += 1
}
} catch {
case e: IOException =>
val absPath = path.toAbsolutePath
val opHandle = absPath.getFileName
throw new KyuubiSQLException(s"Operation[$opHandle] log file $absPath is not found", e)
}
val tColumn = TColumn.stringVal(new TStringColumn(logs, ByteBuffer.allocate(0)))
val tRow = new TRowSet(0, new java.util.ArrayList[TRow](0))
tRow.addToColumns(tColumn)
tRow
}
def close(): Unit = synchronized {
try {
reader.close()
writer.close()
FileUtils.forceDelete(path.toFile)
} catch {
case e: IOException =>
error(s"Failed to remove corresponding log file of operation: ${path.toAbsolutePath}", e)
}
}
}

View File

@ -26,7 +26,6 @@ import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.session.{SessionHandle, SessionManager}
/**
* A [[SessionManager]] constructed with [[SparkSession]] which give it the ability to talk with
* Spark and let Spark do all the rest heavy work :)
@ -54,9 +53,9 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
val sparkSession = spark.newSession()
conf.foreach { case (key, value) => spark.conf.set(key, value)}
operationManager.setSparkSession(handle, sparkSession)
sessionImpl.open()
info(s"$user's session with $handle is opened, current opening sessions" +
s" $getOpenSessionCount")
setSession(handle, sessionImpl)
handle
} catch {
@ -65,7 +64,6 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
sessionImpl.close()
} catch {
case t: Throwable => warn(s"Error closing session $handle for $user", t)
}
throw KyuubiSQLException(s"Error opening session $handle for $user", e)
}

View File

@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark.session
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.engine.spark.operation.log.OperationLog
import org.apache.kyuubi.session.{AbstractSession, SessionManager}
class SparkSessionImpl(
@ -30,5 +31,7 @@ class SparkSessionImpl(
sessionManager: SessionManager)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
override def open(): Unit = {}
override def open(): Unit = {
OperationLog.createOperationLogRootDirectory(handle)
}
}

View File

@ -0,0 +1,36 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file target/unit-tests.log
log4j.rootLogger=DEBUG, CA, FA
#Console Appender
log4j.appender.CA=org.apache.log4j.ConsoleAppender
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
log4j.appender.CA.Threshold = WARN
#File Appender
log4j.appender.FA=org.apache.log4j.FileAppender
log4j.appender.FA.append=false
log4j.appender.FA.file=target/unit-tests.log
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
log4j.appender.FA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %t %p %c{2}: %m%n
# Set the logger level of File Appender to WARN
log4j.appender.FA.Threshold = DEBUG

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.operation.log
import java.nio.file.{Files, Paths}
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.operation.{OperationHandle, OperationType}
import org.apache.kyuubi.session.SessionHandle
class OperationLogSuite extends KyuubiFunSuite {
val msg1 = "This is just a dummy log message 1"
val msg2 = "This is just a dummy log message 2"
test("create, delete, read and write to operation log") {
val sHandle = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
val oHandle = OperationHandle(
OperationType.EXECUTE_STATEMENT, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
OperationLog.createOperationLogRootDirectory(sHandle)
assert(Files.exists(Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString)))
assert(Files.isDirectory(Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString)))
val operationLog = OperationLog.createOperationLog(sHandle, oHandle)
val logFile =
Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString, oHandle.identifier.toString)
assert(Files.exists(logFile))
OperationLog.setCurrentOperationLog(operationLog)
assert(OperationLog.getCurrentOperationLog === operationLog)
OperationLog.removeCurrentOperationLog()
assert(OperationLog.getCurrentOperationLog === null)
operationLog.write(msg1 + "\n")
val tRowSet1 = operationLog.read(1)
assert(tRowSet1.getColumns.get(0).getStringVal.getValues.get(0) === msg1)
val tRowSet2 = operationLog.read(1)
assert(tRowSet2.getColumns.get(0).getStringVal.getValues.isEmpty)
operationLog.write(msg1 + "\n")
operationLog.write(msg2 + "\n")
val tRowSet3 = operationLog.read(-1).getColumns.get(0).getStringVal.getValues
assert(tRowSet3.get(0) === msg1)
assert(tRowSet3.get(1) === msg2)
operationLog.close()
assert(!Files.exists(logFile))
}
test("log divert appender") {
val sHandle = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
val oHandle = OperationHandle(
OperationType.EXECUTE_STATEMENT, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
OperationLog.createOperationLogRootDirectory(sHandle)
val operationLog = OperationLog.createOperationLog(sHandle, oHandle)
OperationLog.setCurrentOperationLog(operationLog)
LogDivertAppender.initialize()
info(msg1)
info(msg2)
warn(msg2)
error(msg2)
val list =
operationLog.read(-1).getColumns.get(0).getStringVal.getValues.asScala.distinct
val logMsg1 = list(0)
assert(logMsg1.contains("INFO") &&
logMsg1.contains(classOf[OperationLogSuite].getCanonicalName) && logMsg1.endsWith(msg1))
val logMsg2 = list(2)
assert(logMsg2.contains("WARN") && logMsg2.endsWith(msg2))
val logMsg3 = list(3)
assert(logMsg3.contains("ERROR") && logMsg3.endsWith(msg2))
OperationLog.removeCurrentOperationLog()
info(msg1)
val list2 = operationLog.read(-1).getColumns.get(0).getStringVal.getValues
assert(list2.isEmpty)
operationLog.close()
}
}

View File

@ -504,7 +504,7 @@
<junitxml>.</junitxml>
<filereports>TestSuite.txt</filereports>
<systemProperties>
<log4j.configuration>src/test/resources/log4j.properties</log4j.configuration>
<log4j.configuration>file:src/test/resources/log4j.properties</log4j.configuration>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
</systemProperties>
</configuration>