org.jooq
* interfaces.
@@ -754,6 +756,46 @@ public class DSL {
return new DefaultDSLContext(connectionProvider, dialect, settings);
}
+ /**
+ * Create an executor with a custom connection factory and a dialect
+ * configured.
+ *
+ * @param connectionFactory The connection factory providing jOOQ with
+ * R2DBC connections
+ */
+ @NotNull
+ public static DSLContext using(ConnectionFactory connectionFactory) {
+ return new DefaultDSLContext(connectionFactory, JDBCUtils.dialect(connectionFactory));
+ }
+
+ /**
+ * Create an executor with a custom connection factory and a dialect
+ * configured.
+ *
+ * @param connectionFactory The connection factory providing jOOQ with
+ * R2DBC connections
+ * @param dialect The dialect to use with objects created from this executor
+ */
+ @NotNull
+ public static DSLContext using(ConnectionFactory connectionFactory, SQLDialect dialect) {
+ return new DefaultDSLContext(connectionFactory, dialect);
+ }
+
+ /**
+ * Create an executor with a custom connection factory, a dialect and settings
+ * configured.
+ *
+ * @param connectionFactory The connection factory providing jOOQ with
+ * R2DBC connections
+ * @param dialect The dialect to use with objects created from this executor
+ * @param settings The runtime settings to apply to objects created from
+ * this executor
+ */
+ @NotNull
+ public static DSLContext using(ConnectionFactory connectionFactory, SQLDialect dialect, Settings settings) {
+ return new DefaultDSLContext(connectionFactory, dialect, settings);
+ }
+
/**
* Create an executor from a custom configuration.
*
diff --git a/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java b/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java
index 7ca10a1642..53983f7b4a 100644
--- a/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java
+++ b/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java
@@ -338,6 +338,14 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri
this(new DefaultConfiguration(connectionProvider, dialect, settings));
}
+ public DefaultDSLContext(ConnectionFactory connectionFactory, SQLDialect dialect) {
+ this(connectionFactory, dialect, null);
+ }
+
+ public DefaultDSLContext(ConnectionFactory connectionFactory, SQLDialect dialect, Settings settings) {
+ this(new DefaultConfiguration().set(connectionFactory).set(dialect).set(settings));
+ }
+
public DefaultDSLContext(Configuration configuration) {
super(configuration, configuration == null ? null : configuration.data());
}
diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java
index 7816d8485a..ef7f887305 100644
--- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java
+++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java
@@ -65,6 +65,7 @@ import java.util.ArrayDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -82,14 +83,15 @@ import org.jooq.SQLDialect;
import org.jooq.conf.Settings;
import org.jooq.conf.SettingsTools;
import org.jooq.impl.DefaultRenderContext.Rendered;
-import org.jooq.impl.R2DBC.RowCountSubscriber;
+import org.jooq.impl.Tools.ThreadGuard;
+import org.jooq.impl.Tools.ThreadGuard.Guard;
import org.jooq.tools.Convert;
import org.jooq.tools.JooqLogger;
import org.jooq.tools.jdbc.DefaultPreparedStatement;
import org.jooq.tools.jdbc.DefaultResultSet;
-import org.jooq.tools.jdbc.JDBCUtils;
import org.jooq.tools.jdbc.MockArray;
+import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@@ -110,12 +112,70 @@ final class R2DBC {
private static final JooqLogger log = JooqLogger.getLogger(R2DBC.class);
static volatile boolean is_0_9 = true;
+ // -------------------------------------------------------------------------
+ // Utilities to pass the TCK
+ // -------------------------------------------------------------------------
+
+ static abstract class AbstractSubscription, Subscriber> resultSubscriber; + final BiFunction , Subscriber> resultSubscriber; QueryExecutionSubscriber( Q query, QuerySubscription downstream, - BiFunction , Subscriber> resultSubscriber + BiFunction , Subscriber> resultSubscriber ) { super(downstream); @@ -392,33 +452,29 @@ final class R2DBC { } } - static abstract class AbstractSubscription implements Subscription { + static abstract class AbstractConnectionSubscription extends AbstractSubscription { - final Subscriber super T> subscriber; final AtomicBoolean subscribed; - final AtomicLong requested; final Publisher extends Connection> connection; - AbstractSubscription( + AbstractConnectionSubscription( Configuration configuration, Subscriber super T> subscriber ) { - this.subscriber = subscriber; + super(subscriber); + this.subscribed = new AtomicBoolean(); - this.requested = new AtomicLong(); this.connection = configuration.connectionFactory().create(); } @Override - public final void request(long n) { - requested.getAndAdd(n); - + final void request0() { if (!subscribed.getAndSet(true)) connection.subscribe(delegate()); } @Override - public final void cancel() { + final void cancel0() { delegate().connection.updateAndGet(c -> { // close() calls on already closed resources have no effect, so @@ -428,20 +484,21 @@ final class R2DBC { return null; }); + subscriber.onComplete(); } abstract ConnectionSubscriber delegate(); } - static final class QuerySubscription extends AbstractSubscription { + static final class QuerySubscription extends AbstractConnectionSubscription { final QueryExecutionSubscriber queryExecutionSubscriber; QuerySubscription( Q query, Subscriber super T> subscriber, - BiFunction , Subscriber> resultSubscriber + BiFunction , Subscriber> resultSubscriber ) { super(query.configuration(), subscriber); @@ -454,7 +511,7 @@ final class R2DBC { } } - static final class BatchSubscription extends AbstractSubscription { + static final class BatchSubscription extends AbstractConnectionSubscription { final ConnectionSubscriber batchSubscriber; @@ -1017,36 +1074,32 @@ final class R2DBC { // XXX: Legacy implementation // ------------------------------------------------------------------------- - static final class BlockingRecordSubscription implements Subscription { + static final class BlockingRecordSubscription extends AbstractSubscription { private final ResultQueryTrait query; - private final Subscriber super R> subscriber; - private final ArrayDeque buffer; private volatile Cursor c; BlockingRecordSubscription(ResultQueryTrait query, Subscriber super R> subscriber) { + super(subscriber); + this.query = query; - this.subscriber = subscriber; - this.buffer = new ArrayDeque<>(); } @Override - public final synchronized void request(long n) { - int i = (int) Math.min(n, Integer.MAX_VALUE); - + final synchronized void request0() { try { if (c == null) c = query.fetchLazyNonAutoClosing(); - if (buffer.size() < i) - buffer.addAll(c.fetchNext(i - buffer.size())); + while (requested.getAndUpdate(l -> Math.max(0, l - 1)) > 0) { + R r = c.fetchNext(); - boolean complete = buffer.size() < i; - while (!buffer.isEmpty()) - subscriber.onNext(buffer.pollFirst()); + if (r == null) { + subscriber.onComplete(); + safeClose(c); + break; + } - if (complete) { - subscriber.onComplete(); - safeClose(c); + subscriber.onNext(r); } } catch (Throwable t) { @@ -1056,28 +1109,24 @@ final class R2DBC { } @Override - public final void cancel() { + final void cancel0() { safeClose(c); } } - static final class BlockingRowCountSubscription implements Subscription { - final AbstractRowCountQuery query; - final Subscriber super Integer> subscriber; - final AtomicBoolean executed; + static final class BlockingRowCountSubscription extends AbstractSubscription { + final AbstractRowCountQuery query; BlockingRowCountSubscription(AbstractRowCountQuery query, Subscriber super Integer> subscriber) { + super(subscriber); + this.query = query; - this.subscriber = subscriber; - this.executed = new AtomicBoolean(); } @Override - public void request(long n) { + final void request0() { try { - if (!executed.getAndSet(true)) - subscriber.onNext(query.execute()); - + subscriber.onNext(query.execute()); subscriber.onComplete(); } catch (Throwable t) { @@ -1086,6 +1135,6 @@ final class R2DBC { } @Override - public void cancel() {} + final void cancel0() {} } } diff --git a/jOOQ/src/main/java/org/jooq/impl/Tools.java b/jOOQ/src/main/java/org/jooq/impl/Tools.java index 761aa7cfcb..a7d22a6300 100644 --- a/jOOQ/src/main/java/org/jooq/impl/Tools.java +++ b/jOOQ/src/main/java/org/jooq/impl/Tools.java @@ -3273,7 +3273,8 @@ final class Tools { * The type of guard. */ static enum Guard { - RECORD_TOSTRING; + RECORD_TOSTRING, + SUBSCRIPTION_SYNC_RECURSION; ThreadLocal