[jOOQ/jOOQ#15394] Exception thrown inside blocking

TransactionPublishable gets wrapped by DataAccessException unlike when
thrown from non-blocking TransactionPublishable
This commit is contained in:
Lukas Eder 2023-11-02 16:16:20 +01:00
parent 60df91b3e7
commit 03435fe3be
2 changed files with 15 additions and 9 deletions

View File

@ -83,7 +83,7 @@ public class DefaultCloseableDSLContext extends DefaultDSLContext implements Clo
if (cf instanceof DefaultConnectionFactory dcf) {
if (dcf.finalize) {
R2DBC.block(dcf.connection.close());
R2DBC.blockWrappingExceptions(dcf.connection.close());
dcf.connection = null;
}
}

View File

@ -37,10 +37,10 @@
*/
package org.jooq.impl;
import static org.jooq.ContextConverter.scoped;
import static org.jooq.SQLDialect.MARIADB;
import static org.jooq.SQLDialect.MYSQL;
// ...
import static org.jooq.ContextConverter.scoped;
import static org.jooq.conf.ParamType.NAMED;
import static org.jooq.impl.Internal.subscriber;
import static org.jooq.impl.Tools.EMPTY_PARAM;
@ -85,7 +85,6 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.jooq.BindingGetResultSetContext;
import org.jooq.Configuration;
import org.jooq.Converter;
import org.jooq.Cursor;
@ -99,7 +98,6 @@ import org.jooq.Param;
import org.jooq.Query;
import org.jooq.Record;
import org.jooq.SQLDialect;
import org.jooq.ContextConverter;
import org.jooq.TransactionalPublishable;
import org.jooq.XML;
import org.jooq.conf.Settings;
@ -125,7 +123,6 @@ import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.ConnectionFactoryOptions.Builder;
import io.r2dbc.spi.Result.RowSegment;
import io.r2dbc.spi.Option;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.Result;
@ -825,7 +822,7 @@ final class R2DBC {
}
@SuppressWarnings("unchecked")
static final <T> T block(Publisher<? extends T> publisher) {
static final <T> T block(Publisher<? extends T> publisher) throws Throwable {
Object complete = new Object();
LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();
publisher.subscribe(subscriber(s -> s.request(1), queue::add, queue::add, () -> queue.add(complete)));
@ -834,7 +831,7 @@ final class R2DBC {
Object result = queue.take();
if (result instanceof Throwable t)
throw new DataAccessException("Exception when blocking on publisher", t);
throw t;
else if (result == complete)
return null;
else
@ -845,6 +842,15 @@ final class R2DBC {
}
}
static final <T> T blockWrappingExceptions(Publisher<? extends T> publisher) {
try {
return block(publisher);
}
catch (Throwable t) {
throw new DataAccessException("Exception when blocking on publisher", t);
}
}
static final Connection getConnection(String url) {
return getConnection(url, new Properties());
}
@ -858,7 +864,7 @@ final class R2DBC {
static final Connection getConnection(String url, Properties properties) {
if (properties.isEmpty())
return block(ConnectionFactories.get(url).create());
return blockWrappingExceptions(ConnectionFactories.get(url).create());
Builder builder = ConnectionFactoryOptions.parse(url).mutate();
properties.forEach((k, v) -> {
@ -878,7 +884,7 @@ final class R2DBC {
setOption(builder, Option.valueOf("" + k), v);
});
return block(ConnectionFactories.get(builder.build()).create());
return blockWrappingExceptions(ConnectionFactories.get(builder.build()).create());
}
private static <T> Builder setOption(Builder builder, Option<T> option, Object v) {