diff --git a/jOOQ/src/main/java/org/jooq/Cursor.java b/jOOQ/src/main/java/org/jooq/Cursor.java index 2ed4c07064..a0783c3182 100644 --- a/jOOQ/src/main/java/org/jooq/Cursor.java +++ b/jOOQ/src/main/java/org/jooq/Cursor.java @@ -399,6 +399,22 @@ public interface Cursor extends Iterable , AutoCloseable { /** * Turn this Cursor into a {@link Stream}. + *

+ * The resulting stream is auto-closing upon: + *

+ *

+ * Clients who extract {@link Stream#iterator()} or + * {@link Stream#spliterator()} cannot rely on this auto-closing behaviour + * in the event of: + *

* * @throws DataAccessException if something went wrong executing the query */ diff --git a/jOOQ/src/main/java/org/jooq/DSLContext.java b/jOOQ/src/main/java/org/jooq/DSLContext.java index 593659c738..7c29ce1a6d 100644 --- a/jOOQ/src/main/java/org/jooq/DSLContext.java +++ b/jOOQ/src/main/java/org/jooq/DSLContext.java @@ -2423,6 +2423,22 @@ public interface DSLContext extends Scope , AutoCloseable { *

* Use {@link #fetch(ResultSet)}, to load the entire ResultSet * into a jOOQ Result at once. + *

+ * The resulting stream is auto-closing upon: + *

+ *

+ * Clients who extract {@link Stream#iterator()} or + * {@link Stream#spliterator()} cannot rely on this auto-closing behaviour + * in the event of: + *

* * @param rs The JDBC ResultSet to fetch data from * @return The resulting stream @@ -2438,7 +2454,23 @@ public interface DSLContext extends Scope , AutoCloseable { * into a jOOQ Result at once. *

* The additional fields argument is used by jOOQ to coerce - * field names and data types to the desired output + * field names and data types to the desired output. + *

+ * The resulting stream is auto-closing upon: + *

+ *

+ * Clients who extract {@link Stream#iterator()} or + * {@link Stream#spliterator()} cannot rely on this auto-closing behaviour + * in the event of: + *

* * @param rs The JDBC ResultSet to fetch data from * @param fields The fields to use in the desired output @@ -2455,7 +2487,23 @@ public interface DSLContext extends Scope , AutoCloseable { * into a jOOQ Result at once. *

* The additional types argument is used by jOOQ to coerce data - * types to the desired output + * types to the desired output. + *

+ * The resulting stream is auto-closing upon: + *

+ *

+ * Clients who extract {@link Stream#iterator()} or + * {@link Stream#spliterator()} cannot rely on this auto-closing behaviour + * in the event of: + *

* * @param rs The JDBC ResultSet to fetch data from * @param types The data types to use in the desired output @@ -2472,7 +2520,23 @@ public interface DSLContext extends Scope , AutoCloseable { * into a jOOQ Result at once. *

* The additional types argument is used by jOOQ to coerce data - * types to the desired output + * types to the desired output. + *

+ * The resulting stream is auto-closing upon: + *

+ *

+ * Clients who extract {@link Stream#iterator()} or + * {@link Stream#spliterator()} cannot rely on this auto-closing behaviour + * in the event of: + *

* * @param rs The JDBC ResultSet to fetch data from * @param types The data types to use in the desired output diff --git a/jOOQ/src/main/java/org/jooq/ResultQuery.java b/jOOQ/src/main/java/org/jooq/ResultQuery.java index 7c3941ed33..fd2bef5c5b 100644 --- a/jOOQ/src/main/java/org/jooq/ResultQuery.java +++ b/jOOQ/src/main/java/org/jooq/ResultQuery.java @@ -53,8 +53,6 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.stream.Stream; -import javax.sql.DataSource; - import org.jooq.conf.Settings; import org.jooq.exception.DataAccessException; import org.jooq.exception.DataTypeException; @@ -149,6 +147,22 @@ public interface ResultQuery extends Query, Iterable { /** * Stream this query. *

+ * The resulting stream is auto-closing upon: + *

    + *
  • Complete consumption
  • + *
  • Any exception that is thrown by a stream sink
  • + *
  • Any exception that is thrown by the underlying {@link ResultSet}, or + * jOOQ
  • + *
+ *

+ * Clients who extract {@link Stream#iterator()} or + * {@link Stream#spliterator()} cannot rely on this auto-closing behaviour + * in the event of: + *

    + *
  • Incomplete consumption
  • + *
  • Exceptions (which result in incomplete consumption)
  • + *
+ *

* This is just a synonym for {@link #stream()}. * * @return The result. @@ -160,20 +174,21 @@ public interface ResultQuery extends Query, Iterable { /** * Stream this query. *

- * This is essentially the same as {@link #fetchLazy()} but instead of - * returning a {@link Cursor}, a Java 8 {@link Stream} is returned. Clients - * should ensure the {@link Stream} is properly closed, e.g. in a - * try-with-resources statement: + * The resulting stream is auto-closing upon: + *

    + *
  • Complete consumption
  • + *
  • Any exception that is thrown by a stream sink
  • + *
  • Any exception that is thrown by the underlying {@link ResultSet}, or + * jOOQ
  • + *
*

- *

-     * try (Stream<R> stream = query.stream()) {
-     *     // Do things with stream
-     * }
-     * 
- *

- * If users prefer more fluent style streaming of queries, {@link ResultSet} - * can be registered and closed via {@link ExecuteListener}, or via "smart" - * third-party {@link DataSource}s. + * Clients who extract {@link Stream#iterator()} or + * {@link Stream#spliterator()} cannot rely on this auto-closing behaviour + * in the event of: + *

    + *
  • Incomplete consumption
  • + *
  • Exceptions (which result in incomplete consumption)
  • + *
* * @return The result. * @throws DataAccessException if something went wrong executing the query diff --git a/jOOQ/src/main/java/org/jooq/impl/AutoClosingBaseStream.java b/jOOQ/src/main/java/org/jooq/impl/AutoClosingBaseStream.java new file mode 100644 index 0000000000..9cff575d40 --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/impl/AutoClosingBaseStream.java @@ -0,0 +1,151 @@ +/** + * Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com) + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq.impl; + + + +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.BaseStream; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +/** + * A {@link Stream} wrapper that auto-closes itself: + *
    + *
  • Upon failure
  • + *
  • Upon a terminal operation
  • + *
+ * + * @author Lukas Eder + */ +abstract class AutoClosingBaseStream, U extends AutoClosingBaseStream> implements BaseStream { + + final S delegate; + final Consumer> onComplete; + + AutoClosingBaseStream(S delegate, Consumer> onComplete) { + this.delegate = delegate; + this.onComplete = onComplete; + } + + abstract S delegateOf(S newDelegate); + + final Stream delegate(Stream newDelegate) { + return new AutoClosingStream<>(newDelegate, onComplete); + } + + final IntStream delegate(IntStream newDelegate) { + return new AutoClosingIntStream(newDelegate, onComplete); + } + + final LongStream delegate(LongStream newDelegate) { + return new AutoClosingLongStream(newDelegate, onComplete); + } + + final DoubleStream delegate(DoubleStream newDelegate) { + return new AutoClosingDoubleStream(newDelegate, onComplete); + } + + final void terminalOp(Runnable runnable) { + terminalOp(() -> { + runnable.run(); + return null; + }); + } + + final R1 terminalOp(Supplier supplier) { + R1 result = null; + + try { + result = supplier.get(); + close(); + } + catch (Throwable e) { + close(); + Utils.sneakyThrow(e); + } + + return result; + } + + // ------------------------------------------------------------------------- + // These methods are not affected by the auto-closing semantics + // ------------------------------------------------------------------------- + + @Override + public final boolean isParallel() { + return delegate.isParallel(); + } + + @Override + public final void close() { + delegate.close(); + } + + // ------------------------------------------------------------------------- + // These methods forward to delegate stream, and wrap afresh + // ------------------------------------------------------------------------- + + @Override + public final S sequential() { + return delegateOf(delegate.sequential()); + } + + @Override + public final S parallel() { + return delegateOf(delegate.parallel()); + } + + @Override + public final S unordered() { + return delegateOf(delegate.unordered()); + } + + @Override + public final S onClose(Runnable closeHandler) { + return delegateOf(delegate.onClose(closeHandler)); + } +} + diff --git a/jOOQ/src/main/java/org/jooq/impl/AutoClosingDoubleStream.java b/jOOQ/src/main/java/org/jooq/impl/AutoClosingDoubleStream.java new file mode 100644 index 0000000000..7fe61e0c62 --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/impl/AutoClosingDoubleStream.java @@ -0,0 +1,253 @@ +/** + * Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com) + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq.impl; + + + +import java.util.DoubleSummaryStatistics; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.PrimitiveIterator; +import java.util.Spliterator; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.DoubleBinaryOperator; +import java.util.function.DoubleConsumer; +import java.util.function.DoubleFunction; +import java.util.function.DoublePredicate; +import java.util.function.DoubleToIntFunction; +import java.util.function.DoubleToLongFunction; +import java.util.function.DoubleUnaryOperator; +import java.util.function.ObjDoubleConsumer; +import java.util.function.Supplier; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +/** + * A {@link DoubleStream} wrapper that auto-closes itself: + *
    + *
  • Upon failure
  • + *
  • Upon a terminal operation
  • + *
+ * + * @author Lukas Eder + */ +class AutoClosingDoubleStream extends AutoClosingBaseStream implements DoubleStream { + + AutoClosingDoubleStream(DoubleStream delegate, Consumer> onComplete) { + super(delegate, onComplete); + } + + @Override + final DoubleStream delegateOf(DoubleStream newDelegate) { + return delegate(newDelegate); + } + + // ------------------------------------------------------------------------- + // These methods break the auto-closing semantics + // ------------------------------------------------------------------------- + + @Override + public final PrimitiveIterator.OfDouble iterator() { + return delegate.iterator(); + } + + @Override + public final Spliterator.OfDouble spliterator() { + return delegate.spliterator(); + } + + // ------------------------------------------------------------------------- + // These methods forward to delegate stream, and wrap afresh + // ------------------------------------------------------------------------- + + @Override + public final DoubleStream filter(DoublePredicate predicate) { + return delegate(delegate.filter(predicate)); + } + + @Override + public final DoubleStream map(DoubleUnaryOperator mapper) { + return delegate(delegate.map(mapper)); + } + + @Override + public final Stream mapToObj(DoubleFunction mapper) { + return delegate(delegate.mapToObj(mapper)); + } + + @Override + public final IntStream mapToInt(DoubleToIntFunction mapper) { + return delegate(delegate.mapToInt(mapper)); + } + + @Override + public final LongStream mapToLong(DoubleToLongFunction mapper) { + return delegate(delegate.mapToLong(mapper)); + } + + @Override + public final DoubleStream flatMap(DoubleFunction mapper) { + return delegate(delegate.flatMap(mapper)); + } + + @Override + public final DoubleStream distinct() { + return delegate(delegate.distinct()); + } + + @Override + public final DoubleStream sorted() { + return delegate(delegate.sorted()); + } + + @Override + public final DoubleStream peek(DoubleConsumer action) { + return delegate.peek(action); + } + + @Override + public final DoubleStream limit(long maxSize) { + return delegate(delegate.limit(maxSize)); + } + + @Override + public final DoubleStream skip(long n) { + return delegate(delegate.skip(n)); + } + + @Override + public final Stream boxed() { + return delegate(delegate.boxed()); + } + + // ------------------------------------------------------------------------- + // These methods implement terminal op semantics, and implement auto-closing + // ------------------------------------------------------------------------- + + @Override + public final void forEach(DoubleConsumer action) { + terminalOp(() -> delegate.forEach(action)); + } + + @Override + public final void forEachOrdered(DoubleConsumer action) { + terminalOp(() -> delegate.forEachOrdered(action)); + } + + @Override + public final double[] toArray() { + return terminalOp(() -> delegate.toArray()); + } + + @Override + public final double reduce(double identity, DoubleBinaryOperator op) { + return terminalOp(() -> delegate.reduce(identity, op)); + } + + @Override + public final OptionalDouble reduce(DoubleBinaryOperator op) { + return terminalOp(() -> delegate.reduce(op)); + } + + @Override + public final R collect(Supplier supplier, ObjDoubleConsumer accumulator, BiConsumer combiner) { + return terminalOp(() -> delegate.collect(supplier, accumulator, combiner)); + } + + @Override + public final double sum() { + return terminalOp(() -> delegate.sum()); + } + + @Override + public final OptionalDouble min() { + return terminalOp(() -> delegate.min()); + } + + @Override + public final OptionalDouble max() { + return terminalOp(() -> delegate.max()); + } + + @Override + public final long count() { + return terminalOp(() -> delegate.count()); + } + + @Override + public final OptionalDouble average() { + return terminalOp(() -> delegate.average()); + } + + @Override + public final DoubleSummaryStatistics summaryStatistics() { + return terminalOp(() -> delegate.summaryStatistics()); + } + + @Override + public final boolean anyMatch(DoublePredicate predicate) { + return terminalOp(() -> delegate.anyMatch(predicate)); + } + + @Override + public final boolean allMatch(DoublePredicate predicate) { + return terminalOp(() -> delegate.allMatch(predicate)); + } + + @Override + public final boolean noneMatch(DoublePredicate predicate) { + return terminalOp(() -> delegate.noneMatch(predicate)); + } + + @Override + public final OptionalDouble findFirst() { + return terminalOp(() -> delegate.findFirst()); + } + + @Override + public final OptionalDouble findAny() { + return terminalOp(() -> delegate.findAny()); + } +} + diff --git a/jOOQ/src/main/java/org/jooq/impl/AutoClosingIntStream.java b/jOOQ/src/main/java/org/jooq/impl/AutoClosingIntStream.java new file mode 100644 index 0000000000..756ad658ea --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/impl/AutoClosingIntStream.java @@ -0,0 +1,264 @@ +/** + * Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com) + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq.impl; + + + +import java.util.IntSummaryStatistics; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.OptionalInt; +import java.util.PrimitiveIterator; +import java.util.Spliterator; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.IntBinaryOperator; +import java.util.function.IntConsumer; +import java.util.function.IntFunction; +import java.util.function.IntPredicate; +import java.util.function.IntToDoubleFunction; +import java.util.function.IntToLongFunction; +import java.util.function.IntUnaryOperator; +import java.util.function.ObjIntConsumer; +import java.util.function.Supplier; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +/** + * An {@link IntStream} wrapper that auto-closes itself: + *
    + *
  • Upon failure
  • + *
  • Upon a terminal operation
  • + *
+ * + * @author Lukas Eder + */ +class AutoClosingIntStream extends AutoClosingBaseStream implements IntStream { + + AutoClosingIntStream(IntStream delegate, Consumer> onComplete) { + super(delegate, onComplete); + } + + @Override + final IntStream delegateOf(IntStream newDelegate) { + return delegate(newDelegate); + } + + // ------------------------------------------------------------------------- + // These methods break the auto-closing semantics + // ------------------------------------------------------------------------- + + @Override + public final PrimitiveIterator.OfInt iterator() { + return delegate.iterator(); + } + + @Override + public final Spliterator.OfInt spliterator() { + return delegate.spliterator(); + } + + // ------------------------------------------------------------------------- + // These methods forward to delegate stream, and wrap afresh + // ------------------------------------------------------------------------- + + @Override + public final IntStream filter(IntPredicate predicate) { + return delegate(delegate.filter(predicate)); + } + + @Override + public final IntStream map(IntUnaryOperator mapper) { + return delegate(delegate.map(mapper)); + } + + @Override + public final Stream mapToObj(IntFunction mapper) { + return delegate(delegate.mapToObj(mapper)); + } + + @Override + public final LongStream mapToLong(IntToLongFunction mapper) { + return delegate(delegate.mapToLong(mapper)); + } + + @Override + public final DoubleStream mapToDouble(IntToDoubleFunction mapper) { + return delegate(delegate.mapToDouble(mapper)); + } + + @Override + public final IntStream flatMap(IntFunction mapper) { + return delegate(delegate.flatMap(mapper)); + } + + @Override + public final IntStream distinct() { + return delegate(delegate.distinct()); + } + + @Override + public final IntStream sorted() { + return delegate(delegate.sorted()); + } + + @Override + public final IntStream peek(IntConsumer action) { + return delegate.peek(action); + } + + @Override + public final IntStream limit(long maxSize) { + return delegate(delegate.limit(maxSize)); + } + + @Override + public final IntStream skip(long n) { + return delegate(delegate.skip(n)); + } + + @Override + public final LongStream asLongStream() { + return delegate(delegate.asLongStream()); + } + + @Override + public final DoubleStream asDoubleStream() { + return delegate(delegate.asDoubleStream()); + } + + @Override + public final Stream boxed() { + return delegate(delegate.boxed()); + } + + // ------------------------------------------------------------------------- + // These methods implement terminal op semantics, and implement auto-closing + // ------------------------------------------------------------------------- + + @Override + public final void forEach(IntConsumer action) { + terminalOp(() -> delegate.forEach(action)); + } + + @Override + public final void forEachOrdered(IntConsumer action) { + terminalOp(() -> delegate.forEachOrdered(action)); + } + + @Override + public final int[] toArray() { + return terminalOp(() -> delegate.toArray()); + } + + @Override + public final int reduce(int identity, IntBinaryOperator op) { + return terminalOp(() -> delegate.reduce(identity, op)); + } + + @Override + public final OptionalInt reduce(IntBinaryOperator op) { + return terminalOp(() -> delegate.reduce(op)); + } + + @Override + public final R collect(Supplier supplier, ObjIntConsumer accumulator, BiConsumer combiner) { + return terminalOp(() -> delegate.collect(supplier, accumulator, combiner)); + } + + @Override + public final int sum() { + return terminalOp(() -> delegate.sum()); + } + + @Override + public final OptionalInt min() { + return terminalOp(() -> delegate.min()); + } + + @Override + public final OptionalInt max() { + return terminalOp(() -> delegate.max()); + } + + @Override + public final long count() { + return terminalOp(() -> delegate.count()); + } + + @Override + public final OptionalDouble average() { + return terminalOp(() -> delegate.average()); + } + + @Override + public final IntSummaryStatistics summaryStatistics() { + return terminalOp(() -> delegate.summaryStatistics()); + } + + @Override + public final boolean anyMatch(IntPredicate predicate) { + return terminalOp(() -> delegate.anyMatch(predicate)); + } + + @Override + public final boolean allMatch(IntPredicate predicate) { + return terminalOp(() -> delegate.allMatch(predicate)); + } + + @Override + public final boolean noneMatch(IntPredicate predicate) { + return terminalOp(() -> delegate.noneMatch(predicate)); + } + + @Override + public final OptionalInt findFirst() { + return terminalOp(() -> delegate.findFirst()); + } + + @Override + public final OptionalInt findAny() { + return terminalOp(() -> delegate.findAny()); + } +} + diff --git a/jOOQ/src/main/java/org/jooq/impl/AutoClosingLongStream.java b/jOOQ/src/main/java/org/jooq/impl/AutoClosingLongStream.java new file mode 100644 index 0000000000..fd7693ba1d --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/impl/AutoClosingLongStream.java @@ -0,0 +1,259 @@ +/** + * Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com) + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq.impl; + + + +import java.util.LongSummaryStatistics; +import java.util.Optional; +import java.util.OptionalDouble; +import java.util.OptionalLong; +import java.util.PrimitiveIterator; +import java.util.Spliterator; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.LongBinaryOperator; +import java.util.function.LongConsumer; +import java.util.function.LongFunction; +import java.util.function.LongPredicate; +import java.util.function.LongToDoubleFunction; +import java.util.function.LongToIntFunction; +import java.util.function.LongUnaryOperator; +import java.util.function.ObjLongConsumer; +import java.util.function.Supplier; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +/** + * A {@link LongStream} wrapper that auto-closes itself: + *
    + *
  • Upon failure
  • + *
  • Upon a terminal operation
  • + *
+ * + * @author Lukas Eder + */ +class AutoClosingLongStream extends AutoClosingBaseStream implements LongStream { + + AutoClosingLongStream(LongStream delegate, Consumer> onComplete) { + super(delegate, onComplete); + } + + @Override + final LongStream delegateOf(LongStream newDelegate) { + return delegate(newDelegate); + } + + // ------------------------------------------------------------------------- + // These methods break the auto-closing semantics + // ------------------------------------------------------------------------- + + @Override + public final PrimitiveIterator.OfLong iterator() { + return delegate.iterator(); + } + + @Override + public final Spliterator.OfLong spliterator() { + return delegate.spliterator(); + } + + // ------------------------------------------------------------------------- + // These methods forward to delegate stream, and wrap afresh + // ------------------------------------------------------------------------- + + @Override + public final LongStream filter(LongPredicate predicate) { + return delegate(delegate.filter(predicate)); + } + + @Override + public final LongStream map(LongUnaryOperator mapper) { + return delegate(delegate.map(mapper)); + } + + @Override + public final Stream mapToObj(LongFunction mapper) { + return delegate(delegate.mapToObj(mapper)); + } + + @Override + public final IntStream mapToInt(LongToIntFunction mapper) { + return delegate(delegate.mapToInt(mapper)); + } + + @Override + public final DoubleStream mapToDouble(LongToDoubleFunction mapper) { + return delegate(delegate.mapToDouble(mapper)); + } + + @Override + public final LongStream flatMap(LongFunction mapper) { + return delegate(delegate.flatMap(mapper)); + } + + @Override + public final LongStream distinct() { + return delegate(delegate.distinct()); + } + + @Override + public final LongStream sorted() { + return delegate(delegate.sorted()); + } + + @Override + public final LongStream peek(LongConsumer action) { + return delegate.peek(action); + } + + @Override + public final LongStream limit(long maxSize) { + return delegate(delegate.limit(maxSize)); + } + + @Override + public final LongStream skip(long n) { + return delegate(delegate.skip(n)); + } + + @Override + public final DoubleStream asDoubleStream() { + return delegate(delegate.asDoubleStream()); + } + + @Override + public final Stream boxed() { + return delegate(delegate.boxed()); + } + + // ------------------------------------------------------------------------- + // These methods implement terminal op semantics, and implement auto-closing + // ------------------------------------------------------------------------- + + @Override + public final void forEach(LongConsumer action) { + terminalOp(() -> delegate.forEach(action)); + } + + @Override + public final void forEachOrdered(LongConsumer action) { + terminalOp(() -> delegate.forEachOrdered(action)); + } + + @Override + public final long[] toArray() { + return terminalOp(() -> delegate.toArray()); + } + + @Override + public final long reduce(long identity, LongBinaryOperator op) { + return terminalOp(() -> delegate.reduce(identity, op)); + } + + @Override + public final OptionalLong reduce(LongBinaryOperator op) { + return terminalOp(() -> delegate.reduce(op)); + } + + @Override + public final R collect(Supplier supplier, ObjLongConsumer accumulator, BiConsumer combiner) { + return terminalOp(() -> delegate.collect(supplier, accumulator, combiner)); + } + + @Override + public final long sum() { + return terminalOp(() -> delegate.sum()); + } + + @Override + public final OptionalLong min() { + return terminalOp(() -> delegate.min()); + } + + @Override + public final OptionalLong max() { + return terminalOp(() -> delegate.max()); + } + + @Override + public final long count() { + return terminalOp(() -> delegate.count()); + } + + @Override + public final OptionalDouble average() { + return terminalOp(() -> delegate.average()); + } + + @Override + public final LongSummaryStatistics summaryStatistics() { + return terminalOp(() -> delegate.summaryStatistics()); + } + + @Override + public final boolean anyMatch(LongPredicate predicate) { + return terminalOp(() -> delegate.anyMatch(predicate)); + } + + @Override + public final boolean allMatch(LongPredicate predicate) { + return terminalOp(() -> delegate.allMatch(predicate)); + } + + @Override + public final boolean noneMatch(LongPredicate predicate) { + return terminalOp(() -> delegate.noneMatch(predicate)); + } + + @Override + public final OptionalLong findFirst() { + return terminalOp(() -> delegate.findFirst()); + } + + @Override + public final OptionalLong findAny() { + return terminalOp(() -> delegate.findAny()); + } +} + diff --git a/jOOQ/src/main/java/org/jooq/impl/AutoClosingStream.java b/jOOQ/src/main/java/org/jooq/impl/AutoClosingStream.java new file mode 100644 index 0000000000..9f20f5d58d --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/impl/AutoClosingStream.java @@ -0,0 +1,268 @@ +/** + * Copyright (c) 2009-2016, Data Geekery GmbH (http://www.datageekery.com) + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq.impl; + + + +import java.util.Comparator; +import java.util.Iterator; +import java.util.Optional; +import java.util.Spliterator; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.BinaryOperator; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.function.ToDoubleFunction; +import java.util.function.ToIntFunction; +import java.util.function.ToLongFunction; +import java.util.stream.Collector; +import java.util.stream.DoubleStream; +import java.util.stream.IntStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +/** + * A {@link Stream} wrapper that auto-closes itself: + *
    + *
  • Upon failure
  • + *
  • Upon a terminal operation
  • + *
+ * + * @author Lukas Eder + */ +class AutoClosingStream extends AutoClosingBaseStream, AutoClosingStream> implements Stream { + + AutoClosingStream(Stream delegate, Consumer> onComplete) { + super(delegate, onComplete); + } + + @Override + final Stream delegateOf(Stream newDelegate) { + return delegate(newDelegate); + } + + // ------------------------------------------------------------------------- + // These methods break the auto-closing semantics + // ------------------------------------------------------------------------- + + @Override + public final Iterator iterator() { + return delegate.iterator(); + } + + @Override + public final Spliterator spliterator() { + return delegate.spliterator(); + } + + // ------------------------------------------------------------------------- + // These methods forward to delegate stream, and wrap afresh + // ------------------------------------------------------------------------- + + @Override + public final Stream filter(Predicate predicate) { + return delegate(delegate.filter(predicate)); + } + + @Override + public final Stream map(Function mapper) { + return delegate(delegate.map(mapper)); + } + + @Override + public final IntStream mapToInt(ToIntFunction mapper) { + return delegate(delegate.mapToInt(mapper)); + } + + @Override + public final LongStream mapToLong(ToLongFunction mapper) { + throw new UnsupportedOperationException(); + } + + @Override + public final DoubleStream mapToDouble(ToDoubleFunction mapper) { + throw new UnsupportedOperationException(); + } + + @Override + public final Stream flatMap(Function> mapper) { + return delegate(delegate.flatMap(mapper)); + } + + @Override + public final IntStream flatMapToInt(Function mapper) { + return delegate(delegate.flatMapToInt(mapper)); + } + + @Override + public final LongStream flatMapToLong(Function mapper) { + throw new UnsupportedOperationException(); + } + + @Override + public final DoubleStream flatMapToDouble(Function mapper) { + throw new UnsupportedOperationException(); + } + + @Override + public final Stream distinct() { + return delegate(delegate.distinct()); + } + + @Override + public final Stream sorted() { + return delegate(delegate.sorted()); + } + + @Override + public final Stream sorted(Comparator comparator) { + return delegate(delegate.sorted(comparator)); + } + + @Override + public final Stream peek(Consumer action) { + return delegate(delegate.peek(action)); + } + + @Override + public final Stream limit(long maxSize) { + return delegate(delegate.limit(maxSize)); + } + + @Override + public final Stream skip(long n) { + return delegate(delegate.skip(n)); + } + + // ------------------------------------------------------------------------- + // These methods implement terminal op semantics, and implement auto-closing + // ------------------------------------------------------------------------- + + @Override + public final void forEach(Consumer action) { + terminalOp(() -> delegate.forEach(action)); + } + + @Override + public final void forEachOrdered(Consumer action) { + terminalOp(() -> delegate.forEachOrdered(action)); + } + + @Override + public final Object[] toArray() { + return terminalOp(() -> delegate.toArray()); + } + + @Override + public final A[] toArray(IntFunction generator) { + return terminalOp(() -> delegate.toArray(generator)); + } + + @Override + public final T reduce(T identity, BinaryOperator accumulator) { + return terminalOp(() -> delegate.reduce(identity, accumulator)); + } + + @Override + public final Optional reduce(BinaryOperator accumulator) { + return terminalOp(() -> delegate.reduce(accumulator)); + } + + @Override + public final U reduce(U identity, BiFunction accumulator, BinaryOperator combiner) { + return terminalOp(() -> delegate.reduce(identity, accumulator, combiner)); + } + + @Override + public final R collect(Supplier supplier, BiConsumer accumulator, BiConsumer combiner) { + return terminalOp(() -> delegate.collect(supplier, accumulator, combiner)); + } + + @Override + public final R collect(Collector collector) { + return terminalOp(() -> delegate.collect(collector)); + } + + @Override + public final Optional min(Comparator comparator) { + return terminalOp(() -> delegate.min(comparator)); + } + + @Override + public final Optional max(Comparator comparator) { + return terminalOp(() -> delegate.max(comparator)); + } + + @Override + public final long count() { + return terminalOp(() -> delegate.count()); + } + + @Override + public final boolean anyMatch(Predicate predicate) { + return terminalOp(() -> delegate.anyMatch(predicate)); + } + + @Override + public final boolean allMatch(Predicate predicate) { + return terminalOp(() -> delegate.allMatch(predicate)); + } + + @Override + public final boolean noneMatch(Predicate predicate) { + return terminalOp(() -> delegate.noneMatch(predicate)); + } + + @Override + public final Optional findFirst() { + return terminalOp(() -> delegate.findFirst()); + } + + @Override + public final Optional findAny() { + return terminalOp(() -> delegate.findAny()); + } +} + diff --git a/jOOQ/src/main/java/org/jooq/impl/CursorImpl.java b/jOOQ/src/main/java/org/jooq/impl/CursorImpl.java index 0ce6d71821..1f356d3fca 100644 --- a/jOOQ/src/main/java/org/jooq/impl/CursorImpl.java +++ b/jOOQ/src/main/java/org/jooq/impl/CursorImpl.java @@ -261,14 +261,20 @@ class CursorImpl implements Cursor { @Override public final Stream stream() throws DataAccessException { - return StreamSupport.stream( - Spliterators.spliterator( - iterator(), - 0, - Spliterator.ORDERED | Spliterator.NONNULL - ), - false - ).onClose(() -> close()); + return new AutoClosingStream( + StreamSupport.stream( + Spliterators.spliterator( + iterator(), + 0, + Spliterator.ORDERED | Spliterator.NONNULL + ), + false + ).onClose(() -> close()), + o -> { + close(); + o.ifPresent(e -> Utils.sneakyThrow(e)); + } + ); } diff --git a/jOOQ/src/main/java/org/jooq/impl/Utils.java b/jOOQ/src/main/java/org/jooq/impl/Utils.java index 0e3db10c3e..6e291a8dde 100644 --- a/jOOQ/src/main/java/org/jooq/impl/Utils.java +++ b/jOOQ/src/main/java/org/jooq/impl/Utils.java @@ -2935,4 +2935,19 @@ final class Utils { ctx.keyword(typeName); } } + + /** + * Sneaky throw any type of Throwable. + */ + static void sneakyThrow(Throwable throwable) { + Utils.sneakyThrow0(throwable); + } + + /** + * Sneaky throw any type of Throwable. + */ + @SuppressWarnings("unchecked") + static void sneakyThrow0(Throwable throwable) throws E { + throw (E) throwable; + } }