From df809159d1f8d3070ced5e37e7696c2a348ac113 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 10 Mar 2025 15:19:50 +0800 Subject: [PATCH] [CELEBORN-1898] SparkOutOfMemoryError compatible with Spark 4.0 and 4.1 ### What changes were proposed in this pull request? SPARK-49946 (4.0.0) removes single String constructor of class `SparkOutOfMemoryError` and introduces `_LEGACY_ERROR_TEMP_3301` error condition, SPARK-51386 (4.1.0) renames the error condition to `POINTER_ARRAY_OUT_OF_MEMORY`. ### Why are the changes needed? To be compatible with Spark 4.0 and 4.1 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GHA checks Spark 2.4 to Spark 3.5, I manually tested with Spark 4.0.0 RC2 Closes #3141 from pan3793/CELEBORN-1898. Authored-by: Cheng Pan Signed-off-by: mingji --- .../celeborn/ShuffleInMemorySorter.java | 3 +- .../shuffle/celeborn/SparkCommonUtils.java | 40 +++++++++++++++++++ .../celeborn/SparkCommonUtilsSuiteJ.java | 29 ++++++++++++++ 3 files changed, 70 insertions(+), 2 deletions(-) create mode 100644 client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SparkCommonUtilsSuiteJ.java diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java index 57d569eba..c3af2ebd6 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/ShuffleInMemorySorter.java @@ -18,7 +18,6 @@ package org.apache.spark.shuffle.celeborn; import org.apache.spark.memory.MemoryConsumer; -import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.util.collection.unsafe.sort.RadixSort; @@ -87,7 +86,7 @@ public class ShuffleInMemorySorter { public void expandPointerArray(LongArray newArray) { if (array != null) { if (newArray.size() < array.size()) { - throw new SparkOutOfMemoryError("Not enough memory to grow pointer array"); + SparkCommonUtils.throwSparkOutOfMemoryError(); } Platform.copyMemory( array.getBaseObject(), diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java index 9b959a4b8..a24e06d5a 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SparkCommonUtils.java @@ -17,8 +17,15 @@ package org.apache.spark.shuffle.celeborn; +import java.util.Collections; +import java.util.Map; + import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; +import org.apache.spark.memory.SparkOutOfMemoryError; + +import org.apache.celeborn.reflect.DynConstructors; +import org.apache.celeborn.reflect.DynMethods; public class SparkCommonUtils { public static void validateAttemptConfig(SparkConf conf) throws IllegalArgumentException { @@ -48,4 +55,37 @@ public class SparkCommonUtils { public static int getEncodedAttemptNumber(TaskContext context) { return (context.stageAttemptNumber() << 16) | context.attemptNumber(); } + + public static void throwSparkOutOfMemoryError() { + try { // for Spark 3.5 and earlier + throw DynConstructors.builder() + .impl(SparkOutOfMemoryError.class, String.class) + .build() + .newInstance("Not enough memory to grow pointer array"); + } catch (RuntimeException e) { + // SPARK-44838 (4.0.0) + DynMethods.StaticMethod isValidErrorClassMethod = + DynMethods.builder("isValidErrorClass") + .impl("org.apache.spark.SparkThrowableHelper", String.class) + .buildStatic(); + // SPARK-49946 (4.0.0) removes single String constructor and introduces + // _LEGACY_ERROR_TEMP_3301 error condition, SPARK-51386 (4.1.0) renames + // the error condition to POINTER_ARRAY_OUT_OF_MEMORY. + if (isValidErrorClassMethod.invoke("POINTER_ARRAY_OUT_OF_MEMORY")) { // for Spark 4.1 onwards + throw DynConstructors.builder() + .impl(SparkOutOfMemoryError.class, String.class, Map.class) + .build() + .newInstance("POINTER_ARRAY_OUT_OF_MEMORY", Collections.EMPTY_MAP); + } else if (isValidErrorClassMethod.invoke("_LEGACY_ERROR_TEMP_3301")) { // for Spark 4.0 + throw DynConstructors.builder() + .impl(SparkOutOfMemoryError.class, String.class, Map.class) + .build() + .newInstance("_LEGACY_ERROR_TEMP_3301", Collections.EMPTY_MAP); + } else { + throw new OutOfMemoryError( + "Unable to construct a SparkOutOfMemoryError, please report this bug to the " + + "corresponding communities or vendors, and provide the full stack trace."); + } + } + } } diff --git a/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SparkCommonUtilsSuiteJ.java b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SparkCommonUtilsSuiteJ.java new file mode 100644 index 000000000..02ed7fa17 --- /dev/null +++ b/client-spark/common/src/test/java/org/apache/spark/shuffle/celeborn/SparkCommonUtilsSuiteJ.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.celeborn; + +import org.apache.spark.memory.SparkOutOfMemoryError; +import org.junit.Test; + +public class SparkCommonUtilsSuiteJ { + + @Test(expected = SparkOutOfMemoryError.class) + public void testThrowSparkOutOfMemoryError() { + SparkCommonUtils.throwSparkOutOfMemoryError(); + } +}