[CELEBORN-1898] SparkOutOfMemoryError compatible with Spark 4.0 and 4.1

### What changes were proposed in this pull request?

SPARK-49946 (4.0.0) removes single String constructor of class `SparkOutOfMemoryError` and introduces `_LEGACY_ERROR_TEMP_3301` error condition, SPARK-51386 (4.1.0) renames the error condition to `POINTER_ARRAY_OUT_OF_MEMORY`.

### Why are the changes needed?

To be compatible with Spark 4.0 and 4.1

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

GHA checks Spark 2.4 to Spark 3.5, I manually tested with Spark 4.0.0 RC2

Closes #3141 from pan3793/CELEBORN-1898.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
This commit is contained in:
Cheng Pan 2025-03-10 15:19:50 +08:00 committed by mingji
parent 18b268d085
commit df809159d1
3 changed files with 70 additions and 2 deletions

View File

@ -18,7 +18,6 @@
package org.apache.spark.shuffle.celeborn;
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.util.collection.unsafe.sort.RadixSort;
@ -87,7 +86,7 @@ public class ShuffleInMemorySorter {
public void expandPointerArray(LongArray newArray) {
if (array != null) {
if (newArray.size() < array.size()) {
throw new SparkOutOfMemoryError("Not enough memory to grow pointer array");
SparkCommonUtils.throwSparkOutOfMemoryError();
}
Platform.copyMemory(
array.getBaseObject(),

View File

@ -17,8 +17,15 @@
package org.apache.spark.shuffle.celeborn;
import java.util.Collections;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.apache.celeborn.reflect.DynConstructors;
import org.apache.celeborn.reflect.DynMethods;
public class SparkCommonUtils {
public static void validateAttemptConfig(SparkConf conf) throws IllegalArgumentException {
@ -48,4 +55,37 @@ public class SparkCommonUtils {
public static int getEncodedAttemptNumber(TaskContext context) {
return (context.stageAttemptNumber() << 16) | context.attemptNumber();
}
public static void throwSparkOutOfMemoryError() {
try { // for Spark 3.5 and earlier
throw DynConstructors.builder()
.impl(SparkOutOfMemoryError.class, String.class)
.<SparkOutOfMemoryError>build()
.newInstance("Not enough memory to grow pointer array");
} catch (RuntimeException e) {
// SPARK-44838 (4.0.0)
DynMethods.StaticMethod isValidErrorClassMethod =
DynMethods.builder("isValidErrorClass")
.impl("org.apache.spark.SparkThrowableHelper", String.class)
.buildStatic();
// SPARK-49946 (4.0.0) removes single String constructor and introduces
// _LEGACY_ERROR_TEMP_3301 error condition, SPARK-51386 (4.1.0) renames
// the error condition to POINTER_ARRAY_OUT_OF_MEMORY.
if (isValidErrorClassMethod.invoke("POINTER_ARRAY_OUT_OF_MEMORY")) { // for Spark 4.1 onwards
throw DynConstructors.builder()
.impl(SparkOutOfMemoryError.class, String.class, Map.class)
.<SparkOutOfMemoryError>build()
.newInstance("POINTER_ARRAY_OUT_OF_MEMORY", Collections.EMPTY_MAP);
} else if (isValidErrorClassMethod.invoke("_LEGACY_ERROR_TEMP_3301")) { // for Spark 4.0
throw DynConstructors.builder()
.impl(SparkOutOfMemoryError.class, String.class, Map.class)
.<SparkOutOfMemoryError>build()
.newInstance("_LEGACY_ERROR_TEMP_3301", Collections.EMPTY_MAP);
} else {
throw new OutOfMemoryError(
"Unable to construct a SparkOutOfMemoryError, please report this bug to the "
+ "corresponding communities or vendors, and provide the full stack trace.");
}
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle.celeborn;
import org.apache.spark.memory.SparkOutOfMemoryError;
import org.junit.Test;
public class SparkCommonUtilsSuiteJ {
@Test(expected = SparkOutOfMemoryError.class)
public void testThrowSparkOutOfMemoryError() {
SparkCommonUtils.throwSparkOutOfMemoryError();
}
}