From cba7dc6f7145ce4b6dbcb5f3cec3ea8caab1ae24 Mon Sep 17 00:00:00 2001 From: mcdull_zhang Date: Thu, 22 Sep 2022 17:42:25 +0800 Subject: [PATCH] nit (#654) --- .../scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala | 4 ++-- .../scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/client-spark/shuffle-manager-2/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala b/client-spark/shuffle-manager-2/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala index bc434b14d..d62f61a69 100644 --- a/client-spark/shuffle-manager-2/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala +++ b/client-spark/shuffle-manager-2/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala @@ -59,7 +59,7 @@ class RssShuffleReader[K, C]( readMetrics.incFetchWaitTime(time) } - val recordIter = (startPartition until endPartition).map(partitionId => { + val recordIter = (startPartition until endPartition).iterator.map(partitionId => { if (handle.numMaps > 0) { val start = System.currentTimeMillis() val inputStream = essShuffleClient.readPartition( @@ -77,7 +77,7 @@ class RssShuffleReader[K, C]( } else { RssInputStream.empty() } - }).toIterator.flatMap( + }).flatMap( serializerInstance.deserializeStream(_).asKeyValueIterator) val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( diff --git a/client-spark/shuffle-manager-3/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala b/client-spark/shuffle-manager-3/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala index b8dd7b379..78b230c2d 100644 --- a/client-spark/shuffle-manager-3/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala +++ b/client-spark/shuffle-manager-3/src/main/scala/org/apache/spark/shuffle/rss/RssShuffleReader.scala @@ -60,7 +60,7 @@ class RssShuffleReader[K, C]( metrics.incFetchWaitTime(time) } - val recordIter = (startPartition until endPartition).map(partitionId => { + val recordIter = (startPartition until endPartition).iterator.map(partitionId => { if (handle.numMappers > 0) { val start = System.currentTimeMillis() val inputStream = rssShuffleClient.readPartition( @@ -79,7 +79,7 @@ class RssShuffleReader[K, C]( } else { RssInputStream.empty() } - }).toIterator.flatMap( + }).flatMap( serializerInstance.deserializeStream(_).asKeyValueIterator) val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](