diff --git a/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java b/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java index c650e91f3f..2c73095ee6 100644 --- a/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java +++ b/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java @@ -210,7 +210,6 @@ import org.jooq.Record6; import org.jooq.Record7; import org.jooq.Record8; import org.jooq.Record9; -import org.jooq.Records; import org.jooq.RenderContext; import org.jooq.Result; import org.jooq.ResultQuery; @@ -276,9 +275,7 @@ import org.jooq.exception.DetachedException; import org.jooq.exception.InvalidResultException; import org.jooq.exception.SQLDialectNotSupportedException; import org.jooq.impl.BatchCRUD.Action; -import org.jooq.impl.R2DBC.BlockingRecordSubscription; -import org.jooq.impl.R2DBC.QuerySubscription; -import org.jooq.impl.R2DBC.ResultSubscriber; +import org.jooq.impl.R2DBC.BlockingTransactionSubscription; import org.jooq.impl.R2DBC.TransactionSubscription; import org.jooq.tools.csv.CSVReader; import org.jooq.tools.jdbc.BatchedConnection; @@ -659,7 +656,7 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri if (!(cf instanceof NoConnectionFactory)) subscriber.onSubscribe(new TransactionSubscription<>(this, subscriber, transactional)); else - throw new UnsupportedOperationException("Blocking implementation of reactive transactions"); + subscriber.onSubscribe(new BlockingTransactionSubscription<>(this, subscriber, transactional)); }; } diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java index 052d9e47a9..38d39181e7 100644 --- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java +++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java @@ -1473,6 +1473,33 @@ final class R2DBC { } } + static final class BlockingTransactionSubscription extends AbstractSubscription { + final DSLContext ctx; + final TransactionalPublishable transactional; + + BlockingTransactionSubscription( + DSLContext ctx, + Subscriber subscriber, + TransactionalPublishable transactional + ) { + super(subscriber); + + this.ctx = ctx; + this.transactional = transactional; + } + + @Override + final void request0() { + try { + subscriber.onNext(ctx.transactionResult(c -> block(transactional.run(c)))); + subscriber.onComplete(); + } + catch (Throwable t) { + subscriber.onError(t); + } + } + } + static final boolean isR2dbc(java.sql.Statement statement) { return statement instanceof R2DBCPreparedStatement; }