Prevent connection leaks on subscription cancellation in R2DBC

This commit is contained in:
polarian 2024-08-19 15:33:46 +09:00
parent 8b60baf07d
commit 1a6eef70fa

View File

@ -192,7 +192,7 @@ final class R2DBC {
}
@Override
public final void cancel() {
public void cancel() {
complete(() -> {});
}
@ -376,14 +376,18 @@ final class R2DBC {
final AbstractNonBlockingSubscription<T> downstream;
final AtomicReference<Connection> connection;
final AtomicReference<Subscription> subscription;
ConnectionSubscriber(AbstractNonBlockingSubscription<T> downstream) {
this.downstream = downstream;
this.connection = new AtomicReference<>();
this.subscription = new AtomicReference<>();
}
@Override
public final void onSubscribe(Subscription s) {
// Stores the Subscription that handles the connection establishment.
subscription.set(s);
s.request(1);
}
@ -402,6 +406,13 @@ final class R2DBC {
@Override
public final void onComplete() {}
public final void cancel() {
subscription.updateAndGet(s -> {
if (s != null) { s.cancel(); }
return null;
});
}
}
static final class QueryExecutionSubscriber<T, Q extends Query> extends ConnectionSubscriber<T> {
@ -634,6 +645,12 @@ final class R2DBC {
s.request(1);
}
@Override
public final void cancel() {
// Safely cancels the connection acquisition if not yet established.
complete(() -> delegate().cancel());
}
@Override
final void cancel0(boolean closeAfterTransaction, Runnable onComplete) {