From 1a6eef70fab2714e7f271a1a637d699df6969ce0 Mon Sep 17 00:00:00 2001 From: polarian Date: Mon, 19 Aug 2024 15:33:46 +0900 Subject: [PATCH] Prevent connection leaks on subscription cancellation in R2DBC --- jOOQ/src/main/java/org/jooq/impl/R2DBC.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java index 687216a92f..3284e45870 100644 --- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java +++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java @@ -192,7 +192,7 @@ final class R2DBC { } @Override - public final void cancel() { + public void cancel() { complete(() -> {}); } @@ -376,14 +376,18 @@ final class R2DBC { final AbstractNonBlockingSubscription downstream; final AtomicReference connection; + final AtomicReference subscription; ConnectionSubscriber(AbstractNonBlockingSubscription downstream) { this.downstream = downstream; this.connection = new AtomicReference<>(); + this.subscription = new AtomicReference<>(); } @Override public final void onSubscribe(Subscription s) { + // Stores the Subscription that handles the connection establishment. + subscription.set(s); s.request(1); } @@ -402,6 +406,13 @@ final class R2DBC { @Override public final void onComplete() {} + + public final void cancel() { + subscription.updateAndGet(s -> { + if (s != null) { s.cancel(); } + return null; + }); + } } static final class QueryExecutionSubscriber extends ConnectionSubscriber { @@ -634,6 +645,12 @@ final class R2DBC { s.request(1); } + @Override + public final void cancel() { + // Safely cancels the connection acquisition if not yet established. + complete(() -> delegate().cancel()); + } + @Override final void cancel0(boolean closeAfterTransaction, Runnable onComplete) {