[jOOQ/jOOQ#12104] R2DBC nested subscription loops forever when nesting or sequencing INSERT .. RETURNING run on r2dbc-pool
This commit is contained in:
parent
9c1f8ff693
commit
193f570572
@ -220,14 +220,14 @@ final class R2DBC {
|
||||
@Override
|
||||
public final void onSubscribe(Subscription s) {
|
||||
subscription.set(s);
|
||||
resultSubscriber.downstream.request1(s);
|
||||
resultSubscriber.downstream.request2(s);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void onNext(T value) {
|
||||
if (!resultSubscriber.downstream.completed.get()) {
|
||||
resultSubscriber.downstream.subscriber.onNext(value);
|
||||
resultSubscriber.downstream.request1(subscription.get());
|
||||
resultSubscriber.downstream.request2(subscription.get());
|
||||
}
|
||||
}
|
||||
|
||||
@ -530,20 +530,35 @@ final class R2DBC {
|
||||
final void request0() {
|
||||
|
||||
// Lazy execution of the query
|
||||
if (!subscribed.getAndSet(true))
|
||||
connection.subscribe(delegate());
|
||||
if (!subscribed.getAndSet(true)) {
|
||||
ConnectionSubscriber<T> delegate = delegate();
|
||||
|
||||
connection.subscribe(subscriber(
|
||||
delegate::onSubscribe,
|
||||
c -> {
|
||||
delegate.onNext(c);
|
||||
request1();
|
||||
},
|
||||
delegate::onError,
|
||||
delegate::onComplete
|
||||
));
|
||||
}
|
||||
else
|
||||
request1();
|
||||
}
|
||||
|
||||
private void request1() {
|
||||
// Forwarders all forward to the same downstream and are not
|
||||
// expected to be contained in the map at the same time.
|
||||
for (Forwarding<T> f : forwarders.values()) {
|
||||
Subscription s = f.subscription.get();
|
||||
|
||||
if (s != null)
|
||||
request1(s);
|
||||
request2(s);
|
||||
}
|
||||
}
|
||||
|
||||
final void request1(Subscription s) {
|
||||
final void request2(Subscription s) {
|
||||
if (moreRequested())
|
||||
s.request(1);
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user