[jOOQ/jOOQ#13502] Added JDBC implementations of reactive transactions

This commit is contained in:
Lukas Eder 2022-04-28 15:36:07 +02:00
parent 63a27fe1ba
commit 9b571dabd5
2 changed files with 29 additions and 5 deletions

View File

@ -210,7 +210,6 @@ import org.jooq.Record6;
import org.jooq.Record7;
import org.jooq.Record8;
import org.jooq.Record9;
import org.jooq.Records;
import org.jooq.RenderContext;
import org.jooq.Result;
import org.jooq.ResultQuery;
@ -276,9 +275,7 @@ import org.jooq.exception.DetachedException;
import org.jooq.exception.InvalidResultException;
import org.jooq.exception.SQLDialectNotSupportedException;
import org.jooq.impl.BatchCRUD.Action;
import org.jooq.impl.R2DBC.BlockingRecordSubscription;
import org.jooq.impl.R2DBC.QuerySubscription;
import org.jooq.impl.R2DBC.ResultSubscriber;
import org.jooq.impl.R2DBC.BlockingTransactionSubscription;
import org.jooq.impl.R2DBC.TransactionSubscription;
import org.jooq.tools.csv.CSVReader;
import org.jooq.tools.jdbc.BatchedConnection;
@ -659,7 +656,7 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri
if (!(cf instanceof NoConnectionFactory))
subscriber.onSubscribe(new TransactionSubscription<>(this, subscriber, transactional));
else
throw new UnsupportedOperationException("Blocking implementation of reactive transactions");
subscriber.onSubscribe(new BlockingTransactionSubscription<>(this, subscriber, transactional));
};
}

View File

@ -1473,6 +1473,33 @@ final class R2DBC {
}
}
static final class BlockingTransactionSubscription<T> extends AbstractSubscription<T> {
final DSLContext ctx;
final TransactionalPublishable<T> transactional;
BlockingTransactionSubscription(
DSLContext ctx,
Subscriber<? super T> subscriber,
TransactionalPublishable<T> transactional
) {
super(subscriber);
this.ctx = ctx;
this.transactional = transactional;
}
@Override
final void request0() {
try {
subscriber.onNext(ctx.transactionResult(c -> block(transactional.run(c))));
subscriber.onComplete();
}
catch (Throwable t) {
subscriber.onError(t);
}
}
}
static final boolean isR2dbc(java.sql.Statement statement) {
return statement instanceof R2DBCPreparedStatement;
}