diff --git a/jOOQ/src/main/java/org/jooq/Configuration.java b/jOOQ/src/main/java/org/jooq/Configuration.java
index 55c592df76..f15adec575 100644
--- a/jOOQ/src/main/java/org/jooq/Configuration.java
+++ b/jOOQ/src/main/java/org/jooq/Configuration.java
@@ -561,6 +561,12 @@ public interface Configuration extends Serializable {
@NotNull
FormattingProvider formattingProvider();
+ /**
+ * Get the configured SubscriberProvider from this configuration.
+ */
+ @NotNull
+ SubscriberProvider> subscriberProvider();
+
@@ -1273,6 +1279,20 @@ public interface Configuration extends Serializable {
@NotNull
Configuration set(FormattingProvider newFormattingProvider);
+ /**
+ * Change this configuration to hold new subscriber provider.
+ *
+ * This method is not thread-safe and should not be used in globally
+ * available Configuration objects.
+ *
+ * @param newSubscriberProvider The new subscriber provider to be contained in
+ * the changed configuration.
+ * @return The changed configuration.
+ */
+ @Pro
+ @NotNull
+ Configuration set(SubscriberProvider> newSubscriberProvider);
+
@@ -1954,6 +1974,18 @@ public interface Configuration extends Serializable {
@NotNull
Configuration derive(FormattingProvider newFormattingProvider);
+ /**
+ * Create a derived configuration from this one, with a new subscriber
+ * provider.
+ *
+ * @param newSubscriberProvider The new subscriber provider to be contained in
+ * the derived configuration.
+ * @return The derived configuration.
+ */
+ @Pro
+ @NotNull
+ Configuration derive(SubscriberProvider> newSubscriberProvider);
+
diff --git a/jOOQ/src/main/java/org/jooq/SubscriberProvider.java b/jOOQ/src/main/java/org/jooq/SubscriberProvider.java
new file mode 100644
index 0000000000..944a26bbaf
--- /dev/null
+++ b/jOOQ/src/main/java/org/jooq/SubscriberProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Other licenses:
+ * -----------------------------------------------------------------------------
+ * Commercial licenses for this work are available. These replace the above
+ * Apache-2.0 license and offer limited warranties, support, maintenance, and
+ * commercial database integrations.
+ *
+ * For more information, please visit: https://www.jooq.org/legal/licensing
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ */
+package org.jooq;
+
+import java.util.function.Consumer;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+/**
+ * An SPI to allow for overriding the default implementation of
+ * {@link Subscriber} creation in jOOQ.
+ *
+ * This SPI can be used to override jOOQ's internal creation of
+ * {@link Subscriber} instances, typically used to create reactor style
+ * context-aware {@link Subscriber} instances. Whatever the
+ * {@link #context(Subscriber)} is, it will be propagated to additional
+ * {@link #subscriber(Consumer, Consumer, Consumer, Runnable, Object)} calls.
+ *
+ * @author Lukas Eder
+ */
+public interface SubscriberProvider {
+
+ /**
+ * Create an empty context.
+ */
+ @Nullable
+ C context();
+
+ /**
+ * Extract a context from an existing {@link Subscriber}.
+ */
+ @Nullable
+ C context(Subscriber> subscriber);
+
+ /**
+ * Create a new subscriber from its component implementations and a previous
+ * context.
+ *
+ * @param onSubscribe The implementation of
+ * {@link Subscriber#onSubscribe(Subscription)}
+ * @param onNext The implementation of {@link Subscriber#onNext(Object)}
+ * @param onError The implementation of
+ * {@link Subscriber#onError(Throwable)}
+ * @param onComplete The implementation of {@link Subscriber#onComplete()}
+ * @param context The {@link #context()}
+ */
+ @NotNull
+ Subscriber subscriber(
+ Consumer super Subscription> onSubscribe,
+ Consumer super T> onNext,
+ Consumer super Throwable> onError,
+ Runnable onComplete,
+ C context
+ );
+}
diff --git a/jOOQ/src/main/java/org/jooq/impl/DSL.java b/jOOQ/src/main/java/org/jooq/impl/DSL.java
index 4419866685..0f19c4ac14 100644
--- a/jOOQ/src/main/java/org/jooq/impl/DSL.java
+++ b/jOOQ/src/main/java/org/jooq/impl/DSL.java
@@ -577,7 +577,7 @@ public class DSL {
public static CloseableDSLContext using(String url) {
if (url.startsWith("r2dbc")) {
io.r2dbc.spi.Connection connection = R2DBC.getConnection(url);
- return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection));
+ return new DefaultCloseableDSLContext(new DefaultConnectionFactory(null, connection, true, false), JDBCUtils.dialect(connection));
}
else {
try {
@@ -619,7 +619,7 @@ public class DSL {
public static CloseableDSLContext using(String url, String username, String password) {
if (url.startsWith("r2dbc")) {
io.r2dbc.spi.Connection connection = R2DBC.getConnection(url, username, password);
- return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection));
+ return new DefaultCloseableDSLContext(new DefaultConnectionFactory(null, connection, true, false), JDBCUtils.dialect(connection));
}
else {
try {
@@ -678,7 +678,7 @@ public class DSL {
public static CloseableDSLContext using(String url, Properties properties) {
if (url.startsWith("r2dbc")) {
io.r2dbc.spi.Connection connection = R2DBC.getConnection(url, properties);
- return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection));
+ return new DefaultCloseableDSLContext(new DefaultConnectionFactory(null, connection, true, false), JDBCUtils.dialect(connection));
}
else {
try {
@@ -893,7 +893,7 @@ public class DSL {
*/
@NotNull
public static DSLContext using(io.r2dbc.spi.Connection connection) {
- return new DefaultDSLContext(new DefaultConnectionFactory(connection), JDBCUtils.dialect(connection));
+ return new DefaultDSLContext(new DefaultConnectionFactory(null, connection), JDBCUtils.dialect(connection));
}
/**
@@ -905,7 +905,7 @@ public class DSL {
*/
@NotNull
public static DSLContext using(io.r2dbc.spi.Connection connection, SQLDialect dialect) {
- return new DefaultDSLContext(new DefaultConnectionFactory(connection), dialect);
+ return new DefaultDSLContext(new DefaultConnectionFactory(null, connection), dialect);
}
/**
@@ -919,7 +919,7 @@ public class DSL {
*/
@NotNull
public static DSLContext using(io.r2dbc.spi.Connection connection, SQLDialect dialect, Settings settings) {
- return new DefaultDSLContext(new DefaultConnectionFactory(connection), dialect, settings);
+ return new DefaultDSLContext(new DefaultConnectionFactory(null, connection), dialect, settings);
}
/**
diff --git a/jOOQ/src/main/java/org/jooq/impl/DefaultConfiguration.java b/jOOQ/src/main/java/org/jooq/impl/DefaultConfiguration.java
index 6655d37b58..c575920eae 100644
--- a/jOOQ/src/main/java/org/jooq/impl/DefaultConfiguration.java
+++ b/jOOQ/src/main/java/org/jooq/impl/DefaultConfiguration.java
@@ -84,6 +84,7 @@ import org.jooq.RecordType;
import org.jooq.RecordUnmapper;
import org.jooq.RecordUnmapperProvider;
import org.jooq.SQLDialect;
+import org.jooq.SubscriberProvider;
import org.jooq.TransactionListener;
import org.jooq.TransactionListenerProvider;
import org.jooq.TransactionProvider;
@@ -141,6 +142,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
private transient CharsetProvider charsetProvider;
private transient ConverterProvider converterProvider;
private transient FormattingProvider formattingProvider;
+ private transient SubscriberProvider> subscriberProvider;
@@ -210,6 +212,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
null,
null,
null,
+ null,
@@ -256,6 +259,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
configuration.charsetProvider,
configuration.converterProvider,
configuration.formattingProvider,
+ configuration.subscriberProvider,
@@ -301,6 +305,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
CharsetProvider charsetProvider,
ConverterProvider converterProvider,
FormattingProvider formattingProvider,
+ SubscriberProvider> subscriberProvider,
@@ -335,6 +340,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
set(charsetProvider);
set(converterProvider);
set(formattingProvider);
+ set(subscriberProvider);
@@ -404,6 +410,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -443,6 +450,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -482,6 +490,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -521,6 +530,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -565,6 +575,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -604,6 +615,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -643,6 +655,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -682,6 +695,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -721,6 +735,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -765,6 +780,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -809,6 +825,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -848,6 +865,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -887,6 +905,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -926,6 +945,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -965,6 +985,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -1004,6 +1025,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -1043,6 +1065,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -1087,6 +1110,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -1126,6 +1150,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
newCharsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -1165,6 +1190,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
newConverterProvider,
formattingProvider,
+ subscriberProvider,
@@ -1204,6 +1230,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
newFormattingProvider,
+ subscriberProvider,
@@ -1217,6 +1244,50 @@ public class DefaultConfiguration extends AbstractConfiguration {
);
}
+ @Override
+ public final Configuration derive(SubscriberProvider> newSubscriberProvider) {
+ return new DefaultConfiguration(
+ connectionProvider,
+ interpreterConnectionProvider,
+ systemConnectionProvider,
+ connectionFactory,
+ metaProvider,
+ commitProvider,
+ executorProvider,
+ cacheProvider,
+ transactionProvider,
+ annotatedPojoMemberProvider,
+ constructorPropertiesProvider,
+ recordMapperProvider,
+ recordUnmapperProvider,
+ recordListenerProviders,
+ executeListenerProviders,
+ migrationListenerProviders,
+ visitListenerProviders,
+ transactionListenerProviders,
+ diagnosticsListenerProviders,
+ unwrapperProvider,
+ charsetProvider,
+ converterProvider,
+ formattingProvider,
+ newSubscriberProvider,
+
+
+
+
+
+
+ clock,
+ dialect,
+ settings,
+ data
+ );
+ }
+
+
+
+
+
@@ -1399,6 +1470,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -1438,6 +1510,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -1477,6 +1550,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
charsetProvider,
converterProvider,
formattingProvider,
+ subscriberProvider,
@@ -1529,6 +1603,10 @@ public class DefaultConfiguration extends AbstractConfiguration {
@Override
public final Configuration set(ConnectionFactory newConnectionFactory) {
this.connectionFactory = newConnectionFactory;
+
+ if (connectionFactory instanceof DefaultConnectionFactory dcf)
+ dcf.configuration = this;
+
return this;
}
@@ -1696,6 +1774,12 @@ public class DefaultConfiguration extends AbstractConfiguration {
return this;
}
+ @Override
+ public final Configuration set(SubscriberProvider> newSubscriberProvider) {
+ this.subscriberProvider = newSubscriberProvider;
+ return this;
+ }
+
@@ -2213,6 +2297,13 @@ public class DefaultConfiguration extends AbstractConfiguration {
: new DefaultFormattingProvider();
}
+ @Override
+ public final SubscriberProvider> subscriberProvider() {
+ return subscriberProvider != null
+ ? subscriberProvider
+ : new DefaultSubscriberProvider<>();
+ }
+
@@ -2333,6 +2424,7 @@ public class DefaultConfiguration extends AbstractConfiguration {
oos.writeObject(serializableOrNull(charsetProvider));
oos.writeObject(serializableOrNull(converterProvider));
oos.writeObject(serializableOrNull(formattingProvider));
+ oos.writeObject(serializableOrNull(subscriberProvider));
// [#7062] Exclude reflection cache from serialisation
for (Entry