From 6ee6d4eb93fe022e08adfd9942c109df15007f0d Mon Sep 17 00:00:00 2001 From: Lukas Eder Date: Thu, 1 Apr 2021 13:24:00 +0200 Subject: [PATCH] [jOOQ/jOOQ#10245] [jOOQ/jOOQ#11700] Pass TCK --- jOOQ/src/main/java/org/jooq/impl/DSL.java | 42 +++++ .../java/org/jooq/impl/DefaultDSLContext.java | 8 + jOOQ/src/main/java/org/jooq/impl/R2DBC.java | 155 ++++++++++++------ jOOQ/src/main/java/org/jooq/impl/Tools.java | 10 +- .../java/org/jooq/tools/jdbc/JDBCUtils.java | 48 ++++++ pom.xml | 36 +++- 6 files changed, 244 insertions(+), 55 deletions(-) diff --git a/jOOQ/src/main/java/org/jooq/impl/DSL.java b/jOOQ/src/main/java/org/jooq/impl/DSL.java index bfe1078dd0..c776a7d775 100644 --- a/jOOQ/src/main/java/org/jooq/impl/DSL.java +++ b/jOOQ/src/main/java/org/jooq/impl/DSL.java @@ -422,6 +422,8 @@ import org.jooq.types.UShort; import org.jetbrains.annotations.NotNull; +import io.r2dbc.spi.ConnectionFactory; + /** * A DSL "entry point" providing implementations to the org.jooq * interfaces. @@ -754,6 +756,46 @@ public class DSL { return new DefaultDSLContext(connectionProvider, dialect, settings); } + /** + * Create an executor with a custom connection factory and a dialect + * configured. + * + * @param connectionFactory The connection factory providing jOOQ with + * R2DBC connections + */ + @NotNull + public static DSLContext using(ConnectionFactory connectionFactory) { + return new DefaultDSLContext(connectionFactory, JDBCUtils.dialect(connectionFactory)); + } + + /** + * Create an executor with a custom connection factory and a dialect + * configured. + * + * @param connectionFactory The connection factory providing jOOQ with + * R2DBC connections + * @param dialect The dialect to use with objects created from this executor + */ + @NotNull + public static DSLContext using(ConnectionFactory connectionFactory, SQLDialect dialect) { + return new DefaultDSLContext(connectionFactory, dialect); + } + + /** + * Create an executor with a custom connection factory, a dialect and settings + * configured. + * + * @param connectionFactory The connection factory providing jOOQ with + * R2DBC connections + * @param dialect The dialect to use with objects created from this executor + * @param settings The runtime settings to apply to objects created from + * this executor + */ + @NotNull + public static DSLContext using(ConnectionFactory connectionFactory, SQLDialect dialect, Settings settings) { + return new DefaultDSLContext(connectionFactory, dialect, settings); + } + /** * Create an executor from a custom configuration. * diff --git a/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java b/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java index 7ca10a1642..53983f7b4a 100644 --- a/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java +++ b/jOOQ/src/main/java/org/jooq/impl/DefaultDSLContext.java @@ -338,6 +338,14 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri this(new DefaultConfiguration(connectionProvider, dialect, settings)); } + public DefaultDSLContext(ConnectionFactory connectionFactory, SQLDialect dialect) { + this(connectionFactory, dialect, null); + } + + public DefaultDSLContext(ConnectionFactory connectionFactory, SQLDialect dialect, Settings settings) { + this(new DefaultConfiguration().set(connectionFactory).set(dialect).set(settings)); + } + public DefaultDSLContext(Configuration configuration) { super(configuration, configuration == null ? null : configuration.data()); } diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java index 7816d8485a..ef7f887305 100644 --- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java +++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java @@ -65,6 +65,7 @@ import java.util.ArrayDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAccumulator; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; @@ -82,14 +83,15 @@ import org.jooq.SQLDialect; import org.jooq.conf.Settings; import org.jooq.conf.SettingsTools; import org.jooq.impl.DefaultRenderContext.Rendered; -import org.jooq.impl.R2DBC.RowCountSubscriber; +import org.jooq.impl.Tools.ThreadGuard; +import org.jooq.impl.Tools.ThreadGuard.Guard; import org.jooq.tools.Convert; import org.jooq.tools.JooqLogger; import org.jooq.tools.jdbc.DefaultPreparedStatement; import org.jooq.tools.jdbc.DefaultResultSet; -import org.jooq.tools.jdbc.JDBCUtils; import org.jooq.tools.jdbc.MockArray; +import org.jetbrains.annotations.Nullable; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -110,12 +112,70 @@ final class R2DBC { private static final JooqLogger log = JooqLogger.getLogger(R2DBC.class); static volatile boolean is_0_9 = true; + // ------------------------------------------------------------------------- + // Utilities to pass the TCK + // ------------------------------------------------------------------------- + + static abstract class AbstractSubscription implements Subscription { + + final AtomicBoolean completed; + final AtomicLong requested; + final Subscriber subscriber; + + AbstractSubscription(Subscriber subscriber) { + this.completed = new AtomicBoolean(); + this.requested = new AtomicLong(); + this.subscriber = org.jooq.Publisher.subscriber( + subscriber::onSubscribe, + subscriber::onNext, + subscriber::onError, + () -> { + completed.set(true); + subscriber.onComplete(); + } + ); + } + + @Override + public final void request(long n) { + if (n <= 0) { + subscriber.onError(new IllegalArgumentException("Rule 3.9 non-positive request signals are illegal")); + } + else if (!completed.get()) { + requested.accumulateAndGet(n, (x, y) -> { + long r = x + y; + + // See Long::addExact + if (((x ^ r) & (y ^ r)) < 0) + return Long.MAX_VALUE; + else + return r; + }); + + ThreadGuard.run(Guard.SUBSCRIPTION_SYNC_RECURSION, this::request0, () -> {}); + } + } + + @Override + public final void cancel() { + if (!completed.getAndSet(true)) + cancel0(); + } + + abstract void request0(); + abstract void cancel0(); + } + + // ------------------------------------------------------------------------- + // R2DBC implementations + // ------------------------------------------------------------------------- + static final class Forwarding implements Subscriber { - final AbstractSubscription downstream; + final AbstractConnectionSubscription downstream; final AtomicLong expected; - Forwarding(AbstractSubscription downstream, long expected) { + Forwarding(AbstractConnectionSubscription downstream, long expected) { this.downstream = downstream; this.expected = new AtomicLong(expected); } @@ -145,9 +205,9 @@ final class R2DBC { static abstract class AbstractResultSubscriber implements Subscriber { - final AbstractSubscription downstream; + final AbstractConnectionSubscription downstream; - AbstractResultSubscriber(AbstractSubscription downstream) { + AbstractResultSubscriber(AbstractConnectionSubscription downstream) { this.downstream = downstream; } @@ -164,7 +224,7 @@ final class R2DBC { final Forwarding forwarding; - RowCountSubscriber(AbstractSubscription downstream, long expected) { + RowCountSubscriber(AbstractConnectionSubscription downstream, long expected) { super(downstream); this.forwarding = new Forwarding<>(downstream, expected); @@ -185,7 +245,7 @@ final class R2DBC { final Q query; - ResultSubscriber(Q query, AbstractSubscription downstream) { + ResultSubscriber(Q query, AbstractConnectionSubscription downstream) { super(downstream); this.query = query; @@ -242,10 +302,10 @@ final class R2DBC { static abstract class ConnectionSubscriber implements Subscriber { - final AbstractSubscription downstream; + final AbstractConnectionSubscription downstream; final AtomicReference connection; - ConnectionSubscriber(AbstractSubscription downstream) { + ConnectionSubscriber(AbstractConnectionSubscription downstream) { this.downstream = downstream; this.connection = new AtomicReference<>(); } @@ -275,12 +335,12 @@ final class R2DBC { static final class QueryExecutionSubscriber extends ConnectionSubscriber { final Q query; - final BiFunction, Subscriber> resultSubscriber; + final BiFunction, Subscriber> resultSubscriber; QueryExecutionSubscriber( Q query, QuerySubscription downstream, - BiFunction, Subscriber> resultSubscriber + BiFunction, Subscriber> resultSubscriber ) { super(downstream); @@ -392,33 +452,29 @@ final class R2DBC { } } - static abstract class AbstractSubscription implements Subscription { + static abstract class AbstractConnectionSubscription extends AbstractSubscription { - final Subscriber subscriber; final AtomicBoolean subscribed; - final AtomicLong requested; final Publisher connection; - AbstractSubscription( + AbstractConnectionSubscription( Configuration configuration, Subscriber subscriber ) { - this.subscriber = subscriber; + super(subscriber); + this.subscribed = new AtomicBoolean(); - this.requested = new AtomicLong(); this.connection = configuration.connectionFactory().create(); } @Override - public final void request(long n) { - requested.getAndAdd(n); - + final void request0() { if (!subscribed.getAndSet(true)) connection.subscribe(delegate()); } @Override - public final void cancel() { + final void cancel0() { delegate().connection.updateAndGet(c -> { // close() calls on already closed resources have no effect, so @@ -428,20 +484,21 @@ final class R2DBC { return null; }); + subscriber.onComplete(); } abstract ConnectionSubscriber delegate(); } - static final class QuerySubscription extends AbstractSubscription { + static final class QuerySubscription extends AbstractConnectionSubscription { final QueryExecutionSubscriber queryExecutionSubscriber; QuerySubscription( Q query, Subscriber subscriber, - BiFunction, Subscriber> resultSubscriber + BiFunction, Subscriber> resultSubscriber ) { super(query.configuration(), subscriber); @@ -454,7 +511,7 @@ final class R2DBC { } } - static final class BatchSubscription extends AbstractSubscription { + static final class BatchSubscription extends AbstractConnectionSubscription { final ConnectionSubscriber batchSubscriber; @@ -1017,36 +1074,32 @@ final class R2DBC { // XXX: Legacy implementation // ------------------------------------------------------------------------- - static final class BlockingRecordSubscription implements Subscription { + static final class BlockingRecordSubscription extends AbstractSubscription { private final ResultQueryTrait query; - private final Subscriber subscriber; - private final ArrayDeque buffer; private volatile Cursor c; BlockingRecordSubscription(ResultQueryTrait query, Subscriber subscriber) { + super(subscriber); + this.query = query; - this.subscriber = subscriber; - this.buffer = new ArrayDeque<>(); } @Override - public final synchronized void request(long n) { - int i = (int) Math.min(n, Integer.MAX_VALUE); - + final synchronized void request0() { try { if (c == null) c = query.fetchLazyNonAutoClosing(); - if (buffer.size() < i) - buffer.addAll(c.fetchNext(i - buffer.size())); + while (requested.getAndUpdate(l -> Math.max(0, l - 1)) > 0) { + R r = c.fetchNext(); - boolean complete = buffer.size() < i; - while (!buffer.isEmpty()) - subscriber.onNext(buffer.pollFirst()); + if (r == null) { + subscriber.onComplete(); + safeClose(c); + break; + } - if (complete) { - subscriber.onComplete(); - safeClose(c); + subscriber.onNext(r); } } catch (Throwable t) { @@ -1056,28 +1109,24 @@ final class R2DBC { } @Override - public final void cancel() { + final void cancel0() { safeClose(c); } } - static final class BlockingRowCountSubscription implements Subscription { - final AbstractRowCountQuery query; - final Subscriber subscriber; - final AtomicBoolean executed; + static final class BlockingRowCountSubscription extends AbstractSubscription { + final AbstractRowCountQuery query; BlockingRowCountSubscription(AbstractRowCountQuery query, Subscriber subscriber) { + super(subscriber); + this.query = query; - this.subscriber = subscriber; - this.executed = new AtomicBoolean(); } @Override - public void request(long n) { + final void request0() { try { - if (!executed.getAndSet(true)) - subscriber.onNext(query.execute()); - + subscriber.onNext(query.execute()); subscriber.onComplete(); } catch (Throwable t) { @@ -1086,6 +1135,6 @@ final class R2DBC { } @Override - public void cancel() {} + final void cancel0() {} } } diff --git a/jOOQ/src/main/java/org/jooq/impl/Tools.java b/jOOQ/src/main/java/org/jooq/impl/Tools.java index 761aa7cfcb..a7d22a6300 100644 --- a/jOOQ/src/main/java/org/jooq/impl/Tools.java +++ b/jOOQ/src/main/java/org/jooq/impl/Tools.java @@ -3273,7 +3273,8 @@ final class Tools { * The type of guard. */ static enum Guard { - RECORD_TOSTRING; + RECORD_TOSTRING, + SUBSCRIPTION_SYNC_RECURSION; ThreadLocal tl = new ThreadLocal<>(); } @@ -3294,6 +3295,13 @@ final class Tools { V guarded(); } + /** + * Run an operation using a guard. + */ + static final void run(Guard guard, Runnable unguardedOperation, Runnable guardedOperation) { + run(guard, () -> { unguardedOperation.run(); return null; }, () -> { guardedOperation.run(); return null; }); + } + /** * Run an operation using a guard. */ diff --git a/jOOQ/src/main/java/org/jooq/tools/jdbc/JDBCUtils.java b/jOOQ/src/main/java/org/jooq/tools/jdbc/JDBCUtils.java index 5595dbb537..1439bb07f4 100644 --- a/jOOQ/src/main/java/org/jooq/tools/jdbc/JDBCUtils.java +++ b/jOOQ/src/main/java/org/jooq/tools/jdbc/JDBCUtils.java @@ -114,6 +114,9 @@ import org.jooq.tools.JooqLogger; import org.jetbrains.annotations.NotNull; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.ConnectionFactoryMetadata; + /** * JDBC-related utility methods. * @@ -169,6 +172,51 @@ public class JDBCUtils { result = dialect(url, majorVersion, minorVersion); } catch (SQLException ignore) {} + } + + if (result == SQLDialect.DEFAULT) { + // If the dialect cannot be guessed from the URL, take some other + // measures, e.g. by querying DatabaseMetaData.getDatabaseProductName() + } + + return result; + } + + /** + * "Guess" the {@link SQLDialect} from a {@link ConnectionFactory} instance. + *

+ * This method tries to guess the SQLDialect of a connection + * from the its {@link ConnectionFactoryMetadata} as obtained by + * {@link ConnectionFactory#getMetadata()}. If the dialect cannot be guessed + * from the URL, further actions may be implemented in the future. + * + * @return The appropriate {@link SQLDialect} or {@link SQLDialect#DEFAULT} + * if no dialect could be derived from the connection. Never + * null. + * @see #dialect(String) + */ + @NotNull + public static final SQLDialect dialect(ConnectionFactory connection) { + SQLDialect result = SQLDialect.DEFAULT; + + if (connection != null) { + ConnectionFactoryMetadata m = connection.getMetadata(); + String product = m.getName().toLowerCase(); + + if (product.contains("h2")) + result = H2; + else if (product.contains("mariadb")) + result = MARIADB; + else if (product.contains("mysql")) + result = MYSQL; + else if (product.contains("postgres")) + result = POSTGRES; + + + + + + } if (result == SQLDialect.DEFAULT) { diff --git a/pom.xml b/pom.xml index 7e05bca552..c927b33833 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,9 @@ 9.2.1.jre8 21.1.0.0 + + 0.9.0.M1 + 2.3.1 1.2.0 @@ -129,10 +132,41 @@ annotations 19.0.0 + io.r2dbc r2dbc-spi - 0.9.0.M1 + ${io.r2dbc.version} + + + io.projectreactor + reactor-core + 3.4.4 + + + io.r2dbc + r2dbc-h2 + 0.8.4.RELEASE + + + org.mariadb + r2dbc-mariadb + 1.0.1 + + + io.r2dbc + r2dbc-mssql + ${io.r2dbc.version} + + + com.oracle.database.r2dbc + oracle-r2dbc + 0.1.0 + + + org.postgresql + r2dbc-postgresql + ${io.r2dbc.version}