Merge pull request #17095 from polarian/bugfix/cancelled-connection-leak
[jOOQ/jOOQ#17094] Prevent connection leaks on subscription cancellation in R2DBC
This commit is contained in:
commit
b32ac525fd
@ -196,7 +196,7 @@ final class R2DBC {
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void cancel() {
|
||||
public void cancel() {
|
||||
complete(() -> {});
|
||||
}
|
||||
|
||||
@ -392,14 +392,18 @@ final class R2DBC {
|
||||
|
||||
final AbstractNonBlockingSubscription<T> downstream;
|
||||
final AtomicReference<Connection> connection;
|
||||
final AtomicReference<Subscription> subscription;
|
||||
|
||||
ConnectionSubscriber(AbstractNonBlockingSubscription<T> downstream) {
|
||||
this.downstream = downstream;
|
||||
this.connection = new AtomicReference<>();
|
||||
this.subscription = new AtomicReference<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void onSubscribe(Subscription s) {
|
||||
// Stores the Subscription that handles the connection establishment.
|
||||
subscription.set(s);
|
||||
s.request(1);
|
||||
}
|
||||
|
||||
@ -418,6 +422,13 @@ final class R2DBC {
|
||||
|
||||
@Override
|
||||
public final void onComplete() {}
|
||||
|
||||
public final void cancel() {
|
||||
subscription.updateAndGet(s -> {
|
||||
if (s != null) { s.cancel(); }
|
||||
return null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
static final class QueryExecutionSubscriber<T, Q extends Query> extends ConnectionSubscriber<T> {
|
||||
@ -652,6 +663,12 @@ final class R2DBC {
|
||||
s.request(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void cancel() {
|
||||
// Safely cancels the connection acquisition if not yet established.
|
||||
complete(() -> delegate().cancel());
|
||||
}
|
||||
|
||||
@Override
|
||||
final void cancel0(boolean closeAfterTransaction, Runnable onComplete) {
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user