[jOOQ/jOOQ#11700] Add DSL.using(io.r2dbc.spi.Connection)

It should be possible to wrap a io.r2dbc.spi.Connection in a single-connection R2DBC ConnectionFactory, which produces Connection proxies that never close the underlying Connection. This works the same way as using DSL.using(Connection) which is wrapped in a JDBC ConnectionProvider that never closes the underlying Connection.
This commit is contained in:
Lukas Eder 2021-04-08 12:54:25 +02:00
parent 30aa7cd241
commit aa7df66998
4 changed files with 270 additions and 33 deletions

View File

@ -757,10 +757,10 @@ public class DSL {
}
/**
* Create an executor with a custom connection factory and a dialect
* configured.
* Create an executor with a custom R2DBC connection factory and guess the
* dialect from it.
*
* @param connectionFactory The connection factory providing jOOQ with
* @param connectionFactory The R2DBC connection factory providing jOOQ with
* R2DBC connections
*/
@NotNull
@ -769,10 +769,10 @@ public class DSL {
}
/**
* Create an executor with a custom connection factory and a dialect
* Create an executor with a custom R2DBC connection factory and a dialect
* configured.
*
* @param connectionFactory The connection factory providing jOOQ with
* @param connectionFactory The R2DBC connection factory providing jOOQ with
* R2DBC connections
* @param dialect The dialect to use with objects created from this executor
*/
@ -782,10 +782,10 @@ public class DSL {
}
/**
* Create an executor with a custom connection factory, a dialect and settings
* configured.
* Create an executor with a custom R2DBC connection factory, a dialect and
* settings configured.
*
* @param connectionFactory The connection factory providing jOOQ with
* @param connectionFactory The R2DBC connection factory providing jOOQ with
* R2DBC connections
* @param dialect The dialect to use with objects created from this executor
* @param settings The runtime settings to apply to objects created from
@ -796,6 +796,42 @@ public class DSL {
return new DefaultDSLContext(connectionFactory, dialect, settings);
}
/**
* Create an executor with a custom R2DBC connection and guess the dialect.
*
* @param connection The R2DBC connection
*/
@NotNull
public static DSLContext using(io.r2dbc.spi.Connection connection) {
return new DefaultDSLContext(new DefaultConnectionFactory(connection), JDBCUtils.dialect(connection));
}
/**
* Create an executor with a custom R2DBC connection and a dialect
* configured.
*
* @param connection The R2DBC connection
* @param dialect The dialect to use with objects created from this executor
*/
@NotNull
public static DSLContext using(io.r2dbc.spi.Connection connection, SQLDialect dialect) {
return new DefaultDSLContext(new DefaultConnectionFactory(connection), dialect);
}
/**
* Create an executor with a custom R2DBC connection, a dialect and settings
* configured.
*
* @param connection The R2DBC connection
* @param dialect The dialect to use with objects created from this executor
* @param settings The runtime settings to apply to objects created from
* this executor
*/
@NotNull
public static DSLContext using(io.r2dbc.spi.Connection connection, SQLDialect dialect, Settings settings) {
return new DefaultDSLContext(new DefaultConnectionFactory(connection), dialect, settings);
}
/**
* Create an executor from a custom configuration.
*

View File

@ -0,0 +1,160 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Other licenses:
* -----------------------------------------------------------------------------
* Commercial licenses for this work are available. These replace the above
* ASL 2.0 and offer limited warranties, support, maintenance, and commercial
* database integrations.
*
* For more information, please visit: http://www.jooq.org/licenses
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package org.jooq.impl;
import static org.jooq.impl.R2DBC.AbstractSubscription.onRequest;
import org.reactivestreams.Publisher;
import io.r2dbc.spi.Batch;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryMetadata;
import io.r2dbc.spi.ConnectionMetadata;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.Statement;
import io.r2dbc.spi.TransactionDefinition;
import io.r2dbc.spi.ValidationDepth;
/**
* A {@link ConnectionFactory} wrapper for a single connection, which cannot be
* closed via this connection factory.
*/
final class DefaultConnectionFactory implements ConnectionFactory {
private final Connection connection;
DefaultConnectionFactory(Connection connection) {
this.connection = connection;
}
@Override
public final Publisher<? extends Connection> create() {
return s -> s.onSubscribe(onRequest(s, x -> {
x.onNext(new NonClosingConnection());
x.onComplete();
}));
}
@Override
public final ConnectionFactoryMetadata getMetadata() {
return () -> connection.getMetadata().getDatabaseProductName();
}
private final class NonClosingConnection implements Connection {
@Override
public Publisher<Void> beginTransaction() {
return connection.beginTransaction();
}
@Override
public Publisher<Void> beginTransaction(TransactionDefinition definition) {
return connection.beginTransaction(definition);
}
@Override
public Publisher<Void> close() {
return s -> s.onSubscribe(onRequest(s, x -> x.onComplete()));
}
@Override
public Publisher<Void> commitTransaction() {
return connection.commitTransaction();
}
@Override
public Batch createBatch() {
return connection.createBatch();
}
@Override
public Publisher<Void> createSavepoint(String name) {
return connection.createSavepoint(name);
}
@Override
public Statement createStatement(String sql) {
return connection.createStatement(sql);
}
@Override
public boolean isAutoCommit() {
return connection.isAutoCommit();
}
@Override
public ConnectionMetadata getMetadata() {
return connection.getMetadata();
}
@Override
public IsolationLevel getTransactionIsolationLevel() {
return connection.getTransactionIsolationLevel();
}
@Override
public Publisher<Void> releaseSavepoint(String name) {
return connection.releaseSavepoint(name);
}
@Override
public Publisher<Void> rollbackTransaction() {
return connection.rollbackTransaction();
}
@Override
public Publisher<Void> rollbackTransactionToSavepoint(String name) {
return connection.rollbackTransactionToSavepoint(name);
}
@Override
public Publisher<Void> setAutoCommit(boolean autoCommit) {
return connection.setAutoCommit(autoCommit);
}
@Override
public Publisher<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
return connection.setTransactionIsolationLevel(isolationLevel);
}
@Override
public Publisher<Boolean> validate(ValidationDepth depth) {
return connection.validate(depth);
}
}
}

View File

@ -72,6 +72,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@ -128,6 +129,15 @@ final class R2DBC {
final Subscriber<? super T> subscriber;
final Guard guard;
static <T> Subscription onRequest(Subscriber<? super T> s, Consumer<? super Subscriber<? super T>> onRequest) {
return new AbstractSubscription<T>(s) {
@Override
void request0() {
onRequest.accept(subscriber);
}
};
}
AbstractSubscription(Subscriber<? super T> subscriber) {
this.completed = new AtomicBoolean();
this.requested = new AtomicLong();
@ -180,7 +190,7 @@ final class R2DBC {
}
abstract void request0();
abstract void cancel0(boolean cancelled);
void cancel0(boolean cancelled) {}
}
// -------------------------------------------------------------------------
@ -1207,8 +1217,5 @@ final class R2DBC {
subscriber.onError(t);
}
}
@Override
final void cancel0(boolean cancelled) {}
}
}

View File

@ -116,6 +116,7 @@ import org.jetbrains.annotations.NotNull;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryMetadata;
import io.r2dbc.spi.ConnectionMetadata;
/**
* JDBC-related utility methods.
@ -187,8 +188,8 @@ public class JDBCUtils {
* <p>
* This method tries to guess the <code>SQLDialect</code> of a connection
* from the its {@link ConnectionFactoryMetadata} as obtained by
* {@link ConnectionFactory#getMetadata()}. If the dialect cannot be guessed
* from the URL, further actions may be implemented in the future.
* {@link ConnectionFactory#getMetadata()}. If the dialect cannot be
* guessed, further actions may be implemented in the future.
*
* @return The appropriate {@link SQLDialect} or {@link SQLDialect#DEFAULT}
* if no dialect could be derived from the connection. Never
@ -199,25 +200,8 @@ public class JDBCUtils {
public static final SQLDialect dialect(ConnectionFactory connection) {
SQLDialect result = SQLDialect.DEFAULT;
if (connection != null) {
ConnectionFactoryMetadata m = connection.getMetadata();
String product = m.getName().toLowerCase();
if (product.contains("h2"))
result = H2;
else if (product.contains("mariadb"))
result = MARIADB;
else if (product.contains("mysql"))
result = MYSQL;
else if (product.contains("postgres"))
result = POSTGRES;
}
if (connection != null)
result = dialectFromProductName(connection.getMetadata().getName());
if (result == SQLDialect.DEFAULT) {
// If the dialect cannot be guessed from the URL, take some other
@ -227,6 +211,56 @@ public class JDBCUtils {
return result;
}
/**
* "Guess" the {@link SQLDialect} from an R2DBC
* {@link io.r2dbc.spi.Connection} instance.
* <p>
* This method tries to guess the <code>SQLDialect</code> of a connection
* from the its {@link ConnectionMetadata} as obtained by
* {@link io.r2dbc.spi.Connection#getMetadata()}. If the dialect cannot be,
* further actions may be implemented in the future.
*
* @return The appropriate {@link SQLDialect} or {@link SQLDialect#DEFAULT}
* if no dialect could be derived from the connection. Never
* <code>null</code>.
* @see #dialect(String)
*/
@NotNull
public static final SQLDialect dialect(io.r2dbc.spi.Connection connection) {
SQLDialect result = SQLDialect.DEFAULT;
if (connection != null)
result = dialectFromProductName(connection.getMetadata().getDatabaseProductName());
if (result == SQLDialect.DEFAULT) {
// If the dialect cannot be guessed from the URL, take some other
// measures, e.g. by querying DatabaseMetaData.getDatabaseProductName()
}
return result;
}
private static SQLDialect dialectFromProductName(String product) {
String p = product.toLowerCase().replace(" ", "");
if (p.contains("h2"))
return H2;
else if (p.contains("mariadb"))
return MARIADB;
else if (p.contains("mysql"))
return MYSQL;
else if (p.contains("postgres"))
return POSTGRES;
else
return DEFAULT;
}
@NotNull
private static final SQLDialect dialect(String url, int majorVersion, int minorVersion) {
SQLDialect family = dialect(url).family();