This commit is contained in:
Lukas Eder 2021-04-01 13:24:00 +02:00
parent 7b6148b1b7
commit 6ee6d4eb93
6 changed files with 244 additions and 55 deletions

View File

@ -422,6 +422,8 @@ import org.jooq.types.UShort;
import org.jetbrains.annotations.NotNull;
import io.r2dbc.spi.ConnectionFactory;
/**
* A DSL "entry point" providing implementations to the <code>org.jooq</code>
* interfaces.
@ -754,6 +756,46 @@ public class DSL {
return new DefaultDSLContext(connectionProvider, dialect, settings);
}
/**
* Create an executor with a custom connection factory and a dialect
* configured.
*
* @param connectionFactory The connection factory providing jOOQ with
* R2DBC connections
*/
@NotNull
public static DSLContext using(ConnectionFactory connectionFactory) {
return new DefaultDSLContext(connectionFactory, JDBCUtils.dialect(connectionFactory));
}
/**
* Create an executor with a custom connection factory and a dialect
* configured.
*
* @param connectionFactory The connection factory providing jOOQ with
* R2DBC connections
* @param dialect The dialect to use with objects created from this executor
*/
@NotNull
public static DSLContext using(ConnectionFactory connectionFactory, SQLDialect dialect) {
return new DefaultDSLContext(connectionFactory, dialect);
}
/**
* Create an executor with a custom connection factory, a dialect and settings
* configured.
*
* @param connectionFactory The connection factory providing jOOQ with
* R2DBC connections
* @param dialect The dialect to use with objects created from this executor
* @param settings The runtime settings to apply to objects created from
* this executor
*/
@NotNull
public static DSLContext using(ConnectionFactory connectionFactory, SQLDialect dialect, Settings settings) {
return new DefaultDSLContext(connectionFactory, dialect, settings);
}
/**
* Create an executor from a custom configuration.
*

View File

@ -338,6 +338,14 @@ public class DefaultDSLContext extends AbstractScope implements DSLContext, Seri
this(new DefaultConfiguration(connectionProvider, dialect, settings));
}
public DefaultDSLContext(ConnectionFactory connectionFactory, SQLDialect dialect) {
this(connectionFactory, dialect, null);
}
public DefaultDSLContext(ConnectionFactory connectionFactory, SQLDialect dialect, Settings settings) {
this(new DefaultConfiguration().set(connectionFactory).set(dialect).set(settings));
}
public DefaultDSLContext(Configuration configuration) {
super(configuration, configuration == null ? null : configuration.data());
}

View File

@ -65,6 +65,7 @@ import java.util.ArrayDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
@ -82,14 +83,15 @@ import org.jooq.SQLDialect;
import org.jooq.conf.Settings;
import org.jooq.conf.SettingsTools;
import org.jooq.impl.DefaultRenderContext.Rendered;
import org.jooq.impl.R2DBC.RowCountSubscriber;
import org.jooq.impl.Tools.ThreadGuard;
import org.jooq.impl.Tools.ThreadGuard.Guard;
import org.jooq.tools.Convert;
import org.jooq.tools.JooqLogger;
import org.jooq.tools.jdbc.DefaultPreparedStatement;
import org.jooq.tools.jdbc.DefaultResultSet;
import org.jooq.tools.jdbc.JDBCUtils;
import org.jooq.tools.jdbc.MockArray;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@ -110,12 +112,70 @@ final class R2DBC {
private static final JooqLogger log = JooqLogger.getLogger(R2DBC.class);
static volatile boolean is_0_9 = true;
// -------------------------------------------------------------------------
// Utilities to pass the TCK
// -------------------------------------------------------------------------
static abstract class AbstractSubscription<T> implements Subscription {
final AtomicBoolean completed;
final AtomicLong requested;
final Subscriber<? super T> subscriber;
AbstractSubscription(Subscriber<? super T> subscriber) {
this.completed = new AtomicBoolean();
this.requested = new AtomicLong();
this.subscriber = org.jooq.Publisher.subscriber(
subscriber::onSubscribe,
subscriber::onNext,
subscriber::onError,
() -> {
completed.set(true);
subscriber.onComplete();
}
);
}
@Override
public final void request(long n) {
if (n <= 0) {
subscriber.onError(new IllegalArgumentException("Rule 3.9 non-positive request signals are illegal"));
}
else if (!completed.get()) {
requested.accumulateAndGet(n, (x, y) -> {
long r = x + y;
// See Long::addExact
if (((x ^ r) & (y ^ r)) < 0)
return Long.MAX_VALUE;
else
return r;
});
ThreadGuard.run(Guard.SUBSCRIPTION_SYNC_RECURSION, this::request0, () -> {});
}
}
@Override
public final void cancel() {
if (!completed.getAndSet(true))
cancel0();
}
abstract void request0();
abstract void cancel0();
}
// -------------------------------------------------------------------------
// R2DBC implementations
// -------------------------------------------------------------------------
static final class Forwarding<T> implements Subscriber<T> {
final AbstractSubscription<T> downstream;
final AbstractConnectionSubscription<T> downstream;
final AtomicLong expected;
Forwarding(AbstractSubscription<T> downstream, long expected) {
Forwarding(AbstractConnectionSubscription<T> downstream, long expected) {
this.downstream = downstream;
this.expected = new AtomicLong(expected);
}
@ -145,9 +205,9 @@ final class R2DBC {
static abstract class AbstractResultSubscriber<T> implements Subscriber<Result> {
final AbstractSubscription<? super T> downstream;
final AbstractConnectionSubscription<? super T> downstream;
AbstractResultSubscriber(AbstractSubscription<? super T> downstream) {
AbstractResultSubscriber(AbstractConnectionSubscription<? super T> downstream) {
this.downstream = downstream;
}
@ -164,7 +224,7 @@ final class R2DBC {
final Forwarding<? super Integer> forwarding;
RowCountSubscriber(AbstractSubscription<? super Integer> downstream, long expected) {
RowCountSubscriber(AbstractConnectionSubscription<? super Integer> downstream, long expected) {
super(downstream);
this.forwarding = new Forwarding<>(downstream, expected);
@ -185,7 +245,7 @@ final class R2DBC {
final Q query;
ResultSubscriber(Q query, AbstractSubscription<? super R> downstream) {
ResultSubscriber(Q query, AbstractConnectionSubscription<? super R> downstream) {
super(downstream);
this.query = query;
@ -242,10 +302,10 @@ final class R2DBC {
static abstract class ConnectionSubscriber<T> implements Subscriber<Connection> {
final AbstractSubscription<T> downstream;
final AbstractConnectionSubscription<T> downstream;
final AtomicReference<Connection> connection;
ConnectionSubscriber(AbstractSubscription<T> downstream) {
ConnectionSubscriber(AbstractConnectionSubscription<T> downstream) {
this.downstream = downstream;
this.connection = new AtomicReference<>();
}
@ -275,12 +335,12 @@ final class R2DBC {
static final class QueryExecutionSubscriber<T, Q extends Query> extends ConnectionSubscriber<T> {
final Q query;
final BiFunction<Q, AbstractSubscription<T>, Subscriber<Result>> resultSubscriber;
final BiFunction<Q, AbstractConnectionSubscription<T>, Subscriber<Result>> resultSubscriber;
QueryExecutionSubscriber(
Q query,
QuerySubscription<T, Q> downstream,
BiFunction<Q, AbstractSubscription<T>, Subscriber<Result>> resultSubscriber
BiFunction<Q, AbstractConnectionSubscription<T>, Subscriber<Result>> resultSubscriber
) {
super(downstream);
@ -392,33 +452,29 @@ final class R2DBC {
}
}
static abstract class AbstractSubscription<T> implements Subscription {
static abstract class AbstractConnectionSubscription<T> extends AbstractSubscription<T> {
final Subscriber<? super T> subscriber;
final AtomicBoolean subscribed;
final AtomicLong requested;
final Publisher<? extends Connection> connection;
AbstractSubscription(
AbstractConnectionSubscription(
Configuration configuration,
Subscriber<? super T> subscriber
) {
this.subscriber = subscriber;
super(subscriber);
this.subscribed = new AtomicBoolean();
this.requested = new AtomicLong();
this.connection = configuration.connectionFactory().create();
}
@Override
public final void request(long n) {
requested.getAndAdd(n);
final void request0() {
if (!subscribed.getAndSet(true))
connection.subscribe(delegate());
}
@Override
public final void cancel() {
final void cancel0() {
delegate().connection.updateAndGet(c -> {
// close() calls on already closed resources have no effect, so
@ -428,20 +484,21 @@ final class R2DBC {
return null;
});
subscriber.onComplete();
}
abstract ConnectionSubscriber<T> delegate();
}
static final class QuerySubscription<T, Q extends Query> extends AbstractSubscription<T> {
static final class QuerySubscription<T, Q extends Query> extends AbstractConnectionSubscription<T> {
final QueryExecutionSubscriber<T, Q> queryExecutionSubscriber;
QuerySubscription(
Q query,
Subscriber<? super T> subscriber,
BiFunction<Q, AbstractSubscription<T>, Subscriber<Result>> resultSubscriber
BiFunction<Q, AbstractConnectionSubscription<T>, Subscriber<Result>> resultSubscriber
) {
super(query.configuration(), subscriber);
@ -454,7 +511,7 @@ final class R2DBC {
}
}
static final class BatchSubscription<B extends AbstractBatch> extends AbstractSubscription<Integer> {
static final class BatchSubscription<B extends AbstractBatch> extends AbstractConnectionSubscription<Integer> {
final ConnectionSubscriber<Integer> batchSubscriber;
@ -1017,36 +1074,32 @@ final class R2DBC {
// XXX: Legacy implementation
// -------------------------------------------------------------------------
static final class BlockingRecordSubscription<R extends Record> implements Subscription {
static final class BlockingRecordSubscription<R extends Record> extends AbstractSubscription<R> {
private final ResultQueryTrait<R> query;
private final Subscriber<? super R> subscriber;
private final ArrayDeque<R> buffer;
private volatile Cursor<R> c;
BlockingRecordSubscription(ResultQueryTrait<R> query, Subscriber<? super R> subscriber) {
super(subscriber);
this.query = query;
this.subscriber = subscriber;
this.buffer = new ArrayDeque<>();
}
@Override
public final synchronized void request(long n) {
int i = (int) Math.min(n, Integer.MAX_VALUE);
final synchronized void request0() {
try {
if (c == null)
c = query.fetchLazyNonAutoClosing();
if (buffer.size() < i)
buffer.addAll(c.fetchNext(i - buffer.size()));
while (requested.getAndUpdate(l -> Math.max(0, l - 1)) > 0) {
R r = c.fetchNext();
boolean complete = buffer.size() < i;
while (!buffer.isEmpty())
subscriber.onNext(buffer.pollFirst());
if (r == null) {
subscriber.onComplete();
safeClose(c);
break;
}
if (complete) {
subscriber.onComplete();
safeClose(c);
subscriber.onNext(r);
}
}
catch (Throwable t) {
@ -1056,28 +1109,24 @@ final class R2DBC {
}
@Override
public final void cancel() {
final void cancel0() {
safeClose(c);
}
}
static final class BlockingRowCountSubscription implements Subscription {
final AbstractRowCountQuery query;
final Subscriber<? super Integer> subscriber;
final AtomicBoolean executed;
static final class BlockingRowCountSubscription extends AbstractSubscription<Integer> {
final AbstractRowCountQuery query;
BlockingRowCountSubscription(AbstractRowCountQuery query, Subscriber<? super Integer> subscriber) {
super(subscriber);
this.query = query;
this.subscriber = subscriber;
this.executed = new AtomicBoolean();
}
@Override
public void request(long n) {
final void request0() {
try {
if (!executed.getAndSet(true))
subscriber.onNext(query.execute());
subscriber.onNext(query.execute());
subscriber.onComplete();
}
catch (Throwable t) {
@ -1086,6 +1135,6 @@ final class R2DBC {
}
@Override
public void cancel() {}
final void cancel0() {}
}
}

View File

@ -3273,7 +3273,8 @@ final class Tools {
* The type of guard.
*/
static enum Guard {
RECORD_TOSTRING;
RECORD_TOSTRING,
SUBSCRIPTION_SYNC_RECURSION;
ThreadLocal<Object> tl = new ThreadLocal<>();
}
@ -3294,6 +3295,13 @@ final class Tools {
V guarded();
}
/**
* Run an operation using a guard.
*/
static final void run(Guard guard, Runnable unguardedOperation, Runnable guardedOperation) {
run(guard, () -> { unguardedOperation.run(); return null; }, () -> { guardedOperation.run(); return null; });
}
/**
* Run an operation using a guard.
*/

View File

@ -114,6 +114,9 @@ import org.jooq.tools.JooqLogger;
import org.jetbrains.annotations.NotNull;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryMetadata;
/**
* JDBC-related utility methods.
*
@ -169,6 +172,51 @@ public class JDBCUtils {
result = dialect(url, majorVersion, minorVersion);
}
catch (SQLException ignore) {}
}
if (result == SQLDialect.DEFAULT) {
// If the dialect cannot be guessed from the URL, take some other
// measures, e.g. by querying DatabaseMetaData.getDatabaseProductName()
}
return result;
}
/**
* "Guess" the {@link SQLDialect} from a {@link ConnectionFactory} instance.
* <p>
* This method tries to guess the <code>SQLDialect</code> of a connection
* from the its {@link ConnectionFactoryMetadata} as obtained by
* {@link ConnectionFactory#getMetadata()}. If the dialect cannot be guessed
* from the URL, further actions may be implemented in the future.
*
* @return The appropriate {@link SQLDialect} or {@link SQLDialect#DEFAULT}
* if no dialect could be derived from the connection. Never
* <code>null</code>.
* @see #dialect(String)
*/
@NotNull
public static final SQLDialect dialect(ConnectionFactory connection) {
SQLDialect result = SQLDialect.DEFAULT;
if (connection != null) {
ConnectionFactoryMetadata m = connection.getMetadata();
String product = m.getName().toLowerCase();
if (product.contains("h2"))
result = H2;
else if (product.contains("mariadb"))
result = MARIADB;
else if (product.contains("mysql"))
result = MYSQL;
else if (product.contains("postgres"))
result = POSTGRES;
}
if (result == SQLDialect.DEFAULT) {

36
pom.xml
View File

@ -37,6 +37,9 @@
<sqlserver.version>9.2.1.jre8</sqlserver.version>
<oracle.version>21.1.0.0</oracle.version>
<!-- R2DBC SPI version and some matching driver versions -->
<io.r2dbc.version>0.9.0.M1</io.r2dbc.version>
<!-- From JDK 11 onwards, we need to depend on the JAXB API explicitly -->
<jaxb.version>2.3.1</jaxb.version>
<javax.activation.version>1.2.0</javax.activation.version>
@ -129,10 +132,41 @@
<artifactId>annotations</artifactId>
<version>19.0.0</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
<version>0.9.0.M1</version>
<version>${io.r2dbc.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.4</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<version>0.8.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.mariadb</groupId>
<artifactId>r2dbc-mariadb</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-mssql</artifactId>
<version>${io.r2dbc.version}</version>
</dependency>
<dependency>
<groupId>com.oracle.database.r2dbc</groupId>
<artifactId>oracle-r2dbc</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>${io.r2dbc.version}</version>
</dependency>
<dependency>