diff --git a/jOOQ/src/main/java/org/jooq/impl/AbstractRowCountQuery.java b/jOOQ/src/main/java/org/jooq/impl/AbstractRowCountQuery.java index 023e92e73b..ebc34f505c 100644 --- a/jOOQ/src/main/java/org/jooq/impl/AbstractRowCountQuery.java +++ b/jOOQ/src/main/java/org/jooq/impl/AbstractRowCountQuery.java @@ -37,12 +37,13 @@ */ package org.jooq.impl; +import static org.jooq.impl.R2DBC.rowCountSubscriber; + import org.jooq.Configuration; import org.jooq.Record; import org.jooq.RowCountQuery; import org.jooq.impl.R2DBC.BlockingRowCountSubscription; import org.jooq.impl.R2DBC.QuerySubscription; -import org.jooq.impl.R2DBC.RowCountSubscriber; import org.reactivestreams.Subscriber; @@ -62,7 +63,7 @@ abstract class AbstractRowCountQuery extends AbstractQuery implements Ro ConnectionFactory cf = configuration().connectionFactory(); if (!(cf instanceof NoConnectionFactory)) - subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, (t, u, s) -> new RowCountSubscriber(u, s))); + subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, (t, u, s) -> rowCountSubscriber(u, s))); else subscriber.onSubscribe(new BlockingRowCountSubscription(this, subscriber)); } diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java index f7917272e3..a250824bd1 100644 --- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java +++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java @@ -252,7 +252,7 @@ final class R2DBC { return r; } - static final class Forwarding implements DownstreamSubscriber { + private static final class Forwarding implements DownstreamSubscriber { final int forwarderIndex; final AbstractResultSubscriber resultSubscriber; @@ -350,7 +350,18 @@ final class R2DBC { } } - static final class RowCountSubscriber extends AbstractResultSubscriber { + static final Subscriber rowCountSubscriber( + AbstractNonBlockingSubscription downstream, + R2DBCPreparedStatement statement + ) { + return subscriber( + new RowCountSubscriber(downstream, statement), + downstream.configuration.subscriberProvider(), + downstream.subscriber + ); + } + + private static final class RowCountSubscriber extends AbstractResultSubscriber { RowCountSubscriber( AbstractNonBlockingSubscription downstream, R2DBCPreparedStatement statement @@ -361,7 +372,7 @@ final class R2DBC { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void onNext(Result r) { - Forwarding s = downstream.forwardingSubscriber((AbstractResultSubscriber) this); + Subscriber s = downstream.forwardingSubscriber((AbstractResultSubscriber) this); // [#13565] r2dbc-spi's Result::getRowsUpdated now returns Long, not // Integer. To stay backwards compatible with 0.x drivers, @@ -383,7 +394,19 @@ final class R2DBC { } } - static final class ResultSubscriber> extends AbstractResultSubscriber { + static final > Subscriber resultSubscriber( + Q query, + AbstractNonBlockingSubscription downstream, + R2DBCPreparedStatement statement + ) { + return subscriber( + new ResultSubscriber(query, downstream, statement), + downstream.configuration.subscriberProvider(), + downstream.subscriber + ); + } + + private static final class ResultSubscriber> extends AbstractResultSubscriber { final Q query; @@ -758,11 +781,16 @@ final class R2DBC { abstract ConnectionSubscriber delegate(); - final Forwarding forwardingSubscriber(AbstractResultSubscriber resultSubscriber) { + final Subscriber forwardingSubscriber(AbstractResultSubscriber resultSubscriber) { int i = nextForwarderIndex.getAndIncrement(); Forwarding f = new Forwarding<>(i, resultSubscriber); forwarders.put(i, f); - return f; + + return subscriber( + f, + resultSubscriber.downstream.configuration.subscriberProvider(), + resultSubscriber.downstream.subscriber + ); } } @@ -1789,6 +1817,29 @@ final class R2DBC { } } + /** + * [#17920] create a potentially context aware {@link Subscriber} using our + * {@link SubscriberProvider} SPI. + */ + static final Subscriber subscriber( + Subscriber delegate, + SubscriberProvider provider, + Subscriber previous + ) { + return subscriber( + delegate::onSubscribe, + delegate::onNext, + delegate::onError, + delegate::onComplete, + provider, + previous + ); + } + + /** + * [#17920] create a potentially context aware {@link Subscriber} using our + * {@link SubscriberProvider} SPI. + */ static final Subscriber subscriber( Consumer subscription, Consumer onNext, diff --git a/jOOQ/src/main/java/org/jooq/impl/ResultQueryTrait.java b/jOOQ/src/main/java/org/jooq/impl/ResultQueryTrait.java index 982b7e3dda..a369e53f27 100644 --- a/jOOQ/src/main/java/org/jooq/impl/ResultQueryTrait.java +++ b/jOOQ/src/main/java/org/jooq/impl/ResultQueryTrait.java @@ -114,7 +114,6 @@ import org.jooq.Table; import org.jooq.exception.DataAccessException; import org.jooq.impl.R2DBC.BlockingRecordSubscription; import org.jooq.impl.R2DBC.QuerySubscription; -import org.jooq.impl.R2DBC.ResultSubscriber; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -367,7 +366,7 @@ extends ConnectionFactory cf = configuration().connectionFactory(); if (!(cf instanceof NoConnectionFactory)) - subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, ResultSubscriber::new)); + subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, R2DBC::resultSubscriber)); else subscriber.onSubscribe(new BlockingRecordSubscription<>(this, subscriber)); }