[#5377] Add alternative TransactionProvider that implements ThreadLocal semantics

This commit is contained in:
lukaseder 2016-07-01 18:37:36 +02:00
parent 22c0f9f411
commit 6e35df87a0
10 changed files with 532 additions and 17 deletions

View File

@ -84,11 +84,13 @@ import javax.annotation.Generated;
import org.jooq.conf.ParamType;
import org.jooq.conf.Settings;
import org.jooq.conf.StatementType;
import org.jooq.exception.ConfigurationException;
import org.jooq.exception.DataAccessException;
import org.jooq.exception.InvalidResultException;
import org.jooq.exception.MappingException;
import org.jooq.exception.TooManyRowsException;
import org.jooq.impl.DSL;
import org.jooq.impl.ThreadLocalTransactionProvider;
import org.jooq.tools.jdbc.MockCallable;
import org.jooq.tools.jdbc.MockDataProvider;
import org.jooq.tools.jdbc.MockRunnable;
@ -214,6 +216,18 @@ public interface DSLContext extends Scope , AutoCloseable {
*/
<T> T transactionResult(TransactionalCallable<T> transactional);
/**
* Run a {@link ThreadLocalTransactionalRunnable} in the context of this
* <code>DSLContext</code>'s underlying {@link #configuration()}'s
* {@link ThreadLocalTransactionProvider}.
*
* @param transactional The transactional code
* @throws ConfigurationException if the underlying
* {@link Configuration#transactionProvider()} is not a
* {@link ThreadLocalTransactionProvider}.
*/
<T> T transactionResult(ThreadLocalTransactionalCallable<T> transactional) throws ConfigurationException;
/**
* Run a {@link TransactionalRunnable} in the context of this
* <code>DSLContext</code>'s underlying {@link #configuration()}'s
@ -223,6 +237,18 @@ public interface DSLContext extends Scope , AutoCloseable {
*/
void transaction(TransactionalRunnable transactional);
/**
* Run a {@link ThreadLocalTransactionalRunnable} in the context of this
* <code>DSLContext</code>'s underlying {@link #configuration()}'s
* {@link ThreadLocalTransactionProvider}.
*
* @param transactional The transactional code
* @throws ConfigurationException if the underlying
* {@link Configuration#transactionProvider()} is not a
* {@link ThreadLocalTransactionProvider}.
*/
void transaction(ThreadLocalTransactionalRunnable transactional) throws ConfigurationException;
/**
@ -238,8 +264,9 @@ public interface DSLContext extends Scope , AutoCloseable {
*
* @param transactional The transactional code
* @return The transactional outcome
* @throws ConfigurationException If this is run with a {@link ThreadLocalTransactionProvider}.
*/
<T> CompletionStage<T> transactionResultAsync(TransactionalCallable<T> transactional);
<T> CompletionStage<T> transactionResultAsync(TransactionalCallable<T> transactional) throws ConfigurationException;
/**
* Run a {@link TransactionalRunnable} asynchronously.
@ -253,8 +280,9 @@ public interface DSLContext extends Scope , AutoCloseable {
* {@link Configuration#executorProvider()}.
*
* @param transactional The transactional code
* @throws ConfigurationException If this is run with a {@link ThreadLocalTransactionProvider}.
*/
CompletionStage<Void> transactionAsync(TransactionalRunnable transactional);
CompletionStage<Void> transactionAsync(TransactionalRunnable transactional) throws ConfigurationException;
/**
* Run a {@link TransactionalCallable} asynchronously.
@ -268,8 +296,9 @@ public interface DSLContext extends Scope , AutoCloseable {
*
* @param transactional The transactional code
* @return The transactional outcome
* @throws ConfigurationException If this is run with a {@link ThreadLocalTransactionProvider}.
*/
<T> CompletionStage<T> transactionResultAsync(Executor executor, TransactionalCallable<T> transactional);
<T> CompletionStage<T> transactionResultAsync(Executor executor, TransactionalCallable<T> transactional) throws ConfigurationException;
/**
* Run a {@link TransactionalRunnable} asynchronously.
@ -282,8 +311,9 @@ public interface DSLContext extends Scope , AutoCloseable {
* {@link Executor}.
*
* @param transactional The transactional code
* @throws ConfigurationException If this is run with a {@link ThreadLocalTransactionProvider}.
*/
CompletionStage<Void> transactionAsync(Executor executor, TransactionalRunnable transactional);
CompletionStage<Void> transactionAsync(Executor executor, TransactionalRunnable transactional) throws ConfigurationException;

View File

@ -0,0 +1,69 @@
/**
* Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com)
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Other licenses:
* -----------------------------------------------------------------------------
* Commercial licenses for this work are available. These replace the above
* ASL 2.0 and offer limited warranties, support, maintenance, and commercial
* database integrations.
*
* For more information, please visit: http://www.jooq.org/licenses
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package org.jooq;
/**
* An <code>FunctionalInterface</code> that wraps transactional code.
*
* @author Lukas Eder
*/
@FunctionalInterface
public interface ThreadLocalTransactionalCallable<T> {
/**
* Run the transactional code.
* <p>
* If this method completes normally, and this is not a nested transaction,
* then the transaction will be committed. If this method completes with an
* exception, then the transaction is rolled back to the beginning of this
* <code>ThreadLocalTransactionalCallable</code>.
*
* @return The outcome of the transaction.
* @throws Exception Any exception that will cause a rollback of the code
* contained in this transaction. If this is a nested
* transaction, the rollback may be performed only to the state
* before executing this
* <code>ThreadLocalTransactionalCallable</code>.
*/
T run() throws Exception;
}

View File

@ -0,0 +1,68 @@
/**
* Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com)
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Other licenses:
* -----------------------------------------------------------------------------
* Commercial licenses for this work are available. These replace the above
* ASL 2.0 and offer limited warranties, support, maintenance, and commercial
* database integrations.
*
* For more information, please visit: http://www.jooq.org/licenses
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package org.jooq;
/**
* An <code>FunctionalInterface</code> that wraps transactional code.
*
* @author Lukas Eder
*/
@FunctionalInterface
public interface ThreadLocalTransactionalRunnable {
/**
* Run the transactional code.
* <p>
* If this method completes normally, and this is not a nested transaction,
* then the transaction will be committed. If this method completes with an
* exception, then the transaction is rolled back to the beginning of this
* <code>ThreadLocalTransactionalRunnable</code>.
*
* @throws Exception Any exception that will cause a rollback of the code
* contained in this transaction. If this is a nested
* transaction, the rollback may be performed only to the state
* before executing this
* <code>ThreadLocalTransactionalRunnable</code>.
*/
void run() throws Exception;
}

View File

@ -0,0 +1,66 @@
/**
* Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com)
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Other licenses:
* -----------------------------------------------------------------------------
* Commercial licenses for this work are available. These replace the above
* ASL 2.0 and offer limited warranties, support, maintenance, and commercial
* database integrations.
*
* For more information, please visit: http://www.jooq.org/licenses
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package org.jooq.exception;
import org.jooq.Configuration;
/**
* The {@link Configuration} was set up in a way that does not allow for a
* particular operation.
*
* @author Lukas Eder
*/
public class ConfigurationException extends DataAccessException {
/**
* Generated UID
*/
private static final long serialVersionUID = -6460945824599280420L;
/**
* Constructor for ConfigurationException.
*
* @param message the detail message
*/
public ConfigurationException(String message) {
super(message);
}
}

View File

@ -77,6 +77,7 @@ import org.jooq.VisitListener;
import org.jooq.VisitListenerProvider;
import org.jooq.conf.Settings;
import org.jooq.conf.SettingsTools;
import org.jooq.exception.ConfigurationException;
/**
* A default implementation for configurations within a {@link DSLContext}, if no
@ -713,9 +714,16 @@ public class DefaultConfiguration implements Configuration {
@Override
public final Configuration set(ConnectionProvider newConnectionProvider) {
this.connectionProvider = newConnectionProvider != null
? newConnectionProvider
: new NoConnectionProvider();
if (newConnectionProvider != null) {
if (transactionProvider instanceof ThreadLocalTransactionProvider &&
!(newConnectionProvider instanceof ThreadLocalConnectionProvider))
throw new ConfigurationException("Cannot override ConnectionProvider when Configuration contains a ThreadLocalTransactionProvider");
this.connectionProvider = newConnectionProvider;
}
else {
this.connectionProvider = new NoConnectionProvider();
}
return this;
}
@ -733,9 +741,15 @@ public class DefaultConfiguration implements Configuration {
@Override
public final Configuration set(TransactionProvider newTransactionProvider) {
this.transactionProvider = newTransactionProvider != null
? newTransactionProvider
: new NoTransactionProvider();
if (newTransactionProvider != null) {
this.transactionProvider = newTransactionProvider;
if (newTransactionProvider instanceof ThreadLocalTransactionProvider)
this.connectionProvider = ((ThreadLocalTransactionProvider) newTransactionProvider).connection;
}
else {
this.transactionProvider = new NoTransactionProvider();
}
return this;
}
@ -927,9 +941,13 @@ public class DefaultConfiguration implements Configuration {
@Override
public final ConnectionProvider connectionProvider() {
// [#3229] If we're currently in a transaction, return that transaction's
// [#3229] [#5377] If we're currently in a transaction, return that transaction's
// local DefaultConnectionProvider, not the one from this configuration
ConnectionProvider transactional = (ConnectionProvider) data(DATA_DEFAULT_TRANSACTION_PROVIDER_CONNECTION);
TransactionProvider tp = transactionProvider();
ConnectionProvider transactional = tp instanceof ThreadLocalTransactionProvider
? ((ThreadLocalTransactionProvider) tp).connection
: (ConnectionProvider) data(DATA_DEFAULT_TRANSACTION_PROVIDER_CONNECTION);
return transactional == null ? connectionProvider : transactional;
}

View File

@ -214,6 +214,8 @@ import org.jooq.Table;
import org.jooq.TableField;
import org.jooq.TableLike;
import org.jooq.TableRecord;
import org.jooq.ThreadLocalTransactionalCallable;
import org.jooq.ThreadLocalTransactionalRunnable;
import org.jooq.TransactionProvider;
import org.jooq.TransactionalCallable;
import org.jooq.TransactionalRunnable;
@ -248,6 +250,7 @@ import org.jooq.WithAsStep8;
import org.jooq.WithAsStep9;
import org.jooq.WithStep;
import org.jooq.conf.Settings;
import org.jooq.exception.ConfigurationException;
import org.jooq.exception.DataAccessException;
import org.jooq.exception.InvalidResultException;
import org.jooq.exception.SQLDialectNotSupportedException;
@ -372,8 +375,27 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri
// XXX APIs for creating scope for transactions, mocking, batching, etc.
// -------------------------------------------------------------------------
@Override
public <T> T transactionResult(final ThreadLocalTransactionalCallable<T> transactional) {
TransactionProvider tp = configuration().transactionProvider();
if (!(tp instanceof ThreadLocalTransactionProvider))
throw new ConfigurationException("Cannot use ThreadLocalTransactionalCallable with TransactionProvider of type " + tp.getClass());
return transactionResult0(new TransactionalCallable<T>() {
@Override
public T run(Configuration c) throws Exception {
return transactional.run();
}
}, ((ThreadLocalTransactionProvider) tp).configuration(configuration()), true);
}
@Override
public <T> T transactionResult(TransactionalCallable<T> transactional) {
return transactionResult0(transactional, configuration(), false);
}
private static <T> T transactionResult0(TransactionalCallable<T> transactional, Configuration configuration, boolean threadLocal) {
// If used in a Java 8 Stream, a transaction should always be executed
// in a ManagedBlocker context, just in case Stream.parallel() is called
@ -387,7 +409,7 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri
T result = null;
DefaultTransactionContext ctx = new DefaultTransactionContext(configuration().derive());
DefaultTransactionContext ctx = new DefaultTransactionContext(configuration.derive());
TransactionProvider provider = ctx.configuration().transactionProvider();
TransactionListeners listeners = new TransactionListeners(ctx.configuration());
@ -437,10 +459,21 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri
return result;
}).get();
}, threadLocal).get();
}
@Override
public void transaction(final ThreadLocalTransactionalRunnable transactional) {
transactionResult(new ThreadLocalTransactionalCallable<Void>() {
@Override
public Void run() throws Exception {
transactional.run();
return null;
}
});
}
@Override
public void transaction(final TransactionalRunnable transactional) {
transactionResult(new TransactionalCallable<Void>() {
@ -461,6 +494,9 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri
@Override
public CompletionStage<Void> transactionAsync(Executor executor, TransactionalRunnable transactional) {
if (configuration().transactionProvider() instanceof ThreadLocalTransactionProvider)
throw new ConfigurationException("Cannot use TransactionalCallable with ThreadLocalTransactionProvider");
return ExecutorProviderCompletionStage.of(CompletableFuture.supplyAsync(
() -> { transaction(transactional); return null; }, executor),
() -> executor
@ -474,6 +510,9 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri
@Override
public <T> CompletionStage<T> transactionResultAsync(Executor executor, TransactionalCallable<T> transactional) {
if (configuration().transactionProvider() instanceof ThreadLocalTransactionProvider)
throw new ConfigurationException("Cannot use TransactionalCallable with ThreadLocalTransactionProvider");
return ExecutorProviderCompletionStage.of(CompletableFuture.supplyAsync(
() -> transactionResult(transactional), executor),
() -> executor

View File

@ -105,6 +105,10 @@ public class DefaultTransactionProvider implements TransactionProvider {
return nested;
}
final int nestingLevel(Configuration configuration) {
return savepoints(configuration).size();
}
@SuppressWarnings("unchecked")
private final Deque<Savepoint> savepoints(Configuration configuration) {
Deque<Savepoint> savepoints = (Deque<Savepoint>) configuration.data(DATA_DEFAULT_TRANSACTION_PROVIDER_SAVEPOINTS);
@ -155,7 +159,7 @@ public class DefaultTransactionProvider implements TransactionProvider {
savepoints.push(savepoint);
}
private Savepoint setSavepoint(Configuration configuration) {
private final Savepoint setSavepoint(Configuration configuration) {
if (!nested())
return IGNORED_SAVEPOINT;
@ -231,7 +235,7 @@ public class DefaultTransactionProvider implements TransactionProvider {
* Ensure an <code>autoCommit</code> value on the connection, if it was set
* to <code>true</code>, originally.
*/
private void brace(Configuration configuration, boolean start) {
private final void brace(Configuration configuration, boolean start) {
DefaultConnectionProvider connection = connection(configuration);
try {

View File

@ -0,0 +1,83 @@
/**
* Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com)
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Other licenses:
* -----------------------------------------------------------------------------
* Commercial licenses for this work are available. These replace the above
* ASL 2.0 and offer limited warranties, support, maintenance, and commercial
* database integrations.
*
* For more information, please visit: http://www.jooq.org/licenses
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package org.jooq.impl;
import java.sql.Connection;
import org.jooq.ConnectionProvider;
/**
* A connection provider that interoperates with the
* {@link ThreadLocalTransactionProvider}.
* <p>
* This implementation is used implicitly and automatically by
* {@link DefaultConfiguration}. Users should pass their own
* {@link ConnectionProvider} to the {@link ThreadLocalTransactionProvider}.
*
* @author Lukas Eder
*/
final class ThreadLocalConnectionProvider implements ConnectionProvider {
final ConnectionProvider delegate;
final ThreadLocal<Connection> tl;
public ThreadLocalConnectionProvider(ConnectionProvider delegate) {
this.delegate = delegate;
this.tl = new ThreadLocal<Connection>();
}
@Override
public final Connection acquire() {
Connection result = tl.get();
return result != null ? result : delegate.acquire();
}
@Override
public final void release(Connection connection) {
Connection previous = tl.get();
if (previous == null)
delegate.release(connection);
else if (previous != connection)
throw new IllegalStateException(
"A different connection was released than the thread-bound one that was expected");
}
}

View File

@ -0,0 +1,129 @@
/**
* Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com)
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Other licenses:
* -----------------------------------------------------------------------------
* Commercial licenses for this work are available. These replace the above
* ASL 2.0 and offer limited warranties, support, maintenance, and commercial
* database integrations.
*
* For more information, please visit: http://www.jooq.org/licenses
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package org.jooq.impl;
import static org.jooq.impl.Tools.DataKey.DATA_DEFAULT_TRANSACTION_PROVIDER_CONNECTION;
import java.sql.Connection;
import java.sql.Savepoint;
import java.util.ArrayDeque;
import java.util.Deque;
import org.jooq.Configuration;
import org.jooq.ConnectionProvider;
import org.jooq.TransactionContext;
import org.jooq.TransactionProvider;
/**
* A {@link TransactionProvider} that implements thread-bound transaction
* semantics.
* <p>
* Use this <code>TransactionProvider</code> if your transactions are
* thread-bound, meaning that a transaction and its underlying
* {@link Connection} will never leave the thread that started the transaction.
* <p>
* When this <code>TransactionProvider</code> is used, users must pass their
* custom {@link ConnectionProvider} implementations to this
* <code>TransactionProvider</code>, instead of passing it to the
* {@link Configuration}.
*
* @author Lukas Eder
*/
public class ThreadLocalTransactionProvider implements TransactionProvider {
final DefaultTransactionProvider transaction;
final ThreadLocalConnectionProvider connection;
final ThreadLocal<Deque<Configuration>> configuration;
public ThreadLocalTransactionProvider(ConnectionProvider provider) {
this(provider, true);
}
/**
* @param nested Whether nested transactions via {@link Savepoint}s are
* supported.
*/
public ThreadLocalTransactionProvider(ConnectionProvider provider, boolean nested) {
this.connection = new ThreadLocalConnectionProvider(provider);
this.transaction = new DefaultTransactionProvider(connection, nested);
this.configuration = new ThreadLocal<Deque<Configuration>>();
}
@Override
public void begin(TransactionContext ctx) {
transaction.begin(ctx);
configurations().push(ctx.configuration());
if (transaction.nestingLevel(ctx.configuration()) == 1)
connection.tl.set(((DefaultConnectionProvider) ctx.configuration().data(DATA_DEFAULT_TRANSACTION_PROVIDER_CONNECTION)).connection);
}
@Override
public void commit(TransactionContext ctx) {
if (transaction.nestingLevel(ctx.configuration()) == 1)
connection.tl.remove();
configurations().pop();
transaction.commit(ctx);
}
@Override
public void rollback(TransactionContext ctx) {
if (transaction.nestingLevel(ctx.configuration()) == 1)
connection.tl.remove();
configurations().pop();
transaction.rollback(ctx);
}
Configuration configuration(Configuration fallback) {
Deque<Configuration> configurations = configurations();
return configurations.isEmpty() ? fallback : configurations.peek();
}
private Deque<Configuration> configurations() {
Deque<Configuration> result = configuration.get();
if (result == null) {
result = new ArrayDeque<Configuration>();
configuration.set(result);
}
return result;
}
}

View File

@ -3242,7 +3242,16 @@ final class Tools {
static <T> Supplier<T> blocking(Supplier<T> supplier) {
return new Supplier<T>() {
return blocking(supplier, false);
}
static <T> Supplier<T> blocking(Supplier<T> supplier, boolean threadLocal) {
// [#5377] In ThreadLocal contexts (e.g. when using ThreadLocalTransactionprovider),
// no ManagedBlocker is needed as we're guaranteed by API contract to always
// remain on the same thread.
return threadLocal ? supplier : new Supplier<T>() {
volatile T asyncResult;
@Override