[CELEBORN-912][FOLLOWUP] Support columnar shuffle for Spark 3.5

### What changes were proposed in this pull request?

Introduce `spark-3.5-columnar-shuffle` module to support columnar shuffle for Spark 3.5.

Follow up #2710, #2609.

### Why are the changes needed?

Tests of `CelebornColumnarShuffleReaderSuite` are failed for the changes of #2609.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`CelebornColumnarShuffleReaderSuite`

Closes #2726 from SteNicholas/CELEBORN-912.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: xiyu.zk <xiyu.zk@alibaba-inc.com>
This commit is contained in:
SteNicholas 2024-09-05 16:52:47 +08:00 committed by xiyu.zk
parent cd916040da
commit 60dbb3e9f8

View File

@ -17,7 +17,7 @@
package org.apache.spark.shuffle.celeborn
import org.apache.spark.{ShuffleDependency, SparkConf}
import org.apache.spark.{ShuffleDependency, SparkConf, TaskContext}
import org.apache.spark.serializer.{KryoSerializer, KryoSerializerInstance}
import org.apache.spark.sql.execution.UnsafeRowSerializer
import org.apache.spark.sql.execution.columnar.CelebornColumnarBatchSerializerInstance
@ -45,6 +45,9 @@ class CelebornColumnarShuffleReaderSuite {
var shuffleClient: MockedStatic[ShuffleClient] = null
try {
val taskContext = Mockito.mock(classOf[TaskContext])
Mockito.when(taskContext.stageAttemptNumber).thenReturn(0)
Mockito.when(taskContext.attemptNumber).thenReturn(0)
shuffleClient = Mockito.mockStatic(classOf[ShuffleClient])
val shuffleReader = SparkUtils.createColumnarShuffleReader(
handle,
@ -52,7 +55,7 @@ class CelebornColumnarShuffleReaderSuite {
10,
0,
10,
null,
taskContext,
new CelebornConf(),
null,
new ExecutorShuffleIdTracker())
@ -68,6 +71,9 @@ class CelebornColumnarShuffleReaderSuite {
def columnarShuffleReaderNewSerializerInstance(): Unit = {
var shuffleClient: MockedStatic[ShuffleClient] = null
try {
val taskContext = Mockito.mock(classOf[TaskContext])
Mockito.when(taskContext.stageAttemptNumber).thenReturn(0)
Mockito.when(taskContext.attemptNumber).thenReturn(0)
shuffleClient = Mockito.mockStatic(classOf[ShuffleClient])
val shuffleReader = SparkUtils.createColumnarShuffleReader(
new CelebornShuffleHandle[Int, String, String](
@ -83,7 +89,7 @@ class CelebornColumnarShuffleReaderSuite {
10,
0,
10,
null,
taskContext,
new CelebornConf(),
null,
new ExecutorShuffleIdTracker())