[jOOQ/jOOQ#17920] Add a new SubscriberProvider SPI that allows for overriding the way jOOQ internals creates reactive streams Subscribers, to allow for making them context aware

This commit is contained in:
Lukas Eder 2025-01-23 12:22:44 +01:00
parent 0a6ec105c6
commit a455e01fa2
12 changed files with 463 additions and 91 deletions

View File

@ -561,6 +561,12 @@ public interface Configuration extends Serializable {
@NotNull
FormattingProvider formattingProvider();
/**
* Get the configured <code>SubscriberProvider</code> from this configuration.
*/
@NotNull
SubscriberProvider<?> subscriberProvider();
@ -1273,6 +1279,20 @@ public interface Configuration extends Serializable {
@NotNull
Configuration set(FormattingProvider newFormattingProvider);
/**
* Change this configuration to hold new subscriber provider.
* <p>
* This method is not thread-safe and should not be used in globally
* available <code>Configuration</code> objects.
*
* @param newSubscriberProvider The new subscriber provider to be contained in
* the changed configuration.
* @return The changed configuration.
*/
@Pro
@NotNull
Configuration set(SubscriberProvider<?> newSubscriberProvider);
@ -1954,6 +1974,18 @@ public interface Configuration extends Serializable {
@NotNull
Configuration derive(FormattingProvider newFormattingProvider);
/**
* Create a derived configuration from this one, with a new subscriber
* provider.
*
* @param newSubscriberProvider The new subscriber provider to be contained in
* the derived configuration.
* @return The derived configuration.
*/
@Pro
@NotNull
Configuration derive(SubscriberProvider<?> newSubscriberProvider);

View File

@ -0,0 +1,93 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Other licenses:
* -----------------------------------------------------------------------------
* Commercial licenses for this work are available. These replace the above
* Apache-2.0 license and offer limited warranties, support, maintenance, and
* commercial database integrations.
*
* For more information, please visit: https://www.jooq.org/legal/licensing
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package org.jooq;
import java.util.function.Consumer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
/**
* An SPI to allow for overriding the default implementation of
* {@link Subscriber} creation in jOOQ.
* <p>
* This SPI can be used to override jOOQ's internal creation of
* {@link Subscriber} instances, typically used to create reactor style
* context-aware {@link Subscriber} instances. Whatever the
* {@link #context(Subscriber)} is, it will be propagated to additional
* {@link #subscriber(Consumer, Consumer, Consumer, Runnable, Object)} calls.
*
* @author Lukas Eder
*/
public interface SubscriberProvider<C> {
/**
* Create an empty context.
*/
@Nullable
C context();
/**
* Extract a context from an existing {@link Subscriber}.
*/
@Nullable
C context(Subscriber<?> subscriber);
/**
* Create a new subscriber from its component implementations and a previous
* context.
*
* @param onSubscribe The implementation of
* {@link Subscriber#onSubscribe(Subscription)}
* @param onNext The implementation of {@link Subscriber#onNext(Object)}
* @param onError The implementation of
* {@link Subscriber#onError(Throwable)}
* @param onComplete The implementation of {@link Subscriber#onComplete()}
* @param context The {@link #context()}
*/
@NotNull
<T> Subscriber<T> subscriber(
Consumer<? super Subscription> onSubscribe,
Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Runnable onComplete,
C context
);
}

View File

@ -577,7 +577,7 @@ public class DSL {
public static CloseableDSLContext using(String url) {
if (url.startsWith("r2dbc")) {
io.r2dbc.spi.Connection connection = R2DBC.getConnection(url);
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection));
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(null, connection, true, false), JDBCUtils.dialect(connection));
}
else {
try {
@ -619,7 +619,7 @@ public class DSL {
public static CloseableDSLContext using(String url, String username, String password) {
if (url.startsWith("r2dbc")) {
io.r2dbc.spi.Connection connection = R2DBC.getConnection(url, username, password);
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection));
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(null, connection, true, false), JDBCUtils.dialect(connection));
}
else {
try {
@ -678,7 +678,7 @@ public class DSL {
public static CloseableDSLContext using(String url, Properties properties) {
if (url.startsWith("r2dbc")) {
io.r2dbc.spi.Connection connection = R2DBC.getConnection(url, properties);
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection));
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(null, connection, true, false), JDBCUtils.dialect(connection));
}
else {
try {
@ -893,7 +893,7 @@ public class DSL {
*/
@NotNull
public static DSLContext using(io.r2dbc.spi.Connection connection) {
return new DefaultDSLContext(new DefaultConnectionFactory(connection), JDBCUtils.dialect(connection));
return new DefaultDSLContext(new DefaultConnectionFactory(null, connection), JDBCUtils.dialect(connection));
}
/**
@ -905,7 +905,7 @@ public class DSL {
*/
@NotNull
public static DSLContext using(io.r2dbc.spi.Connection connection, SQLDialect dialect) {
return new DefaultDSLContext(new DefaultConnectionFactory(connection), dialect);
return new DefaultDSLContext(new DefaultConnectionFactory(null, connection), dialect);
}
/**
@ -919,7 +919,7 @@ public class DSL {
*/
@NotNull
public static DSLContext using(io.r2dbc.spi.Connection connection, SQLDialect dialect, Settings settings) {
return new DefaultDSLContext(new DefaultConnectionFactory(connection), dialect, settings);
return new DefaultDSLContext(new DefaultConnectionFactory(null, connection), dialect, settings);
}
/**

View File

@ -84,6 +84,7 @@ import org.jooq.RecordType;
import org.jooq.RecordUnmapper;
import org.jooq.RecordUnmapperProvider;
import org.jooq.SQLDialect;
import org.jooq.SubscriberProvider;
import org.jooq.TransactionListener;
import org.jooq.TransactionListenerProvider;
import org.jooq.TransactionProvider;
@ -141,6 +142,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
private transient CharsetProvider charsetProvider;
private transient ConverterProvider converterProvider;
private transient FormattingProvider formattingProvider;
private transient SubscriberProvider<?> subscriberProvider;
@ -210,6 +212,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
null,
null,
null,
null,
@ -256,6 +259,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
configuration.charsetProvider,
configuration.converterProvider,
configuration.formattingProvider,
configuration.subscriberProvider,
@ -301,6 +305,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
CharsetProvider charsetProvider,
ConverterProvider converterProvider,
FormattingProvider formattingProvider,
SubscriberProvider<?> subscriberProvider,
@ -335,6 +340,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
set(charsetProvider);
set(converterProvider);
set(formattingProvider);
set(subscriberProvider);
@ -404,6 +410,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -443,6 +450,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -482,6 +490,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -521,6 +530,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -565,6 +575,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -604,6 +615,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -643,6 +655,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -682,6 +695,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -721,6 +735,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -765,6 +780,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -809,6 +825,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -848,6 +865,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -887,6 +905,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -926,6 +945,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -965,6 +985,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -1004,6 +1025,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -1043,6 +1065,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -1087,6 +1110,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -1126,6 +1150,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
newCharsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -1165,6 +1190,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
newConverterProvider,
formattingProvider,
subscriberProvider,
@ -1204,6 +1230,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
newFormattingProvider,
subscriberProvider,
@ -1217,6 +1244,50 @@ public class DefaultConfiguration extends AbstractConfiguration {
);
}
@Override
public final Configuration derive(SubscriberProvider<?> newSubscriberProvider) {
return new DefaultConfiguration(
connectionProvider,
interpreterConnectionProvider,
systemConnectionProvider,
connectionFactory,
metaProvider,
commitProvider,
executorProvider,
cacheProvider,
transactionProvider,
annotatedPojoMemberProvider,
constructorPropertiesProvider,
recordMapperProvider,
recordUnmapperProvider,
recordListenerProviders,
executeListenerProviders,
migrationListenerProviders,
visitListenerProviders,
transactionListenerProviders,
diagnosticsListenerProviders,
unwrapperProvider,
charsetProvider,
converterProvider,
formattingProvider,
newSubscriberProvider,
clock,
dialect,
settings,
data
);
}
@ -1399,6 +1470,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -1438,6 +1510,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -1477,6 +1550,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
subscriberProvider,
@ -1529,6 +1603,10 @@ public class DefaultConfiguration extends AbstractConfiguration {
@Override
public final Configuration set(ConnectionFactory newConnectionFactory) {
this.connectionFactory = newConnectionFactory;
if (connectionFactory instanceof DefaultConnectionFactory dcf)
dcf.configuration = this;
return this;
}
@ -1696,6 +1774,12 @@ public class DefaultConfiguration extends AbstractConfiguration {
return this;
}
@Override
public final Configuration set(SubscriberProvider<?> newSubscriberProvider) {
this.subscriberProvider = newSubscriberProvider;
return this;
}
@ -2213,6 +2297,13 @@ public class DefaultConfiguration extends AbstractConfiguration {
: new DefaultFormattingProvider();
}
@Override
public final SubscriberProvider<?> subscriberProvider() {
return subscriberProvider != null
? subscriberProvider
: new DefaultSubscriberProvider<>();
}
@ -2333,6 +2424,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
oos.writeObject(serializableOrNull(charsetProvider));
oos.writeObject(serializableOrNull(converterProvider));
oos.writeObject(serializableOrNull(formattingProvider));
oos.writeObject(serializableOrNull(subscriberProvider));
// [#7062] Exclude reflection cache from serialisation
for (Entry<Object, Object> entry : data.entrySet()) {
@ -2391,6 +2483,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider = (CharsetProvider) ois.readObject();
converterProvider = (ConverterProvider) ois.readObject();
formattingProvider = (FormattingProvider) ois.readObject();
subscriberProvider = (SubscriberProvider<?>) ois.readObject();
data = new ConcurrentHashMap<>();
Object key;

View File

@ -38,6 +38,7 @@
package org.jooq.impl;
import static org.jooq.impl.R2DBC.AbstractSubscription.onRequest;
import static org.jooq.impl.Tools.CONFIG;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
@ -45,6 +46,7 @@ import java.util.function.IntFunction;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import org.jooq.Configuration;
import org.jooq.exception.DetachedException;
import org.reactivestreams.Publisher;
@ -65,16 +67,18 @@ import io.r2dbc.spi.ValidationDepth;
*/
final class DefaultConnectionFactory implements ConnectionFactory {
Configuration configuration;
Connection connection;
final boolean finalize;
final boolean nested;
final AtomicInteger savepoints = new AtomicInteger();
DefaultConnectionFactory(Connection connection) {
this(connection, false, true);
DefaultConnectionFactory(Configuration configuration, Connection connection) {
this(configuration, connection, false, true);
}
DefaultConnectionFactory(Connection connection, boolean finalize, boolean nested) {
DefaultConnectionFactory(Configuration configuration, Connection connection, boolean finalize, boolean nested) {
this.configuration = configuration != null ? configuration : CONFIG.get();
this.connection = connection;
this.finalize = finalize;
this.nested = nested;
@ -89,7 +93,7 @@ final class DefaultConnectionFactory implements ConnectionFactory {
@Override
public final Publisher<? extends Connection> create() {
return s -> s.onSubscribe(onRequest(s, x -> {
return s -> s.onSubscribe(onRequest(configuration, s, x -> {
x.onNext(new NonClosingConnection());
x.onComplete();
}));
@ -137,7 +141,7 @@ final class DefaultConnectionFactory implements ConnectionFactory {
@Override
public Publisher<Void> close() {
return s -> s.onSubscribe(onRequest(s, x -> x.onComplete()));
return s -> s.onSubscribe(onRequest(configuration, s, x -> x.onComplete()));
}
@Override

View File

@ -0,0 +1,95 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Other licenses:
* -----------------------------------------------------------------------------
* Commercial licenses for this work are available. These replace the above
* Apache-2.0 license and offer limited warranties, support, maintenance, and
* commercial database integrations.
*
* For more information, please visit: https://www.jooq.org/legal/licensing
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package org.jooq.impl;
import java.util.function.Consumer;
import org.jooq.SubscriberProvider;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
/**
* A default, context unaware implementation of the {@link SubscriberProvider}
* SPI.
*
* @author Lukas Eder
*/
public class DefaultSubscriberProvider<C> implements SubscriberProvider<C> {
@Override
public final C context() {
return null;
}
@Override
public final C context(Subscriber<?> subscriber) {
return null;
}
@Override
public final <T> Subscriber<T> subscriber(
Consumer<? super Subscription> onSubscribe,
Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Runnable onComplete,
C context
) {
return new Subscriber<T>() {
@Override
public void onSubscribe(Subscription s) {
onSubscribe.accept(s);
}
@Override
public void onNext(T t) {
onNext.accept(t);
}
@Override
public void onError(Throwable t) {
onError.accept(t);
}
@Override
public void onComplete() {
onComplete.run();
}
};
}
}

View File

@ -883,40 +883,6 @@ public final class Internal {
return new IDiv<>(lhs, (Field<T>) nullSafe(rhs, lhs.getDataType()));
}
/**
* Create a {@link Subscriber} from a set of lambdas.
* <p>
* This is used for internal purposes and thus subject for change.
*/
public static final <T> Subscriber<T> subscriber(
Consumer<? super Subscription> subscription,
Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Runnable onComplete
) {
return new Subscriber<T>() {
@Override
public void onSubscribe(Subscription s) {
subscription.accept(s);
}
@Override
public void onNext(T t) {
onNext.accept(t);
}
@Override
public void onError(Throwable t) {
onError.accept(t);
}
@Override
public void onComplete() {
onComplete.run();
}
};
}
/**
* JDK agnostic abstraction over {@link Class#arrayType()} and
* {@link Array#newInstance(Class, int)}.

View File

@ -43,7 +43,7 @@ import static org.jooq.SQLDialect.MARIADB;
import static org.jooq.SQLDialect.MYSQL;
// ...
import static org.jooq.conf.ParamType.NAMED;
import static org.jooq.impl.Internal.subscriber;
import static org.jooq.impl.Tools.CONFIG;
import static org.jooq.impl.Tools.EMPTY_PARAM;
import static org.jooq.impl.Tools.abstractDMLQuery;
import static org.jooq.impl.Tools.abstractResultQuery;
@ -82,7 +82,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@ -102,6 +101,7 @@ import org.jooq.Query;
import org.jooq.Record;
import org.jooq.SQLDialect;
import org.jooq.Scope;
import org.jooq.SubscriberProvider;
import org.jooq.TransactionalPublishable;
import org.jooq.XML;
import org.jooq.conf.Settings;
@ -149,13 +149,18 @@ final class R2DBC {
static abstract class AbstractSubscription<T> implements org.reactivestreams.Subscription {
final Configuration configuration;
final AtomicBoolean completed;
final AtomicLong requested;
final Subscriber<? super T> subscriber;
final Guard guard;
static <T> Subscription onRequest(Subscriber<? super T> s, Consumer<? super Subscriber<? super T>> onRequest) {
return new AbstractSubscription<T>(s) {
static <T> Subscription onRequest(
Configuration configuration,
Subscriber<? super T> s,
Consumer<? super Subscriber<? super T>> onRequest
) {
return new AbstractSubscription<T>(configuration, s) {
@Override
void request0() {
onRequest.accept(subscriber);
@ -163,7 +168,8 @@ final class R2DBC {
};
}
AbstractSubscription(Subscriber<? super T> subscriber) {
AbstractSubscription(Configuration configuration, Subscriber<? super T> subscriber) {
this.configuration = configuration;
this.completed = new AtomicBoolean();
this.requested = new AtomicLong();
this.guard = new Guard();
@ -178,7 +184,9 @@ final class R2DBC {
// required_spec317_mustSupportACumulativePendingElementCountUpToLongMaxValue
completed.set(true);
subscriber.onComplete();
}
},
configuration.subscriberProvider(),
subscriber
);
}
@ -226,7 +234,25 @@ final class R2DBC {
// R2DBC implementations
// -------------------------------------------------------------------------
static final class Forwarding<T> implements Subscriber<T> {
/**
* [#14048] A {@link Subscriber} that allows for accessing a downstream
* subscriber in order to access its context via
* {@link SubscriberProvider#context(Subscriber)}.
*/
interface DownstreamSubscriber<T> extends Subscriber<T> {
Subscriber<?> downstream();
}
static final Subscriber<?> downstream(Subscriber<?> subscriber) {
Subscriber<?> r = subscriber;
while (r instanceof DownstreamSubscriber<?> d)
r = d.downstream();
return r;
}
static final class Forwarding<T> implements DownstreamSubscriber<T> {
final int forwarderIndex;
final AbstractResultSubscriber<T> resultSubscriber;
@ -238,6 +264,11 @@ final class R2DBC {
this.subscription = new AtomicReference<>();
}
@Override
public final Subscriber<?> downstream() {
return resultSubscriber.downstream.subscriber;
}
@Override
public final void onSubscribe(Subscription s) {
subscription.set(s);
@ -271,7 +302,7 @@ final class R2DBC {
}
}
static abstract class AbstractResultSubscriber<T> implements Subscriber<Result> {
static abstract class AbstractResultSubscriber<T> implements DownstreamSubscriber<Result> {
final AbstractNonBlockingSubscription<? super T> downstream;
final AtomicBoolean completed;
@ -288,6 +319,11 @@ final class R2DBC {
this.statement = statement;
}
@Override
public final Subscriber<?> downstream() {
return downstream.subscriber;
}
@Override
public final void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
@ -340,7 +376,9 @@ final class R2DBC {
s.onNext(t);
},
s::onError,
s::onComplete
s::onComplete,
downstream.configuration.subscriberProvider(),
this
));
}
}
@ -392,7 +430,7 @@ final class R2DBC {
}
}
static abstract class ConnectionSubscriber<T> implements Subscriber<Connection> {
static abstract class ConnectionSubscriber<T> implements DownstreamSubscriber<Connection> {
final AbstractNonBlockingSubscription<T> downstream;
final AtomicReference<Connection> connection;
@ -404,6 +442,11 @@ final class R2DBC {
this.subscription = new AtomicReference<>();
}
@Override
public final Subscriber<?> downstream() {
return downstream.subscriber;
}
@Override
public final void onSubscribe(Subscription s) {
@ -607,7 +650,6 @@ final class R2DBC {
static abstract class AbstractNonBlockingSubscription<T> extends AbstractSubscription<T> {
final Configuration configuration;
final AtomicBoolean subscribed;
final Publisher<? extends Connection> connection;
final AtomicInteger nextForwarderIndex;
@ -617,9 +659,8 @@ final class R2DBC {
Configuration configuration,
Subscriber<? super T> subscriber
) {
super(subscriber);
super(configuration, subscriber);
this.configuration = configuration;
this.subscribed = new AtomicBoolean();
this.connection = configuration.connectionFactory().create();
this.nextForwarderIndex = new AtomicInteger();
@ -642,7 +683,9 @@ final class R2DBC {
request1();
},
delegate::onError,
delegate::onComplete
delegate::onComplete,
configuration.subscriberProvider(),
delegate
));
}
else
@ -700,7 +743,14 @@ final class R2DBC {
return c;
}
else {
c.close().subscribe(subscriber(s -> s.request(Long.MAX_VALUE), t -> {}, t -> {}, onComplete));
c.close().subscribe(subscriber(
s -> s.request(Long.MAX_VALUE),
t -> {},
t -> {},
onComplete,
configuration.subscriberProvider(),
subscriber
));
return null;
}
});
@ -794,7 +844,7 @@ final class R2DBC {
try {
transactional.run(c instanceof NonClosingConnection
? configuration
: configuration.derive(new DefaultConnectionFactory(c))).subscribe(subscriber(
: configuration.derive(new DefaultConnectionFactory(configuration, c))).subscribe(subscriber(
s1 -> s1.request(Long.MAX_VALUE),
subscriber::onNext,
e -> rollback(subscriber, c, e),
@ -802,8 +852,12 @@ final class R2DBC {
s2 -> s2.request(1),
v -> {},
t -> cancel0(true, () -> subscriber.onError(t)),
() -> cancel0(true, () -> subscriber.onComplete())
))
() -> cancel0(true, () -> subscriber.onComplete()),
configuration.subscriberProvider(),
subscriber
)),
configuration.subscriberProvider(),
subscriber
));
}
@ -812,7 +866,9 @@ final class R2DBC {
catch (Exception e) {
rollback(subscriber, c, e);
}
}
},
configuration.subscriberProvider(),
subscriber
));
}
@ -821,7 +877,9 @@ final class R2DBC {
s2 -> s2.request(1),
v -> {},
t -> cancel0(true, () -> s.onError(t)),
() -> cancel0(true, () -> s.onError(e))
() -> cancel0(true, () -> s.onError(e)),
configuration.subscriberProvider(),
s
));
}
};
@ -869,9 +927,20 @@ final class R2DBC {
@SuppressWarnings("unchecked")
static final <T> T block(Publisher<? extends T> publisher) throws Throwable {
return block(publisher, CONFIG.get(), null);
}
static final <T> T block(Publisher<? extends T> publisher, Configuration configuration, Subscriber<?> subscriber) throws Throwable {
Object complete = new Object();
LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();
publisher.subscribe(subscriber(s -> s.request(1), queue::add, queue::add, () -> queue.add(complete)));
publisher.subscribe(subscriber(
s -> s.request(1),
queue::add,
queue::add,
() -> queue.add(complete),
configuration.subscriberProvider(),
subscriber
));
try {
Object result = queue.take();
@ -1612,7 +1681,7 @@ final class R2DBC {
private volatile Cursor<R> c;
BlockingRecordSubscription(ResultQueryTrait<R> query, Subscriber<? super R> subscriber) {
super(subscriber);
super(query.configuration(), subscriber);
this.query = query;
}
@ -1652,7 +1721,7 @@ final class R2DBC {
final AbstractRowCountQuery query;
BlockingRowCountSubscription(AbstractRowCountQuery query, Subscriber<? super Integer> subscriber) {
super(subscriber);
super(query.configuration(), subscriber);
this.query = query;
}
@ -1682,7 +1751,7 @@ final class R2DBC {
Subscriber<? super T> subscriber,
TransactionalPublishable<T> transactional
) {
super(subscriber);
super(ctx.configuration(), subscriber);
this.ctx = ctx;
this.transactional = transactional;
@ -1691,7 +1760,7 @@ final class R2DBC {
@Override
final void request0() {
try {
subscriber.onNext(ctx.transactionResult(c -> block(transactional.run(c))));
subscriber.onNext(ctx.transactionResult(c -> block(transactional.run(c), configuration, subscriber)));
subscriber.onComplete();
}
catch (Throwable t) {
@ -1719,4 +1788,23 @@ final class R2DBC {
return "Error while rendering SQL: " + t.getMessage();
}
}
static final <T, C> Subscriber<T> subscriber(
Consumer<Subscription> subscription,
Consumer<T> onNext,
Consumer<Throwable> onError,
Runnable onComplete,
SubscriberProvider<C> provider,
Subscriber<?> previous
) {
return provider.subscriber(
subscription,
onNext,
onError,
onComplete,
previous != null
? provider.context(downstream(previous))
: provider.context()
);
}
}

View File

@ -39,9 +39,7 @@ package org.jooq.impl;
import static java.util.Collections.emptyList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.jooq.Binding;
import org.jooq.Catalog;
@ -60,9 +58,6 @@ import org.jooq.UDTField;
import org.jooq.UDTRecord;
import org.jooq.impl.QOM.UNotYetImplemented;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.ApiStatus.Internal;
/**
* A common base type for UDT's
* <p>
@ -70,7 +65,7 @@ import org.jetbrains.annotations.ApiStatus.Internal;
*
* @author Lukas Eder
*/
@Internal
@org.jooq.Internal
public /* non-final */ class UDTImpl<R extends UDTRecord<R>>
extends
AbstractNamed

View File

@ -44,44 +44,33 @@ import static org.jooq.impl.SchemaImpl.DEFAULT_SCHEMA;
import static org.jooq.impl.Tools.BooleanDataKey.DATA_STORE_ASSIGNMENT;
import static org.jooq.tools.StringUtils.defaultIfNull;
import java.util.stream.Stream;
import org.jooq.Binding;
import org.jooq.Catalog;
import org.jooq.Clause;
import org.jooq.Comment;
import org.jooq.Context;
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.Name;
import org.jooq.Package;
import org.jooq.QueryPart;
import org.jooq.Record;
import org.jooq.RecordQualifier;
// ...
import org.jooq.Row;
import org.jooq.Schema;
import org.jooq.Table;
import org.jooq.TableField;
// ...
import org.jooq.UDT;
import org.jooq.UDTPathField;
import org.jooq.UDTRecord;
import org.jooq.impl.QOM.UEmpty;
import org.jooq.impl.QOM.UNotYetImplemented;
import org.jooq.impl.Tools.BooleanDataKey;
import org.jooq.tools.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.ApiStatus.Internal;
/**
* A common base type for UDT path fields.
*
* @author Lukas Eder
*/
@Internal
@org.jooq.Internal
public /* non-final */ class UDTPathFieldImpl<R extends Record, U extends UDTRecord<U>, T>
extends
AbstractField<T>

View File

@ -50,14 +50,12 @@ import org.jooq.UDTPathField;
import org.jooq.UDTPathTableField;
import org.jooq.UDTRecord;
import org.jetbrains.annotations.ApiStatus.Internal;
/**
* A common base type for table fields that are also {@link UDTPathField}.
*
* @author Lukas Eder
*/
@Internal
@org.jooq.Internal
public /* non-final */ class UDTPathTableFieldImpl<R extends Record, U extends UDTRecord<U>, T>
extends
UDTPathFieldImpl<R, U, T>

View File

@ -69,6 +69,7 @@ import org.jooq.RecordMapperProvider;
import org.jooq.RecordUnmapper;
import org.jooq.RecordUnmapperProvider;
import org.jooq.SQLDialect;
import org.jooq.SubscriberProvider;
import org.jooq.TransactionListenerProvider;
import org.jooq.TransactionProvider;
// ...
@ -80,6 +81,8 @@ import org.jooq.impl.AbstractConfiguration;
import org.jooq.impl.AnnotatedPojoMemberProvider;
import org.jooq.impl.DefaultDSLContext;
import org.jetbrains.annotations.NotNull;
import io.r2dbc.spi.ConnectionFactory;
/**
@ -266,6 +269,11 @@ public class MockConfiguration extends AbstractConfiguration {
return delegate.formattingProvider();
}
@Override
public SubscriberProvider<?> subscriberProvider() {
return delegate.subscriberProvider();
}
@Override
public org.jooq.SchemaMapping schemaMapping() {
return delegate.schemaMapping();
@ -482,6 +490,12 @@ public class MockConfiguration extends AbstractConfiguration {
return this;
}
@Override
public Configuration set(SubscriberProvider<?> newSubscriberProvider) {
delegate.set(newSubscriberProvider);
return this;
}
@Override
public Configuration set(SQLDialect newDialect) {
delegate.set(newDialect);
@ -667,6 +681,11 @@ public class MockConfiguration extends AbstractConfiguration {
return new MockConfiguration(delegate.derive(newFormattingProvider), provider);
}
@Override
public Configuration derive(SubscriberProvider<?> newSubscriberProvider) {
return new MockConfiguration(delegate.derive(newSubscriberProvider), provider);
}
@Override
public Configuration derive(SQLDialect newDialect) {
return new MockConfiguration(delegate.derive(newDialect), provider);