[CELEBORN-812] Cleanup SendBufferPool if idle for long
### What changes were proposed in this pull request? Cleans up the pooled send buffers and push tasks if the SendBufferPool has been idle for more than `celeborn.client.push.sendbufferpool.expireTimeout`. ### Why are the changes needed? Before this PR the SendBufferPool will cache the send buffers and push tasks forever. If they are large and will not be reused in the future, it wastes memory and causes GC. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Passes GA and manual tests. Closes #1735 from waitinfuture/812-1. Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
This commit is contained in:
parent
14c6e5719f
commit
6a5e3ed794
@ -20,17 +20,20 @@ package org.apache.spark.shuffle.celeborn;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.celeborn.client.write.PushTask;
|
||||
import org.apache.celeborn.common.util.ThreadUtils;
|
||||
|
||||
public class SendBufferPool {
|
||||
private static volatile SendBufferPool _instance;
|
||||
|
||||
public static SendBufferPool get(int capacity) {
|
||||
public static SendBufferPool get(int capacity, long checkInterval, long timeout) {
|
||||
if (_instance == null) {
|
||||
synchronized (SendBufferPool.class) {
|
||||
if (_instance == null) {
|
||||
_instance = new SendBufferPool(capacity);
|
||||
_instance = new SendBufferPool(capacity, checkInterval, timeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -41,16 +44,35 @@ public class SendBufferPool {
|
||||
|
||||
// numPartitions -> buffers
|
||||
private final LinkedList<byte[][]> buffers;
|
||||
private long lastAquireTime;
|
||||
private final LinkedList<LinkedBlockingQueue<PushTask>> pushTaskQueues;
|
||||
|
||||
private SendBufferPool(int capacity) {
|
||||
private ScheduledExecutorService cleaner =
|
||||
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-sendBufferPool-cleaner");
|
||||
|
||||
private SendBufferPool(int capacity, long checkInterval, long timeout) {
|
||||
assert capacity > 0;
|
||||
this.capacity = capacity;
|
||||
buffers = new LinkedList<>();
|
||||
pushTaskQueues = new LinkedList<>();
|
||||
|
||||
lastAquireTime = System.currentTimeMillis();
|
||||
cleaner.scheduleAtFixedRate(
|
||||
() -> {
|
||||
if (System.currentTimeMillis() - lastAquireTime > timeout) {
|
||||
synchronized (this) {
|
||||
buffers.clear();
|
||||
pushTaskQueues.clear();
|
||||
}
|
||||
}
|
||||
},
|
||||
checkInterval,
|
||||
checkInterval,
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public synchronized byte[][] acquireBuffer(int numPartitions) {
|
||||
lastAquireTime = System.currentTimeMillis();
|
||||
Iterator<byte[][]> iterator = buffers.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
byte[][] candidate = iterator.next();
|
||||
@ -66,6 +88,7 @@ public class SendBufferPool {
|
||||
}
|
||||
|
||||
public synchronized LinkedBlockingQueue<PushTask> acquirePushTaskQueue() {
|
||||
lastAquireTime = System.currentTimeMillis();
|
||||
if (!pushTaskQueues.isEmpty()) {
|
||||
return pushTaskQueues.removeFirst();
|
||||
}
|
||||
|
||||
@ -94,7 +94,7 @@ public class SortBasedPusherSuiteJ {
|
||||
/*pushSortMemoryThreshold=*/ Utils.byteStringAsBytes("1m"),
|
||||
/*sharedPushLock=*/ null,
|
||||
/*executorService=*/ null,
|
||||
SendBufferPool.get(4));
|
||||
SendBufferPool.get(4, 30, 60));
|
||||
|
||||
// default page size == 2 MiB
|
||||
assertEquals(unifiedMemoryManager.pageSizeBytes(), Utils.byteStringAsBytes("2m"));
|
||||
|
||||
@ -63,6 +63,9 @@ public class SparkShuffleManager implements ShuffleManager {
|
||||
private final ExecutorService[] asyncPushers;
|
||||
private AtomicInteger pusherIdx = new AtomicInteger(0);
|
||||
|
||||
private long sendBufferPoolCheckInterval;
|
||||
private long sendBufferPoolExpireTimeout;
|
||||
|
||||
public SparkShuffleManager(SparkConf conf, boolean isDriver) {
|
||||
this.conf = conf;
|
||||
this.isDriver = isDriver;
|
||||
@ -78,6 +81,8 @@ public class SparkShuffleManager implements ShuffleManager {
|
||||
} else {
|
||||
asyncPushers = null;
|
||||
}
|
||||
this.sendBufferPoolCheckInterval = celebornConf.clientPushSendBufferPoolExpireCheckInterval();
|
||||
this.sendBufferPoolExpireTimeout = celebornConf.clientPushSendBufferPoolExpireTimeout();
|
||||
}
|
||||
|
||||
private SortShuffleManager sortShuffleManager() {
|
||||
@ -187,10 +192,15 @@ public class SparkShuffleManager implements ShuffleManager {
|
||||
celebornConf,
|
||||
client,
|
||||
pushThread,
|
||||
SendBufferPool.get(cores));
|
||||
SendBufferPool.get(cores, sendBufferPoolCheckInterval, sendBufferPoolExpireTimeout));
|
||||
} else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
|
||||
return new HashBasedShuffleWriter<>(
|
||||
h, mapId, context, celebornConf, client, SendBufferPool.get(cores));
|
||||
h,
|
||||
mapId,
|
||||
context,
|
||||
celebornConf,
|
||||
client,
|
||||
SendBufferPool.get(cores, sendBufferPoolCheckInterval, sendBufferPoolExpireTimeout));
|
||||
} else {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unrecognized shuffle write mode!" + celebornConf.shuffleWriterMode());
|
||||
|
||||
@ -35,6 +35,6 @@ public class HashBasedShuffleWriterSuiteJ extends CelebornShuffleWriterSuiteBase
|
||||
throws IOException {
|
||||
// this test case is independent of the `mapId` value
|
||||
return new HashBasedShuffleWriter<Integer, String, String>(
|
||||
handle, /*mapId=*/ 0, context, conf, client, SendBufferPool.get(1));
|
||||
handle, /*mapId=*/ 0, context, conf, client, SendBufferPool.get(1, 30, 60));
|
||||
}
|
||||
}
|
||||
|
||||
@ -34,6 +34,12 @@ public class SortBasedShuffleWriterSuiteJ extends CelebornShuffleWriterSuiteBase
|
||||
CelebornShuffleHandle handle, TaskContext context, CelebornConf conf, ShuffleClient client)
|
||||
throws IOException {
|
||||
return new SortBasedShuffleWriter<Integer, String, String>(
|
||||
handle.dependency(), numPartitions, context, conf, client, null, SendBufferPool.get(4));
|
||||
handle.dependency(),
|
||||
numPartitions,
|
||||
context,
|
||||
conf,
|
||||
client,
|
||||
null,
|
||||
SendBufferPool.get(4, 30, 60));
|
||||
}
|
||||
}
|
||||
|
||||
@ -64,6 +64,9 @@ public class SparkShuffleManager implements ShuffleManager {
|
||||
private final ExecutorService[] asyncPushers;
|
||||
private AtomicInteger pusherIdx = new AtomicInteger(0);
|
||||
|
||||
private long sendBufferPoolCheckInterval;
|
||||
private long sendBufferPoolExpireTimeout;
|
||||
|
||||
public SparkShuffleManager(SparkConf conf, boolean isDriver) {
|
||||
if (conf.getBoolean(LOCAL_SHUFFLE_READER_KEY, true)) {
|
||||
logger.warn(
|
||||
@ -85,6 +88,8 @@ public class SparkShuffleManager implements ShuffleManager {
|
||||
} else {
|
||||
asyncPushers = null;
|
||||
}
|
||||
this.sendBufferPoolCheckInterval = celebornConf.clientPushSendBufferPoolExpireCheckInterval();
|
||||
this.sendBufferPoolExpireTimeout = celebornConf.clientPushSendBufferPoolExpireTimeout();
|
||||
}
|
||||
|
||||
private SortShuffleManager sortShuffleManager() {
|
||||
@ -198,10 +203,15 @@ public class SparkShuffleManager implements ShuffleManager {
|
||||
shuffleClient,
|
||||
metrics,
|
||||
pushThread,
|
||||
SendBufferPool.get(cores));
|
||||
SendBufferPool.get(cores, sendBufferPoolCheckInterval, sendBufferPoolExpireTimeout));
|
||||
} else if (ShuffleMode.HASH.equals(celebornConf.shuffleWriterMode())) {
|
||||
return new HashBasedShuffleWriter<>(
|
||||
h, context, celebornConf, shuffleClient, metrics, SendBufferPool.get(cores));
|
||||
h,
|
||||
context,
|
||||
celebornConf,
|
||||
shuffleClient,
|
||||
metrics,
|
||||
SendBufferPool.get(cores, sendBufferPoolCheckInterval, sendBufferPoolExpireTimeout));
|
||||
} else {
|
||||
throw new UnsupportedOperationException(
|
||||
"Unrecognized shuffle write mode!" + celebornConf.shuffleWriterMode());
|
||||
|
||||
@ -37,6 +37,6 @@ public class HashBasedShuffleWriterSuiteJ extends CelebornShuffleWriterSuiteBase
|
||||
ShuffleWriteMetricsReporter metrics)
|
||||
throws IOException {
|
||||
return new HashBasedShuffleWriter<Integer, String, String>(
|
||||
handle, context, conf, client, metrics, SendBufferPool.get(1));
|
||||
handle, context, conf, client, metrics, SendBufferPool.get(1, 30, 60));
|
||||
}
|
||||
}
|
||||
|
||||
@ -36,6 +36,6 @@ public class SortBasedShuffleWriterSuiteJ extends CelebornShuffleWriterSuiteBase
|
||||
ShuffleWriteMetricsReporter metrics)
|
||||
throws IOException {
|
||||
return new SortBasedShuffleWriter<Integer, String, String>(
|
||||
handle, context, conf, client, metrics, null, SendBufferPool.get(4));
|
||||
handle, context, conf, client, metrics, null, SendBufferPool.get(4, 30, 60));
|
||||
}
|
||||
}
|
||||
|
||||
@ -783,6 +783,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
|
||||
def clientPushSplitPartitionThreads: Int = get(CLIENT_PUSH_SPLIT_PARTITION_THREADS)
|
||||
def clientPushTakeTaskWaitIntervalMs: Long = get(CLIENT_PUSH_TAKE_TASK_WAIT_INTERVAL)
|
||||
def clientPushTakeTaskMaxWaitAttempts: Int = get(CLIENT_PUSH_TAKE_TASK_MAX_WAIT_ATTEMPTS)
|
||||
def clientPushSendBufferPoolExpireTimeout: Long = get(CLIENT_PUSH_SENDBUFFERPOOL_EXPIRETIMEOUT)
|
||||
def clientPushSendBufferPoolExpireCheckInterval: Long =
|
||||
get(CLIENT_PUSH_SENDBUFFERPOOL_CHECKEXPIREINTERVAL)
|
||||
|
||||
// //////////////////////////////////////////////////////
|
||||
// Client Shuffle //
|
||||
@ -2883,6 +2886,24 @@ object CelebornConf extends Logging {
|
||||
.intConf
|
||||
.createWithDefault(1)
|
||||
|
||||
val CLIENT_PUSH_SENDBUFFERPOOL_EXPIRETIMEOUT: ConfigEntry[Long] =
|
||||
buildConf("celeborn.client.push.sendBufferPool.expireTimeout")
|
||||
.categories("client")
|
||||
.doc("Timeout before clean up SendBufferPool. If SendBufferPool is idle for more than this time, " +
|
||||
"the send buffers and push tasks will be cleaned up.")
|
||||
.version("0.3.1")
|
||||
.timeConf(TimeUnit.MILLISECONDS)
|
||||
.createWithDefaultString("60s")
|
||||
|
||||
val CLIENT_PUSH_SENDBUFFERPOOL_CHECKEXPIREINTERVAL: ConfigEntry[Long] =
|
||||
buildConf("celeborn.client.push.sendBufferPool.checkExpireInterval")
|
||||
.categories("client")
|
||||
.doc("Interval to check expire for send buffer pool. If the pool has been idle " +
|
||||
s"for more than `${CLIENT_PUSH_SENDBUFFERPOOL_EXPIRETIMEOUT.key}`, the pooled send buffers and push tasks will be cleaned up.")
|
||||
.version("0.3.1")
|
||||
.timeConf(TimeUnit.MILLISECONDS)
|
||||
.createWithDefaultString("30s")
|
||||
|
||||
val TEST_CLIENT_RETRY_REVIVE: ConfigEntry[Boolean] =
|
||||
buildConf("celeborn.test.client.retryRevive")
|
||||
.withAlternative("celeborn.test.retryRevive")
|
||||
|
||||
@ -51,6 +51,8 @@ license: |
|
||||
| celeborn.client.push.revive.batchSize | 2048 | Max number of partitions in one Revive request. | 0.3.0 |
|
||||
| celeborn.client.push.revive.interval | 100ms | Interval for client to trigger Revive to LifecycleManager. The number of partitions in one Revive request is `celeborn.client.push.revive.batchSize`. | 0.3.0 |
|
||||
| celeborn.client.push.revive.maxRetries | 5 | Max retry times for reviving when celeborn push data failed. | 0.3.0 |
|
||||
| celeborn.client.push.sendBufferPool.checkExpireInterval | 30s | Interval to check expire for send buffer pool. If the pool has been idle for more than `celeborn.client.push.sendBufferPool.expireTimeout`, the pooled send buffers and push tasks will be cleaned up. | 0.3.1 |
|
||||
| celeborn.client.push.sendBufferPool.expireTimeout | 60s | Timeout before clean up SendBufferPool. If SendBufferPool is idle for more than this time, the send buffers and push tasks will be cleaned up. | 0.3.1 |
|
||||
| celeborn.client.push.slowStart.initialSleepTime | 500ms | The initial sleep time if the current max in flight requests is 0 | 0.3.0 |
|
||||
| celeborn.client.push.slowStart.maxSleepTime | 2s | If celeborn.client.push.limit.strategy is set to SLOWSTART, push side will take a sleep strategy for each batch of requests, this controls the max sleep time if the max in flight requests limit is 1 for a long time | 0.3.0 |
|
||||
| celeborn.client.push.sort.randomizePartitionId.enabled | false | Whether to randomize partitionId in push sorter. If true, partitionId will be randomized when sort data to avoid skew when push to worker | 0.3.0 |
|
||||
|
||||
Loading…
Reference in New Issue
Block a user