[KYUUBI #6290] Add custom exception serialization for SparkOperationEvent
# 🔍 Description
## Issue References 🔗
This pull request fixes #6290
## Describe Your Solution 🔧
Serializing the `SparkException` object may cause NPE, so I referred to SparkListenerEvent serialization methods to implement `ExceptionDeserializer` and `ExceptionSerializer`.
error detail:
```
(was java.lang.NullPointerException) (through reference chain: org.apache.kyuubi.engine.spark.events.SparkOperationEvent["exception"]->org.apache.spark.SparkException["internalError"])
com.fasterxml.jackson.databind.JsonMappingException: (was java.lang.NullPointerException) (through reference chain: org.apache.kyuubi.engine.spark.events.SparkOperationEvent["exception"]->org.apache.spark.SparkException["internalError"])
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:402)
at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:361)
at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:323)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:780)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:178)
at com.fasterxml.jackson.databind.ser.std.ReferenceTypeSerializer.serialize(ReferenceTypeSerializer.java:386)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:772)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:655)
at com.fasterxml.jackson.databind.ser.impl.TypeWrappedSerializer.serialize(TypeWrappedSerializer.java:32)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:479)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:318)
at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:3303)
at org.apache.spark.util.JsonProtocol$.writeSparkEventToJson(JsonProtocol.scala:110)
at org.apache.spark.util.JsonProtocol$.$anonfun$sparkEventToJsonString$1(JsonProtocol.scala:63)
at org.apache.spark.util.JsonProtocol$.$anonfun$sparkEventToJsonString$1$adapted(JsonProtocol.scala:62)
at org.apache.spark.util.JsonUtils.toJsonString(JsonUtils.scala:36)
at org.apache.spark.util.JsonUtils.toJsonString$(JsonUtils.scala:33)
at org.apache.spark.util.JsonProtocol$.toJsonString(JsonProtocol.scala:54)
at org.apache.spark.util.JsonProtocol$.sparkEventToJsonString(JsonProtocol.scala:62)
at org.apache.spark.kyuubi.KyuubiSparkEventSuite.$anonfun$new$1(KyuubiSparkEventSuite.scala:44)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
at org.apache.kyuubi.KyuubiFunSuite.withFixture(KyuubiFunSuite.scala:65)
at org.apache.kyuubi.KyuubiFunSuite.withFixture$(KyuubiFunSuite.scala:59)
at org.apache.spark.kyuubi.KyuubiSparkEventSuite.withFixture(KyuubiSparkEventSuite.scala:25)
at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
at org.apache.spark.kyuubi.KyuubiSparkEventSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(KyuubiSparkEventSuite.scala:25)
at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
at org.apache.spark.kyuubi.KyuubiSparkEventSuite.runTest(KyuubiSparkEventSuite.scala:25)
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
at org.scalatest.Suite.run(Suite.scala:1114)
at org.scalatest.Suite.run$(Suite.scala:1096)
at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
at org.apache.spark.kyuubi.KyuubiSparkEventSuite.org$scalatest$BeforeAndAfterAll$$super$run(KyuubiSparkEventSuite.scala:25)
at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
at org.apache.spark.kyuubi.KyuubiSparkEventSuite.run(KyuubiSparkEventSuite.scala:25)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
Caused by: java.lang.NullPointerException
at org.apache.spark.SparkThrowableHelper$.isInternalError(SparkThrowableHelper.scala:64)
at org.apache.spark.SparkThrowableHelper.isInternalError(SparkThrowableHelper.scala)
at org.apache.spark.SparkThrowable.isInternalError(SparkThrowable.java:50)
at org.apache.spark.SparkException.isInternalError(SparkException.scala:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:688)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:772)
... 69 more
```
## Types of changes 🔖
- [X] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)
## Test Plan 🧪
#### Behavior Without This Pull Request ⚰️
#### Behavior With This Pull Request 🎉
#### Related Unit Tests
new unit test
---
# Checklist 📝
- [X] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6289 from wForget/hotfix2.
Closes #6290
cf701f97e [wforget] fix test
41df6ce9b [wforget] Add Exception Serialization for SparkOperationEvent
Authored-by: wforget <643348094@qq.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit cefc27f035)
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
8b4a649d34
commit
58843eda2e
@ -18,13 +18,14 @@
|
||||
package org.apache.kyuubi.engine.spark.events
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore
|
||||
import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize}
|
||||
import org.apache.spark.scheduler.SparkListenerEvent
|
||||
import org.apache.spark.util.kvstore.KVIndex
|
||||
|
||||
import org.apache.kyuubi.Utils
|
||||
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.KVIndexParam
|
||||
import org.apache.kyuubi.engine.spark.operation.SparkOperation
|
||||
import org.apache.kyuubi.events.KyuubiEvent
|
||||
import org.apache.kyuubi.events.{ExceptionDeserializer, ExceptionSerializer, KyuubiEvent}
|
||||
|
||||
/**
|
||||
* A [[SparkOperationEvent]] used to tracker the lifecycle of an operation at Spark SQL Engine side.
|
||||
@ -60,6 +61,8 @@ case class SparkOperationEvent(
|
||||
createTime: Long,
|
||||
startTime: Long,
|
||||
completeTime: Long,
|
||||
@JsonSerialize(contentUsing = classOf[ExceptionSerializer])
|
||||
@JsonDeserialize(contentUsing = classOf[ExceptionDeserializer])
|
||||
exception: Option[Throwable],
|
||||
sessionId: String,
|
||||
sessionUser: String,
|
||||
|
||||
@ -0,0 +1,56 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.spark.kyuubi
|
||||
|
||||
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
|
||||
import com.fasterxml.jackson.module.scala.DefaultScalaModule
|
||||
import org.apache.spark.SparkException
|
||||
|
||||
import org.apache.kyuubi.KyuubiFunSuite
|
||||
import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
|
||||
|
||||
class KyuubiSparkEventSuite extends KyuubiFunSuite {
|
||||
|
||||
test("test exception serializer and deserializer of SparkOperationEvent") {
|
||||
val exception = new SparkException("message", new Exception("cause"))
|
||||
val event = new SparkOperationEvent(
|
||||
"statementId",
|
||||
"statement",
|
||||
shouldRunAsync = true,
|
||||
"state",
|
||||
0L,
|
||||
0L,
|
||||
0L,
|
||||
0L,
|
||||
Some(exception),
|
||||
"sessionId",
|
||||
"sessionUser",
|
||||
None,
|
||||
None,
|
||||
None)
|
||||
val mapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
||||
val json = mapper.writeValueAsString(event)
|
||||
assert(json.contains("\"exception\":{\"Message\":\"message\",\"Stack Trace\":" +
|
||||
"[{\"Declaring Class\":\"org.apache.spark.kyuubi.KyuubiSparkEventSuite\","))
|
||||
val deserializeEvent = mapper.readValue(json, classOf[SparkOperationEvent])
|
||||
assert(deserializeEvent.exception.isDefined)
|
||||
assert(deserializeEvent.exception.get.getMessage === "message")
|
||||
assert(deserializeEvent.exception.get.getStackTrace.length > 0)
|
||||
}
|
||||
|
||||
}
|
||||
@ -17,7 +17,10 @@
|
||||
|
||||
package org.apache.kyuubi.events
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import com.fasterxml.jackson.core.{JsonGenerator, JsonParser}
|
||||
import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonNode, JsonSerializer, ObjectMapper, SerializerProvider}
|
||||
import com.fasterxml.jackson.module.scala.DefaultScalaModule
|
||||
|
||||
object JsonProtocol {
|
||||
@ -30,3 +33,85 @@ object JsonProtocol {
|
||||
mapper.readValue(jsonValue, cls)
|
||||
}
|
||||
}
|
||||
|
||||
// Exception serializer and deserializer, copy from org.apache.spark.util.JsonProtocol
|
||||
class ExceptionSerializer extends JsonSerializer[Exception] {
|
||||
|
||||
override def serialize(
|
||||
value: Exception,
|
||||
gen: JsonGenerator,
|
||||
serializers: SerializerProvider): Unit = {
|
||||
exceptionToJson(value, gen)
|
||||
}
|
||||
|
||||
private def exceptionToJson(exception: Exception, g: JsonGenerator): Unit = {
|
||||
g.writeStartObject()
|
||||
g.writeStringField("Message", exception.getMessage)
|
||||
g.writeFieldName("Stack Trace")
|
||||
stackTraceToJson(exception.getStackTrace, g)
|
||||
g.writeEndObject()
|
||||
}
|
||||
|
||||
private def stackTraceToJson(stackTrace: Array[StackTraceElement], g: JsonGenerator): Unit = {
|
||||
g.writeStartArray()
|
||||
stackTrace.foreach { line =>
|
||||
g.writeStartObject()
|
||||
g.writeStringField("Declaring Class", line.getClassName)
|
||||
g.writeStringField("Method Name", line.getMethodName)
|
||||
g.writeStringField("File Name", line.getFileName)
|
||||
g.writeNumberField("Line Number", line.getLineNumber)
|
||||
g.writeEndObject()
|
||||
}
|
||||
g.writeEndArray()
|
||||
}
|
||||
}
|
||||
|
||||
class ExceptionDeserializer extends JsonDeserializer[Exception] {
|
||||
|
||||
override def deserialize(jsonParser: JsonParser, ctxt: DeserializationContext): Exception = {
|
||||
val jsonNode = jsonParser.readValueAsTree[JsonNode]()
|
||||
exceptionFromJson(jsonNode)
|
||||
}
|
||||
|
||||
private def exceptionFromJson(json: JsonNode): Exception = {
|
||||
val message = jsonOption(json.get("Message")).map(_.extractString).orNull
|
||||
val e = new Exception(message)
|
||||
e.setStackTrace(stackTraceFromJson(json.get("Stack Trace")))
|
||||
e
|
||||
}
|
||||
|
||||
private def stackTraceFromJson(json: JsonNode): Array[StackTraceElement] = {
|
||||
jsonOption(json).map(_.extractElements.map { line =>
|
||||
val declaringClass = line.get("Declaring Class").extractString
|
||||
val methodName = line.get("Method Name").extractString
|
||||
val fileName = jsonOption(line.get("File Name")).map(_.extractString).orNull
|
||||
val lineNumber = line.get("Line Number").extractInt
|
||||
new StackTraceElement(declaringClass, methodName, fileName, lineNumber)
|
||||
}.toArray).getOrElse(Array[StackTraceElement]())
|
||||
}
|
||||
|
||||
private def jsonOption(json: JsonNode): Option[JsonNode] = {
|
||||
if (json == null || json.isNull) {
|
||||
None
|
||||
} else {
|
||||
Some(json)
|
||||
}
|
||||
}
|
||||
|
||||
implicit private class JsonNodeImplicits(json: JsonNode) {
|
||||
def extractElements: Iterator[JsonNode] = {
|
||||
require(json.isContainerNode, s"Expected container, got ${json.getNodeType}")
|
||||
json.elements.asScala
|
||||
}
|
||||
|
||||
def extractInt: Int = {
|
||||
require(json.isNumber, s"Expected number, got ${json.getNodeType}")
|
||||
json.intValue
|
||||
}
|
||||
|
||||
def extractString: String = {
|
||||
require(json.isTextual || json.isNull, s"Expected string or NULL, got ${json.getNodeType}")
|
||||
json.textValue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user