From 4819e6874321bf6b6feabe2eb983bad1e7e4e828 Mon Sep 17 00:00:00 2001 From: Lukas Eder Date: Thu, 25 Mar 2021 21:53:54 +0100 Subject: [PATCH] [jOOQ/jOOQ#11700] Implement RowCountQuery.subscribe() --- .../org/jooq/impl/AbstractResultQuery.java | 10 +- .../org/jooq/impl/AbstractRowCountQuery.java | 48 +++--- .../java/org/jooq/impl/ParsingConnection.java | 11 +- jOOQ/src/main/java/org/jooq/impl/R2DBC.java | 157 +++++++++++++----- 4 files changed, 148 insertions(+), 78 deletions(-) diff --git a/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java b/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java index 5347b86a5c..c925d4574b 100644 --- a/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java +++ b/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java @@ -89,8 +89,10 @@ import org.jooq.Results; import org.jooq.SQLDialect; import org.jooq.Table; import org.jooq.conf.SettingsTools; -import org.jooq.impl.R2DBC.BlockingSubscription; -import org.jooq.impl.R2DBC.RecordSubscription; +import org.jooq.impl.R2DBC.BlockingRecordSubscription; +import org.jooq.impl.R2DBC.ConnectionSubscriber; +import org.jooq.impl.R2DBC.QuerySubscription; +import org.jooq.impl.R2DBC.ResultSubscriber; import org.jooq.tools.JooqLogger; import org.jooq.tools.jdbc.MockResultSet; @@ -338,9 +340,9 @@ abstract class AbstractResultQuery extends AbstractQuery im ConnectionFactory cf = configuration().connectionFactory(); if (!(cf instanceof NoConnectionFactory)) - subscriber.onSubscribe(new RecordSubscription(this, subscriber)); + subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, (AbstractResultQuery t, ConnectionSubscriber> u) -> new ResultSubscriber<>(t, u))); else - subscriber.onSubscribe(new BlockingSubscription<>(this, subscriber)); + subscriber.onSubscribe(new BlockingRecordSubscription<>(this, subscriber)); } @Override diff --git a/jOOQ/src/main/java/org/jooq/impl/AbstractRowCountQuery.java b/jOOQ/src/main/java/org/jooq/impl/AbstractRowCountQuery.java index 7f5e183b8f..c09d3cff77 100644 --- a/jOOQ/src/main/java/org/jooq/impl/AbstractRowCountQuery.java +++ b/jOOQ/src/main/java/org/jooq/impl/AbstractRowCountQuery.java @@ -40,6 +40,12 @@ package org.jooq.impl; import org.jooq.Configuration; import org.jooq.Record; import org.jooq.RowCountQuery; +import org.jooq.impl.R2DBC.BlockingRowCountSubscription; +import org.jooq.impl.R2DBC.ConnectionSubscriber; +import org.jooq.impl.R2DBC.QuerySubscription; +import org.jooq.impl.R2DBC.RowCountSubscriber; + +import io.r2dbc.spi.ConnectionFactory; /** * @author Lukas Eder @@ -55,36 +61,22 @@ abstract class AbstractRowCountQuery extends AbstractQuery implements Ro super(configuration); } + + + + + + + + + @Override public final void subscribe(org.reactivestreams.Subscriber subscriber) { - subscriber.onSubscribe(new org.reactivestreams.Subscription() { - Integer rows; + ConnectionFactory cf = configuration().connectionFactory(); - @Override - public void request(long n) { - try { - if (rows == null) - subscriber.onNext(rows = execute()); - } - catch (Throwable t) { - subscriber.onError(t); - } - - subscriber.onComplete(); - } - - @Override - public void cancel() { - } - }); + if (!(cf instanceof NoConnectionFactory)) + subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, (AbstractRowCountQuery t, ConnectionSubscriber u) -> new RowCountSubscriber<>(t, u))); + else + subscriber.onSubscribe(new BlockingRowCountSubscription(this, subscriber)); } - - - - - - - - - } diff --git a/jOOQ/src/main/java/org/jooq/impl/ParsingConnection.java b/jOOQ/src/main/java/org/jooq/impl/ParsingConnection.java index 49c230e228..d09e3e3386 100644 --- a/jOOQ/src/main/java/org/jooq/impl/ParsingConnection.java +++ b/jOOQ/src/main/java/org/jooq/impl/ParsingConnection.java @@ -54,6 +54,7 @@ import java.util.Map; import org.jooq.Configuration; import org.jooq.DSLContext; import org.jooq.Param; +import org.jooq.exception.DataAccessException; import org.jooq.impl.DefaultRenderContext.Rendered; import org.jooq.impl.Tools.Cache; import org.jooq.tools.JooqLogger; @@ -126,13 +127,21 @@ final class ParsingConnection extends DefaultConnection { return new CacheValue(configuration, sql, bindValues); }, CacheType.CACHE_PARSING_CONNECTION, - () -> Cache.key(sql, asList(dataTypes(bindValues))) + () -> Cache.key(sql, asList(dataTypes(nonNull(bindValues)))) ).rendered(bindValues); log.debug("Translating to", result.sql); return result; } + private static Param[] nonNull(Param[] bindValues) { + for (int i = 0; i < bindValues.length; i++) + if (bindValues[i] == null) + throw new DataAccessException("Bind value at position " + i + " not set"); + + return bindValues; + } + @Override public final Statement createStatement() throws SQLException { return new ParsingStatement(this, getDelegate().createStatement()); diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java index 0960a9cc53..656dcf8ad0 100644 --- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java +++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java @@ -39,7 +39,6 @@ package org.jooq.impl; import static org.jooq.conf.ParamType.NAMED; import static org.jooq.impl.Tools.recordFactory; -import static org.jooq.tools.Convert.convert; import static org.jooq.tools.StringUtils.defaultIfNull; import java.math.BigDecimal; @@ -57,6 +56,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.util.ArrayDeque; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; @@ -71,7 +71,6 @@ import org.jooq.SQLDialect; import org.jooq.conf.Settings; import org.jooq.conf.SettingsTools; import org.jooq.impl.DefaultRenderContext.Rendered; -import org.jooq.tools.Convert; import org.jooq.tools.JooqLogger; import org.jooq.tools.jdbc.DefaultPreparedStatement; import org.jooq.tools.jdbc.DefaultResultSet; @@ -83,6 +82,7 @@ import org.reactivestreams.Subscription; import io.r2dbc.spi.ColumnMetadata; import io.r2dbc.spi.Connection; +import io.r2dbc.spi.Result; import io.r2dbc.spi.Row; import io.r2dbc.spi.RowMetadata; import io.r2dbc.spi.Statement; @@ -95,28 +95,28 @@ final class R2DBC { private static final JooqLogger log = JooqLogger.getLogger(R2DBC.class); static volatile boolean is_0_9 = true; - static final class RowSubscriber implements Subscriber { + static final class SendResultUpstream implements Subscriber { - final ResultSubscriber upstream; - Subscription subscription; + final ConnectionSubscriber upstream; + Subscription subscription; - RowSubscriber(ResultSubscriber s) { + SendResultUpstream(ConnectionSubscriber s) { this.upstream = s; } @Override public final void onSubscribe(Subscription s) { this.subscription = s; - subscription.request(upstream.upstream.upstream.requested.getAndSet(0L)); + subscription.request(upstream.upstream.requested.getAndSet(0L)); } @Override - public final void onNext(R record) { - upstream.upstream.upstream.subscriber.onNext(record); + public final void onNext(T value) { + upstream.upstream.subscriber.onNext(value); - long requested = upstream.upstream.upstream.requested.getAndSet(0L); - if (requested > 0) - subscription.request(requested); + long more = upstream.upstream.requested.getAndSet(0L); + if (more > 0) + subscription.request(more); } @Override @@ -126,18 +126,47 @@ final class R2DBC { @Override public final void onComplete() { - upstream.upstream.upstream.subscriber.onComplete(); + upstream.upstream.subscriber.onComplete(); } } - static final class ResultSubscriber implements Subscriber { + static final class RowCountSubscriber> implements Subscriber { - final AbstractResultQuery query; - final ConnectionSubscriber upstream; + final Q query; + final ConnectionSubscriber upstream; - ResultSubscriber(AbstractResultQuery query, ConnectionSubscriber s) { + RowCountSubscriber(Q query, ConnectionSubscriber upstream) { this.query = query; - this.upstream = s; + this.upstream = upstream; + } + + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } + + @Override + public void onNext(Result r) { + r.getRowsUpdated().subscribe(new SendResultUpstream<>(upstream)); + } + + @Override + public final void onError(Throwable t) { + upstream.onError(t); + } + + @Override + public final void onComplete() {} + } + + static final class ResultSubscriber> implements Subscriber { + + final Q query; + final ConnectionSubscriber upstream; + + ResultSubscriber(Q query, ConnectionSubscriber upstream) { + this.query = query; + this.upstream = upstream; } @Override @@ -147,7 +176,7 @@ final class R2DBC { @SuppressWarnings("unchecked") @Override - public final void onNext(io.r2dbc.spi.Result r) { + public final void onNext(Result r) { r.map((row, m) -> { try { // TODO: Cache this getFields() call @@ -185,7 +214,7 @@ final class R2DBC { onError(t); return null; } - }).subscribe(new RowSubscriber(this)); + }).subscribe(new SendResultUpstream<>(upstream)); } @Override @@ -197,14 +226,20 @@ final class R2DBC { public final void onComplete() {} } - static final class ConnectionSubscriber implements Subscriber { + static final class ConnectionSubscriber> implements Subscriber { - final AbstractResultQuery query; - final RecordSubscription upstream; + final Q query; + final QuerySubscription upstream; + final BiFunction, Subscriber> resultSubscriber; - ConnectionSubscriber(AbstractResultQuery query, RecordSubscription s) { + ConnectionSubscriber( + Q query, + QuerySubscription upstream, + BiFunction, Subscriber> resultSubscriber + ) { this.query = query; - this.upstream = s; + this.upstream = upstream; + this.resultSubscriber = resultSubscriber; } @Override @@ -224,15 +259,17 @@ final class R2DBC { new DefaultBindContext(query.configuration(), new R2DBCPreparedStatement(query.configuration(), stmt)).visit(rendered.bindValues); // TODO: Reuse org.jooq.impl.Tools.setFetchSize(ExecuteContext ctx, int fetchSize) - int f = SettingsTools.getFetchSize(((AbstractResultQuery) query).fetchSize(), render.settings()); - if (f != 0) { - if (log.isDebugEnabled()) - log.debug("Setting fetch size", f); + if (query instanceof AbstractResultQuery) { + int f = SettingsTools.getFetchSize(((AbstractResultQuery) query).fetchSize(), render.settings()); + if (f != 0) { + if (log.isDebugEnabled()) + log.debug("Setting fetch size", f); - stmt.fetchSize(f); + stmt.fetchSize(f); + } } - stmt.execute().subscribe(new ResultSubscriber<>(query, this)); + stmt.execute().subscribe(resultSubscriber.apply(query, this)); } // TODO: More specific error handling @@ -250,27 +287,29 @@ final class R2DBC { public final void onComplete() {} } - static final class RecordSubscription implements Subscription { + static final class QuerySubscription> implements Subscription { + final Q query; + final Subscriber subscriber; + final BiFunction, Subscriber> resultSubscriber; + final AtomicLong requested; + final Publisher connection; - final AbstractResultQuery query; - final AtomicLong requested; - final Subscriber subscriber; - Publisher connection; - - RecordSubscription(AbstractResultQuery query, Subscriber subscriber) { + QuerySubscription( + Q query, + Subscriber subscriber, + BiFunction, Subscriber> resultSubscriber + ) { this.query = query; this.subscriber = subscriber; + this.resultSubscriber = resultSubscriber; this.requested = new AtomicLong(); + this.connection = query.configuration().connectionFactory().create(); } @Override public final void request(long n) { requested.getAndAdd(n); - - if (connection == null) { - connection = query.configuration().connectionFactory().create(); - connection.subscribe(new ConnectionSubscriber<>(query, this)); - } + connection.subscribe(new ConnectionSubscriber<>(query, this, resultSubscriber)); } @Override @@ -785,13 +824,13 @@ final class R2DBC { // XXX: Legacy implementation // ------------------------------------------------------------------------- - static final class BlockingSubscription implements Subscription { + static final class BlockingRecordSubscription implements Subscription { private final AbstractResultQuery query; private final Subscriber subscriber; private Cursor c; private ArrayDeque buffer; - BlockingSubscription(AbstractResultQuery query, Subscriber subscriber) { + BlockingRecordSubscription(AbstractResultQuery query, Subscriber subscriber) { this.query = query; this.subscriber = subscriber; } @@ -839,4 +878,32 @@ final class R2DBC { close(); } } + + static final class BlockingRowCountSubscription implements Subscription { + final AbstractRowCountQuery query; + final Subscriber subscriber; + Integer rows; + + BlockingRowCountSubscription(AbstractRowCountQuery query, Subscriber subscriber) { + this.query = query; + this.subscriber = subscriber; + } + + @Override + public void request(long n) { + try { + if (rows == null) + subscriber.onNext(rows = query.execute()); + } + catch (Throwable t) { + subscriber.onError(t); + } + + subscriber.onComplete(); + } + + @Override + public void cancel() { + } + } }