[jOOQ/jOOQ#13669] More Connection::close calls fixed

- In batches
- When there are rendering problems in jOOQ
- This includes: [jOOQ/jOOQ#13343] R2DBC implementation may hang when
there's an exception in the rendering logic
This commit is contained in:
Lukas Eder 2022-06-29 17:34:19 +02:00
parent 1ef9b89ff7
commit bbe029c733

View File

@ -75,6 +75,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.jooq.BindingGetResultSetContext;
import org.jooq.Configuration;
@ -114,6 +115,7 @@ import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.ConnectionFactoryOptions.Builder;
import io.r2dbc.spi.Result.RowSegment;
import io.r2dbc.spi.Option;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
@ -247,7 +249,10 @@ final class R2DBC {
private final void complete(boolean cancelled) {
resultSubscriber.downstream.forwarders.remove(forwarderIndex);
resultSubscriber.complete(cancelled);
// [#13343] [#13669] Prevent premature completion
if (cancelled || resultSubscriber.downstream.forwarders.isEmpty() && resultSubscriber.completionRequested.get())
resultSubscriber.complete(cancelled);
}
}
@ -255,10 +260,12 @@ final class R2DBC {
final AbstractNonBlockingSubscription<? super T> downstream;
final AtomicBoolean completed;
final AtomicBoolean completionRequested;
AbstractResultSubscriber(AbstractNonBlockingSubscription<? super T> downstream) {
this.downstream = downstream;
this.completed = new AtomicBoolean();
this.completionRequested = new AtomicBoolean();
}
@Override
@ -269,22 +276,26 @@ final class R2DBC {
@Override
public final void onError(Throwable t) {
downstream.subscriber.onError(translate(downstream.sql(), t));
complete(true);
}
@Override
public final void onComplete() {
completed.set(true);
complete(false);
}
final void complete(boolean cancelled) {
if (completed.get() && downstream.forwarders.isEmpty())
completionRequested.set(true);
// [#13343] [#13669] Delay completion of the downstream in case this
// completion happens before each forwarder's
// completion.
if (downstream.forwarders.isEmpty() && !completed.getAndSet(true))
downstream.complete(cancelled);
}
}
static final class RowCountSubscriber extends AbstractResultSubscriber<Integer> {
RowCountSubscriber(AbstractNonBlockingSubscription<? super Integer> downstream) {
super(downstream);
}
@ -339,8 +350,6 @@ final class R2DBC {
return record;
});
}
// TODO: More specific error handling
catch (Throwable t) {
onError(t);
return null;
@ -432,8 +441,9 @@ final class R2DBC {
stmt.execute().subscribe(resultSubscriber.apply(query, downstream));
}
// TODO: More specific error handling
// [#13343] Cancel the downstream in case of a rendering bug in jOOQ
catch (Throwable t) {
downstream.cancel();
onError(t);
}
}
@ -463,8 +473,9 @@ final class R2DBC {
b.execute().subscribe(new RowCountSubscriber(downstream));
}
// TODO: More specific error handling
// [#13343] Cancel the downstream in case of a rendering bug in jOOQ
catch (Throwable t) {
downstream.cancel();
onError(t);
}
}
@ -514,7 +525,10 @@ final class R2DBC {
stmt.execute().subscribe(new RowCountSubscriber(downstream));
}
// [#13343] Cancel the downstream in case of a rendering bug in jOOQ
catch (Throwable t) {
downstream.cancel();
onError(t);
}
}
@ -642,7 +656,7 @@ final class R2DBC {
@Override
final String sql() {
String result = queryExecutionSubscriber.sql;
return result != null ? result : "" + queryExecutionSubscriber.query;
return result != null ? result : sql0(() -> "" + queryExecutionSubscriber.query);
}
}
@ -669,7 +683,7 @@ final class R2DBC {
@Override
final String sql() {
return batch.toString();
return sql0(() -> batch.toString());
}
}
@ -1510,4 +1524,16 @@ final class R2DBC {
static final boolean isR2dbc(java.sql.Statement statement) {
return statement instanceof R2DBCPreparedStatement;
}
/**
* [#13343] Prevent debug rendering errors from influencing control flow.
*/
static final String sql0(Supplier<String> supplier) {
try {
return supplier.get();
}
catch (Throwable t) {
return "Error while rendering SQL: " + t.getMessage();
}
}
}