[jOOQ/jOOQ#11700] Implement RowCountQuery.subscribe()

This commit is contained in:
Lukas Eder 2021-03-25 21:53:54 +01:00
parent d7736fda19
commit 4819e68743
4 changed files with 148 additions and 78 deletions

View File

@ -89,8 +89,10 @@ import org.jooq.Results;
import org.jooq.SQLDialect;
import org.jooq.Table;
import org.jooq.conf.SettingsTools;
import org.jooq.impl.R2DBC.BlockingSubscription;
import org.jooq.impl.R2DBC.RecordSubscription;
import org.jooq.impl.R2DBC.BlockingRecordSubscription;
import org.jooq.impl.R2DBC.ConnectionSubscriber;
import org.jooq.impl.R2DBC.QuerySubscription;
import org.jooq.impl.R2DBC.ResultSubscriber;
import org.jooq.tools.JooqLogger;
import org.jooq.tools.jdbc.MockResultSet;
@ -338,9 +340,9 @@ abstract class AbstractResultQuery<R extends Record> extends AbstractQuery<R> im
ConnectionFactory cf = configuration().connectionFactory();
if (!(cf instanceof NoConnectionFactory))
subscriber.onSubscribe(new RecordSubscription<R>(this, subscriber));
subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, (AbstractResultQuery<R> t, ConnectionSubscriber<R, R, AbstractResultQuery<R>> u) -> new ResultSubscriber<>(t, u)));
else
subscriber.onSubscribe(new BlockingSubscription<>(this, subscriber));
subscriber.onSubscribe(new BlockingRecordSubscription<>(this, subscriber));
}
@Override

View File

@ -40,6 +40,12 @@ package org.jooq.impl;
import org.jooq.Configuration;
import org.jooq.Record;
import org.jooq.RowCountQuery;
import org.jooq.impl.R2DBC.BlockingRowCountSubscription;
import org.jooq.impl.R2DBC.ConnectionSubscriber;
import org.jooq.impl.R2DBC.QuerySubscription;
import org.jooq.impl.R2DBC.RowCountSubscriber;
import io.r2dbc.spi.ConnectionFactory;
/**
* @author Lukas Eder
@ -55,36 +61,22 @@ abstract class AbstractRowCountQuery extends AbstractQuery<Record> implements Ro
super(configuration);
}
@Override
public final void subscribe(org.reactivestreams.Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new org.reactivestreams.Subscription() {
Integer rows;
ConnectionFactory cf = configuration().connectionFactory();
@Override
public void request(long n) {
try {
if (rows == null)
subscriber.onNext(rows = execute());
}
catch (Throwable t) {
subscriber.onError(t);
}
subscriber.onComplete();
}
@Override
public void cancel() {
}
});
if (!(cf instanceof NoConnectionFactory))
subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, (AbstractRowCountQuery t, ConnectionSubscriber<Integer, Record, AbstractRowCountQuery> u) -> new RowCountSubscriber<>(t, u)));
else
subscriber.onSubscribe(new BlockingRowCountSubscription(this, subscriber));
}
}

View File

@ -54,6 +54,7 @@ import java.util.Map;
import org.jooq.Configuration;
import org.jooq.DSLContext;
import org.jooq.Param;
import org.jooq.exception.DataAccessException;
import org.jooq.impl.DefaultRenderContext.Rendered;
import org.jooq.impl.Tools.Cache;
import org.jooq.tools.JooqLogger;
@ -126,13 +127,21 @@ final class ParsingConnection extends DefaultConnection {
return new CacheValue(configuration, sql, bindValues);
},
CacheType.CACHE_PARSING_CONNECTION,
() -> Cache.key(sql, asList(dataTypes(bindValues)))
() -> Cache.key(sql, asList(dataTypes(nonNull(bindValues))))
).rendered(bindValues);
log.debug("Translating to", result.sql);
return result;
}
private static Param<?>[] nonNull(Param<?>[] bindValues) {
for (int i = 0; i < bindValues.length; i++)
if (bindValues[i] == null)
throw new DataAccessException("Bind value at position " + i + " not set");
return bindValues;
}
@Override
public final Statement createStatement() throws SQLException {
return new ParsingStatement(this, getDelegate().createStatement());

View File

@ -39,7 +39,6 @@ package org.jooq.impl;
import static org.jooq.conf.ParamType.NAMED;
import static org.jooq.impl.Tools.recordFactory;
import static org.jooq.tools.Convert.convert;
import static org.jooq.tools.StringUtils.defaultIfNull;
import java.math.BigDecimal;
@ -57,6 +56,7 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
@ -71,7 +71,6 @@ import org.jooq.SQLDialect;
import org.jooq.conf.Settings;
import org.jooq.conf.SettingsTools;
import org.jooq.impl.DefaultRenderContext.Rendered;
import org.jooq.tools.Convert;
import org.jooq.tools.JooqLogger;
import org.jooq.tools.jdbc.DefaultPreparedStatement;
import org.jooq.tools.jdbc.DefaultResultSet;
@ -83,6 +82,7 @@ import org.reactivestreams.Subscription;
import io.r2dbc.spi.ColumnMetadata;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import io.r2dbc.spi.Statement;
@ -95,28 +95,28 @@ final class R2DBC {
private static final JooqLogger log = JooqLogger.getLogger(R2DBC.class);
static volatile boolean is_0_9 = true;
static final class RowSubscriber<R extends Record> implements Subscriber<R> {
static final class SendResultUpstream<T> implements Subscriber<T> {
final ResultSubscriber<R> upstream;
Subscription subscription;
final ConnectionSubscriber<T, ?, ?> upstream;
Subscription subscription;
RowSubscriber(ResultSubscriber<R> s) {
SendResultUpstream(ConnectionSubscriber<T, ?, ?> s) {
this.upstream = s;
}
@Override
public final void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(upstream.upstream.upstream.requested.getAndSet(0L));
subscription.request(upstream.upstream.requested.getAndSet(0L));
}
@Override
public final void onNext(R record) {
upstream.upstream.upstream.subscriber.onNext(record);
public final void onNext(T value) {
upstream.upstream.subscriber.onNext(value);
long requested = upstream.upstream.upstream.requested.getAndSet(0L);
if (requested > 0)
subscription.request(requested);
long more = upstream.upstream.requested.getAndSet(0L);
if (more > 0)
subscription.request(more);
}
@Override
@ -126,18 +126,47 @@ final class R2DBC {
@Override
public final void onComplete() {
upstream.upstream.upstream.subscriber.onComplete();
upstream.upstream.subscriber.onComplete();
}
}
static final class ResultSubscriber<R extends Record> implements Subscriber<io.r2dbc.spi.Result> {
static final class RowCountSubscriber<R extends Record, Q extends AbstractQuery<R>> implements Subscriber<Result> {
final AbstractResultQuery<R> query;
final ConnectionSubscriber<R> upstream;
final Q query;
final ConnectionSubscriber<Integer, R, Q> upstream;
ResultSubscriber(AbstractResultQuery<R> query, ConnectionSubscriber<R> s) {
RowCountSubscriber(Q query, ConnectionSubscriber<Integer, R, Q> upstream) {
this.query = query;
this.upstream = s;
this.upstream = upstream;
}
@Override
public void onSubscribe(Subscription s) {
s.request(1);
}
@Override
public void onNext(Result r) {
r.getRowsUpdated().subscribe(new SendResultUpstream<>(upstream));
}
@Override
public final void onError(Throwable t) {
upstream.onError(t);
}
@Override
public final void onComplete() {}
}
static final class ResultSubscriber<R extends Record, Q extends AbstractResultQuery<R>> implements Subscriber<Result> {
final Q query;
final ConnectionSubscriber<R, R, Q> upstream;
ResultSubscriber(Q query, ConnectionSubscriber<R, R, Q> upstream) {
this.query = query;
this.upstream = upstream;
}
@Override
@ -147,7 +176,7 @@ final class R2DBC {
@SuppressWarnings("unchecked")
@Override
public final void onNext(io.r2dbc.spi.Result r) {
public final void onNext(Result r) {
r.map((row, m) -> {
try {
// TODO: Cache this getFields() call
@ -185,7 +214,7 @@ final class R2DBC {
onError(t);
return null;
}
}).subscribe(new RowSubscriber<R>(this));
}).subscribe(new SendResultUpstream<>(upstream));
}
@Override
@ -197,14 +226,20 @@ final class R2DBC {
public final void onComplete() {}
}
static final class ConnectionSubscriber<R extends Record> implements Subscriber<Connection> {
static final class ConnectionSubscriber<T, R extends Record, Q extends AbstractQuery<R>> implements Subscriber<Connection> {
final AbstractResultQuery<R> query;
final RecordSubscription<R> upstream;
final Q query;
final QuerySubscription<T, R, Q> upstream;
final BiFunction<Q, ConnectionSubscriber<T, R, Q>, Subscriber<Result>> resultSubscriber;
ConnectionSubscriber(AbstractResultQuery<R> query, RecordSubscription<R> s) {
ConnectionSubscriber(
Q query,
QuerySubscription<T, R, Q> upstream,
BiFunction<Q, ConnectionSubscriber<T, R, Q>, Subscriber<Result>> resultSubscriber
) {
this.query = query;
this.upstream = s;
this.upstream = upstream;
this.resultSubscriber = resultSubscriber;
}
@Override
@ -224,15 +259,17 @@ final class R2DBC {
new DefaultBindContext(query.configuration(), new R2DBCPreparedStatement(query.configuration(), stmt)).visit(rendered.bindValues);
// TODO: Reuse org.jooq.impl.Tools.setFetchSize(ExecuteContext ctx, int fetchSize)
int f = SettingsTools.getFetchSize(((AbstractResultQuery<?>) query).fetchSize(), render.settings());
if (f != 0) {
if (log.isDebugEnabled())
log.debug("Setting fetch size", f);
if (query instanceof AbstractResultQuery) {
int f = SettingsTools.getFetchSize(((AbstractResultQuery<?>) query).fetchSize(), render.settings());
if (f != 0) {
if (log.isDebugEnabled())
log.debug("Setting fetch size", f);
stmt.fetchSize(f);
stmt.fetchSize(f);
}
}
stmt.execute().subscribe(new ResultSubscriber<>(query, this));
stmt.execute().subscribe(resultSubscriber.apply(query, this));
}
// TODO: More specific error handling
@ -250,27 +287,29 @@ final class R2DBC {
public final void onComplete() {}
}
static final class RecordSubscription<R extends Record> implements Subscription {
static final class QuerySubscription<T, R extends Record, Q extends AbstractQuery<R>> implements Subscription {
final Q query;
final Subscriber<? super T> subscriber;
final BiFunction<Q, ConnectionSubscriber<T, R, Q>, Subscriber<Result>> resultSubscriber;
final AtomicLong requested;
final Publisher<? extends Connection> connection;
final AbstractResultQuery<R> query;
final AtomicLong requested;
final Subscriber<? super R> subscriber;
Publisher<? extends Connection> connection;
RecordSubscription(AbstractResultQuery<R> query, Subscriber<? super R> subscriber) {
QuerySubscription(
Q query,
Subscriber<? super T> subscriber,
BiFunction<Q, ConnectionSubscriber<T, R, Q>, Subscriber<Result>> resultSubscriber
) {
this.query = query;
this.subscriber = subscriber;
this.resultSubscriber = resultSubscriber;
this.requested = new AtomicLong();
this.connection = query.configuration().connectionFactory().create();
}
@Override
public final void request(long n) {
requested.getAndAdd(n);
if (connection == null) {
connection = query.configuration().connectionFactory().create();
connection.subscribe(new ConnectionSubscriber<>(query, this));
}
connection.subscribe(new ConnectionSubscriber<>(query, this, resultSubscriber));
}
@Override
@ -785,13 +824,13 @@ final class R2DBC {
// XXX: Legacy implementation
// -------------------------------------------------------------------------
static final class BlockingSubscription<R extends Record> implements Subscription {
static final class BlockingRecordSubscription<R extends Record> implements Subscription {
private final AbstractResultQuery<R> query;
private final Subscriber<? super R> subscriber;
private Cursor<R> c;
private ArrayDeque<R> buffer;
BlockingSubscription(AbstractResultQuery<R> query, Subscriber<? super R> subscriber) {
BlockingRecordSubscription(AbstractResultQuery<R> query, Subscriber<? super R> subscriber) {
this.query = query;
this.subscriber = subscriber;
}
@ -839,4 +878,32 @@ final class R2DBC {
close();
}
}
static final class BlockingRowCountSubscription implements Subscription {
final AbstractRowCountQuery query;
final Subscriber<? super Integer> subscriber;
Integer rows;
BlockingRowCountSubscription(AbstractRowCountQuery query, Subscriber<? super Integer> subscriber) {
this.query = query;
this.subscriber = subscriber;
}
@Override
public void request(long n) {
try {
if (rows == null)
subscriber.onNext(rows = query.execute());
}
catch (Throwable t) {
subscriber.onError(t);
}
subscriber.onComplete();
}
@Override
public void cancel() {
}
}
}