[#5021] Add DBMS_AQ.dequeueAsync() to create an Oracle AQ CompletionStage
This commit is contained in:
parent
bb1bbc3b79
commit
4b210ff417
@ -46,11 +46,14 @@ import static org.jooq.example.db.oracle.sp.Queues.NEW_AUTHOR_AQ;
|
||||
// ...
|
||||
// ...
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.sql.SQLException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletionException;
|
||||
|
||||
import org.jooq.example.db.oracle.sp.udt.records.AuthorTRecord;
|
||||
import org.jooq.example.db.oracle.sp.udt.records.BookTRecord;
|
||||
@ -157,6 +160,72 @@ public class OracleAQExamples extends Utils {
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAQAsync() throws Exception {
|
||||
dsl.transaction(c -> {
|
||||
|
||||
// Enqueue all authors
|
||||
authors.stream().forEach(a -> {
|
||||
DBMS_AQ.enqueue(dsl.configuration(), NEW_AUTHOR_AQ, a);
|
||||
});
|
||||
|
||||
DEQUEUE_OPTIONS_T options = new DEQUEUE_OPTIONS_T().wait(NO_WAIT);
|
||||
|
||||
// Dequeue some of them again
|
||||
assertNull(
|
||||
DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ)
|
||||
.thenCompose(a -> {
|
||||
assertEquals(authors.get(0), a);
|
||||
return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options);
|
||||
})
|
||||
.thenCompose(a -> {
|
||||
assertEquals(authors.get(1), a);
|
||||
return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options);
|
||||
})
|
||||
.thenCompose(a -> {
|
||||
assertEquals(authors.get(2), a);
|
||||
return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options);
|
||||
})
|
||||
.thenCompose(a -> {
|
||||
assertEquals(authors.get(3), a);
|
||||
return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options);
|
||||
})
|
||||
.thenCompose(a -> {
|
||||
assertEquals(authors.get(4), a);
|
||||
return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options);
|
||||
})
|
||||
.thenCompose(a -> {
|
||||
assertEquals(authors.get(5), a);
|
||||
return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options);
|
||||
})
|
||||
.thenCompose(a -> {
|
||||
assertEquals(authors.get(6), a);
|
||||
return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options);
|
||||
})
|
||||
.thenCompose(a -> {
|
||||
assertEquals(authors.get(7), a);
|
||||
return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options);
|
||||
})
|
||||
.thenCompose(a -> {
|
||||
assertEquals(authors.get(8), a);
|
||||
return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options);
|
||||
})
|
||||
.thenCompose(a -> {
|
||||
assertEquals(authors.get(9), a);
|
||||
return DBMS_AQ.dequeueAsync(dsl.configuration(), NEW_AUTHOR_AQ, options);
|
||||
})
|
||||
.toCompletableFuture()
|
||||
.exceptionally(t -> {
|
||||
assertTrue(t instanceof CompletionException);
|
||||
assertTrue(t.getCause() instanceof DataAccessException);
|
||||
assertTrue(t.getCause().getCause() instanceof SQLException);
|
||||
assertTrue(t.getCause().getCause().getMessage().contains("ORA-25228"));
|
||||
return null;
|
||||
})
|
||||
.join());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAQWait() throws Exception {
|
||||
dsl.transaction(c -> {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user