From af00823df5429253bcd94e791c47bb7a98ce4d87 Mon Sep 17 00:00:00 2001 From: Lukas Eder Date: Mon, 13 May 2019 11:37:08 +0200 Subject: [PATCH] [#4371] Add support for the JDK 9 Flow API (JSR-166 enhancements) and reactive streams --- jOOQ/pom.xml | 8 ++ jOOQ/src/main/java/org/jooq/ResultQuery.java | 9 +- .../org/jooq/impl/AbstractResultQuery.java | 62 +++++++++++++ ...FlowToReactiveStreamsSubscriberBridge.java | 88 +++++++++++++++++++ .../main/java/org/jooq/impl/SelectImpl.java | 16 ++++ 5 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 jOOQ/src/main/java/org/jooq/impl/FlowToReactiveStreamsSubscriberBridge.java diff --git a/jOOQ/pom.xml b/jOOQ/pom.xml index 26c414b851..d30788f467 100644 --- a/jOOQ/pom.xml +++ b/jOOQ/pom.xml @@ -75,6 +75,14 @@ + + + org.reactivestreams + reactive-streams + 1.0.2 + + + org.slf4j slf4j-api diff --git a/jOOQ/src/main/java/org/jooq/ResultQuery.java b/jOOQ/src/main/java/org/jooq/ResultQuery.java index 2505e3993c..4770955e75 100644 --- a/jOOQ/src/main/java/org/jooq/ResultQuery.java +++ b/jOOQ/src/main/java/org/jooq/ResultQuery.java @@ -52,6 +52,7 @@ import java.util.Spliterator; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Flow; import java.util.function.Consumer; import java.util.stream.Collector; import java.util.stream.Stream; @@ -97,7 +98,13 @@ import org.jooq.impl.DefaultRecordMapper; * * @author Lukas Eder */ -public interface ResultQuery extends Query, Iterable { +public interface ResultQuery +extends + Query + , Iterable +, org.reactivestreams.Publisher +, Flow.Publisher +{ /** * Return the result generated by a previous call to execute(). diff --git a/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java b/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java index f46062b93c..16ea4cc6ff 100644 --- a/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java +++ b/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java @@ -57,6 +57,7 @@ import java.lang.reflect.Array; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayDeque; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; @@ -70,6 +71,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Flow; import java.util.concurrent.Future; import java.util.stream.Collector; import java.util.stream.Stream; @@ -371,6 +373,66 @@ abstract class AbstractResultQuery extends AbstractQuery imple + @Override + public final void subscribe(Flow.Subscriber subscriber) { + subscribe(new FlowToReactiveStreamsSubscriberBridge(subscriber)); + } + + + + + + @Override + public final void subscribe(org.reactivestreams.Subscriber subscriber) { + subscriber.onSubscribe(new org.reactivestreams.Subscription() { + Cursor c; + ArrayDeque buffer; + + @Override + public void request(long n) { + int i = (int) Math.min(n, Integer.MAX_VALUE); + + try { + if (c == null) + c = fetchLazy(); + + if (buffer == null) + buffer = new ArrayDeque(); + + if (buffer.size() < i) + buffer.addAll(c.fetchNext(i - buffer.size())); + + boolean complete = buffer.size() < i; + while (!buffer.isEmpty()) { + subscriber.onNext(buffer.pollFirst()); + } + + if (complete) + doComplete(); + } + catch (Throwable t) { + subscriber.onError(t); + doComplete(); + } + } + + private void doComplete() { + close(); + subscriber.onComplete(); + } + + private void close() { + if (c != null) + c.close(); + } + + @Override + public void cancel() { + close(); + } + }); + } + @Override public final CompletionStage> fetchAsync() { return fetchAsync(Tools.configuration(this).executorProvider().provide()); diff --git a/jOOQ/src/main/java/org/jooq/impl/FlowToReactiveStreamsSubscriberBridge.java b/jOOQ/src/main/java/org/jooq/impl/FlowToReactiveStreamsSubscriberBridge.java new file mode 100644 index 0000000000..444c29c892 --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/impl/FlowToReactiveStreamsSubscriberBridge.java @@ -0,0 +1,88 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq.impl; + + + +import java.util.concurrent.Flow; + +/** + * A bridge mapping a JDK 9 {@link Flow.Subscriber} to a reactive streams + * {@link org.reactivestreams.Subscriber} + * + * @author Lukas Eder + */ +final class FlowToReactiveStreamsSubscriberBridge implements org.reactivestreams.Subscriber { + + final Flow.Subscriber delegate; + + FlowToReactiveStreamsSubscriberBridge(Flow.Subscriber delegate) { + this.delegate = delegate; + } + + @Override + public void onSubscribe(org.reactivestreams.Subscription s) { + delegate.onSubscribe(new Flow.Subscription() { + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + s.cancel(); + } + }); + } + + @Override + public void onNext(T t) { + delegate.onNext(t); + } + + @Override + public void onError(Throwable t) { + delegate.onError(t); + } + + @Override + public void onComplete() { + delegate.onComplete(); + } +} + diff --git a/jOOQ/src/main/java/org/jooq/impl/SelectImpl.java b/jOOQ/src/main/java/org/jooq/impl/SelectImpl.java index 368c165af6..212264db1c 100644 --- a/jOOQ/src/main/java/org/jooq/impl/SelectImpl.java +++ b/jOOQ/src/main/java/org/jooq/impl/SelectImpl.java @@ -54,6 +54,7 @@ import java.util.Set; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Flow; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collector; @@ -2701,6 +2702,21 @@ final class SelectImpl subscriber) { + getDelegate().subscribe(subscriber); + } + + + + + + @Override + public final void subscribe(org.reactivestreams.Subscriber subscriber) { + getDelegate().subscribe(subscriber); + } + @Override public final Stream fetchStream() { return getDelegate().fetchStream();