diff --git a/client-spark/shuffle-manager-2/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala b/client-spark/shuffle-manager-2/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala index bc434b14d..d62f61a69 100644 --- a/client-spark/shuffle-manager-2/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala +++ b/client-spark/shuffle-manager-2/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala @@ -59,7 +59,7 @@ class RssShuffleReader[K, C]( readMetrics.incFetchWaitTime(time) } - val recordIter = (startPartition until endPartition).map(partitionId => { + val recordIter = (startPartition until endPartition).iterator.map(partitionId => { if (handle.numMaps > 0) { val start = System.currentTimeMillis() val inputStream = essShuffleClient.readPartition( @@ -77,7 +77,7 @@ class RssShuffleReader[K, C]( } else { RssInputStream.empty() } - }).toIterator.flatMap( + }).flatMap( serializerInstance.deserializeStream(_).asKeyValueIterator) val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( diff --git a/client-spark/shuffle-manager-3/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala b/client-spark/shuffle-manager-3/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala index b8dd7b379..78b230c2d 100644 --- a/client-spark/shuffle-manager-3/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala +++ b/client-spark/shuffle-manager-3/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala @@ -60,7 +60,7 @@ class RssShuffleReader[K, C]( metrics.incFetchWaitTime(time) } - val recordIter = (startPartition until endPartition).map(partitionId => { + val recordIter = (startPartition until endPartition).iterator.map(partitionId => { if (handle.numMappers > 0) { val start = System.currentTimeMillis() val inputStream = rssShuffleClient.readPartition( @@ -79,7 +79,7 @@ class RssShuffleReader[K, C]( } else { RssInputStream.empty() } - }).toIterator.flatMap( + }).flatMap( serializerInstance.deserializeStream(_).asKeyValueIterator) val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](