From 4b210ff417c042b3086b3191afd6bdf8b7a60e67 Mon Sep 17 00:00:00 2001 From: lukaseder Date: Mon, 1 Feb 2016 11:53:10 +0100 Subject: [PATCH] [#5021] Add DBMS_AQ.dequeueAsync() to create an Oracle AQ CompletionStage --- .../org/jooq/example/OracleAQExamples.java | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/jOOQ-examples/jOOQ-oracle-example/src/test/java/org/jooq/example/OracleAQExamples.java b/jOOQ-examples/jOOQ-oracle-example/src/test/java/org/jooq/example/OracleAQExamples.java index bbfb345e93..6093d996c6 100644 --- a/jOOQ-examples/jOOQ-oracle-example/src/test/java/org/jooq/example/OracleAQExamples.java +++ b/jOOQ-examples/jOOQ-oracle-example/src/test/java/org/jooq/example/OracleAQExamples.java @@ -46,11 +46,14 @@ import static org.jooq.example.db.oracle.sp.Queues.NEW_AUTHOR_AQ; // ... // ... import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.math.BigDecimal; +import java.sql.SQLException; import java.util.List; +import java.util.concurrent.CompletionException; import org.jooq.example.db.oracle.sp.udt.records.AuthorTRecord; import org.jooq.example.db.oracle.sp.udt.records.BookTRecord; @@ -157,6 +160,72 @@ public class OracleAQExamples extends Utils { }); } + @Test + public void testAQAsync() throws Exception { + dsl.transaction(c -> { + + // Enqueue all authors + authors.stream().forEach(a -> { + DBMS_AQ.enqueue(dsl.configuration(), NEW_AUTHOR_AQ, a); + }); + + DEQUEUE_OPTIONS_T options = new DEQUEUE_OPTIONS_T().wait(NO_WAIT); + + // Dequeue some of them again + assertNull( + DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ) + .thenCompose(a -> { + assertEquals(authors.get(0), a); + return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options); + }) + .thenCompose(a -> { + assertEquals(authors.get(1), a); + return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options); + }) + .thenCompose(a -> { + assertEquals(authors.get(2), a); + return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options); + }) + .thenCompose(a -> { + assertEquals(authors.get(3), a); + return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options); + }) + .thenCompose(a -> { + assertEquals(authors.get(4), a); + return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options); + }) + .thenCompose(a -> { + assertEquals(authors.get(5), a); + return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options); + }) + .thenCompose(a -> { + assertEquals(authors.get(6), a); + return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options); + }) + .thenCompose(a -> { + assertEquals(authors.get(7), a); + return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options); + }) + .thenCompose(a -> { + assertEquals(authors.get(8), a); + return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options); + }) + .thenCompose(a -> { + assertEquals(authors.get(9), a); + return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options); + }) + .toCompletableFuture() + .exceptionally(t -> { + assertTrue(t instanceof CompletionException); + assertTrue(t.getCause() instanceof DataAccessException); + assertTrue(t.getCause().getCause() instanceof SQLException); + assertTrue(t.getCause().getCause().getMessage().contains("ORA-25228")); + return null; + }) + .join()); + }); + } + @Test public void testAQWait() throws Exception { dsl.transaction(c -> {