[#5015] Ensure all transactions are run in ForkJoinPool.ManagedBlocker

This commit is contained in:
lukaseder 2016-02-08 15:41:04 +01:00
parent 2b2dfabdff
commit b4d16b337f
2 changed files with 41 additions and 32 deletions

View File

@ -357,37 +357,52 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri
@Override
public <T> T transactionResult(TransactionalCallable<T> transactional) {
T result = null;
DefaultTransactionContext ctx = new DefaultTransactionContext(configuration().derive());
TransactionProvider provider = ctx.configuration().transactionProvider();
// If used in a Java 8 Stream, a transaction should always be executed
// in a ManagedBlocker context, just in case Stream.parallel() is called
// The same is true for all asynchronous transactions, which must always
// run in a ManagedBlocker context.
return blocking(() -> {
T result = null;
DefaultTransactionContext ctx = new DefaultTransactionContext(configuration().derive());
TransactionProvider provider = ctx.configuration().transactionProvider();
try {
provider.begin(ctx);
result = transactional.run(ctx.configuration());
provider.commit(ctx);
}
catch (Exception cause) {
try {
provider.rollback(ctx.cause(cause));
provider.begin(ctx);
result = transactional.run(ctx.configuration());
provider.commit(ctx);
}
catch (Exception cause) {
try {
provider.rollback(ctx.cause(cause));
}
// [#3718] Use reflection to support also JDBC 4.0
catch (Exception suppress) {
cause.addSuppressed(suppress);
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
else {
throw new DataAccessException("Rollback caused", cause);
}
}
// [#3718] Use reflection to support also JDBC 4.0
catch (Exception suppress) {
return result;
cause.addSuppressed(suppress);
}
}).get();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
else {
throw new DataAccessException("Rollback caused", cause);
}
}
return result;
}
@Override
@ -411,13 +426,7 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri
@Override
public CompletionStage<Void> transactionAsync(Executor executor, TransactionalRunnable transactional) {
return ExecutorProviderCompletionStage.of(CompletableFuture.supplyAsync(
blocking(
() -> {
transaction(transactional);
return null;
}),
executor
),
() -> { transaction(transactional); return null; }, executor),
() -> executor
);
}
@ -430,7 +439,7 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri
@Override
public <T> CompletionStage<T> transactionResultAsync(Executor executor, TransactionalCallable<T> transactional) {
return ExecutorProviderCompletionStage.of(CompletableFuture.supplyAsync(
blocking(() -> transactionResult(transactional)), executor),
() -> transactionResult(transactional), executor),
() -> executor
);
}

View File

@ -2973,7 +2973,7 @@ final class Utils {
try {
ForkJoinPool.managedBlock(new ManagedBlocker() {
@Override
public boolean block() throws InterruptedException {
public boolean block() {
asyncResult = supplier.get();
return true;
}