diff --git a/jOOQ/src/main/java/org/jooq/TransactionalPublishable.java b/jOOQ/src/main/java/org/jooq/TransactionalPublishable.java new file mode 100644 index 0000000000..25e480a8a3 --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/TransactionalPublishable.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq; + +import org.jetbrains.annotations.NotNull; +import org.reactivestreams.Publisher; + +/** + * An FunctionalInterface that wraps transactional code for + * reactive usage. + * + * @author Lukas Eder + */ +@FunctionalInterface +public interface TransactionalPublishable { + + /** + * Run the transactional code. + * + * @param configuration The Configuration in whose context the + * transaction is run. + * @return The outcome of the transaction. + */ + @NotNull + Publisher run(Configuration configuration); +} diff --git a/jOOQ/src/main/java/org/jooq/impl/DSL.java b/jOOQ/src/main/java/org/jooq/impl/DSL.java index 33044cf154..9d03934d5e 100644 --- a/jOOQ/src/main/java/org/jooq/impl/DSL.java +++ b/jOOQ/src/main/java/org/jooq/impl/DSL.java @@ -547,7 +547,7 @@ public class DSL { public static CloseableDSLContext using(String url) { if (url.startsWith("r2dbc")) { io.r2dbc.spi.Connection connection = R2DBC.getConnection(url); - return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true), JDBCUtils.dialect(connection)); + return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection)); } else { try { @@ -588,7 +588,7 @@ public class DSL { public static CloseableDSLContext using(String url, String username, String password) { if (url.startsWith("r2dbc")) { io.r2dbc.spi.Connection connection = R2DBC.getConnection(url, username, password); - return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true), JDBCUtils.dialect(connection)); + return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection)); } else { try { @@ -628,7 +628,7 @@ public class DSL { public static CloseableDSLContext using(String url, Properties properties) { if (url.startsWith("r2dbc")) { io.r2dbc.spi.Connection connection = R2DBC.getConnection(url, properties); - return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true), JDBCUtils.dialect(connection)); + return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection)); } else { try { diff --git a/jOOQ/src/main/java/org/jooq/impl/DefaultConnectionFactory.java b/jOOQ/src/main/java/org/jooq/impl/DefaultConnectionFactory.java index 0811e480e3..472bb36028 100644 --- a/jOOQ/src/main/java/org/jooq/impl/DefaultConnectionFactory.java +++ b/jOOQ/src/main/java/org/jooq/impl/DefaultConnectionFactory.java @@ -40,6 +40,10 @@ package org.jooq.impl; import static org.jooq.impl.R2DBC.AbstractSubscription.onRequest; import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntFunction; +import java.util.function.IntSupplier; +import java.util.function.Supplier; import org.jooq.exception.DetachedException; @@ -61,16 +65,19 @@ import io.r2dbc.spi.ValidationDepth; */ final class DefaultConnectionFactory implements ConnectionFactory { - Connection connection; - final boolean finalize; + Connection connection; + final boolean finalize; + final boolean nested; + final AtomicInteger savepoints = new AtomicInteger(); DefaultConnectionFactory(Connection connection) { - this(connection, false); + this(connection, false, true); } - DefaultConnectionFactory(Connection connection, boolean finalize) { + DefaultConnectionFactory(Connection connection, boolean finalize, boolean nested) { this.connection = connection; this.finalize = finalize; + this.nested = nested; } final Connection connectionOrThrow() { @@ -93,20 +100,39 @@ final class DefaultConnectionFactory implements ConnectionFactory { return () -> connectionOrThrow().getMetadata().getDatabaseProductName(); } - private final class NonClosingConnection implements Connection { + final class NonClosingConnection implements Connection { // --------------------------------------------------------------------- // 0.9.0.M1 API // --------------------------------------------------------------------- + private T nest(IntSupplier level, Supplier toplevelAction, IntFunction nestedAction) { + if (nested) + return nestedAction.apply(level.getAsInt()); + else + return toplevelAction.get(); + } + + private String savepoint(int i) { + return "S" + i; + } + @Override public Publisher beginTransaction() { - return connectionOrThrow().beginTransaction(); + return nest( + savepoints::getAndIncrement, + () -> connectionOrThrow().beginTransaction(), + i -> connectionOrThrow().createSavepoint(savepoint(i)) + ); } @Override public Publisher beginTransaction(TransactionDefinition definition) { - return connectionOrThrow().beginTransaction(definition); + return nest( + savepoints::getAndIncrement, + () -> connectionOrThrow().beginTransaction(definition), + i -> connectionOrThrow().createSavepoint(savepoint(i)) + ); } @Override @@ -116,7 +142,11 @@ final class DefaultConnectionFactory implements ConnectionFactory { @Override public Publisher commitTransaction() { - return connectionOrThrow().commitTransaction(); + return nest( + savepoints::decrementAndGet, + () -> connectionOrThrow().commitTransaction(), + i -> connectionOrThrow().releaseSavepoint(savepoint(i)) + ); } @Override @@ -156,7 +186,10 @@ final class DefaultConnectionFactory implements ConnectionFactory { @Override public Publisher rollbackTransaction() { - return connectionOrThrow().rollbackTransaction(); + return nest( + savepoints::decrementAndGet, + () -> connectionOrThrow().rollbackTransaction(), + i -> connectionOrThrow().rollbackTransactionToSavepoint(savepoint(i))); } @Override diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java index 4a97608402..052d9e47a9 100644 --- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java +++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java @@ -96,6 +96,7 @@ import org.jooq.conf.Settings; import org.jooq.conf.SettingsTools; import org.jooq.exception.DataAccessException; import org.jooq.exception.DataTypeException; +import org.jooq.impl.DefaultConnectionFactory.NonClosingConnection; import org.jooq.impl.DefaultRenderContext.Rendered; import org.jooq.impl.ThreadGuard.Guard; import org.jooq.tools.JooqLogger; @@ -695,11 +696,10 @@ final class R2DBC { v -> {}, subscriber::onError, - // [#13502] TODO: Savepoint support - () -> transactional.run(configuration.derive(new DefaultConnectionFactory(c))).subscribe(subscriber( - - // [#13502] TODO: Continue requesting items - // [#13502] TODO: Cancel subscription when appropriate + // [#13502] Implement Savepoint logic for nested transactions + () -> transactional.run(c instanceof NonClosingConnection + ? configuration + : configuration.derive(new DefaultConnectionFactory(c))).subscribe(subscriber( s1 -> s1.request(Long.MAX_VALUE), subscriber::onNext, e -> c.rollbackTransaction().subscribe(subscriber(