From cefc27f035c9096c63dfbca080684381f47669e2 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Thu, 11 Apr 2024 19:29:41 +0800 Subject: [PATCH] [KYUUBI #6290] Add custom exception serialization for SparkOperationEvent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— This pull request fixes #6290 ## Describe Your Solution ๐Ÿ”ง Serializing the `SparkException` object may cause NPE, so I referred to SparkListenerEvent serialization methods to implement `ExceptionDeserializer` and `ExceptionSerializer`. error detail: ``` (was java.lang.NullPointerException) (through reference chain: org.apache.kyuubi.engine.spark.events.SparkOperationEvent["exception"]->org.apache.spark.SparkException["internalError"]) com.fasterxml.jackson.databind.JsonMappingException: (was java.lang.NullPointerException) (through reference chain: org.apache.kyuubi.engine.spark.events.SparkOperationEvent["exception"]->org.apache.spark.SparkException["internalError"]) at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:402) at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:361) at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:323) at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:780) at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178) at com.fasterxml.jackson.databind.ser.std.ReferenceTypeSerializer.serialize(ReferenceTypeSerializer.java:386) at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732) at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:772) at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:655) at com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32) at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:479) at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:318) at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3303) at org.apache.spark.util.JsonProtocol$.writeSparkEventToJson(JsonProtocol.scala:110) at org.apache.spark.util.JsonProtocol$.$anonfun$sparkEventToJsonString$1(JsonProtocol.scala:63) at org.apache.spark.util.JsonProtocol$.$anonfun$sparkEventToJsonString$1$adapted(JsonProtocol.scala:62) at org.apache.spark.util.JsonUtils.toJsonString(JsonUtils.scala:36) at org.apache.spark.util.JsonUtils.toJsonString$(JsonUtils.scala:33) at org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:54) at org.apache.spark.util.JsonProtocol$.sparkEventToJsonString(JsonProtocol.scala:62) at org.apache.spark.kyuubi.KyuubiSparkEventSuite.$anonfun$new$1(KyuubiSparkEventSuite.scala:44) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) at org.apache.kyuubi.KyuubiFunSuite.withFixture(KyuubiFunSuite.scala:65) at org.apache.kyuubi.KyuubiFunSuite.withFixture$(KyuubiFunSuite.scala:59) at org.apache.spark.kyuubi.KyuubiSparkEventSuite.withFixture(KyuubiSparkEventSuite.scala:25) at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) at org.apache.spark.kyuubi.KyuubiSparkEventSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(KyuubiSparkEventSuite.scala:25) at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) at org.apache.spark.kyuubi.KyuubiSparkEventSuite.runTest(KyuubiSparkEventSuite.scala:25) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) at scala.collection.immutable.List.foreach(List.scala:431) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269) at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268) at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564) at org.scalatest.Suite.run(Suite.scala:1114) at org.scalatest.Suite.run$(Suite.scala:1096) at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273) at org.scalatest.SuperEngine.runImpl(Engine.scala:535) at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273) at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272) at org.apache.spark.kyuubi.KyuubiSparkEventSuite.org$scalatest$BeforeAndAfterAll$$super$run(KyuubiSparkEventSuite.scala:25) at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) at org.apache.spark.kyuubi.KyuubiSparkEventSuite.run(KyuubiSparkEventSuite.scala:25) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321) at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315) at scala.collection.immutable.List.foreach(List.scala:431) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992) at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970) at org.scalatest.tools.Runner$.run(Runner.scala:798) at org.scalatest.tools.Runner.run(Runner.scala) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26) Caused by: java.lang.NullPointerException at org.apache.spark.SparkThrowableHelper$.isInternalError(SparkThrowableHelper.scala:64) at org.apache.spark.SparkThrowableHelper.isInternalError(SparkThrowableHelper.scala) at org.apache.spark.SparkThrowable.isInternalError(SparkThrowable.java:50) at org.apache.spark.SparkException.isInternalError(SparkException.scala:27) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:688) at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:772) ... 69 more ``` ## Types of changes :bookmark: - [X] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests new unit test --- # Checklist ๐Ÿ“ - [X] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6289 from wForget/hotfix2. Closes #6290 cf701f97e [wforget] fix test 41df6ce9b [wforget] Add Exception Serialization for SparkOperationEvent Authored-by: wforget <643348094@qq.com> Signed-off-by: Cheng Pan --- .../spark/events/SparkOperationEvent.scala | 5 +- .../spark/kyuubi/KyuubiSparkEventSuite.scala | 56 ++++++++++++ .../apache/kyuubi/events/JsonProtocol.scala | 87 ++++++++++++++++++- 3 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/KyuubiSparkEventSuite.scala diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala index 143ba61f8..caf49fb05 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala @@ -18,13 +18,14 @@ package org.apache.kyuubi.engine.spark.events import com.fasterxml.jackson.annotation.JsonIgnore +import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize} import org.apache.spark.scheduler.SparkListenerEvent import org.apache.spark.util.kvstore.KVIndex import org.apache.kyuubi.Utils import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.KVIndexParam import org.apache.kyuubi.engine.spark.operation.SparkOperation -import org.apache.kyuubi.events.KyuubiEvent +import org.apache.kyuubi.events.{ExceptionDeserializer, ExceptionSerializer, KyuubiEvent} /** * A [[SparkOperationEvent]] used to tracker the lifecycle of an operation at Spark SQL Engine side. @@ -60,6 +61,8 @@ case class SparkOperationEvent( createTime: Long, startTime: Long, completeTime: Long, + @JsonSerialize(contentUsing = classOf[ExceptionSerializer]) + @JsonDeserialize(contentUsing = classOf[ExceptionDeserializer]) exception: Option[Throwable], sessionId: String, sessionUser: String, diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/KyuubiSparkEventSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/KyuubiSparkEventSuite.scala new file mode 100644 index 000000000..2a0a05d9c --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/KyuubiSparkEventSuite.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.kyuubi + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.spark.SparkException + +import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.engine.spark.events.SparkOperationEvent + +class KyuubiSparkEventSuite extends KyuubiFunSuite { + + test("test exception serializer and deserializer of SparkOperationEvent") { + val exception = new SparkException("message", new Exception("cause")) + val event = new SparkOperationEvent( + "statementId", + "statement", + shouldRunAsync = true, + "state", + 0L, + 0L, + 0L, + 0L, + Some(exception), + "sessionId", + "sessionUser", + None, + None, + None) + val mapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + val json = mapper.writeValueAsString(event) + assert(json.contains("\"exception\":{\"Message\":\"message\",\"Stack Trace\":" + + "[{\"Declaring Class\":\"org.apache.spark.kyuubi.KyuubiSparkEventSuite\",")) + val deserializeEvent = mapper.readValue(json, classOf[SparkOperationEvent]) + assert(deserializeEvent.exception.isDefined) + assert(deserializeEvent.exception.get.getMessage === "message") + assert(deserializeEvent.exception.get.getStackTrace.length > 0) + } + +} diff --git a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/JsonProtocol.scala b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/JsonProtocol.scala index 32aef4f51..77e76b938 100644 --- a/kyuubi-events/src/main/scala/org/apache/kyuubi/events/JsonProtocol.scala +++ b/kyuubi-events/src/main/scala/org/apache/kyuubi/events/JsonProtocol.scala @@ -17,7 +17,10 @@ package org.apache.kyuubi.events -import com.fasterxml.jackson.databind.ObjectMapper +import scala.collection.JavaConverters._ + +import com.fasterxml.jackson.core.{JsonGenerator, JsonParser} +import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonNode, JsonSerializer, ObjectMapper, SerializerProvider} import com.fasterxml.jackson.module.scala.DefaultScalaModule object JsonProtocol { @@ -30,3 +33,85 @@ object JsonProtocol { mapper.readValue(jsonValue, cls) } } + +// Exception serializer and deserializer, copy from org.apache.spark.util.JsonProtocol +class ExceptionSerializer extends JsonSerializer[Exception] { + + override def serialize( + value: Exception, + gen: JsonGenerator, + serializers: SerializerProvider): Unit = { + exceptionToJson(value, gen) + } + + private def exceptionToJson(exception: Exception, g: JsonGenerator): Unit = { + g.writeStartObject() + g.writeStringField("Message", exception.getMessage) + g.writeFieldName("Stack Trace") + stackTraceToJson(exception.getStackTrace, g) + g.writeEndObject() + } + + private def stackTraceToJson(stackTrace: Array[StackTraceElement], g: JsonGenerator): Unit = { + g.writeStartArray() + stackTrace.foreach { line => + g.writeStartObject() + g.writeStringField("Declaring Class", line.getClassName) + g.writeStringField("Method Name", line.getMethodName) + g.writeStringField("File Name", line.getFileName) + g.writeNumberField("Line Number", line.getLineNumber) + g.writeEndObject() + } + g.writeEndArray() + } +} + +class ExceptionDeserializer extends JsonDeserializer[Exception] { + + override def deserialize(jsonParser: JsonParser, ctxt: DeserializationContext): Exception = { + val jsonNode = jsonParser.readValueAsTree[JsonNode]() + exceptionFromJson(jsonNode) + } + + private def exceptionFromJson(json: JsonNode): Exception = { + val message = jsonOption(json.get("Message")).map(_.extractString).orNull + val e = new Exception(message) + e.setStackTrace(stackTraceFromJson(json.get("Stack Trace"))) + e + } + + private def stackTraceFromJson(json: JsonNode): Array[StackTraceElement] = { + jsonOption(json).map(_.extractElements.map { line => + val declaringClass = line.get("Declaring Class").extractString + val methodName = line.get("Method Name").extractString + val fileName = jsonOption(line.get("File Name")).map(_.extractString).orNull + val lineNumber = line.get("Line Number").extractInt + new StackTraceElement(declaringClass, methodName, fileName, lineNumber) + }.toArray).getOrElse(Array[StackTraceElement]()) + } + + private def jsonOption(json: JsonNode): Option[JsonNode] = { + if (json == null || json.isNull) { + None + } else { + Some(json) + } + } + + implicit private class JsonNodeImplicits(json: JsonNode) { + def extractElements: Iterator[JsonNode] = { + require(json.isContainerNode, s"Expected container, got ${json.getNodeType}") + json.elements.asScala + } + + def extractInt: Int = { + require(json.isNumber, s"Expected number, got ${json.getNodeType}") + json.intValue + } + + def extractString: String = { + require(json.isTextual || json.isNull, s"Expected string or NULL, got ${json.getNodeType}") + json.textValue + } + } +}