[jOOQ/jOOQ#14489] Syntax errors lead to unclosed R2DBC connection in SQL
Server
This commit is contained in:
parent
6c68000a3d
commit
19a13fe9d3
@ -188,7 +188,7 @@ final class R2DBC {
|
||||
|
||||
@Override
|
||||
public final void cancel() {
|
||||
complete(true);
|
||||
complete(() -> {});
|
||||
}
|
||||
|
||||
final boolean moreRequested() {
|
||||
@ -197,16 +197,16 @@ final class R2DBC {
|
||||
return !completed.get() && requested.getAndUpdate(l -> l == Long.MAX_VALUE ? l : Math.max(0, l - 1)) > 0;
|
||||
}
|
||||
|
||||
final void complete(boolean cancelled) {
|
||||
final void complete(Runnable onComplete) {
|
||||
|
||||
// required_spec306_afterSubscriptionIsCancelledRequestMustBeNops
|
||||
// required_spec307_afterSubscriptionIsCancelledAdditionalCancelationsMustBeNops
|
||||
if (!completed.getAndSet(true))
|
||||
cancel0(cancelled);
|
||||
cancel0(onComplete);
|
||||
}
|
||||
|
||||
abstract void request0();
|
||||
void cancel0(boolean cancelled) {}
|
||||
void cancel0(Runnable onComplete) {}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
@ -241,21 +241,20 @@ final class R2DBC {
|
||||
|
||||
@Override
|
||||
public final void onError(Throwable t) {
|
||||
resultSubscriber.downstream.subscriber.onError(translate(resultSubscriber.downstream.sql(), t));
|
||||
complete(true);
|
||||
complete(true, () -> resultSubscriber.downstream.subscriber.onError(translate(resultSubscriber.downstream.sql(), t)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void onComplete() {
|
||||
complete(false);
|
||||
complete(false, () -> resultSubscriber.downstream.subscriber.onComplete());
|
||||
}
|
||||
|
||||
private final void complete(boolean cancelled) {
|
||||
private final void complete(boolean cancelled, Runnable onComplete) {
|
||||
resultSubscriber.downstream.forwarders.remove(forwarderIndex);
|
||||
|
||||
// [#13343] [#13669] Prevent premature completion
|
||||
if (resultSubscriber.downstream.forwarders.isEmpty() && resultSubscriber.completionRequested.get())
|
||||
resultSubscriber.complete(cancelled);
|
||||
if (resultSubscriber.downstream.forwarders.isEmpty() && (cancelled || resultSubscriber.completionRequested.get()))
|
||||
resultSubscriber.complete(cancelled, onComplete);
|
||||
}
|
||||
}
|
||||
|
||||
@ -278,23 +277,22 @@ final class R2DBC {
|
||||
|
||||
@Override
|
||||
public final void onError(Throwable t) {
|
||||
downstream.subscriber.onError(translate(downstream.sql(), t));
|
||||
complete(true);
|
||||
complete(true, () -> downstream.subscriber.onError(translate(downstream.sql(), t)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void onComplete() {
|
||||
complete(false);
|
||||
complete(false, () -> downstream.subscriber.onComplete());
|
||||
}
|
||||
|
||||
final void complete(boolean cancelled) {
|
||||
final void complete(boolean cancelled, Runnable onComplete) {
|
||||
completionRequested.set(true);
|
||||
|
||||
// [#13343] [#13669] Delay completion of the downstream in case this
|
||||
// completion happens before each forwarder's
|
||||
// completion.
|
||||
if (downstream.forwarders.isEmpty() && !completed.getAndSet(true))
|
||||
downstream.complete(cancelled);
|
||||
if ((cancelled || downstream.forwarders.isEmpty()) && !completed.getAndSet(true))
|
||||
downstream.complete(onComplete);
|
||||
}
|
||||
}
|
||||
|
||||
@ -620,17 +618,12 @@ final class R2DBC {
|
||||
}
|
||||
|
||||
@Override
|
||||
final void cancel0(boolean cancelled) {
|
||||
|
||||
// [#12977] Correctly sequence the delegation to run after close completion
|
||||
cancel0(cancelled ? () -> {} : () -> subscriber.onComplete());
|
||||
}
|
||||
|
||||
final void cancel0(Runnable onComplete) {
|
||||
|
||||
// [#12108] Must pass along cancellation to forwarding subscriptions
|
||||
forAllForwardingSubscriptions(Subscription::cancel);
|
||||
|
||||
// [#12977] Correctly sequence the delegation to run after close completion
|
||||
delegate().connection.updateAndGet(c -> {
|
||||
|
||||
// close() calls on already closed resources have no effect, so
|
||||
@ -743,7 +736,7 @@ final class R2DBC {
|
||||
s2 -> s2.request(1),
|
||||
v -> {},
|
||||
t -> cancel0(() -> subscriber.onError(t)),
|
||||
() -> cancel0(false)
|
||||
() -> cancel0(() -> subscriber.onComplete())
|
||||
))
|
||||
))
|
||||
));
|
||||
@ -1488,8 +1481,9 @@ final class R2DBC {
|
||||
}
|
||||
|
||||
@Override
|
||||
final void cancel0(boolean cancelled) {
|
||||
final void cancel0(Runnable onComplete) {
|
||||
safeClose(c);
|
||||
onComplete.run();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
4
pom.xml
4
pom.xml
@ -156,12 +156,12 @@
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-core</artifactId>
|
||||
<version>3.4.19</version>
|
||||
<version>3.5.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-test</artifactId>
|
||||
<version>3.4.19</version>
|
||||
<version>3.5.2</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Data binding -->
|
||||
|
||||
Loading…
Reference in New Issue
Block a user