diff --git a/jOOQ/src/main/java/org/jooq/impl/DSL.java b/jOOQ/src/main/java/org/jooq/impl/DSL.java index c776a7d775..863b8d120b 100644 --- a/jOOQ/src/main/java/org/jooq/impl/DSL.java +++ b/jOOQ/src/main/java/org/jooq/impl/DSL.java @@ -757,10 +757,10 @@ public class DSL { } /** - * Create an executor with a custom connection factory and a dialect - * configured. + * Create an executor with a custom R2DBC connection factory and guess the + * dialect from it. * - * @param connectionFactory The connection factory providing jOOQ with + * @param connectionFactory The R2DBC connection factory providing jOOQ with * R2DBC connections */ @NotNull @@ -769,10 +769,10 @@ public class DSL { } /** - * Create an executor with a custom connection factory and a dialect + * Create an executor with a custom R2DBC connection factory and a dialect * configured. * - * @param connectionFactory The connection factory providing jOOQ with + * @param connectionFactory The R2DBC connection factory providing jOOQ with * R2DBC connections * @param dialect The dialect to use with objects created from this executor */ @@ -782,10 +782,10 @@ public class DSL { } /** - * Create an executor with a custom connection factory, a dialect and settings - * configured. + * Create an executor with a custom R2DBC connection factory, a dialect and + * settings configured. * - * @param connectionFactory The connection factory providing jOOQ with + * @param connectionFactory The R2DBC connection factory providing jOOQ with * R2DBC connections * @param dialect The dialect to use with objects created from this executor * @param settings The runtime settings to apply to objects created from @@ -796,6 +796,42 @@ public class DSL { return new DefaultDSLContext(connectionFactory, dialect, settings); } + /** + * Create an executor with a custom R2DBC connection and guess the dialect. + * + * @param connection The R2DBC connection + */ + @NotNull + public static DSLContext using(io.r2dbc.spi.Connection connection) { + return new DefaultDSLContext(new DefaultConnectionFactory(connection), JDBCUtils.dialect(connection)); + } + + /** + * Create an executor with a custom R2DBC connection and a dialect + * configured. + * + * @param connection The R2DBC connection + * @param dialect The dialect to use with objects created from this executor + */ + @NotNull + public static DSLContext using(io.r2dbc.spi.Connection connection, SQLDialect dialect) { + return new DefaultDSLContext(new DefaultConnectionFactory(connection), dialect); + } + + /** + * Create an executor with a custom R2DBC connection, a dialect and settings + * configured. + * + * @param connection The R2DBC connection + * @param dialect The dialect to use with objects created from this executor + * @param settings The runtime settings to apply to objects created from + * this executor + */ + @NotNull + public static DSLContext using(io.r2dbc.spi.Connection connection, SQLDialect dialect, Settings settings) { + return new DefaultDSLContext(new DefaultConnectionFactory(connection), dialect, settings); + } + /** * Create an executor from a custom configuration. * diff --git a/jOOQ/src/main/java/org/jooq/impl/DefaultConnectionFactory.java b/jOOQ/src/main/java/org/jooq/impl/DefaultConnectionFactory.java new file mode 100644 index 0000000000..387d72792b --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/impl/DefaultConnectionFactory.java @@ -0,0 +1,160 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq.impl; + +import static org.jooq.impl.R2DBC.AbstractSubscription.onRequest; + +import org.reactivestreams.Publisher; + +import io.r2dbc.spi.Batch; +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.ConnectionFactoryMetadata; +import io.r2dbc.spi.ConnectionMetadata; +import io.r2dbc.spi.IsolationLevel; +import io.r2dbc.spi.Statement; +import io.r2dbc.spi.TransactionDefinition; +import io.r2dbc.spi.ValidationDepth; + +/** + * A {@link ConnectionFactory} wrapper for a single connection, which cannot be + * closed via this connection factory. + */ +final class DefaultConnectionFactory implements ConnectionFactory { + + private final Connection connection; + + DefaultConnectionFactory(Connection connection) { + this.connection = connection; + } + + @Override + public final Publisher create() { + return s -> s.onSubscribe(onRequest(s, x -> { + x.onNext(new NonClosingConnection()); + x.onComplete(); + })); + } + + @Override + public final ConnectionFactoryMetadata getMetadata() { + return () -> connection.getMetadata().getDatabaseProductName(); + } + + private final class NonClosingConnection implements Connection { + @Override + public Publisher beginTransaction() { + return connection.beginTransaction(); + } + + @Override + public Publisher beginTransaction(TransactionDefinition definition) { + return connection.beginTransaction(definition); + } + + @Override + public Publisher close() { + return s -> s.onSubscribe(onRequest(s, x -> x.onComplete())); + } + + @Override + public Publisher commitTransaction() { + return connection.commitTransaction(); + } + + @Override + public Batch createBatch() { + return connection.createBatch(); + } + + @Override + public Publisher createSavepoint(String name) { + return connection.createSavepoint(name); + } + + @Override + public Statement createStatement(String sql) { + return connection.createStatement(sql); + } + + @Override + public boolean isAutoCommit() { + return connection.isAutoCommit(); + } + + @Override + public ConnectionMetadata getMetadata() { + return connection.getMetadata(); + } + + @Override + public IsolationLevel getTransactionIsolationLevel() { + return connection.getTransactionIsolationLevel(); + } + + @Override + public Publisher releaseSavepoint(String name) { + return connection.releaseSavepoint(name); + } + + @Override + public Publisher rollbackTransaction() { + return connection.rollbackTransaction(); + } + + @Override + public Publisher rollbackTransactionToSavepoint(String name) { + return connection.rollbackTransactionToSavepoint(name); + } + + @Override + public Publisher setAutoCommit(boolean autoCommit) { + return connection.setAutoCommit(autoCommit); + } + + @Override + public Publisher setTransactionIsolationLevel(IsolationLevel isolationLevel) { + return connection.setTransactionIsolationLevel(isolationLevel); + } + + @Override + public Publisher validate(ValidationDepth depth) { + return connection.validate(depth); + } + } +} diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java index 7c94b8cac7..0602a86d3c 100644 --- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java +++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java @@ -72,6 +72,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -128,6 +129,15 @@ final class R2DBC { final Subscriber subscriber; final Guard guard; + static Subscription onRequest(Subscriber s, Consumer> onRequest) { + return new AbstractSubscription(s) { + @Override + void request0() { + onRequest.accept(subscriber); + } + }; + } + AbstractSubscription(Subscriber subscriber) { this.completed = new AtomicBoolean(); this.requested = new AtomicLong(); @@ -180,7 +190,7 @@ final class R2DBC { } abstract void request0(); - abstract void cancel0(boolean cancelled); + void cancel0(boolean cancelled) {} } // ------------------------------------------------------------------------- @@ -1207,8 +1217,5 @@ final class R2DBC { subscriber.onError(t); } } - - @Override - final void cancel0(boolean cancelled) {} } } diff --git a/jOOQ/src/main/java/org/jooq/tools/jdbc/JDBCUtils.java b/jOOQ/src/main/java/org/jooq/tools/jdbc/JDBCUtils.java index 1439bb07f4..87414c3bf7 100644 --- a/jOOQ/src/main/java/org/jooq/tools/jdbc/JDBCUtils.java +++ b/jOOQ/src/main/java/org/jooq/tools/jdbc/JDBCUtils.java @@ -116,6 +116,7 @@ import org.jetbrains.annotations.NotNull; import io.r2dbc.spi.ConnectionFactory; import io.r2dbc.spi.ConnectionFactoryMetadata; +import io.r2dbc.spi.ConnectionMetadata; /** * JDBC-related utility methods. @@ -187,8 +188,8 @@ public class JDBCUtils { *

* This method tries to guess the SQLDialect of a connection * from the its {@link ConnectionFactoryMetadata} as obtained by - * {@link ConnectionFactory#getMetadata()}. If the dialect cannot be guessed - * from the URL, further actions may be implemented in the future. + * {@link ConnectionFactory#getMetadata()}. If the dialect cannot be + * guessed, further actions may be implemented in the future. * * @return The appropriate {@link SQLDialect} or {@link SQLDialect#DEFAULT} * if no dialect could be derived from the connection. Never @@ -199,25 +200,8 @@ public class JDBCUtils { public static final SQLDialect dialect(ConnectionFactory connection) { SQLDialect result = SQLDialect.DEFAULT; - if (connection != null) { - ConnectionFactoryMetadata m = connection.getMetadata(); - String product = m.getName().toLowerCase(); - - if (product.contains("h2")) - result = H2; - else if (product.contains("mariadb")) - result = MARIADB; - else if (product.contains("mysql")) - result = MYSQL; - else if (product.contains("postgres")) - result = POSTGRES; - - - - - - - } + if (connection != null) + result = dialectFromProductName(connection.getMetadata().getName()); if (result == SQLDialect.DEFAULT) { // If the dialect cannot be guessed from the URL, take some other @@ -227,6 +211,56 @@ public class JDBCUtils { return result; } + /** + * "Guess" the {@link SQLDialect} from an R2DBC + * {@link io.r2dbc.spi.Connection} instance. + *

+ * This method tries to guess the SQLDialect of a connection + * from the its {@link ConnectionMetadata} as obtained by + * {@link io.r2dbc.spi.Connection#getMetadata()}. If the dialect cannot be, + * further actions may be implemented in the future. + * + * @return The appropriate {@link SQLDialect} or {@link SQLDialect#DEFAULT} + * if no dialect could be derived from the connection. Never + * null. + * @see #dialect(String) + */ + @NotNull + public static final SQLDialect dialect(io.r2dbc.spi.Connection connection) { + SQLDialect result = SQLDialect.DEFAULT; + + if (connection != null) + result = dialectFromProductName(connection.getMetadata().getDatabaseProductName()); + + if (result == SQLDialect.DEFAULT) { + // If the dialect cannot be guessed from the URL, take some other + // measures, e.g. by querying DatabaseMetaData.getDatabaseProductName() + } + + return result; + } + + private static SQLDialect dialectFromProductName(String product) { + String p = product.toLowerCase().replace(" ", ""); + + if (p.contains("h2")) + return H2; + else if (p.contains("mariadb")) + return MARIADB; + else if (p.contains("mysql")) + return MYSQL; + else if (p.contains("postgres")) + return POSTGRES; + + + + + + + else + return DEFAULT; + } + @NotNull private static final SQLDialect dialect(String url, int majorVersion, int minorVersion) { SQLDialect family = dialect(url).family();