From 6e35df87a04423986af9450df065f6c1b63bdca9 Mon Sep 17 00:00:00 2001 From: lukaseder Date: Fri, 1 Jul 2016 18:37:36 +0200 Subject: [PATCH] [#5377] Add alternative TransactionProvider that implements ThreadLocal semantics --- jOOQ/src/main/java/org/jooq/DSLContext.java | 38 +++++- .../ThreadLocalTransactionalCallable.java | 69 ++++++++++ .../ThreadLocalTransactionalRunnable.java | 68 +++++++++ .../exception/ConfigurationException.java | 66 +++++++++ .../org/jooq/impl/DefaultConfiguration.java | 34 +++-- .../java/org/jooq/impl/DefaultDSLContext.java | 43 +++++- .../jooq/impl/DefaultTransactionProvider.java | 8 +- .../impl/ThreadLocalConnectionProvider.java | 83 +++++++++++ .../impl/ThreadLocalTransactionProvider.java | 129 ++++++++++++++++++ jOOQ/src/main/java/org/jooq/impl/Tools.java | 11 +- 10 files changed, 532 insertions(+), 17 deletions(-) create mode 100644 jOOQ/src/main/java/org/jooq/ThreadLocalTransactionalCallable.java create mode 100644 jOOQ/src/main/java/org/jooq/ThreadLocalTransactionalRunnable.java create mode 100644 jOOQ/src/main/java/org/jooq/exception/ConfigurationException.java create mode 100644 jOOQ/src/main/java/org/jooq/impl/ThreadLocalConnectionProvider.java create mode 100644 jOOQ/src/main/java/org/jooq/impl/ThreadLocalTransactionProvider.java diff --git a/jOOQ/src/main/java/org/jooq/DSLContext.java b/jOOQ/src/main/java/org/jooq/DSLContext.java index cdedb69c94..c8ec2bb883 100644 --- a/jOOQ/src/main/java/org/jooq/DSLContext.java +++ b/jOOQ/src/main/java/org/jooq/DSLContext.java @@ -84,11 +84,13 @@ import javax.annotation.Generated; import org.jooq.conf.ParamType; import org.jooq.conf.Settings; import org.jooq.conf.StatementType; +import org.jooq.exception.ConfigurationException; import org.jooq.exception.DataAccessException; import org.jooq.exception.InvalidResultException; import org.jooq.exception.MappingException; import org.jooq.exception.TooManyRowsException; import org.jooq.impl.DSL; +import org.jooq.impl.ThreadLocalTransactionProvider; import org.jooq.tools.jdbc.MockCallable; import org.jooq.tools.jdbc.MockDataProvider; import org.jooq.tools.jdbc.MockRunnable; @@ -214,6 +216,18 @@ public interface DSLContext extends Scope , AutoCloseable { */ T transactionResult(TransactionalCallable transactional); + /** + * Run a {@link ThreadLocalTransactionalRunnable} in the context of this + * DSLContext's underlying {@link #configuration()}'s + * {@link ThreadLocalTransactionProvider}. + * + * @param transactional The transactional code + * @throws ConfigurationException if the underlying + * {@link Configuration#transactionProvider()} is not a + * {@link ThreadLocalTransactionProvider}. + */ + T transactionResult(ThreadLocalTransactionalCallable transactional) throws ConfigurationException; + /** * Run a {@link TransactionalRunnable} in the context of this * DSLContext's underlying {@link #configuration()}'s @@ -223,6 +237,18 @@ public interface DSLContext extends Scope , AutoCloseable { */ void transaction(TransactionalRunnable transactional); + /** + * Run a {@link ThreadLocalTransactionalRunnable} in the context of this + * DSLContext's underlying {@link #configuration()}'s + * {@link ThreadLocalTransactionProvider}. + * + * @param transactional The transactional code + * @throws ConfigurationException if the underlying + * {@link Configuration#transactionProvider()} is not a + * {@link ThreadLocalTransactionProvider}. + */ + void transaction(ThreadLocalTransactionalRunnable transactional) throws ConfigurationException; + /** @@ -238,8 +264,9 @@ public interface DSLContext extends Scope , AutoCloseable { * * @param transactional The transactional code * @return The transactional outcome + * @throws ConfigurationException If this is run with a {@link ThreadLocalTransactionProvider}. */ - CompletionStage transactionResultAsync(TransactionalCallable transactional); + CompletionStage transactionResultAsync(TransactionalCallable transactional) throws ConfigurationException; /** * Run a {@link TransactionalRunnable} asynchronously. @@ -253,8 +280,9 @@ public interface DSLContext extends Scope , AutoCloseable { * {@link Configuration#executorProvider()}. * * @param transactional The transactional code + * @throws ConfigurationException If this is run with a {@link ThreadLocalTransactionProvider}. */ - CompletionStage transactionAsync(TransactionalRunnable transactional); + CompletionStage transactionAsync(TransactionalRunnable transactional) throws ConfigurationException; /** * Run a {@link TransactionalCallable} asynchronously. @@ -268,8 +296,9 @@ public interface DSLContext extends Scope , AutoCloseable { * * @param transactional The transactional code * @return The transactional outcome + * @throws ConfigurationException If this is run with a {@link ThreadLocalTransactionProvider}. */ - CompletionStage transactionResultAsync(Executor executor, TransactionalCallable transactional); + CompletionStage transactionResultAsync(Executor executor, TransactionalCallable transactional) throws ConfigurationException; /** * Run a {@link TransactionalRunnable} asynchronously. @@ -282,8 +311,9 @@ public interface DSLContext extends Scope , AutoCloseable { * {@link Executor}. * * @param transactional The transactional code + * @throws ConfigurationException If this is run with a {@link ThreadLocalTransactionProvider}. */ - CompletionStage transactionAsync(Executor executor, TransactionalRunnable transactional); + CompletionStage transactionAsync(Executor executor, TransactionalRunnable transactional) throws ConfigurationException; diff --git a/jOOQ/src/main/java/org/jooq/ThreadLocalTransactionalCallable.java b/jOOQ/src/main/java/org/jooq/ThreadLocalTransactionalCallable.java new file mode 100644 index 0000000000..0bb8dad60c --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/ThreadLocalTransactionalCallable.java @@ -0,0 +1,69 @@ +/** + * Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com) + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq; + +/** + * An FunctionalInterface that wraps transactional code. + * + * @author Lukas Eder + */ + +@FunctionalInterface + +public interface ThreadLocalTransactionalCallable { + + /** + * Run the transactional code. + *

+ * If this method completes normally, and this is not a nested transaction, + * then the transaction will be committed. If this method completes with an + * exception, then the transaction is rolled back to the beginning of this + * ThreadLocalTransactionalCallable. + * + * @return The outcome of the transaction. + * @throws Exception Any exception that will cause a rollback of the code + * contained in this transaction. If this is a nested + * transaction, the rollback may be performed only to the state + * before executing this + * ThreadLocalTransactionalCallable. + */ + T run() throws Exception; +} diff --git a/jOOQ/src/main/java/org/jooq/ThreadLocalTransactionalRunnable.java b/jOOQ/src/main/java/org/jooq/ThreadLocalTransactionalRunnable.java new file mode 100644 index 0000000000..ea9c51b4d2 --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/ThreadLocalTransactionalRunnable.java @@ -0,0 +1,68 @@ +/** + * Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com) + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq; + +/** + * An FunctionalInterface that wraps transactional code. + * + * @author Lukas Eder + */ + +@FunctionalInterface + +public interface ThreadLocalTransactionalRunnable { + + /** + * Run the transactional code. + *

+ * If this method completes normally, and this is not a nested transaction, + * then the transaction will be committed. If this method completes with an + * exception, then the transaction is rolled back to the beginning of this + * ThreadLocalTransactionalRunnable. + * + * @throws Exception Any exception that will cause a rollback of the code + * contained in this transaction. If this is a nested + * transaction, the rollback may be performed only to the state + * before executing this + * ThreadLocalTransactionalRunnable. + */ + void run() throws Exception; +} diff --git a/jOOQ/src/main/java/org/jooq/exception/ConfigurationException.java b/jOOQ/src/main/java/org/jooq/exception/ConfigurationException.java new file mode 100644 index 0000000000..e94c0af43f --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/exception/ConfigurationException.java @@ -0,0 +1,66 @@ +/** + * Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com) + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq.exception; + +import org.jooq.Configuration; + +/** + * The {@link Configuration} was set up in a way that does not allow for a + * particular operation. + * + * @author Lukas Eder + */ +public class ConfigurationException extends DataAccessException { + + /** + * Generated UID + */ + private static final long serialVersionUID = -6460945824599280420L; + + /** + * Constructor for ConfigurationException. + * + * @param message the detail message + */ + public ConfigurationException(String message) { + super(message); + } +} diff --git a/jOOQ/src/main/java/org/jooq/impl/DefaultConfiguration.java b/jOOQ/src/main/java/org/jooq/impl/DefaultConfiguration.java index ac9358fb0d..b362955f61 100644 --- a/jOOQ/src/main/java/org/jooq/impl/DefaultConfiguration.java +++ b/jOOQ/src/main/java/org/jooq/impl/DefaultConfiguration.java @@ -77,6 +77,7 @@ import org.jooq.VisitListener; import org.jooq.VisitListenerProvider; import org.jooq.conf.Settings; import org.jooq.conf.SettingsTools; +import org.jooq.exception.ConfigurationException; /** * A default implementation for configurations within a {@link DSLContext}, if no @@ -713,9 +714,16 @@ public class DefaultConfiguration implements Configuration { @Override public final Configuration set(ConnectionProvider newConnectionProvider) { - this.connectionProvider = newConnectionProvider != null - ? newConnectionProvider - : new NoConnectionProvider(); + if (newConnectionProvider != null) { + if (transactionProvider instanceof ThreadLocalTransactionProvider && + !(newConnectionProvider instanceof ThreadLocalConnectionProvider)) + throw new ConfigurationException("Cannot override ConnectionProvider when Configuration contains a ThreadLocalTransactionProvider"); + + this.connectionProvider = newConnectionProvider; + } + else { + this.connectionProvider = new NoConnectionProvider(); + } return this; } @@ -733,9 +741,15 @@ public class DefaultConfiguration implements Configuration { @Override public final Configuration set(TransactionProvider newTransactionProvider) { - this.transactionProvider = newTransactionProvider != null - ? newTransactionProvider - : new NoTransactionProvider(); + if (newTransactionProvider != null) { + this.transactionProvider = newTransactionProvider; + + if (newTransactionProvider instanceof ThreadLocalTransactionProvider) + this.connectionProvider = ((ThreadLocalTransactionProvider) newTransactionProvider).connection; + } + else { + this.transactionProvider = new NoTransactionProvider(); + } return this; } @@ -927,9 +941,13 @@ public class DefaultConfiguration implements Configuration { @Override public final ConnectionProvider connectionProvider() { - // [#3229] If we're currently in a transaction, return that transaction's + // [#3229] [#5377] If we're currently in a transaction, return that transaction's // local DefaultConnectionProvider, not the one from this configuration - ConnectionProvider transactional = (ConnectionProvider) data(DATA_DEFAULT_TRANSACTION_PROVIDER_CONNECTION); + TransactionProvider tp = transactionProvider(); + ConnectionProvider transactional = tp instanceof ThreadLocalTransactionProvider + ? ((ThreadLocalTransactionProvider) tp).connection + : (ConnectionProvider) data(DATA_DEFAULT_TRANSACTION_PROVIDER_CONNECTION); + return transactional == null ? connectionProvider : transactional; } diff --git a/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java b/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java index d51c1fad5a..c7dd02b855 100644 --- a/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java +++ b/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java @@ -214,6 +214,8 @@ import org.jooq.Table; import org.jooq.TableField; import org.jooq.TableLike; import org.jooq.TableRecord; +import org.jooq.ThreadLocalTransactionalCallable; +import org.jooq.ThreadLocalTransactionalRunnable; import org.jooq.TransactionProvider; import org.jooq.TransactionalCallable; import org.jooq.TransactionalRunnable; @@ -248,6 +250,7 @@ import org.jooq.WithAsStep8; import org.jooq.WithAsStep9; import org.jooq.WithStep; import org.jooq.conf.Settings; +import org.jooq.exception.ConfigurationException; import org.jooq.exception.DataAccessException; import org.jooq.exception.InvalidResultException; import org.jooq.exception.SQLDialectNotSupportedException; @@ -372,8 +375,27 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri // XXX APIs for creating scope for transactions, mocking, batching, etc. // ------------------------------------------------------------------------- + @Override + public T transactionResult(final ThreadLocalTransactionalCallable transactional) { + TransactionProvider tp = configuration().transactionProvider(); + + if (!(tp instanceof ThreadLocalTransactionProvider)) + throw new ConfigurationException("Cannot use ThreadLocalTransactionalCallable with TransactionProvider of type " + tp.getClass()); + + return transactionResult0(new TransactionalCallable() { + @Override + public T run(Configuration c) throws Exception { + return transactional.run(); + } + }, ((ThreadLocalTransactionProvider) tp).configuration(configuration()), true); + } + @Override public T transactionResult(TransactionalCallable transactional) { + return transactionResult0(transactional, configuration(), false); + } + + private static T transactionResult0(TransactionalCallable transactional, Configuration configuration, boolean threadLocal) { // If used in a Java 8 Stream, a transaction should always be executed // in a ManagedBlocker context, just in case Stream.parallel() is called @@ -387,7 +409,7 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri T result = null; - DefaultTransactionContext ctx = new DefaultTransactionContext(configuration().derive()); + DefaultTransactionContext ctx = new DefaultTransactionContext(configuration.derive()); TransactionProvider provider = ctx.configuration().transactionProvider(); TransactionListeners listeners = new TransactionListeners(ctx.configuration()); @@ -437,10 +459,21 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri return result; - }).get(); + }, threadLocal).get(); } + @Override + public void transaction(final ThreadLocalTransactionalRunnable transactional) { + transactionResult(new ThreadLocalTransactionalCallable() { + @Override + public Void run() throws Exception { + transactional.run(); + return null; + } + }); + } + @Override public void transaction(final TransactionalRunnable transactional) { transactionResult(new TransactionalCallable() { @@ -461,6 +494,9 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri @Override public CompletionStage transactionAsync(Executor executor, TransactionalRunnable transactional) { + if (configuration().transactionProvider() instanceof ThreadLocalTransactionProvider) + throw new ConfigurationException("Cannot use TransactionalCallable with ThreadLocalTransactionProvider"); + return ExecutorProviderCompletionStage.of(CompletableFuture.supplyAsync( () -> { transaction(transactional); return null; }, executor), () -> executor @@ -474,6 +510,9 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri @Override public CompletionStage transactionResultAsync(Executor executor, TransactionalCallable transactional) { + if (configuration().transactionProvider() instanceof ThreadLocalTransactionProvider) + throw new ConfigurationException("Cannot use TransactionalCallable with ThreadLocalTransactionProvider"); + return ExecutorProviderCompletionStage.of(CompletableFuture.supplyAsync( () -> transactionResult(transactional), executor), () -> executor diff --git a/jOOQ/src/main/java/org/jooq/impl/DefaultTransactionProvider.java b/jOOQ/src/main/java/org/jooq/impl/DefaultTransactionProvider.java index bcb2ad8b1d..305ba43935 100644 --- a/jOOQ/src/main/java/org/jooq/impl/DefaultTransactionProvider.java +++ b/jOOQ/src/main/java/org/jooq/impl/DefaultTransactionProvider.java @@ -105,6 +105,10 @@ public class DefaultTransactionProvider implements TransactionProvider { return nested; } + final int nestingLevel(Configuration configuration) { + return savepoints(configuration).size(); + } + @SuppressWarnings("unchecked") private final Deque savepoints(Configuration configuration) { Deque savepoints = (Deque) configuration.data(DATA_DEFAULT_TRANSACTION_PROVIDER_SAVEPOINTS); @@ -155,7 +159,7 @@ public class DefaultTransactionProvider implements TransactionProvider { savepoints.push(savepoint); } - private Savepoint setSavepoint(Configuration configuration) { + private final Savepoint setSavepoint(Configuration configuration) { if (!nested()) return IGNORED_SAVEPOINT; @@ -231,7 +235,7 @@ public class DefaultTransactionProvider implements TransactionProvider { * Ensure an autoCommit value on the connection, if it was set * to true, originally. */ - private void brace(Configuration configuration, boolean start) { + private final void brace(Configuration configuration, boolean start) { DefaultConnectionProvider connection = connection(configuration); try { diff --git a/jOOQ/src/main/java/org/jooq/impl/ThreadLocalConnectionProvider.java b/jOOQ/src/main/java/org/jooq/impl/ThreadLocalConnectionProvider.java new file mode 100644 index 0000000000..8b589f6fd7 --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/impl/ThreadLocalConnectionProvider.java @@ -0,0 +1,83 @@ +/** + * Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com) + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq.impl; + +import java.sql.Connection; + +import org.jooq.ConnectionProvider; + +/** + * A connection provider that interoperates with the + * {@link ThreadLocalTransactionProvider}. + *

+ * This implementation is used implicitly and automatically by + * {@link DefaultConfiguration}. Users should pass their own + * {@link ConnectionProvider} to the {@link ThreadLocalTransactionProvider}. + * + * @author Lukas Eder + */ +final class ThreadLocalConnectionProvider implements ConnectionProvider { + + final ConnectionProvider delegate; + final ThreadLocal tl; + + public ThreadLocalConnectionProvider(ConnectionProvider delegate) { + this.delegate = delegate; + this.tl = new ThreadLocal(); + } + + @Override + public final Connection acquire() { + Connection result = tl.get(); + return result != null ? result : delegate.acquire(); + } + + @Override + public final void release(Connection connection) { + Connection previous = tl.get(); + + if (previous == null) + delegate.release(connection); + else if (previous != connection) + throw new IllegalStateException( + "A different connection was released than the thread-bound one that was expected"); + } +} diff --git a/jOOQ/src/main/java/org/jooq/impl/ThreadLocalTransactionProvider.java b/jOOQ/src/main/java/org/jooq/impl/ThreadLocalTransactionProvider.java new file mode 100644 index 0000000000..036807fa14 --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/impl/ThreadLocalTransactionProvider.java @@ -0,0 +1,129 @@ +/** + * Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com) + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq.impl; + +import static org.jooq.impl.Tools.DataKey.DATA_DEFAULT_TRANSACTION_PROVIDER_CONNECTION; + +import java.sql.Connection; +import java.sql.Savepoint; +import java.util.ArrayDeque; +import java.util.Deque; + +import org.jooq.Configuration; +import org.jooq.ConnectionProvider; +import org.jooq.TransactionContext; +import org.jooq.TransactionProvider; + +/** + * A {@link TransactionProvider} that implements thread-bound transaction + * semantics. + *

+ * Use this TransactionProvider if your transactions are + * thread-bound, meaning that a transaction and its underlying + * {@link Connection} will never leave the thread that started the transaction. + *

+ * When this TransactionProvider is used, users must pass their + * custom {@link ConnectionProvider} implementations to this + * TransactionProvider, instead of passing it to the + * {@link Configuration}. + * + * @author Lukas Eder + */ +public class ThreadLocalTransactionProvider implements TransactionProvider { + + final DefaultTransactionProvider transaction; + final ThreadLocalConnectionProvider connection; + final ThreadLocal> configuration; + + public ThreadLocalTransactionProvider(ConnectionProvider provider) { + this(provider, true); + } + + /** + * @param nested Whether nested transactions via {@link Savepoint}s are + * supported. + */ + public ThreadLocalTransactionProvider(ConnectionProvider provider, boolean nested) { + this.connection = new ThreadLocalConnectionProvider(provider); + this.transaction = new DefaultTransactionProvider(connection, nested); + this.configuration = new ThreadLocal>(); + } + + @Override + public void begin(TransactionContext ctx) { + transaction.begin(ctx); + configurations().push(ctx.configuration()); + if (transaction.nestingLevel(ctx.configuration()) == 1) + connection.tl.set(((DefaultConnectionProvider) ctx.configuration().data(DATA_DEFAULT_TRANSACTION_PROVIDER_CONNECTION)).connection); + } + + @Override + public void commit(TransactionContext ctx) { + if (transaction.nestingLevel(ctx.configuration()) == 1) + connection.tl.remove(); + configurations().pop(); + transaction.commit(ctx); + } + + @Override + public void rollback(TransactionContext ctx) { + if (transaction.nestingLevel(ctx.configuration()) == 1) + connection.tl.remove(); + configurations().pop(); + transaction.rollback(ctx); + } + + Configuration configuration(Configuration fallback) { + Deque configurations = configurations(); + return configurations.isEmpty() ? fallback : configurations.peek(); + } + + private Deque configurations() { + Deque result = configuration.get(); + + if (result == null) { + result = new ArrayDeque(); + configuration.set(result); + } + + return result; + } +} diff --git a/jOOQ/src/main/java/org/jooq/impl/Tools.java b/jOOQ/src/main/java/org/jooq/impl/Tools.java index 3ae3723d72..caccdbcfdf 100644 --- a/jOOQ/src/main/java/org/jooq/impl/Tools.java +++ b/jOOQ/src/main/java/org/jooq/impl/Tools.java @@ -3242,7 +3242,16 @@ final class Tools { static Supplier blocking(Supplier supplier) { - return new Supplier() { + return blocking(supplier, false); + } + + static Supplier blocking(Supplier supplier, boolean threadLocal) { + + // [#5377] In ThreadLocal contexts (e.g. when using ThreadLocalTransactionprovider), + // no ManagedBlocker is needed as we're guaranteed by API contract to always + // remain on the same thread. + + return threadLocal ? supplier : new Supplier() { volatile T asyncResult; @Override