[jOOQ/jOOQ#17920] Wrap also internal subscribers

This commit is contained in:
Lukas Eder 2025-01-30 19:00:51 +01:00
parent 195dc10149
commit 203a32cdb4
3 changed files with 61 additions and 10 deletions

View File

@ -37,12 +37,13 @@
*/
package org.jooq.impl;
import static org.jooq.impl.R2DBC.rowCountSubscriber;
import org.jooq.Configuration;
import org.jooq.Record;
import org.jooq.RowCountQuery;
import org.jooq.impl.R2DBC.BlockingRowCountSubscription;
import org.jooq.impl.R2DBC.QuerySubscription;
import org.jooq.impl.R2DBC.RowCountSubscriber;
import org.reactivestreams.Subscriber;
@ -62,7 +63,7 @@ abstract class AbstractRowCountQuery extends AbstractQuery<Record> implements Ro
ConnectionFactory cf = configuration().connectionFactory();
if (!(cf instanceof NoConnectionFactory))
subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, (t, u, s) -> new RowCountSubscriber(u, s)));
subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, (t, u, s) -> rowCountSubscriber(u, s)));
else
subscriber.onSubscribe(new BlockingRowCountSubscription(this, subscriber));
}

View File

@ -252,7 +252,7 @@ final class R2DBC {
return r;
}
static final class Forwarding<T> implements DownstreamSubscriber<T> {
private static final class Forwarding<T> implements DownstreamSubscriber<T> {
final int forwarderIndex;
final AbstractResultSubscriber<T> resultSubscriber;
@ -350,7 +350,18 @@ final class R2DBC {
}
}
static final class RowCountSubscriber extends AbstractResultSubscriber<Integer> {
static final Subscriber<Result> rowCountSubscriber(
AbstractNonBlockingSubscription<? super Integer> downstream,
R2DBCPreparedStatement statement
) {
return subscriber(
new RowCountSubscriber(downstream, statement),
downstream.configuration.subscriberProvider(),
downstream.subscriber
);
}
private static final class RowCountSubscriber extends AbstractResultSubscriber<Integer> {
RowCountSubscriber(
AbstractNonBlockingSubscription<? super Integer> downstream,
R2DBCPreparedStatement statement
@ -361,7 +372,7 @@ final class R2DBC {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void onNext(Result r) {
Forwarding s = downstream.forwardingSubscriber((AbstractResultSubscriber) this);
Subscriber s = downstream.forwardingSubscriber((AbstractResultSubscriber) this);
// [#13565] r2dbc-spi's Result::getRowsUpdated now returns Long, not
// Integer. To stay backwards compatible with 0.x drivers,
@ -383,7 +394,19 @@ final class R2DBC {
}
}
static final class ResultSubscriber<R extends Record, Q extends ResultQueryTrait<R>> extends AbstractResultSubscriber<R> {
static final <R extends Record, Q extends ResultQueryTrait<R>> Subscriber<Result> resultSubscriber(
Q query,
AbstractNonBlockingSubscription<? super R> downstream,
R2DBCPreparedStatement statement
) {
return subscriber(
new ResultSubscriber<R, Q>(query, downstream, statement),
downstream.configuration.subscriberProvider(),
downstream.subscriber
);
}
private static final class ResultSubscriber<R extends Record, Q extends ResultQueryTrait<R>> extends AbstractResultSubscriber<R> {
final Q query;
@ -758,11 +781,16 @@ final class R2DBC {
abstract ConnectionSubscriber<T> delegate();
final Forwarding<T> forwardingSubscriber(AbstractResultSubscriber<T> resultSubscriber) {
final Subscriber<T> forwardingSubscriber(AbstractResultSubscriber<T> resultSubscriber) {
int i = nextForwarderIndex.getAndIncrement();
Forwarding<T> f = new Forwarding<>(i, resultSubscriber);
forwarders.put(i, f);
return f;
return subscriber(
f,
resultSubscriber.downstream.configuration.subscriberProvider(),
resultSubscriber.downstream.subscriber
);
}
}
@ -1789,6 +1817,29 @@ final class R2DBC {
}
}
/**
* [#17920] create a potentially context aware {@link Subscriber} using our
* {@link SubscriberProvider} SPI.
*/
static final <T, C> Subscriber<T> subscriber(
Subscriber<T> delegate,
SubscriberProvider<C> provider,
Subscriber<?> previous
) {
return subscriber(
delegate::onSubscribe,
delegate::onNext,
delegate::onError,
delegate::onComplete,
provider,
previous
);
}
/**
* [#17920] create a potentially context aware {@link Subscriber} using our
* {@link SubscriberProvider} SPI.
*/
static final <T, C> Subscriber<T> subscriber(
Consumer<Subscription> subscription,
Consumer<T> onNext,

View File

@ -114,7 +114,6 @@ import org.jooq.Table;
import org.jooq.exception.DataAccessException;
import org.jooq.impl.R2DBC.BlockingRecordSubscription;
import org.jooq.impl.R2DBC.QuerySubscription;
import org.jooq.impl.R2DBC.ResultSubscriber;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -367,7 +366,7 @@ extends
ConnectionFactory cf = configuration().connectionFactory();
if (!(cf instanceof NoConnectionFactory))
subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, ResultSubscriber::new));
subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, R2DBC::resultSubscriber));
else
subscriber.onSubscribe(new BlockingRecordSubscription<>(this, subscriber));
}