From b422ec0153df801bd3afc55c8f5faae6a89ae6c7 Mon Sep 17 00:00:00 2001 From: lukaseder Date: Sat, 6 Feb 2016 18:33:01 +0100 Subject: [PATCH] [#5012] [#5014] Returned CompletionStage should re-use ExecutorProvider --- jOOQ/src/main/java/org/jooq/DSLContext.java | 104 ++++--- jOOQ/src/main/java/org/jooq/Query.java | 20 +- jOOQ/src/main/java/org/jooq/ResultQuery.java | 10 +- .../java/org/jooq/impl/AbstractQuery.java | 2 +- .../org/jooq/impl/AbstractResultQuery.java | 4 +- .../java/org/jooq/impl/DefaultDSLContext.java | 36 ++- .../impl/ExecutorProviderCompletionStage.java | 268 ++++++++++++++++++ 7 files changed, 390 insertions(+), 54 deletions(-) create mode 100644 jOOQ/src/main/java/org/jooq/impl/ExecutorProviderCompletionStage.java diff --git a/jOOQ/src/main/java/org/jooq/DSLContext.java b/jOOQ/src/main/java/org/jooq/DSLContext.java index 478e5df0a5..c704f8ee0b 100644 --- a/jOOQ/src/main/java/org/jooq/DSLContext.java +++ b/jOOQ/src/main/java/org/jooq/DSLContext.java @@ -77,7 +77,6 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; -import java.util.concurrent.ForkJoinPool; import java.util.stream.Stream; import javax.annotation.Generated; @@ -220,12 +219,15 @@ public interface DSLContext extends Scope , AutoCloseable { /** - * Run a {@link TransactionalCallable} in the context of this + * Run a {@link TransactionalCallable} asynchronously. + *

+ * The TransactionCallable is run in the context of this * DSLContext's underlying {@link #configuration()}'s - * {@link Configuration#transactionProvider()}, and return the + * {@link Configuration#transactionProvider()}, and returns the * transactional's outcome in a new {@link CompletionStage} - * that is asynchronously completed by a task running in the - * {@link ForkJoinPool#commonPool()}. + * that is asynchronously completed by a task run by an {@link Executor} + * provided by the underlying {@link #configuration()}'s + * {@link Configuration#executorProvider()}. * * @param transactional The transactional code * @return The transactional outcome @@ -233,23 +235,29 @@ public interface DSLContext extends Scope , AutoCloseable { CompletionStage transactionResultAsync(TransactionalCallable transactional); /** - * Run a {@link TransactionalRunnable} in the context of this + * Run a {@link TransactionalRunnable} asynchronously. + *

+ * The TransactionRunnable is run in the context of this * DSLContext's underlying {@link #configuration()}'s - * {@link Configuration#transactionProvider()}, and return the + * {@link Configuration#transactionProvider()}, and returns the * transactional's outcome in a new {@link CompletionStage} - * that is asynchronously completed by a task running in the - * {@link ForkJoinPool#commonPool()}. + * that is asynchronously completed by a task run by an {@link Executor} + * provided by the underlying {@link #configuration()}'s + * {@link Configuration#executorProvider()}. * * @param transactional The transactional code */ CompletionStage transactionAsync(TransactionalRunnable transactional); /** - * Run a {@link TransactionalCallable} in the context of this + * Run a {@link TransactionalCallable} asynchronously. + *

+ * The TransactionCallable is run in the context of this * DSLContext's underlying {@link #configuration()}'s - * {@link Configuration#transactionProvider()}, and return the + * {@link Configuration#transactionProvider()}, and returns the * transactional's outcome in a new {@link CompletionStage} - * that is asynchronously completed by a task running in the given executor. + * that is asynchronously completed by a task run by a given + * {@link Executor}. * * @param transactional The transactional code * @return The transactional outcome @@ -257,11 +265,14 @@ public interface DSLContext extends Scope , AutoCloseable { CompletionStage transactionResultAsync(Executor executor, TransactionalCallable transactional); /** - * Run a {@link TransactionalRunnable} in the context of this + * Run a {@link TransactionalRunnable} asynchronously. + *

+ * The TransactionRunnable is run in the context of this * DSLContext's underlying {@link #configuration()}'s - * {@link Configuration#transactionProvider()}, and return the + * {@link Configuration#transactionProvider()}, and returns the * transactional's outcome in a new {@link CompletionStage} - * that is asynchronously completed by a task running in the given executor. + * that is asynchronously completed by a task run by a given + * {@link Executor}. * * @param transactional The transactional code */ @@ -818,8 +829,11 @@ public interface DSLContext extends Scope , AutoCloseable { /** - * Fetch results in a new {@link CompletionStage} that is asynchronously - * completed by a task running in the {@link ForkJoinPool#commonPool()}. + * Fetch results in a new {@link CompletionStage}. + *

+ * The result is asynchronously completed by a task running in an + * {@link Executor} provided by the {@link #configuration()}'s + * {@link Configuration#executorProvider()}. *

* Example (Postgres): *

@@ -845,8 +859,11 @@ public interface DSLContext extends Scope , AutoCloseable { CompletionStage> fetchAsync(SQL sql); /** - * Fetch results in a new {@link CompletionStage} that is asynchronously - * completed by a task running in the {@link ForkJoinPool#commonPool()}. + * Fetch results in a new {@link CompletionStage}. + *

+ * The result is asynchronously completed by a task running in an + * {@link Executor} provided by the {@link #configuration()}'s + * {@link Configuration#executorProvider()}. *

* Example (Postgres): *

@@ -872,8 +889,11 @@ public interface DSLContext extends Scope , AutoCloseable { CompletionStage> fetchAsync(String sql); /** - * Fetch results in a new {@link CompletionStage} that is asynchronously - * completed by a task running in the {@link ForkJoinPool#commonPool()}. + * Fetch results in a new {@link CompletionStage}. + *

+ * The result is asynchronously completed by a task running in an + * {@link Executor} provided by the {@link #configuration()}'s + * {@link Configuration#executorProvider()}. *

* There must be as many bind variables contained in the SQL, as passed in * the bindings parameter @@ -903,8 +923,11 @@ public interface DSLContext extends Scope , AutoCloseable { CompletionStage> fetchAsync(String sql, Object... bindings); /** - * Fetch results in a new {@link CompletionStage} that is asynchronously - * completed by a task running in the {@link ForkJoinPool#commonPool()}. + * Fetch results in a new {@link CompletionStage}. + *

+ * The result is asynchronously completed by a task running in an + * {@link Executor} provided by the {@link #configuration()}'s + * {@link Configuration#executorProvider()}. *

* Unlike {@link #fetchLazy(String, Object...)}, the SQL passed to this * method should not contain any bind variables. Instead, you can pass @@ -2685,8 +2708,11 @@ public interface DSLContext extends Scope , AutoCloseable { /** - * Fetch results in a new {@link CompletionStage} that is asynchronously - * completed by a task running in the {@link ForkJoinPool#commonPool()}. + * Fetch results in a new {@link CompletionStage}. + *

+ * The result is asynchronously completed by a task running in an + * {@link Executor} provided by the {@link #configuration()}'s + * {@link Configuration#executorProvider()}. * * @param rs The JDBC ResultSet to fetch data from * @return The completion stage. The completed result will never be @@ -2696,8 +2722,11 @@ public interface DSLContext extends Scope , AutoCloseable { CompletionStage> fetchAsync(ResultSet rs); /** - * Fetch results in a new {@link CompletionStage} that is asynchronously - * completed by a task running in the {@link ForkJoinPool#commonPool()}. + * Fetch results in a new {@link CompletionStage}. + *

+ * The result is asynchronously completed by a task running in an + * {@link Executor} provided by the {@link #configuration()}'s + * {@link Configuration#executorProvider()}. *

* The additional fields argument is used by jOOQ to coerce * field names and data types to the desired output @@ -2711,8 +2740,11 @@ public interface DSLContext extends Scope , AutoCloseable { CompletionStage> fetchAsync(ResultSet rs, Field... fields); /** - * Fetch results in a new {@link CompletionStage} that is asynchronously - * completed by a task running in the {@link ForkJoinPool#commonPool()}. + * Fetch results in a new {@link CompletionStage}. + *

+ * The result is asynchronously completed by a task running in an + * {@link Executor} provided by the {@link #configuration()}'s + * {@link Configuration#executorProvider()}. *

* The additional types argument is used by jOOQ to coerce data * types to the desired output @@ -2726,8 +2758,11 @@ public interface DSLContext extends Scope , AutoCloseable { CompletionStage> fetchAsync(ResultSet rs, DataType... types); /** - * Fetch results in a new {@link CompletionStage} that is asynchronously - * completed by a task running in the {@link ForkJoinPool#commonPool()}. + * Fetch results in a new {@link CompletionStage}. + *

+ * The result is asynchronously completed by a task running in an + * {@link Executor} provided by the {@link #configuration()}'s + * {@link Configuration#executorProvider()}. *

* The additional types argument is used by jOOQ to coerce data * types to the desired output @@ -8256,8 +8291,11 @@ public interface DSLContext extends Scope , AutoCloseable { /** - * Fetch results in a new {@link CompletionStage} that is asynchronously - * completed by a task running in the {@link ForkJoinPool#commonPool()}. + * Fetch results in a new {@link CompletionStage}. + *

+ * The result is asynchronously completed by a task running in an + * {@link Executor} provided by the {@link #configuration()}'s + * {@link Configuration#executorProvider()}. * * @param query The query to execute * @return The completion stage. The completed result will never be diff --git a/jOOQ/src/main/java/org/jooq/Query.java b/jOOQ/src/main/java/org/jooq/Query.java index c210b58392..8f707e2d67 100644 --- a/jOOQ/src/main/java/org/jooq/Query.java +++ b/jOOQ/src/main/java/org/jooq/Query.java @@ -47,7 +47,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; -import java.util.concurrent.ForkJoinPool; import org.jooq.conf.ParamType; import org.jooq.conf.Settings; @@ -83,18 +82,21 @@ public interface Query extends QueryPart, Attachable , AutoCloseable { /** - * Execute the query in a new {@link CompletionStage} that is asynchronously - * completed by a task running in the {@link ForkJoinPool#commonPool()}. + * Execute the query in a new {@link CompletionStage}. + *

+ * The result is asynchronously completed by a task running in an + * {@link Executor} provided by the underlying + * {@link Configuration#executorProvider()}. * * @return A result value, depending on the concrete implementation of * {@link Query}: *

*/ CompletionStage executeAsync(); diff --git a/jOOQ/src/main/java/org/jooq/ResultQuery.java b/jOOQ/src/main/java/org/jooq/ResultQuery.java index e6b7fff694..22ce366f86 100644 --- a/jOOQ/src/main/java/org/jooq/ResultQuery.java +++ b/jOOQ/src/main/java/org/jooq/ResultQuery.java @@ -53,7 +53,6 @@ import java.util.Set; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; import java.util.stream.Stream; import javax.sql.DataSource; @@ -3080,9 +3079,13 @@ public interface ResultQuery extends Query, Iterable { List fetch(RecordMapper mapper) throws DataAccessException; + /** - * Fetch results in a new {@link CompletionStage} that is asynchronously - * completed by a task running in the {@link ForkJoinPool#commonPool()}. + * Fetch results in a new {@link CompletionStage}. + *

+ * The result is asynchronously completed by a task running in an + * {@link Executor} provided by the underlying + * {@link Configuration#executorProvider()}. * * @return The completion stage. The completed result will never be * null. @@ -3099,6 +3102,7 @@ public interface ResultQuery extends Query, Iterable { CompletionStage> fetchAsync(Executor executor); + /** * Fetch results asynchronously. *

diff --git a/jOOQ/src/main/java/org/jooq/impl/AbstractQuery.java b/jOOQ/src/main/java/org/jooq/impl/AbstractQuery.java index 67f9c353ef..adb56aaa7f 100644 --- a/jOOQ/src/main/java/org/jooq/impl/AbstractQuery.java +++ b/jOOQ/src/main/java/org/jooq/impl/AbstractQuery.java @@ -394,7 +394,7 @@ abstract class AbstractQuery extends AbstractQueryPart implements Query, Attacha @Override public final CompletionStage executeAsync(Executor executor) { - return CompletableFuture.supplyAsync(blocking(this::execute), executor); + return ExecutorProviderCompletionStage.of(CompletableFuture.supplyAsync(blocking(this::execute), executor), () -> executor); } diff --git a/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java b/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java index 42d601bf94..b5e7956d4e 100644 --- a/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java +++ b/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java @@ -48,8 +48,8 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor; import static org.jooq.SQLDialect.CUBRID; import static org.jooq.SQLDialect.POSTGRES; // ... -import static org.jooq.impl.Utils.consumeResultSets; import static org.jooq.impl.Utils.blocking; +import static org.jooq.impl.Utils.consumeResultSets; import static org.jooq.impl.Utils.DataKey.DATA_LOCK_ROWS_FOR_UPDATE; import java.lang.reflect.Array; @@ -332,7 +332,7 @@ abstract class AbstractResultQuery extends AbstractQuery imple @Override public final CompletionStage> fetchAsync(Executor executor) { - return CompletableFuture.supplyAsync(blocking(this::fetch), executor); + return ExecutorProviderCompletionStage.of(CompletableFuture.supplyAsync(blocking(this::fetch), executor), () -> executor); } @Override diff --git a/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java b/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java index 39fdd5edad..5a0bb135cd 100644 --- a/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java +++ b/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java @@ -410,7 +410,16 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri @Override public CompletionStage transactionAsync(Executor executor, TransactionalRunnable transactional) { - return CompletableFuture.supplyAsync(blocking(() -> { transaction(transactional); return null; }), executor); + return ExecutorProviderCompletionStage.of(CompletableFuture.supplyAsync( + blocking( + () -> { + transaction(transactional); + return null; + }), + executor + ), + () -> executor + ); } @Override @@ -420,7 +429,10 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri @Override public CompletionStage transactionResultAsync(Executor executor, TransactionalCallable transactional) { - return CompletableFuture.supplyAsync(blocking(() -> transactionResult(transactional)), executor); + return ExecutorProviderCompletionStage.of(CompletableFuture.supplyAsync( + blocking(() -> transactionResult(transactional)), executor), + () -> executor + ); } @@ -1053,22 +1065,34 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri @Override public CompletionStage> fetchAsync(Executor executor, ResultSet rs) { - return CompletableFuture.supplyAsync(blocking(() -> fetch(rs)), executor); + return ExecutorProviderCompletionStage.of( + CompletableFuture.supplyAsync(blocking(() -> fetch(rs)), executor), + () -> executor + ); } @Override public CompletionStage> fetchAsync(Executor executor, ResultSet rs, Field... fields) { - return CompletableFuture.supplyAsync(blocking(() -> fetch(rs, fields)), executor); + return ExecutorProviderCompletionStage.of( + CompletableFuture.supplyAsync(blocking(() -> fetch(rs, fields)), executor), + () -> executor + ); } @Override public CompletionStage> fetchAsync(Executor executor, ResultSet rs, DataType... types) { - return CompletableFuture.supplyAsync(blocking(() -> fetch(rs, types)), executor); + return ExecutorProviderCompletionStage.of( + CompletableFuture.supplyAsync(blocking(() -> fetch(rs, types)), executor), + () -> executor + ); } @Override public CompletionStage> fetchAsync(Executor executor, ResultSet rs, Class... types) { - return CompletableFuture.supplyAsync(blocking(() -> fetch(rs, types)), executor); + return ExecutorProviderCompletionStage.of( + CompletableFuture.supplyAsync(blocking(() -> fetch(rs, types)), executor), + () -> executor + ); } @Override diff --git a/jOOQ/src/main/java/org/jooq/impl/ExecutorProviderCompletionStage.java b/jOOQ/src/main/java/org/jooq/impl/ExecutorProviderCompletionStage.java new file mode 100644 index 0000000000..caa1f26e61 --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/impl/ExecutorProviderCompletionStage.java @@ -0,0 +1,268 @@ +/** + * Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com) + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq.impl; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.jooq.ExecutorProvider; + +/** + * A {@link CompletionStage} that defaults to an {@link ExecutorProvider} for + * all async methods that do not have an explicit {@link Executor}. + *

+ * In jOOQ, the {@link ExecutorProvider} SPI is used for all types of + * asynchronous tasks. It overrides standard {@link CompletableFuture} behaviour + * in case users prefer not to use the {@link ForkJoinPool#commonPool()}. + * + * @author Lukas Eder + */ +final class ExecutorProviderCompletionStage implements CompletionStage { + + private final CompletionStage delegate; + private final ExecutorProvider provider; + + static final ExecutorProviderCompletionStage of(CompletionStage delegate, ExecutorProvider provider) { + return new ExecutorProviderCompletionStage<>(delegate, provider); + } + + ExecutorProviderCompletionStage(CompletionStage delegate, ExecutorProvider provider) { + this.delegate = delegate; + this.provider = provider; + } + + @Override + public final CompletionStage thenApply(Function fn) { + return of(delegate.thenApply(fn), provider); + } + + @Override + public final CompletionStage thenApplyAsync(Function fn) { + return of(delegate.thenApplyAsync(fn, provider.provide()), provider); + } + + @Override + public final CompletionStage thenApplyAsync(Function fn, Executor executor) { + return of(delegate.thenApplyAsync(fn, executor), provider); + } + + @Override + public final CompletionStage thenAccept(Consumer action) { + return of(delegate.thenAccept(action), provider); + } + + @Override + public final CompletionStage thenAcceptAsync(Consumer action) { + return of(delegate.thenAcceptAsync(action, provider.provide()), provider); + } + + @Override + public final CompletionStage thenAcceptAsync(Consumer action, Executor executor) { + return of(delegate.thenAcceptAsync(action, executor), provider); + } + + @Override + public final CompletionStage thenRun(Runnable action) { + return of(delegate.thenRun(action), provider); + } + + @Override + public final CompletionStage thenRunAsync(Runnable action) { + return of(delegate.thenRunAsync(action, provider.provide()), provider); + } + + @Override + public final CompletionStage thenRunAsync(Runnable action, Executor executor) { + return of(delegate.thenRunAsync(action, executor), provider); + } + + @Override + public final CompletionStage thenCombine(CompletionStage other, BiFunction fn) { + return of(delegate.thenCombine(other, fn), provider); + } + + @Override + public final CompletionStage thenCombineAsync(CompletionStage other, BiFunction fn) { + return of(delegate.thenCombineAsync(other, fn, provider.provide()), provider); + } + + @Override + public final CompletionStage thenCombineAsync(CompletionStage other, BiFunction fn, Executor executor) { + return of(delegate.thenCombineAsync(other, fn, executor), provider); + } + + @Override + public final CompletionStage thenAcceptBoth(CompletionStage other, BiConsumer action) { + return of(delegate.thenAcceptBoth(other, action), provider); + } + + @Override + public final CompletionStage thenAcceptBothAsync(CompletionStage other, BiConsumer action) { + return of(delegate.thenAcceptBothAsync(other, action, provider.provide()), provider); + } + + @Override + public final CompletionStage thenAcceptBothAsync(CompletionStage other, BiConsumer action, Executor executor) { + return of(delegate.thenAcceptBothAsync(other, action, executor), provider); + } + + @Override + public final CompletionStage runAfterBoth(CompletionStage other, Runnable action) { + return of(delegate.runAfterBoth(other, action), provider); + } + + @Override + public final CompletionStage runAfterBothAsync(CompletionStage other, Runnable action) { + return of(delegate.runAfterBothAsync(other, action, provider.provide()), provider); + } + + @Override + public final CompletionStage runAfterBothAsync(CompletionStage other, Runnable action, Executor executor) { + return of(delegate.runAfterBothAsync(other, action, executor), provider); + } + + @Override + public final CompletionStage applyToEither(CompletionStage other, Function fn) { + return of(delegate.applyToEither(other, fn), provider); + } + + @Override + public final CompletionStage applyToEitherAsync(CompletionStage other, Function fn) { + return of(delegate.applyToEitherAsync(other, fn, provider.provide()), provider); + } + + @Override + public final CompletionStage applyToEitherAsync(CompletionStage other, Function fn, Executor executor) { + return of(delegate.applyToEitherAsync(other, fn, executor), provider); + } + + @Override + public final CompletionStage acceptEither(CompletionStage other, Consumer action) { + return of(delegate.acceptEither(other, action), provider); + } + + @Override + public final CompletionStage acceptEitherAsync(CompletionStage other, Consumer action) { + return of(delegate.acceptEitherAsync(other, action, provider.provide()), provider); + } + + @Override + public final CompletionStage acceptEitherAsync(CompletionStage other, Consumer action, Executor executor) { + return of(delegate.acceptEitherAsync(other, action, executor), provider); + } + + @Override + public final CompletionStage runAfterEither(CompletionStage other, Runnable action) { + return of(delegate.runAfterEither(other, action), provider); + } + + @Override + public final CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action) { + return of(delegate.runAfterEitherAsync(other, action, provider.provide()), provider); + } + + @Override + public final CompletionStage runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor) { + return of(delegate.runAfterEitherAsync(other, action, executor), provider); + } + + @Override + public final CompletionStage thenCompose(Function> fn) { + return of(delegate.thenCompose(fn), provider); + } + + @Override + public final CompletionStage thenComposeAsync(Function> fn) { + return of(delegate.thenComposeAsync(fn, provider.provide()), provider); + } + + @Override + public final CompletionStage thenComposeAsync(Function> fn, + Executor executor) { + return of(delegate.thenComposeAsync(fn, executor), provider); + } + + @Override + public final CompletionStage exceptionally(Function fn) { + return of(delegate.exceptionally(fn), provider); + } + + @Override + public final CompletionStage whenComplete(BiConsumer action) { + return of(delegate.whenComplete(action), provider); + } + + @Override + public final CompletionStage whenCompleteAsync(BiConsumer action) { + return of(delegate.whenCompleteAsync(action, provider.provide()), provider); + } + + @Override + public final CompletionStage whenCompleteAsync(BiConsumer action, Executor executor) { + return of(delegate.whenCompleteAsync(action, executor), provider); + } + + @Override + public final CompletionStage handle(BiFunction fn) { + return of(delegate.handle(fn), provider); + } + + @Override + public final CompletionStage handleAsync(BiFunction fn) { + return of(delegate.handleAsync(fn, provider.provide()), provider); + } + + @Override + public final CompletionStage handleAsync(BiFunction fn, Executor executor) { + return of(delegate.handleAsync(fn, executor), provider); + } + + @Override + public final CompletableFuture toCompletableFuture() { + return delegate.toCompletableFuture(); + } +}