[#4371] Add support for the JDK 9 Flow API (JSR-166 enhancements) and reactive streams

This commit is contained in:
Lukas Eder 2019-05-13 11:37:08 +02:00
parent 4dc63312a8
commit af00823df5
5 changed files with 182 additions and 1 deletions

View File

@ -75,6 +75,14 @@
</build>
<dependencies>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View File

@ -52,6 +52,7 @@ import java.util.Spliterator;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
import java.util.stream.Collector;
import java.util.stream.Stream;
@ -97,7 +98,13 @@ import org.jooq.impl.DefaultRecordMapper;
*
* @author Lukas Eder
*/
public interface ResultQuery<R extends Record> extends Query, Iterable<R> {
public interface ResultQuery<R extends Record>
extends
Query
, Iterable<R>
, org.reactivestreams.Publisher<R>
, Flow.Publisher<R>
{
/**
* Return the result generated by a previous call to execute().

View File

@ -57,6 +57,7 @@ import java.lang.reflect.Array;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
@ -70,6 +71,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.concurrent.Future;
import java.util.stream.Collector;
import java.util.stream.Stream;
@ -371,6 +373,66 @@ abstract class AbstractResultQuery<R extends Record> extends AbstractQuery imple
@Override
public final void subscribe(Flow.Subscriber<? super R> subscriber) {
subscribe(new FlowToReactiveStreamsSubscriberBridge<R>(subscriber));
}
@Override
public final void subscribe(org.reactivestreams.Subscriber<? super R> subscriber) {
subscriber.onSubscribe(new org.reactivestreams.Subscription() {
Cursor<R> c;
ArrayDeque<R> buffer;
@Override
public void request(long n) {
int i = (int) Math.min(n, Integer.MAX_VALUE);
try {
if (c == null)
c = fetchLazy();
if (buffer == null)
buffer = new ArrayDeque<R>();
if (buffer.size() < i)
buffer.addAll(c.fetchNext(i - buffer.size()));
boolean complete = buffer.size() < i;
while (!buffer.isEmpty()) {
subscriber.onNext(buffer.pollFirst());
}
if (complete)
doComplete();
}
catch (Throwable t) {
subscriber.onError(t);
doComplete();
}
}
private void doComplete() {
close();
subscriber.onComplete();
}
private void close() {
if (c != null)
c.close();
}
@Override
public void cancel() {
close();
}
});
}
@Override
public final CompletionStage<Result<R>> fetchAsync() {
return fetchAsync(Tools.configuration(this).executorProvider().provide());

View File

@ -0,0 +1,88 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Other licenses:
* -----------------------------------------------------------------------------
* Commercial licenses for this work are available. These replace the above
* ASL 2.0 and offer limited warranties, support, maintenance, and commercial
* database integrations.
*
* For more information, please visit: http://www.jooq.org/licenses
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package org.jooq.impl;
import java.util.concurrent.Flow;
/**
* A bridge mapping a JDK 9 {@link Flow.Subscriber} to a reactive streams
* {@link org.reactivestreams.Subscriber}
*
* @author Lukas Eder
*/
final class FlowToReactiveStreamsSubscriberBridge<T> implements org.reactivestreams.Subscriber<T> {
final Flow.Subscriber<? super T> delegate;
FlowToReactiveStreamsSubscriberBridge(Flow.Subscriber<? super T> delegate) {
this.delegate = delegate;
}
@Override
public void onSubscribe(org.reactivestreams.Subscription s) {
delegate.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
s.request(n);
}
@Override
public void cancel() {
s.cancel();
}
});
}
@Override
public void onNext(T t) {
delegate.onNext(t);
}
@Override
public void onError(Throwable t) {
delegate.onError(t);
}
@Override
public void onComplete() {
delegate.onComplete();
}
}

View File

@ -54,6 +54,7 @@ import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collector;
@ -2701,6 +2702,21 @@ final class SelectImpl<R extends Record, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10
}
@Override
public final void subscribe(Flow.Subscriber<? super R> subscriber) {
getDelegate().subscribe(subscriber);
}
@Override
public final void subscribe(org.reactivestreams.Subscriber<? super R> subscriber) {
getDelegate().subscribe(subscriber);
}
@Override
public final Stream<R> fetchStream() {
return getDelegate().fetchStream();