diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java index 8bf3338879..bb7106b610 100644 --- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java +++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java @@ -196,7 +196,7 @@ final class R2DBC { } @Override - public final void cancel() { + public void cancel() { complete(() -> {}); } @@ -392,14 +392,18 @@ final class R2DBC { final AbstractNonBlockingSubscription downstream; final AtomicReference connection; + final AtomicReference subscription; ConnectionSubscriber(AbstractNonBlockingSubscription downstream) { this.downstream = downstream; this.connection = new AtomicReference<>(); + this.subscription = new AtomicReference<>(); } @Override public final void onSubscribe(Subscription s) { + // Stores the Subscription that handles the connection establishment. + subscription.set(s); s.request(1); } @@ -418,6 +422,13 @@ final class R2DBC { @Override public final void onComplete() {} + + public final void cancel() { + subscription.updateAndGet(s -> { + if (s != null) { s.cancel(); } + return null; + }); + } } static final class QueryExecutionSubscriber extends ConnectionSubscriber { @@ -652,6 +663,12 @@ final class R2DBC { s.request(1); } + @Override + public final void cancel() { + // Safely cancels the connection acquisition if not yet established. + complete(() -> delegate().cancel()); + } + @Override final void cancel0(boolean closeAfterTransaction, Runnable onComplete) {