diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java index aa5907b013..0208714596 100644 --- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java +++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java @@ -188,7 +188,7 @@ final class R2DBC { @Override public final void cancel() { - complete(true); + complete(() -> {}); } final boolean moreRequested() { @@ -197,16 +197,16 @@ final class R2DBC { return !completed.get() && requested.getAndUpdate(l -> l == Long.MAX_VALUE ? l : Math.max(0, l - 1)) > 0; } - final void complete(boolean cancelled) { + final void complete(Runnable onComplete) { // required_spec306_afterSubscriptionIsCancelledRequestMustBeNops // required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops if (!completed.getAndSet(true)) - cancel0(cancelled); + cancel0(onComplete); } abstract void request0(); - void cancel0(boolean cancelled) {} + void cancel0(Runnable onComplete) {} } // ------------------------------------------------------------------------- @@ -241,21 +241,20 @@ final class R2DBC { @Override public final void onError(Throwable t) { - resultSubscriber.downstream.subscriber.onError(translate(resultSubscriber.downstream.sql(), t)); - complete(true); + complete(true, () -> resultSubscriber.downstream.subscriber.onError(translate(resultSubscriber.downstream.sql(), t))); } @Override public final void onComplete() { - complete(false); + complete(false, () -> resultSubscriber.downstream.subscriber.onComplete()); } - private final void complete(boolean cancelled) { + private final void complete(boolean cancelled, Runnable onComplete) { resultSubscriber.downstream.forwarders.remove(forwarderIndex); // [#13343] [#13669] Prevent premature completion - if (resultSubscriber.downstream.forwarders.isEmpty() && resultSubscriber.completionRequested.get()) - resultSubscriber.complete(cancelled); + if (resultSubscriber.downstream.forwarders.isEmpty() && (cancelled || resultSubscriber.completionRequested.get())) + resultSubscriber.complete(cancelled, onComplete); } } @@ -278,23 +277,22 @@ final class R2DBC { @Override public final void onError(Throwable t) { - downstream.subscriber.onError(translate(downstream.sql(), t)); - complete(true); + complete(true, () -> downstream.subscriber.onError(translate(downstream.sql(), t))); } @Override public final void onComplete() { - complete(false); + complete(false, () -> downstream.subscriber.onComplete()); } - final void complete(boolean cancelled) { + final void complete(boolean cancelled, Runnable onComplete) { completionRequested.set(true); // [#13343] [#13669] Delay completion of the downstream in case this // completion happens before each forwarder's // completion. - if (downstream.forwarders.isEmpty() && !completed.getAndSet(true)) - downstream.complete(cancelled); + if ((cancelled || downstream.forwarders.isEmpty()) && !completed.getAndSet(true)) + downstream.complete(onComplete); } } @@ -620,17 +618,12 @@ final class R2DBC { } @Override - final void cancel0(boolean cancelled) { - - // [#12977] Correctly sequence the delegation to run after close completion - cancel0(cancelled ? () -> {} : () -> subscriber.onComplete()); - } - final void cancel0(Runnable onComplete) { // [#12108] Must pass along cancellation to forwarding subscriptions forAllForwardingSubscriptions(Subscription::cancel); + // [#12977] Correctly sequence the delegation to run after close completion delegate().connection.updateAndGet(c -> { // close() calls on already closed resources have no effect, so @@ -743,7 +736,7 @@ final class R2DBC { s2 -> s2.request(1), v -> {}, t -> cancel0(() -> subscriber.onError(t)), - () -> cancel0(false) + () -> cancel0(() -> subscriber.onComplete()) )) )) )); @@ -1488,8 +1481,9 @@ final class R2DBC { } @Override - final void cancel0(boolean cancelled) { + final void cancel0(Runnable onComplete) { safeClose(c); + onComplete.run(); } } diff --git a/pom.xml b/pom.xml index 37c129bac8..b9a7033c9d 100644 --- a/pom.xml +++ b/pom.xml @@ -156,12 +156,12 @@ io.projectreactor reactor-core - 3.4.19 + 3.5.2 io.projectreactor reactor-test - 3.4.19 + 3.5.2