[jOOQ/jOOQ#13502] Add nested transaction support

This commit is contained in:
Lukas Eder 2022-04-28 14:11:43 +02:00
parent 403a57bf98
commit 63a27fe1ba
4 changed files with 111 additions and 17 deletions

View File

@ -0,0 +1,61 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Other licenses:
* -----------------------------------------------------------------------------
* Commercial licenses for this work are available. These replace the above
* ASL 2.0 and offer limited warranties, support, maintenance, and commercial
* database integrations.
*
* For more information, please visit: http://www.jooq.org/licenses
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package org.jooq;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
/**
* An <code>FunctionalInterface</code> that wraps transactional code for
* reactive usage.
*
* @author Lukas Eder
*/
@FunctionalInterface
public interface TransactionalPublishable<T> {
/**
* Run the transactional code.
*
* @param configuration The <code>Configuration</code> in whose context the
* transaction is run.
* @return The outcome of the transaction.
*/
@NotNull
Publisher<T> run(Configuration configuration);
}

View File

@ -547,7 +547,7 @@ public class DSL {
public static CloseableDSLContext using(String url) {
if (url.startsWith("r2dbc")) {
io.r2dbc.spi.Connection connection = R2DBC.getConnection(url);
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true), JDBCUtils.dialect(connection));
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection));
}
else {
try {
@ -588,7 +588,7 @@ public class DSL {
public static CloseableDSLContext using(String url, String username, String password) {
if (url.startsWith("r2dbc")) {
io.r2dbc.spi.Connection connection = R2DBC.getConnection(url, username, password);
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true), JDBCUtils.dialect(connection));
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection));
}
else {
try {
@ -628,7 +628,7 @@ public class DSL {
public static CloseableDSLContext using(String url, Properties properties) {
if (url.startsWith("r2dbc")) {
io.r2dbc.spi.Connection connection = R2DBC.getConnection(url, properties);
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true), JDBCUtils.dialect(connection));
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true, false), JDBCUtils.dialect(connection));
}
else {
try {

View File

@ -40,6 +40,10 @@ package org.jooq.impl;
import static org.jooq.impl.R2DBC.AbstractSubscription.onRequest;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntFunction;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import org.jooq.exception.DetachedException;
@ -61,16 +65,19 @@ import io.r2dbc.spi.ValidationDepth;
*/
final class DefaultConnectionFactory implements ConnectionFactory {
Connection connection;
final boolean finalize;
Connection connection;
final boolean finalize;
final boolean nested;
final AtomicInteger savepoints = new AtomicInteger();
DefaultConnectionFactory(Connection connection) {
this(connection, false);
this(connection, false, true);
}
DefaultConnectionFactory(Connection connection, boolean finalize) {
DefaultConnectionFactory(Connection connection, boolean finalize, boolean nested) {
this.connection = connection;
this.finalize = finalize;
this.nested = nested;
}
final Connection connectionOrThrow() {
@ -93,20 +100,39 @@ final class DefaultConnectionFactory implements ConnectionFactory {
return () -> connectionOrThrow().getMetadata().getDatabaseProductName();
}
private final class NonClosingConnection implements Connection {
final class NonClosingConnection implements Connection {
// ---------------------------------------------------------------------
// 0.9.0.M1 API
// ---------------------------------------------------------------------
private <T> T nest(IntSupplier level, Supplier<T> toplevelAction, IntFunction<T> nestedAction) {
if (nested)
return nestedAction.apply(level.getAsInt());
else
return toplevelAction.get();
}
private String savepoint(int i) {
return "S" + i;
}
@Override
public Publisher<Void> beginTransaction() {
return connectionOrThrow().beginTransaction();
return nest(
savepoints::getAndIncrement,
() -> connectionOrThrow().beginTransaction(),
i -> connectionOrThrow().createSavepoint(savepoint(i))
);
}
@Override
public Publisher<Void> beginTransaction(TransactionDefinition definition) {
return connectionOrThrow().beginTransaction(definition);
return nest(
savepoints::getAndIncrement,
() -> connectionOrThrow().beginTransaction(definition),
i -> connectionOrThrow().createSavepoint(savepoint(i))
);
}
@Override
@ -116,7 +142,11 @@ final class DefaultConnectionFactory implements ConnectionFactory {
@Override
public Publisher<Void> commitTransaction() {
return connectionOrThrow().commitTransaction();
return nest(
savepoints::decrementAndGet,
() -> connectionOrThrow().commitTransaction(),
i -> connectionOrThrow().releaseSavepoint(savepoint(i))
);
}
@Override
@ -156,7 +186,10 @@ final class DefaultConnectionFactory implements ConnectionFactory {
@Override
public Publisher<Void> rollbackTransaction() {
return connectionOrThrow().rollbackTransaction();
return nest(
savepoints::decrementAndGet,
() -> connectionOrThrow().rollbackTransaction(),
i -> connectionOrThrow().rollbackTransactionToSavepoint(savepoint(i)));
}
@Override

View File

@ -96,6 +96,7 @@ import org.jooq.conf.Settings;
import org.jooq.conf.SettingsTools;
import org.jooq.exception.DataAccessException;
import org.jooq.exception.DataTypeException;
import org.jooq.impl.DefaultConnectionFactory.NonClosingConnection;
import org.jooq.impl.DefaultRenderContext.Rendered;
import org.jooq.impl.ThreadGuard.Guard;
import org.jooq.tools.JooqLogger;
@ -695,11 +696,10 @@ final class R2DBC {
v -> {},
subscriber::onError,
// [#13502] TODO: Savepoint support
() -> transactional.run(configuration.derive(new DefaultConnectionFactory(c))).subscribe(subscriber(
// [#13502] TODO: Continue requesting items
// [#13502] TODO: Cancel subscription when appropriate
// [#13502] Implement Savepoint logic for nested transactions
() -> transactional.run(c instanceof NonClosingConnection
? configuration
: configuration.derive(new DefaultConnectionFactory(c))).subscribe(subscriber(
s1 -> s1.request(Long.MAX_VALUE),
subscriber::onNext,
e -> c.rollbackTransaction().subscribe(subscriber(