From 0be3cbff6e35c8e86635bfe6d856d0dfa148247d Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 17 Feb 2023 11:39:32 +0800 Subject: [PATCH] [KYUUBI #4282] Fix ConcurrentModificationException when log4j2 async enabled ### _Why are the changes needed?_ This PR proposes to fix #4282, since log4j2 supports async mode, we need to make sure the `Log4j2DivertAppender#append` is thread-safe. This PR also changes `OperationLog.getCurrentOperationLog` from `OperationLog` to `Option[OperationLog]` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4300 from pan3793/log. Closes #4282 010e34b0 [Cheng Pan] fix 068405b2 [Cheng Pan] fix compile c79dedd5 [Cheng Pan] Use write lock instead 3daf8a4d [Cheng Pan] nit 94176a04 [Cheng Pan] [KYUUBI #4282] Fix ConcurrentModificationException when log4j2 async enabled Authored-by: Cheng Pan Signed-off-by: Cheng Pan --- .../operation/log/Log4j12DivertAppender.scala | 5 +-- .../operation/log/Log4j2DivertAppender.scala | 38 ++++++++++--------- .../kyuubi/operation/log/OperationLog.scala | 2 +- .../operation/log/OperationLogSuite.scala | 4 +- .../server/api/v1/BatchesResource.scala | 19 ++++++---- 5 files changed, 38 insertions(+), 30 deletions(-) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j12DivertAppender.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j12DivertAppender.scala index 1191e94ae..df2ef93d8 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j12DivertAppender.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j12DivertAppender.scala @@ -39,7 +39,7 @@ class Log4j12DivertAppender extends WriterAppender { setLayout(lo) addFilter { _: LoggingEvent => - if (OperationLog.getCurrentOperationLog == null) Filter.DENY else Filter.NEUTRAL + if (OperationLog.getCurrentOperationLog.isDefined) Filter.NEUTRAL else Filter.DENY } /** @@ -51,8 +51,7 @@ class Log4j12DivertAppender extends WriterAppender { // That should've gone into our writer. Notify the LogContext. val logOutput = writer.toString writer.reset() - val log = OperationLog.getCurrentOperationLog - if (log != null) log.write(logOutput) + OperationLog.getCurrentOperationLog.foreach(_.write(logOutput)) } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala index 68753cf98..dc4b24a8c 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/Log4j2DivertAppender.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.operation.log import java.io.CharArrayWriter +import java.util.concurrent.locks.ReadWriteLock import scala.collection.JavaConverters._ @@ -27,6 +28,8 @@ import org.apache.logging.log4j.core.appender.{AbstractWriterAppender, ConsoleAp import org.apache.logging.log4j.core.filter.AbstractFilter import org.apache.logging.log4j.core.layout.PatternLayout +import org.apache.kyuubi.reflection.DynFields + class Log4j2DivertAppender( name: String, layout: StringLayout, @@ -52,22 +55,19 @@ class Log4j2DivertAppender( addFilter(new AbstractFilter() { override def filter(event: LogEvent): Filter.Result = { - if (OperationLog.getCurrentOperationLog == null) { - Filter.Result.DENY - } else { + if (OperationLog.getCurrentOperationLog.isDefined) { Filter.Result.NEUTRAL + } else { + Filter.Result.DENY } } }) - def initLayout(): StringLayout = { - LogManager.getRootLogger.asInstanceOf[org.apache.logging.log4j.core.Logger] - .getAppenders.values().asScala - .find(ap => ap.isInstanceOf[ConsoleAppender] && ap.getLayout.isInstanceOf[StringLayout]) - .map(_.getLayout.asInstanceOf[StringLayout]) - .getOrElse(PatternLayout.newBuilder().withPattern( - "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n").build()) - } + private val writeLock = DynFields.builder() + .hiddenImpl(classOf[AbstractWriterAppender[_]], "readWriteLock") + .build[ReadWriteLock](this) + .get() + .writeLock /** * Overrides AbstractWriterAppender.append(), which does the real logging. No need @@ -75,11 +75,15 @@ class Log4j2DivertAppender( */ override def append(event: LogEvent): Unit = { super.append(event) - // That should've gone into our writer. Notify the LogContext. - val logOutput = writer.toString - writer.reset() - val log = OperationLog.getCurrentOperationLog - if (log != null) log.write(logOutput) + writeLock.lock() + try { + // That should've gone into our writer. Notify the LogContext. + val logOutput = writer.toString + writer.reset() + OperationLog.getCurrentOperationLog.foreach(_.write(logOutput)) + } finally { + writeLock.unlock() + } } } @@ -95,7 +99,7 @@ object Log4j2DivertAppender { def initialize(): Unit = { val ap = new Log4j2DivertAppender() - org.apache.logging.log4j.LogManager.getRootLogger() + org.apache.logging.log4j.LogManager.getRootLogger .asInstanceOf[org.apache.logging.log4j.core.Logger].addAppender(ap) ap.start() } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala index 84c4ed55c..e6312d0fb 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala @@ -44,7 +44,7 @@ object OperationLog extends Logging { OPERATION_LOG.set(operationLog) } - def getCurrentOperationLog: OperationLog = OPERATION_LOG.get() + def getCurrentOperationLog: Option[OperationLog] = Option(OPERATION_LOG.get) def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove() diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala index 758eeeeaf..fe3cbc7fc 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala @@ -61,10 +61,10 @@ class OperationLogSuite extends KyuubiFunSuite { assert(!Files.exists(logFile)) OperationLog.setCurrentOperationLog(operationLog) - assert(OperationLog.getCurrentOperationLog === operationLog) + assert(OperationLog.getCurrentOperationLog === Some(operationLog)) OperationLog.removeCurrentOperationLog() - assert(OperationLog.getCurrentOperationLog === null) + assert(OperationLog.getCurrentOperationLog.isEmpty) operationLog.write(msg1 + "\n") assert(Files.exists(logFile)) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index 969362f7d..d308c26d5 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -18,7 +18,8 @@ package org.apache.kyuubi.server.api.v1 import java.io.InputStream -import java.util.Locale +import java.util +import java.util.{Collections, Locale} import java.util.concurrent.ConcurrentHashMap import javax.ws.rs._ import javax.ws.rs.core.MediaType @@ -318,12 +319,16 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { batchSession => try { val submissionOp = batchSession.batchJobSubmissionOp - val rowSet = submissionOp.getOperationLogRowSet( - FetchOrientation.FETCH_NEXT, - from, - size) - val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala - new OperationLog(logRowSet.asJava, logRowSet.size) + val rowSet = submissionOp.getOperationLogRowSet(FetchOrientation.FETCH_NEXT, from, size) + val columns = rowSet.getColumns + val logRowSet: util.List[String] = + if (columns == null || columns.size == 0) { + Collections.emptyList() + } else { + assert(columns.size == 1) + columns.get(0).getStringVal.getValues + } + new OperationLog(logRowSet, logRowSet.size) } catch { case NonFatal(e) => val errorMsg = s"Error getting operation log for batchId: $batchId"