[KYUUBI #4282] Fix ConcurrentModificationException when log4j2 async enabled

### _Why are the changes needed?_

This PR proposes to fix #4282, since log4j2 supports async mode, we need to make sure the `Log4j2DivertAppender#append` is thread-safe.

This PR also changes `OperationLog.getCurrentOperationLog` from `OperationLog` to `Option[OperationLog]`

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4300 from pan3793/log.

Closes #4282

010e34b0 [Cheng Pan] fix
068405b2 [Cheng Pan] fix compile
c79dedd5 [Cheng Pan] Use write lock instead
3daf8a4d [Cheng Pan] nit
94176a04 [Cheng Pan] [KYUUBI #4282] Fix ConcurrentModificationException when log4j2 async enabled

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Cheng Pan 2023-02-17 11:39:32 +08:00
parent b77c8847f5
commit 0be3cbff6e
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
5 changed files with 38 additions and 30 deletions

View File

@ -39,7 +39,7 @@ class Log4j12DivertAppender extends WriterAppender {
setLayout(lo)
addFilter { _: LoggingEvent =>
if (OperationLog.getCurrentOperationLog == null) Filter.DENY else Filter.NEUTRAL
if (OperationLog.getCurrentOperationLog.isDefined) Filter.NEUTRAL else Filter.DENY
}
/**
@ -51,8 +51,7 @@ class Log4j12DivertAppender extends WriterAppender {
// That should've gone into our writer. Notify the LogContext.
val logOutput = writer.toString
writer.reset()
val log = OperationLog.getCurrentOperationLog
if (log != null) log.write(logOutput)
OperationLog.getCurrentOperationLog.foreach(_.write(logOutput))
}
}

View File

@ -18,6 +18,7 @@
package org.apache.kyuubi.operation.log
import java.io.CharArrayWriter
import java.util.concurrent.locks.ReadWriteLock
import scala.collection.JavaConverters._
@ -27,6 +28,8 @@ import org.apache.logging.log4j.core.appender.{AbstractWriterAppender, ConsoleAp
import org.apache.logging.log4j.core.filter.AbstractFilter
import org.apache.logging.log4j.core.layout.PatternLayout
import org.apache.kyuubi.reflection.DynFields
class Log4j2DivertAppender(
name: String,
layout: StringLayout,
@ -52,22 +55,19 @@ class Log4j2DivertAppender(
addFilter(new AbstractFilter() {
override def filter(event: LogEvent): Filter.Result = {
if (OperationLog.getCurrentOperationLog == null) {
Filter.Result.DENY
} else {
if (OperationLog.getCurrentOperationLog.isDefined) {
Filter.Result.NEUTRAL
} else {
Filter.Result.DENY
}
}
})
def initLayout(): StringLayout = {
LogManager.getRootLogger.asInstanceOf[org.apache.logging.log4j.core.Logger]
.getAppenders.values().asScala
.find(ap => ap.isInstanceOf[ConsoleAppender] && ap.getLayout.isInstanceOf[StringLayout])
.map(_.getLayout.asInstanceOf[StringLayout])
.getOrElse(PatternLayout.newBuilder().withPattern(
"%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n").build())
}
private val writeLock = DynFields.builder()
.hiddenImpl(classOf[AbstractWriterAppender[_]], "readWriteLock")
.build[ReadWriteLock](this)
.get()
.writeLock
/**
* Overrides AbstractWriterAppender.append(), which does the real logging. No need
@ -75,11 +75,15 @@ class Log4j2DivertAppender(
*/
override def append(event: LogEvent): Unit = {
super.append(event)
// That should've gone into our writer. Notify the LogContext.
val logOutput = writer.toString
writer.reset()
val log = OperationLog.getCurrentOperationLog
if (log != null) log.write(logOutput)
writeLock.lock()
try {
// That should've gone into our writer. Notify the LogContext.
val logOutput = writer.toString
writer.reset()
OperationLog.getCurrentOperationLog.foreach(_.write(logOutput))
} finally {
writeLock.unlock()
}
}
}
@ -95,7 +99,7 @@ object Log4j2DivertAppender {
def initialize(): Unit = {
val ap = new Log4j2DivertAppender()
org.apache.logging.log4j.LogManager.getRootLogger()
org.apache.logging.log4j.LogManager.getRootLogger
.asInstanceOf[org.apache.logging.log4j.core.Logger].addAppender(ap)
ap.start()
}

View File

@ -44,7 +44,7 @@ object OperationLog extends Logging {
OPERATION_LOG.set(operationLog)
}
def getCurrentOperationLog: OperationLog = OPERATION_LOG.get()
def getCurrentOperationLog: Option[OperationLog] = Option(OPERATION_LOG.get)
def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove()

View File

@ -61,10 +61,10 @@ class OperationLogSuite extends KyuubiFunSuite {
assert(!Files.exists(logFile))
OperationLog.setCurrentOperationLog(operationLog)
assert(OperationLog.getCurrentOperationLog === operationLog)
assert(OperationLog.getCurrentOperationLog === Some(operationLog))
OperationLog.removeCurrentOperationLog()
assert(OperationLog.getCurrentOperationLog === null)
assert(OperationLog.getCurrentOperationLog.isEmpty)
operationLog.write(msg1 + "\n")
assert(Files.exists(logFile))

View File

@ -18,7 +18,8 @@
package org.apache.kyuubi.server.api.v1
import java.io.InputStream
import java.util.Locale
import java.util
import java.util.{Collections, Locale}
import java.util.concurrent.ConcurrentHashMap
import javax.ws.rs._
import javax.ws.rs.core.MediaType
@ -318,12 +319,16 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { batchSession =>
try {
val submissionOp = batchSession.batchJobSubmissionOp
val rowSet = submissionOp.getOperationLogRowSet(
FetchOrientation.FETCH_NEXT,
from,
size)
val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala
new OperationLog(logRowSet.asJava, logRowSet.size)
val rowSet = submissionOp.getOperationLogRowSet(FetchOrientation.FETCH_NEXT, from, size)
val columns = rowSet.getColumns
val logRowSet: util.List[String] =
if (columns == null || columns.size == 0) {
Collections.emptyList()
} else {
assert(columns.size == 1)
columns.get(0).getStringVal.getValues
}
new OperationLog(logRowSet, logRowSet.size)
} catch {
case NonFatal(e) =>
val errorMsg = s"Error getting operation log for batchId: $batchId"