[#3425] Add support for Oracle AQ
This commit is contained in:
parent
52b0a082a7
commit
c9fe6e499c
@ -59,6 +59,7 @@ import org.jooq.impl.DSL;
|
||||
// ...
|
||||
// ...
|
||||
// ...
|
||||
// ...
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -113,37 +114,39 @@ public class QueriesWithTypes extends Utils {
|
||||
|
||||
@Test
|
||||
public void testAQSimple() throws Exception {
|
||||
dsl.transaction(c -> {
|
||||
|
||||
// Enqueue all authors
|
||||
authors.stream().forEach(a -> {
|
||||
DBMS_AQ.enqueue(dsl.configuration(), NEW_AUTHOR_AQ, a);
|
||||
});
|
||||
// Enqueue all authors
|
||||
authors.stream().forEach(a -> {
|
||||
DBMS_AQ.enqueue(dsl.configuration(), NEW_AUTHOR_AQ, a);
|
||||
});
|
||||
|
||||
// Dequeue them again
|
||||
authors.stream().forEach(a -> {
|
||||
assertEquals(a, DBMS_AQ.dequeue(dsl.configuration(), NEW_AUTHOR_AQ));
|
||||
// Dequeue them again
|
||||
authors.stream().forEach(a -> {
|
||||
assertEquals(a, DBMS_AQ.dequeue(dsl.configuration(), NEW_AUTHOR_AQ));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAQOptions() throws Exception {
|
||||
dsl.transaction(c -> {
|
||||
|
||||
MESSAGE_PROPERTIES_T props = new MESSAGE_PROPERTIES_T();
|
||||
ENQUEUE_OPTIONS_T enq = new ENQUEUE_OPTIONS_T().visibility(IMMEDIATE);
|
||||
|
||||
// Enqueue two authors
|
||||
DBMS_AQ.enqueue(c, NEW_AUTHOR_AQ, authors.get(0), enq);
|
||||
DBMS_AQ.enqueue(c, NEW_AUTHOR_AQ, authors.get(1), enq);
|
||||
DBMS_AQ.enqueue(c, NEW_AUTHOR_AQ, authors.get(0), enq, props);
|
||||
DBMS_AQ.enqueue(c, NEW_AUTHOR_AQ, authors.get(1), enq, props);
|
||||
|
||||
// Dequeue them again
|
||||
DEQUEUE_OPTIONS_T deq = new DEQUEUE_OPTIONS_T().wait(NO_WAIT);
|
||||
|
||||
assertEquals(authors.get(0), DBMS_AQ.dequeue(c, NEW_AUTHOR_AQ, deq));
|
||||
assertEquals(authors.get(1), DBMS_AQ.dequeue(c, NEW_AUTHOR_AQ, deq));
|
||||
assertEquals(authors.get(0), DBMS_AQ.dequeue(c, NEW_AUTHOR_AQ, deq, props));
|
||||
assertEquals(authors.get(1), DBMS_AQ.dequeue(c, NEW_AUTHOR_AQ, deq, props));
|
||||
|
||||
// The queue is empty, this should fail
|
||||
assertThrows(DataAccessException.class, () -> {
|
||||
DBMS_AQ.dequeue(c, NEW_AUTHOR_AQ, deq);
|
||||
DBMS_AQ.dequeue(c, NEW_AUTHOR_AQ, deq, props);
|
||||
});
|
||||
});
|
||||
}
|
||||
@ -158,18 +161,19 @@ public class QueriesWithTypes extends Utils {
|
||||
// This nested transaction is rolled back to its savepoint
|
||||
assertThrows(RuntimeException.class, () -> {
|
||||
DSL.using(c1).transaction(c2 -> {
|
||||
DBMS_AQ.enqueue(c1, NEW_AUTHOR_AQ, authors.get(1));
|
||||
DBMS_AQ.enqueue(c2, NEW_AUTHOR_AQ, authors.get(1));
|
||||
throw new RuntimeException();
|
||||
});
|
||||
});
|
||||
|
||||
// Dequeue the first author
|
||||
MESSAGE_PROPERTIES_T props = new MESSAGE_PROPERTIES_T();
|
||||
DEQUEUE_OPTIONS_T deq = new DEQUEUE_OPTIONS_T().wait(NO_WAIT);
|
||||
assertEquals(authors.get(0), DBMS_AQ.dequeue(c1, NEW_AUTHOR_AQ, deq));
|
||||
assertEquals(authors.get(0), DBMS_AQ.dequeue(c1, NEW_AUTHOR_AQ, deq, props));
|
||||
|
||||
// The queue is empty (due to the rollback), this should fail
|
||||
assertThrows(DataAccessException.class, () -> {
|
||||
DBMS_AQ.dequeue(c1, NEW_AUTHOR_AQ, deq);
|
||||
DBMS_AQ.dequeue(c1, NEW_AUTHOR_AQ, deq, props);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user