From a455e01fa21d165a14c2f842be034f06ab1e7c92 Mon Sep 17 00:00:00 2001 From: Lukas Eder Date: Thu, 23 Jan 2025 12:22:44 +0100 Subject: [PATCH] [jOOQ/jOOQ#17920] Add a new SubscriberProvider SPI that allows for overriding the way jOOQ internals creates reactive streams Subscribers, to allow for making them context aware --- .../src/main/java/org/jooq/Configuration.java | 32 ++++ .../java/org/jooq/SubscriberProvider.java | 93 ++++++++++++ jOOQ/src/main/java/org/jooq/impl/DSL.java | 12 +- .../org/jooq/impl/DefaultConfiguration.java | 93 ++++++++++++ .../jooq/impl/DefaultConnectionFactory.java | 14 +- .../jooq/impl/DefaultSubscriberProvider.java | 95 ++++++++++++ .../src/main/java/org/jooq/impl/Internal.java | 34 ----- jOOQ/src/main/java/org/jooq/impl/R2DBC.java | 138 ++++++++++++++---- jOOQ/src/main/java/org/jooq/impl/UDTImpl.java | 7 +- .../java/org/jooq/impl/UDTPathFieldImpl.java | 13 +- .../org/jooq/impl/UDTPathTableFieldImpl.java | 4 +- .../jooq/tools/jdbc/MockConfiguration.java | 19 +++ 12 files changed, 463 insertions(+), 91 deletions(-) create mode 100644 jOOQ/src/main/java/org/jooq/SubscriberProvider.java create mode 100644 jOOQ/src/main/java/org/jooq/impl/DefaultSubscriberProvider.java diff --git a/jOOQ/src/main/java/org/jooq/Configuration.java b/jOOQ/src/main/java/org/jooq/Configuration.java index 55c592df76..f15adec575 100644 --- a/jOOQ/src/main/java/org/jooq/Configuration.java +++ b/jOOQ/src/main/java/org/jooq/Configuration.java @@ -561,6 +561,12 @@ public interface Configuration extends Serializable { @NotNull FormattingProvider formattingProvider(); + /** + * Get the configured SubscriberProvider from this configuration. + */ + @NotNull + SubscriberProvider subscriberProvider(); + @@ -1273,6 +1279,20 @@ public interface Configuration extends Serializable { @NotNull Configuration set(FormattingProvider newFormattingProvider); + /** + * Change this configuration to hold new subscriber provider. + *

+ * This method is not thread-safe and should not be used in globally + * available Configuration objects. + * + * @param newSubscriberProvider The new subscriber provider to be contained in + * the changed configuration. + * @return The changed configuration. + */ + @Pro + @NotNull + Configuration set(SubscriberProvider newSubscriberProvider); + @@ -1954,6 +1974,18 @@ public interface Configuration extends Serializable { @NotNull Configuration derive(FormattingProvider newFormattingProvider); + /** + * Create a derived configuration from this one, with a new subscriber + * provider. + * + * @param newSubscriberProvider The new subscriber provider to be contained in + * the derived configuration. + * @return The derived configuration. + */ + @Pro + @NotNull + Configuration derive(SubscriberProvider newSubscriberProvider); + diff --git a/jOOQ/src/main/java/org/jooq/SubscriberProvider.java b/jOOQ/src/main/java/org/jooq/SubscriberProvider.java new file mode 100644 index 0000000000..944a26bbaf --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/SubscriberProvider.java @@ -0,0 +1,93 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * Apache-2.0 license and offer limited warranties, support, maintenance, and + * commercial database integrations. + * + * For more information, please visit: https://www.jooq.org/legal/licensing + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq; + +import java.util.function.Consumer; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * An SPI to allow for overriding the default implementation of + * {@link Subscriber} creation in jOOQ. + *

+ * This SPI can be used to override jOOQ's internal creation of + * {@link Subscriber} instances, typically used to create reactor style + * context-aware {@link Subscriber} instances. Whatever the + * {@link #context(Subscriber)} is, it will be propagated to additional + * {@link #subscriber(Consumer, Consumer, Consumer, Runnable, Object)} calls. + * + * @author Lukas Eder + */ +public interface SubscriberProvider { + + /** + * Create an empty context. + */ + @Nullable + C context(); + + /** + * Extract a context from an existing {@link Subscriber}. + */ + @Nullable + C context(Subscriber subscriber); + + /** + * Create a new subscriber from its component implementations and a previous + * context. + * + * @param onSubscribe The implementation of + * {@link Subscriber#onSubscribe(Subscription)} + * @param onNext The implementation of {@link Subscriber#onNext(Object)} + * @param onError The implementation of + * {@link Subscriber#onError(Throwable)} + * @param onComplete The implementation of {@link Subscriber#onComplete()} + * @param context The {@link #context()} + */ + @NotNull + Subscriber subscriber( + Consumer onSubscribe, + Consumer onNext, + Consumer onError, + Runnable onComplete, + C context + ); +} diff --git a/jOOQ/src/main/java/org/jooq/impl/DSL.java b/jOOQ/src/main/java/org/jooq/impl/DSL.java index 4419866685..0f19c4ac14 100644 --- a/jOOQ/src/main/java/org/jooq/impl/DSL.java +++ b/jOOQ/src/main/java/org/jooq/impl/DSL.java @@ -577,7 +577,7 @@ public class DSL { public static CloseableDSLContext using(String url) { if (url.startsWith("r2dbc")) { io.r2dbc.spi.Connection connection = R2DBC.getConnection(url); - return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection)); + return new DefaultCloseableDSLContext(new DefaultConnectionFactory(null, connection, true, false), JDBCUtils.dialect(connection)); } else { try { @@ -619,7 +619,7 @@ public class DSL { public static CloseableDSLContext using(String url, String username, String password) { if (url.startsWith("r2dbc")) { io.r2dbc.spi.Connection connection = R2DBC.getConnection(url, username, password); - return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection)); + return new DefaultCloseableDSLContext(new DefaultConnectionFactory(null, connection, true, false), JDBCUtils.dialect(connection)); } else { try { @@ -678,7 +678,7 @@ public class DSL { public static CloseableDSLContext using(String url, Properties properties) { if (url.startsWith("r2dbc")) { io.r2dbc.spi.Connection connection = R2DBC.getConnection(url, properties); - return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection)); + return new DefaultCloseableDSLContext(new DefaultConnectionFactory(null, connection, true, false), JDBCUtils.dialect(connection)); } else { try { @@ -893,7 +893,7 @@ public class DSL { */ @NotNull public static DSLContext using(io.r2dbc.spi.Connection connection) { - return new DefaultDSLContext(new DefaultConnectionFactory(connection), JDBCUtils.dialect(connection)); + return new DefaultDSLContext(new DefaultConnectionFactory(null, connection), JDBCUtils.dialect(connection)); } /** @@ -905,7 +905,7 @@ public class DSL { */ @NotNull public static DSLContext using(io.r2dbc.spi.Connection connection, SQLDialect dialect) { - return new DefaultDSLContext(new DefaultConnectionFactory(connection), dialect); + return new DefaultDSLContext(new DefaultConnectionFactory(null, connection), dialect); } /** @@ -919,7 +919,7 @@ public class DSL { */ @NotNull public static DSLContext using(io.r2dbc.spi.Connection connection, SQLDialect dialect, Settings settings) { - return new DefaultDSLContext(new DefaultConnectionFactory(connection), dialect, settings); + return new DefaultDSLContext(new DefaultConnectionFactory(null, connection), dialect, settings); } /** diff --git a/jOOQ/src/main/java/org/jooq/impl/DefaultConfiguration.java b/jOOQ/src/main/java/org/jooq/impl/DefaultConfiguration.java index 6655d37b58..c575920eae 100644 --- a/jOOQ/src/main/java/org/jooq/impl/DefaultConfiguration.java +++ b/jOOQ/src/main/java/org/jooq/impl/DefaultConfiguration.java @@ -84,6 +84,7 @@ import org.jooq.RecordType; import org.jooq.RecordUnmapper; import org.jooq.RecordUnmapperProvider; import org.jooq.SQLDialect; +import org.jooq.SubscriberProvider; import org.jooq.TransactionListener; import org.jooq.TransactionListenerProvider; import org.jooq.TransactionProvider; @@ -141,6 +142,7 @@ public class DefaultConfiguration extends AbstractConfiguration { private transient CharsetProvider charsetProvider; private transient ConverterProvider converterProvider; private transient FormattingProvider formattingProvider; + private transient SubscriberProvider subscriberProvider; @@ -210,6 +212,7 @@ public class DefaultConfiguration extends AbstractConfiguration { null, null, null, + null, @@ -256,6 +259,7 @@ public class DefaultConfiguration extends AbstractConfiguration { configuration.charsetProvider, configuration.converterProvider, configuration.formattingProvider, + configuration.subscriberProvider, @@ -301,6 +305,7 @@ public class DefaultConfiguration extends AbstractConfiguration { CharsetProvider charsetProvider, ConverterProvider converterProvider, FormattingProvider formattingProvider, + SubscriberProvider subscriberProvider, @@ -335,6 +340,7 @@ public class DefaultConfiguration extends AbstractConfiguration { set(charsetProvider); set(converterProvider); set(formattingProvider); + set(subscriberProvider); @@ -404,6 +410,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -443,6 +450,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -482,6 +490,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -521,6 +530,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -565,6 +575,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -604,6 +615,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -643,6 +655,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -682,6 +695,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -721,6 +735,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -765,6 +780,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -809,6 +825,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -848,6 +865,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -887,6 +905,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -926,6 +945,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -965,6 +985,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -1004,6 +1025,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -1043,6 +1065,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -1087,6 +1110,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -1126,6 +1150,7 @@ public class DefaultConfiguration extends AbstractConfiguration { newCharsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -1165,6 +1190,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, newConverterProvider, formattingProvider, + subscriberProvider, @@ -1204,6 +1230,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, newFormattingProvider, + subscriberProvider, @@ -1217,6 +1244,50 @@ public class DefaultConfiguration extends AbstractConfiguration { ); } + @Override + public final Configuration derive(SubscriberProvider newSubscriberProvider) { + return new DefaultConfiguration( + connectionProvider, + interpreterConnectionProvider, + systemConnectionProvider, + connectionFactory, + metaProvider, + commitProvider, + executorProvider, + cacheProvider, + transactionProvider, + annotatedPojoMemberProvider, + constructorPropertiesProvider, + recordMapperProvider, + recordUnmapperProvider, + recordListenerProviders, + executeListenerProviders, + migrationListenerProviders, + visitListenerProviders, + transactionListenerProviders, + diagnosticsListenerProviders, + unwrapperProvider, + charsetProvider, + converterProvider, + formattingProvider, + newSubscriberProvider, + + + + + + + clock, + dialect, + settings, + data + ); + } + + + + + @@ -1399,6 +1470,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -1438,6 +1510,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -1477,6 +1550,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider, converterProvider, formattingProvider, + subscriberProvider, @@ -1529,6 +1603,10 @@ public class DefaultConfiguration extends AbstractConfiguration { @Override public final Configuration set(ConnectionFactory newConnectionFactory) { this.connectionFactory = newConnectionFactory; + + if (connectionFactory instanceof DefaultConnectionFactory dcf) + dcf.configuration = this; + return this; } @@ -1696,6 +1774,12 @@ public class DefaultConfiguration extends AbstractConfiguration { return this; } + @Override + public final Configuration set(SubscriberProvider newSubscriberProvider) { + this.subscriberProvider = newSubscriberProvider; + return this; + } + @@ -2213,6 +2297,13 @@ public class DefaultConfiguration extends AbstractConfiguration { : new DefaultFormattingProvider(); } + @Override + public final SubscriberProvider subscriberProvider() { + return subscriberProvider != null + ? subscriberProvider + : new DefaultSubscriberProvider<>(); + } + @@ -2333,6 +2424,7 @@ public class DefaultConfiguration extends AbstractConfiguration { oos.writeObject(serializableOrNull(charsetProvider)); oos.writeObject(serializableOrNull(converterProvider)); oos.writeObject(serializableOrNull(formattingProvider)); + oos.writeObject(serializableOrNull(subscriberProvider)); // [#7062] Exclude reflection cache from serialisation for (Entry entry : data.entrySet()) { @@ -2391,6 +2483,7 @@ public class DefaultConfiguration extends AbstractConfiguration { charsetProvider = (CharsetProvider) ois.readObject(); converterProvider = (ConverterProvider) ois.readObject(); formattingProvider = (FormattingProvider) ois.readObject(); + subscriberProvider = (SubscriberProvider) ois.readObject(); data = new ConcurrentHashMap<>(); Object key; diff --git a/jOOQ/src/main/java/org/jooq/impl/DefaultConnectionFactory.java b/jOOQ/src/main/java/org/jooq/impl/DefaultConnectionFactory.java index f577abd9e9..dd2a81781f 100644 --- a/jOOQ/src/main/java/org/jooq/impl/DefaultConnectionFactory.java +++ b/jOOQ/src/main/java/org/jooq/impl/DefaultConnectionFactory.java @@ -38,6 +38,7 @@ package org.jooq.impl; import static org.jooq.impl.R2DBC.AbstractSubscription.onRequest; +import static org.jooq.impl.Tools.CONFIG; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; @@ -45,6 +46,7 @@ import java.util.function.IntFunction; import java.util.function.IntSupplier; import java.util.function.Supplier; +import org.jooq.Configuration; import org.jooq.exception.DetachedException; import org.reactivestreams.Publisher; @@ -65,16 +67,18 @@ import io.r2dbc.spi.ValidationDepth; */ final class DefaultConnectionFactory implements ConnectionFactory { + Configuration configuration; Connection connection; final boolean finalize; final boolean nested; final AtomicInteger savepoints = new AtomicInteger(); - DefaultConnectionFactory(Connection connection) { - this(connection, false, true); + DefaultConnectionFactory(Configuration configuration, Connection connection) { + this(configuration, connection, false, true); } - DefaultConnectionFactory(Connection connection, boolean finalize, boolean nested) { + DefaultConnectionFactory(Configuration configuration, Connection connection, boolean finalize, boolean nested) { + this.configuration = configuration != null ? configuration : CONFIG.get(); this.connection = connection; this.finalize = finalize; this.nested = nested; @@ -89,7 +93,7 @@ final class DefaultConnectionFactory implements ConnectionFactory { @Override public final Publisher create() { - return s -> s.onSubscribe(onRequest(s, x -> { + return s -> s.onSubscribe(onRequest(configuration, s, x -> { x.onNext(new NonClosingConnection()); x.onComplete(); })); @@ -137,7 +141,7 @@ final class DefaultConnectionFactory implements ConnectionFactory { @Override public Publisher close() { - return s -> s.onSubscribe(onRequest(s, x -> x.onComplete())); + return s -> s.onSubscribe(onRequest(configuration, s, x -> x.onComplete())); } @Override diff --git a/jOOQ/src/main/java/org/jooq/impl/DefaultSubscriberProvider.java b/jOOQ/src/main/java/org/jooq/impl/DefaultSubscriberProvider.java new file mode 100644 index 0000000000..c14974c8d0 --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/impl/DefaultSubscriberProvider.java @@ -0,0 +1,95 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * Apache-2.0 license and offer limited warranties, support, maintenance, and + * commercial database integrations. + * + * For more information, please visit: https://www.jooq.org/legal/licensing + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq.impl; + +import java.util.function.Consumer; + +import org.jooq.SubscriberProvider; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * A default, context unaware implementation of the {@link SubscriberProvider} + * SPI. + * + * @author Lukas Eder + */ +public class DefaultSubscriberProvider implements SubscriberProvider { + + @Override + public final C context() { + return null; + } + + @Override + public final C context(Subscriber subscriber) { + return null; + } + + @Override + public final Subscriber subscriber( + Consumer onSubscribe, + Consumer onNext, + Consumer onError, + Runnable onComplete, + C context + ) { + return new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + onSubscribe.accept(s); + } + + @Override + public void onNext(T t) { + onNext.accept(t); + } + + @Override + public void onError(Throwable t) { + onError.accept(t); + } + + @Override + public void onComplete() { + onComplete.run(); + } + }; + } +} diff --git a/jOOQ/src/main/java/org/jooq/impl/Internal.java b/jOOQ/src/main/java/org/jooq/impl/Internal.java index dcfe65e843..7d8df469e1 100644 --- a/jOOQ/src/main/java/org/jooq/impl/Internal.java +++ b/jOOQ/src/main/java/org/jooq/impl/Internal.java @@ -883,40 +883,6 @@ public final class Internal { return new IDiv<>(lhs, (Field) nullSafe(rhs, lhs.getDataType())); } - /** - * Create a {@link Subscriber} from a set of lambdas. - *

- * This is used for internal purposes and thus subject for change. - */ - public static final Subscriber subscriber( - Consumer subscription, - Consumer onNext, - Consumer onError, - Runnable onComplete - ) { - return new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - subscription.accept(s); - } - - @Override - public void onNext(T t) { - onNext.accept(t); - } - - @Override - public void onError(Throwable t) { - onError.accept(t); - } - - @Override - public void onComplete() { - onComplete.run(); - } - }; - } - /** * JDK agnostic abstraction over {@link Class#arrayType()} and * {@link Array#newInstance(Class, int)}. diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java index 5faa093ccf..f7917272e3 100644 --- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java +++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java @@ -43,7 +43,7 @@ import static org.jooq.SQLDialect.MARIADB; import static org.jooq.SQLDialect.MYSQL; // ... import static org.jooq.conf.ParamType.NAMED; -import static org.jooq.impl.Internal.subscriber; +import static org.jooq.impl.Tools.CONFIG; import static org.jooq.impl.Tools.EMPTY_PARAM; import static org.jooq.impl.Tools.abstractDMLQuery; import static org.jooq.impl.Tools.abstractResultQuery; @@ -82,7 +82,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -102,6 +101,7 @@ import org.jooq.Query; import org.jooq.Record; import org.jooq.SQLDialect; import org.jooq.Scope; +import org.jooq.SubscriberProvider; import org.jooq.TransactionalPublishable; import org.jooq.XML; import org.jooq.conf.Settings; @@ -149,13 +149,18 @@ final class R2DBC { static abstract class AbstractSubscription implements org.reactivestreams.Subscription { + final Configuration configuration; final AtomicBoolean completed; final AtomicLong requested; final Subscriber subscriber; final Guard guard; - static Subscription onRequest(Subscriber s, Consumer> onRequest) { - return new AbstractSubscription(s) { + static Subscription onRequest( + Configuration configuration, + Subscriber s, + Consumer> onRequest + ) { + return new AbstractSubscription(configuration, s) { @Override void request0() { onRequest.accept(subscriber); @@ -163,7 +168,8 @@ final class R2DBC { }; } - AbstractSubscription(Subscriber subscriber) { + AbstractSubscription(Configuration configuration, Subscriber subscriber) { + this.configuration = configuration; this.completed = new AtomicBoolean(); this.requested = new AtomicLong(); this.guard = new Guard(); @@ -178,7 +184,9 @@ final class R2DBC { // required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue completed.set(true); subscriber.onComplete(); - } + }, + configuration.subscriberProvider(), + subscriber ); } @@ -226,7 +234,25 @@ final class R2DBC { // R2DBC implementations // ------------------------------------------------------------------------- - static final class Forwarding implements Subscriber { + /** + * [#14048] A {@link Subscriber} that allows for accessing a downstream + * subscriber in order to access its context via + * {@link SubscriberProvider#context(Subscriber)}. + */ + interface DownstreamSubscriber extends Subscriber { + Subscriber downstream(); + } + + static final Subscriber downstream(Subscriber subscriber) { + Subscriber r = subscriber; + + while (r instanceof DownstreamSubscriber d) + r = d.downstream(); + + return r; + } + + static final class Forwarding implements DownstreamSubscriber { final int forwarderIndex; final AbstractResultSubscriber resultSubscriber; @@ -238,6 +264,11 @@ final class R2DBC { this.subscription = new AtomicReference<>(); } + @Override + public final Subscriber downstream() { + return resultSubscriber.downstream.subscriber; + } + @Override public final void onSubscribe(Subscription s) { subscription.set(s); @@ -271,7 +302,7 @@ final class R2DBC { } } - static abstract class AbstractResultSubscriber implements Subscriber { + static abstract class AbstractResultSubscriber implements DownstreamSubscriber { final AbstractNonBlockingSubscription downstream; final AtomicBoolean completed; @@ -288,6 +319,11 @@ final class R2DBC { this.statement = statement; } + @Override + public final Subscriber downstream() { + return downstream.subscriber; + } + @Override public final void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); @@ -340,7 +376,9 @@ final class R2DBC { s.onNext(t); }, s::onError, - s::onComplete + s::onComplete, + downstream.configuration.subscriberProvider(), + this )); } } @@ -392,7 +430,7 @@ final class R2DBC { } } - static abstract class ConnectionSubscriber implements Subscriber { + static abstract class ConnectionSubscriber implements DownstreamSubscriber { final AbstractNonBlockingSubscription downstream; final AtomicReference connection; @@ -404,6 +442,11 @@ final class R2DBC { this.subscription = new AtomicReference<>(); } + @Override + public final Subscriber downstream() { + return downstream.subscriber; + } + @Override public final void onSubscribe(Subscription s) { @@ -607,7 +650,6 @@ final class R2DBC { static abstract class AbstractNonBlockingSubscription extends AbstractSubscription { - final Configuration configuration; final AtomicBoolean subscribed; final Publisher connection; final AtomicInteger nextForwarderIndex; @@ -617,9 +659,8 @@ final class R2DBC { Configuration configuration, Subscriber subscriber ) { - super(subscriber); + super(configuration, subscriber); - this.configuration = configuration; this.subscribed = new AtomicBoolean(); this.connection = configuration.connectionFactory().create(); this.nextForwarderIndex = new AtomicInteger(); @@ -642,7 +683,9 @@ final class R2DBC { request1(); }, delegate::onError, - delegate::onComplete + delegate::onComplete, + configuration.subscriberProvider(), + delegate )); } else @@ -700,7 +743,14 @@ final class R2DBC { return c; } else { - c.close().subscribe(subscriber(s -> s.request(Long.MAX_VALUE), t -> {}, t -> {}, onComplete)); + c.close().subscribe(subscriber( + s -> s.request(Long.MAX_VALUE), + t -> {}, + t -> {}, + onComplete, + configuration.subscriberProvider(), + subscriber + )); return null; } }); @@ -794,7 +844,7 @@ final class R2DBC { try { transactional.run(c instanceof NonClosingConnection ? configuration - : configuration.derive(new DefaultConnectionFactory(c))).subscribe(subscriber( + : configuration.derive(new DefaultConnectionFactory(configuration, c))).subscribe(subscriber( s1 -> s1.request(Long.MAX_VALUE), subscriber::onNext, e -> rollback(subscriber, c, e), @@ -802,8 +852,12 @@ final class R2DBC { s2 -> s2.request(1), v -> {}, t -> cancel0(true, () -> subscriber.onError(t)), - () -> cancel0(true, () -> subscriber.onComplete()) - )) + () -> cancel0(true, () -> subscriber.onComplete()), + configuration.subscriberProvider(), + subscriber + )), + configuration.subscriberProvider(), + subscriber )); } @@ -812,7 +866,9 @@ final class R2DBC { catch (Exception e) { rollback(subscriber, c, e); } - } + }, + configuration.subscriberProvider(), + subscriber )); } @@ -821,7 +877,9 @@ final class R2DBC { s2 -> s2.request(1), v -> {}, t -> cancel0(true, () -> s.onError(t)), - () -> cancel0(true, () -> s.onError(e)) + () -> cancel0(true, () -> s.onError(e)), + configuration.subscriberProvider(), + s )); } }; @@ -869,9 +927,20 @@ final class R2DBC { @SuppressWarnings("unchecked") static final T block(Publisher publisher) throws Throwable { + return block(publisher, CONFIG.get(), null); + } + + static final T block(Publisher publisher, Configuration configuration, Subscriber subscriber) throws Throwable { Object complete = new Object(); LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); - publisher.subscribe(subscriber(s -> s.request(1), queue::add, queue::add, () -> queue.add(complete))); + publisher.subscribe(subscriber( + s -> s.request(1), + queue::add, + queue::add, + () -> queue.add(complete), + configuration.subscriberProvider(), + subscriber + )); try { Object result = queue.take(); @@ -1612,7 +1681,7 @@ final class R2DBC { private volatile Cursor c; BlockingRecordSubscription(ResultQueryTrait query, Subscriber subscriber) { - super(subscriber); + super(query.configuration(), subscriber); this.query = query; } @@ -1652,7 +1721,7 @@ final class R2DBC { final AbstractRowCountQuery query; BlockingRowCountSubscription(AbstractRowCountQuery query, Subscriber subscriber) { - super(subscriber); + super(query.configuration(), subscriber); this.query = query; } @@ -1682,7 +1751,7 @@ final class R2DBC { Subscriber subscriber, TransactionalPublishable transactional ) { - super(subscriber); + super(ctx.configuration(), subscriber); this.ctx = ctx; this.transactional = transactional; @@ -1691,7 +1760,7 @@ final class R2DBC { @Override final void request0() { try { - subscriber.onNext(ctx.transactionResult(c -> block(transactional.run(c)))); + subscriber.onNext(ctx.transactionResult(c -> block(transactional.run(c), configuration, subscriber))); subscriber.onComplete(); } catch (Throwable t) { @@ -1719,4 +1788,23 @@ final class R2DBC { return "Error while rendering SQL: " + t.getMessage(); } } + + static final Subscriber subscriber( + Consumer subscription, + Consumer onNext, + Consumer onError, + Runnable onComplete, + SubscriberProvider provider, + Subscriber previous + ) { + return provider.subscriber( + subscription, + onNext, + onError, + onComplete, + previous != null + ? provider.context(downstream(previous)) + : provider.context() + ); + } } diff --git a/jOOQ/src/main/java/org/jooq/impl/UDTImpl.java b/jOOQ/src/main/java/org/jooq/impl/UDTImpl.java index c4ecd68bac..bf2907003d 100644 --- a/jOOQ/src/main/java/org/jooq/impl/UDTImpl.java +++ b/jOOQ/src/main/java/org/jooq/impl/UDTImpl.java @@ -39,9 +39,7 @@ package org.jooq.impl; import static java.util.Collections.emptyList; -import java.util.Collections; import java.util.List; -import java.util.Map; import org.jooq.Binding; import org.jooq.Catalog; @@ -60,9 +58,6 @@ import org.jooq.UDTField; import org.jooq.UDTRecord; import org.jooq.impl.QOM.UNotYetImplemented; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.ApiStatus.Internal; - /** * A common base type for UDT's *

@@ -70,7 +65,7 @@ import org.jetbrains.annotations.ApiStatus.Internal; * * @author Lukas Eder */ -@Internal +@org.jooq.Internal public /* non-final */ class UDTImpl> extends AbstractNamed diff --git a/jOOQ/src/main/java/org/jooq/impl/UDTPathFieldImpl.java b/jOOQ/src/main/java/org/jooq/impl/UDTPathFieldImpl.java index ba89f8d2c8..71ab2208f7 100644 --- a/jOOQ/src/main/java/org/jooq/impl/UDTPathFieldImpl.java +++ b/jOOQ/src/main/java/org/jooq/impl/UDTPathFieldImpl.java @@ -44,44 +44,33 @@ import static org.jooq.impl.SchemaImpl.DEFAULT_SCHEMA; import static org.jooq.impl.Tools.BooleanDataKey.DATA_STORE_ASSIGNMENT; import static org.jooq.tools.StringUtils.defaultIfNull; -import java.util.stream.Stream; - import org.jooq.Binding; import org.jooq.Catalog; import org.jooq.Clause; import org.jooq.Comment; import org.jooq.Context; import org.jooq.DataType; -import org.jooq.Field; import org.jooq.Name; import org.jooq.Package; -import org.jooq.QueryPart; import org.jooq.Record; import org.jooq.RecordQualifier; -// ... import org.jooq.Row; import org.jooq.Schema; import org.jooq.Table; import org.jooq.TableField; -// ... import org.jooq.UDT; import org.jooq.UDTPathField; import org.jooq.UDTRecord; import org.jooq.impl.QOM.UEmpty; import org.jooq.impl.QOM.UNotYetImplemented; -import org.jooq.impl.Tools.BooleanDataKey; import org.jooq.tools.StringUtils; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.jetbrains.annotations.ApiStatus.Internal; - /** * A common base type for UDT path fields. * * @author Lukas Eder */ -@Internal +@org.jooq.Internal public /* non-final */ class UDTPathFieldImpl, T> extends AbstractField diff --git a/jOOQ/src/main/java/org/jooq/impl/UDTPathTableFieldImpl.java b/jOOQ/src/main/java/org/jooq/impl/UDTPathTableFieldImpl.java index 5dadaa5628..c1391d1552 100644 --- a/jOOQ/src/main/java/org/jooq/impl/UDTPathTableFieldImpl.java +++ b/jOOQ/src/main/java/org/jooq/impl/UDTPathTableFieldImpl.java @@ -50,14 +50,12 @@ import org.jooq.UDTPathField; import org.jooq.UDTPathTableField; import org.jooq.UDTRecord; -import org.jetbrains.annotations.ApiStatus.Internal; - /** * A common base type for table fields that are also {@link UDTPathField}. * * @author Lukas Eder */ -@Internal +@org.jooq.Internal public /* non-final */ class UDTPathTableFieldImpl, T> extends UDTPathFieldImpl diff --git a/jOOQ/src/main/java/org/jooq/tools/jdbc/MockConfiguration.java b/jOOQ/src/main/java/org/jooq/tools/jdbc/MockConfiguration.java index 70453bd0ac..4251bc7802 100644 --- a/jOOQ/src/main/java/org/jooq/tools/jdbc/MockConfiguration.java +++ b/jOOQ/src/main/java/org/jooq/tools/jdbc/MockConfiguration.java @@ -69,6 +69,7 @@ import org.jooq.RecordMapperProvider; import org.jooq.RecordUnmapper; import org.jooq.RecordUnmapperProvider; import org.jooq.SQLDialect; +import org.jooq.SubscriberProvider; import org.jooq.TransactionListenerProvider; import org.jooq.TransactionProvider; // ... @@ -80,6 +81,8 @@ import org.jooq.impl.AbstractConfiguration; import org.jooq.impl.AnnotatedPojoMemberProvider; import org.jooq.impl.DefaultDSLContext; +import org.jetbrains.annotations.NotNull; + import io.r2dbc.spi.ConnectionFactory; /** @@ -266,6 +269,11 @@ public class MockConfiguration extends AbstractConfiguration { return delegate.formattingProvider(); } + @Override + public SubscriberProvider subscriberProvider() { + return delegate.subscriberProvider(); + } + @Override public org.jooq.SchemaMapping schemaMapping() { return delegate.schemaMapping(); @@ -482,6 +490,12 @@ public class MockConfiguration extends AbstractConfiguration { return this; } + @Override + public Configuration set(SubscriberProvider newSubscriberProvider) { + delegate.set(newSubscriberProvider); + return this; + } + @Override public Configuration set(SQLDialect newDialect) { delegate.set(newDialect); @@ -667,6 +681,11 @@ public class MockConfiguration extends AbstractConfiguration { return new MockConfiguration(delegate.derive(newFormattingProvider), provider); } + @Override + public Configuration derive(SubscriberProvider newSubscriberProvider) { + return new MockConfiguration(delegate.derive(newSubscriberProvider), provider); + } + @Override public Configuration derive(SQLDialect newDialect) { return new MockConfiguration(delegate.derive(newDialect), provider);