[jOOQ/jOOQ#18893] NullPointerException in various locations when another

thread cancels a blocking, JDBC backed Subscription
This commit is contained in:
Lukas Eder 2025-08-20 09:49:13 +02:00
parent 81e391a7ce
commit b6634515ed
2 changed files with 25 additions and 16 deletions

View File

@ -95,20 +95,23 @@ import org.jooq.tools.jdbc.JDBCUtils;
*/
final class CursorImpl<R extends Record> extends AbstractCursor<R> {
private static final JooqLogger log = JooqLogger.getLogger(CursorImpl.class);
private static final JooqLogger log = JooqLogger.getLogger(CursorImpl.class);
final ExecuteContext ctx;
final ExecuteListener listener;
private final boolean keepResultSet;
private final boolean keepStatement;
private final boolean autoclosing;
private final int maxRows;
private final Supplier<? extends R> factory;
private boolean isClosed;
final ExecuteContext ctx;
final ExecuteListener listener;
private final boolean keepResultSet;
private final boolean keepStatement;
private final boolean autoclosing;
private final int maxRows;
private final Supplier<? extends R> factory;
private transient CursorResultSet rs;
private transient Iterator<R> iterator;
private transient int rows;
private volatile transient CursorResultSet rs;
private volatile transient Iterator<R> iterator;
// [#18893] A cursor may be accessed concurrently by blocking subscriptions.
// The subscriptions must ensure mutex access
private volatile boolean isClosed;
private volatile int rows;
@SuppressWarnings("unchecked")
CursorImpl(ExecuteContext ctx, ExecuteListener listener, Field<?>[] fields, boolean keepStatement, boolean keepResultSet) {
@ -1319,7 +1322,7 @@ final class CursorImpl<R extends Record> extends AbstractCursor<R> {
/**
* The (potentially) pre-fetched next record
*/
private R next;
private volatile R next;
/**
* Whether the underlying {@link ResultSet} has a next record. This
@ -1330,7 +1333,7 @@ final class CursorImpl<R extends Record> extends AbstractCursor<R> {
* <li>false: there aren't any next records</li>
* </ul>
*/
private Boolean hasNext;
private volatile Boolean hasNext;
/**
* [#11099] Cache this instance for the entire cursor.
@ -1424,7 +1427,7 @@ final class CursorImpl<R extends Record> extends AbstractCursor<R> {
private final ExecuteContext ctx;
private final ExecuteListener listener;
private final AbstractRow<?> initialiserFields;
private int offset;
private volatile int offset;

View File

@ -82,6 +82,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@ -1714,6 +1715,11 @@ final class R2DBC {
// XXX: Legacy implementation
// -------------------------------------------------------------------------
/**
* [#18893] It might be possible to work with a more fine grained
* {@link Lock} instead of synchronizing all subscription access to allow
* for interleaving concurrent processing of individually requested rows.
*/
static final class BlockingRecordSubscription<R extends Record> extends AbstractSubscription<R> {
private final ResultQueryTrait<R> query;
private volatile Cursor<R> c;
@ -1749,7 +1755,7 @@ final class R2DBC {
}
@Override
final void cancel0(boolean closeAfterTransaction, Runnable onComplete) {
final synchronized void cancel0(boolean closeAfterTransaction, Runnable onComplete) {
safeClose(c);
onComplete.run();
}