parent
a16388d44c
commit
94b501acf5
@ -52,6 +52,7 @@ import static org.jooq.conf.SettingsTools.executePreparedStatements;
|
||||
import static org.jooq.conf.SettingsTools.getParamType;
|
||||
import static org.jooq.impl.DSL.using;
|
||||
import static org.jooq.impl.Utils.consumeExceptions;
|
||||
import static org.jooq.impl.Utils.blocking;
|
||||
import static org.jooq.impl.Utils.DataKey.DATA_COUNT_BIND_VALUES;
|
||||
import static org.jooq.impl.Utils.DataKey.DATA_FORCE_STATIC_STATEMENT;
|
||||
|
||||
@ -394,7 +395,7 @@ abstract class AbstractQuery extends AbstractQueryPart implements Query, Attacha
|
||||
|
||||
@Override
|
||||
public final CompletionStage<Integer> executeAsync(Executor executor) {
|
||||
return CompletableFuture.supplyAsync(this::execute, executor);
|
||||
return CompletableFuture.supplyAsync(blocking(this::execute), executor);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -49,6 +49,7 @@ import static org.jooq.SQLDialect.CUBRID;
|
||||
import static org.jooq.SQLDialect.POSTGRES;
|
||||
// ...
|
||||
import static org.jooq.impl.Utils.consumeResultSets;
|
||||
import static org.jooq.impl.Utils.blocking;
|
||||
import static org.jooq.impl.Utils.DataKey.DATA_LOCK_ROWS_FOR_UPDATE;
|
||||
|
||||
import java.lang.reflect.Array;
|
||||
@ -331,7 +332,7 @@ abstract class AbstractResultQuery<R extends Record> extends AbstractQuery imple
|
||||
|
||||
@Override
|
||||
public final CompletionStage<Result<R>> fetchAsync(Executor executor) {
|
||||
return CompletableFuture.supplyAsync(this::fetch, executor);
|
||||
return CompletableFuture.supplyAsync(blocking(this::fetch), executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -50,6 +50,7 @@ import static org.jooq.impl.DSL.sql;
|
||||
import static org.jooq.impl.DSL.table;
|
||||
import static org.jooq.impl.DSL.trueCondition;
|
||||
import static org.jooq.impl.Utils.list;
|
||||
import static org.jooq.impl.Utils.blocking;
|
||||
import static org.jooq.tools.Convert.convert;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -407,7 +408,7 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri
|
||||
|
||||
@Override
|
||||
public CompletionStage<Void> transactionAsync(Executor executor, TransactionalRunnable transactional) {
|
||||
return CompletableFuture.runAsync(() -> transaction(transactional), executor);
|
||||
return CompletableFuture.supplyAsync(blocking(() -> { transaction(transactional); return null; }), executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -417,7 +418,7 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri
|
||||
|
||||
@Override
|
||||
public <T> CompletionStage<T> transactionResultAsync(Executor executor, TransactionalCallable<T> transactional) {
|
||||
return CompletableFuture.supplyAsync(() -> transactionResult(transactional), executor);
|
||||
return CompletableFuture.supplyAsync(blocking(() -> transactionResult(transactional)), executor);
|
||||
}
|
||||
|
||||
|
||||
@ -1050,22 +1051,22 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri
|
||||
|
||||
@Override
|
||||
public CompletionStage<Result<Record>> fetchAsync(Executor executor, ResultSet rs) {
|
||||
return CompletableFuture.supplyAsync(() -> fetch(rs), executor);
|
||||
return CompletableFuture.supplyAsync(blocking(() -> fetch(rs)), executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Result<Record>> fetchAsync(Executor executor, ResultSet rs, Field<?>... fields) {
|
||||
return CompletableFuture.supplyAsync(() -> fetch(rs, fields), executor);
|
||||
return CompletableFuture.supplyAsync(blocking(() -> fetch(rs, fields)), executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Result<Record>> fetchAsync(Executor executor, ResultSet rs, DataType<?>... types) {
|
||||
return CompletableFuture.supplyAsync(() -> fetch(rs, types), executor);
|
||||
return CompletableFuture.supplyAsync(blocking(() -> fetch(rs, types)), executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<Result<Record>> fetchAsync(Executor executor, ResultSet rs, Class<?>... types) {
|
||||
return CompletableFuture.supplyAsync(() -> fetch(rs, types), executor);
|
||||
return CompletableFuture.supplyAsync(blocking(() -> fetch(rs, types)), executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -94,6 +94,9 @@ import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.ForkJoinPool.ManagedBlocker;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@ -2957,4 +2960,41 @@ final class Utils {
|
||||
ctx.keyword(typeName);
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// XXX: ForkJoinPool ManagedBlock implementation
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static <T, S extends Supplier<T>> S blocking(S supplier) {
|
||||
return (S) new Supplier<T>() {
|
||||
volatile T asyncResult;
|
||||
|
||||
@Override
|
||||
public T get() {
|
||||
try {
|
||||
ForkJoinPool.managedBlock(new ManagedBlocker() {
|
||||
@Override
|
||||
public boolean block() throws InterruptedException {
|
||||
asyncResult = supplier.get();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReleasable() {
|
||||
return asyncResult != null;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return asyncResult;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user