From 94b501acf52adf4e20ea19d989a6dec1df7681b4 Mon Sep 17 00:00:00 2001 From: lukaseder Date: Fri, 5 Feb 2016 11:40:41 +0100 Subject: [PATCH] [#5012] [#5014] Use ForkJoinPool.ManagedBlock --- .../java/org/jooq/impl/AbstractQuery.java | 3 +- .../org/jooq/impl/AbstractResultQuery.java | 3 +- .../java/org/jooq/impl/DefaultDSLContext.java | 13 +++--- jOOQ/src/main/java/org/jooq/impl/Utils.java | 40 +++++++++++++++++++ 4 files changed, 51 insertions(+), 8 deletions(-) diff --git a/jOOQ/src/main/java/org/jooq/impl/AbstractQuery.java b/jOOQ/src/main/java/org/jooq/impl/AbstractQuery.java index a830970521..39f2f12c0d 100644 --- a/jOOQ/src/main/java/org/jooq/impl/AbstractQuery.java +++ b/jOOQ/src/main/java/org/jooq/impl/AbstractQuery.java @@ -52,6 +52,7 @@ import static org.jooq.conf.SettingsTools.executePreparedStatements; import static org.jooq.conf.SettingsTools.getParamType; import static org.jooq.impl.DSL.using; import static org.jooq.impl.Utils.consumeExceptions; +import static org.jooq.impl.Utils.blocking; import static org.jooq.impl.Utils.DataKey.DATA_COUNT_BIND_VALUES; import static org.jooq.impl.Utils.DataKey.DATA_FORCE_STATIC_STATEMENT; @@ -394,7 +395,7 @@ abstract class AbstractQuery extends AbstractQueryPart implements Query, Attacha @Override public final CompletionStage executeAsync(Executor executor) { - return CompletableFuture.supplyAsync(this::execute, executor); + return CompletableFuture.supplyAsync(blocking(this::execute), executor); } diff --git a/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java b/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java index 446629f559..42d601bf94 100644 --- a/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java +++ b/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java @@ -49,6 +49,7 @@ import static org.jooq.SQLDialect.CUBRID; import static org.jooq.SQLDialect.POSTGRES; // ... import static org.jooq.impl.Utils.consumeResultSets; +import static org.jooq.impl.Utils.blocking; import static org.jooq.impl.Utils.DataKey.DATA_LOCK_ROWS_FOR_UPDATE; import java.lang.reflect.Array; @@ -331,7 +332,7 @@ abstract class AbstractResultQuery extends AbstractQuery imple @Override public final CompletionStage> fetchAsync(Executor executor) { - return CompletableFuture.supplyAsync(this::fetch, executor); + return CompletableFuture.supplyAsync(blocking(this::fetch), executor); } @Override diff --git a/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java b/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java index c183c6a1c6..a9604df923 100644 --- a/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java +++ b/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java @@ -50,6 +50,7 @@ import static org.jooq.impl.DSL.sql; import static org.jooq.impl.DSL.table; import static org.jooq.impl.DSL.trueCondition; import static org.jooq.impl.Utils.list; +import static org.jooq.impl.Utils.blocking; import static org.jooq.tools.Convert.convert; import java.io.IOException; @@ -407,7 +408,7 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri @Override public CompletionStage transactionAsync(Executor executor, TransactionalRunnable transactional) { - return CompletableFuture.runAsync(() -> transaction(transactional), executor); + return CompletableFuture.supplyAsync(blocking(() -> { transaction(transactional); return null; }), executor); } @Override @@ -417,7 +418,7 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri @Override public CompletionStage transactionResultAsync(Executor executor, TransactionalCallable transactional) { - return CompletableFuture.supplyAsync(() -> transactionResult(transactional), executor); + return CompletableFuture.supplyAsync(blocking(() -> transactionResult(transactional)), executor); } @@ -1050,22 +1051,22 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri @Override public CompletionStage> fetchAsync(Executor executor, ResultSet rs) { - return CompletableFuture.supplyAsync(() -> fetch(rs), executor); + return CompletableFuture.supplyAsync(blocking(() -> fetch(rs)), executor); } @Override public CompletionStage> fetchAsync(Executor executor, ResultSet rs, Field... fields) { - return CompletableFuture.supplyAsync(() -> fetch(rs, fields), executor); + return CompletableFuture.supplyAsync(blocking(() -> fetch(rs, fields)), executor); } @Override public CompletionStage> fetchAsync(Executor executor, ResultSet rs, DataType... types) { - return CompletableFuture.supplyAsync(() -> fetch(rs, types), executor); + return CompletableFuture.supplyAsync(blocking(() -> fetch(rs, types)), executor); } @Override public CompletionStage> fetchAsync(Executor executor, ResultSet rs, Class... types) { - return CompletableFuture.supplyAsync(() -> fetch(rs, types), executor); + return CompletableFuture.supplyAsync(blocking(() -> fetch(rs, types)), executor); } @Override diff --git a/jOOQ/src/main/java/org/jooq/impl/Utils.java b/jOOQ/src/main/java/org/jooq/impl/Utils.java index 9f8571e42e..b6c6baa9f1 100644 --- a/jOOQ/src/main/java/org/jooq/impl/Utils.java +++ b/jOOQ/src/main/java/org/jooq/impl/Utils.java @@ -94,6 +94,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinPool.ManagedBlocker; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -2957,4 +2960,41 @@ final class Utils { ctx.keyword(typeName); } } + + // ------------------------------------------------------------------------- + // XXX: ForkJoinPool ManagedBlock implementation + // ------------------------------------------------------------------------- + + + + @SuppressWarnings("unchecked") + static > S blocking(S supplier) { + return (S) new Supplier() { + volatile T asyncResult; + + @Override + public T get() { + try { + ForkJoinPool.managedBlock(new ManagedBlocker() { + @Override + public boolean block() throws InterruptedException { + asyncResult = supplier.get(); + return true; + } + + @Override + public boolean isReleasable() { + return asyncResult != null; + } + }); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return asyncResult; + } + }; + } + }