[jOOQ/jOOQ#11700] Support the third party MySQL driver

This commit is contained in:
Lukas Eder 2021-04-09 15:58:21 +02:00
parent a219550f88
commit d32765ee28
2 changed files with 131 additions and 27 deletions

View File

@ -532,7 +532,7 @@ public class DSL {
*/
@NotNull
public static CloseableDSLContext using(String url) {
if (url.startsWith("r2dbc:")) {
if (url.startsWith("r2dbc")) {
io.r2dbc.spi.Connection connection = R2DBC.getConnection(url);
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true), JDBCUtils.dialect(connection));
}
@ -573,7 +573,7 @@ public class DSL {
*/
@NotNull
public static CloseableDSLContext using(String url, String username, String password) {
if (url.startsWith("r2dbc:")) {
if (url.startsWith("r2dbc")) {
io.r2dbc.spi.Connection connection = R2DBC.getConnection(url, username, password);
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true), JDBCUtils.dialect(connection));
}
@ -589,7 +589,7 @@ public class DSL {
}
/**
* Create an executor from a JDBC connection URL.
* Create an executor from a JDBC or R2DBC connection URL.
* <p>
* Clients must ensure connections are closed properly by calling
* {@link CloseableDSLContext#close()} on the resulting {@link DSLContext}.
@ -603,7 +603,8 @@ public class DSL {
* }
* </pre></code>
* <p>
* This API does not yet support R2DBC connection URLs.
* Both acquisition and release of JDBC and R2DBC connection URLs are
* blocking.
*
* @param url The connection URL.
* @param properties The connection properties.
@ -612,12 +613,18 @@ public class DSL {
*/
@NotNull
public static CloseableDSLContext using(String url, Properties properties) {
try {
Connection connection = DriverManager.getConnection(url, properties);
return new DefaultCloseableDSLContext(new DefaultConnectionProvider(connection, true), JDBCUtils.dialect(connection));
if (url.startsWith("r2dbc")) {
io.r2dbc.spi.Connection connection = R2DBC.getConnection(url, properties);
return new DefaultCloseableDSLContext(new DefaultConnectionFactory(connection, true), JDBCUtils.dialect(connection));
}
catch (SQLException e) {
throw Tools.translate("Error when initialising Connection", e);
else {
try {
Connection connection = DriverManager.getConnection(url, properties);
return new DefaultCloseableDSLContext(new DefaultConnectionProvider(connection, true), JDBCUtils.dialect(connection));
}
catch (SQLException e) {
throw Tools.translate("Error when initialising Connection", e);
}
}
}

View File

@ -65,10 +65,10 @@ import java.sql.Types;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -80,6 +80,7 @@ import java.util.function.Supplier;
import org.jooq.BindingGetResultSetContext;
import org.jooq.Configuration;
import org.jooq.Converter;
import org.jooq.Cursor;
import org.jooq.DataType;
import org.jooq.Field;
@ -91,6 +92,7 @@ import org.jooq.SQLDialect;
import org.jooq.conf.Settings;
import org.jooq.conf.SettingsTools;
import org.jooq.exception.DataAccessException;
import org.jooq.exception.DataTypeException;
import org.jooq.impl.DefaultRenderContext.Rendered;
import org.jooq.impl.Tools.ThreadGuard;
import org.jooq.impl.Tools.ThreadGuard.Guard;
@ -108,8 +110,9 @@ import io.r2dbc.spi.Batch;
import io.r2dbc.spi.ColumnMetadata;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.ConnectionFactoryOptions.Builder;
import io.r2dbc.spi.Option;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
@ -298,10 +301,10 @@ final class R2DBC {
@SuppressWarnings("unchecked")
@Override
public final void onNext(Result r) {
r.map((row, m) -> {
r.map((row, meta) -> {
try {
// TODO: Cache this getFields() call
Field<?>[] fields = query.getFields(new R2DBCResultSetMetaData(query.configuration(), m));
Field<?>[] fields = query.getFields(new R2DBCResultSetMetaData(query.configuration(), meta));
// TODO: This call is duplicated from CursorImpl and related classes.
// Refactor this call to make sure code is re-used, especially when
@ -314,7 +317,7 @@ final class R2DBC {
DefaultBindingGetResultSetContext<?> ctx = new DefaultBindingGetResultSetContext<>(
query.configuration(),
query.configuration().data(),
new R2DBCResultSet(query.configuration(), row),
new R2DBCResultSet(query.configuration(), row, meta),
0
);
@ -655,18 +658,43 @@ final class R2DBC {
}
static final Connection getConnection(String url) {
return block(ConnectionFactories.get(url).create());
return getConnection(url, new Properties());
}
static final Connection getConnection(String url, String username, String password) {
return block(ConnectionFactories.get(
ConnectionFactoryOptions
.parse(url)
.mutate()
.option(ConnectionFactoryOptions.USER, username)
.option(ConnectionFactoryOptions.PASSWORD, password)
.build()
).create());
Properties properties = new Properties();
properties.setProperty("user", username);
properties.setProperty("password", password);
return getConnection(url, new Properties());
}
static final Connection getConnection(String url, Properties properties) {
if (properties.isEmpty())
return block(ConnectionFactories.get(url).create());
Builder builder = ConnectionFactoryOptions.parse(url).mutate();
properties.forEach((k, v) -> {
if ("user".equals(k))
setOption(builder, ConnectionFactoryOptions.USER, v);
else if ("password".equals(k))
setOption(builder, ConnectionFactoryOptions.PASSWORD, v);
else if ("host".equals(k))
setOption(builder, ConnectionFactoryOptions.HOST, v);
else if ("port".equals(k))
setOption(builder, ConnectionFactoryOptions.PORT, Integer.parseInt("" + v));
else if ("database".equals(k))
setOption(builder, ConnectionFactoryOptions.DATABASE, v);
else if ("ssl".equals(k))
setOption(builder, ConnectionFactoryOptions.SSL, v);
else
setOption(builder, Option.valueOf("" + k), v);
});
return block(ConnectionFactories.get(builder.build()).create());
}
private static <T> Builder setOption(Builder builder, Option<T> option, Object v) {
return builder.option(option, option.cast(v));
}
// -------------------------------------------------------------------------
@ -770,7 +798,17 @@ final class R2DBC {
@Override
public final void setBoolean(int parameterIndex, boolean x) throws SQLException {
bindNonNull(parameterIndex, x);
switch (c.family()) {
// Workaround for https://github.com/mirromutth/r2dbc-mysql/issues/178
case MYSQL:
bindNonNull(parameterIndex, x ? 1 : 0);
break;
default:
bindNonNull(parameterIndex, x);
break;
}
}
@Override
@ -887,13 +925,15 @@ final class R2DBC {
final Configuration c;
final Row r;
final RowMetadata m;
boolean wasNull;
R2DBCResultSet(Configuration c, Row r) {
R2DBCResultSet(Configuration c, Row r, RowMetadata m) {
super(null, null, () -> new SQLFeatureNotSupportedException("Unsupported operation of the JDBC to R2DBC bridge."));
this.c = c;
this.r = r;
this.r = new DefaultRow(c, r);
this.m = m;
}
private final <T> T wasNull(T nullable) {
@ -1011,6 +1051,58 @@ final class R2DBC {
public final Array getArray(int columnIndex) throws SQLException {
return new MockArray<>(c.dialect(), (Object[]) nullable(columnIndex, Object.class), Object[].class);
}
private static final class DefaultRow implements Row {
final Configuration c;
final Row r;
DefaultRow(Configuration c, Row r) {
this.c = c;
this.r = r;
}
// These methods are proxied for some drivers that can't convert
// between data types. See:
// - https://github.com/mirromutth/r2dbc-mysql/issues/177
// - https://github.com/r2dbc/r2dbc-h2/issues/190
@Override
public final <T> T get(int index, Class<T> uType) {
switch (c.family()) {
case H2:
case MYSQL:
return get0(r.get(index), uType);
default:
return r.get(index, uType);
}
}
@Override
public final <T> T get(String name, Class<T> uType) {
switch (c.family()) {
case H2:
case MYSQL:
return get0(r.get(name), uType);
default:
return r.get(name, uType);
}
}
@SuppressWarnings("unchecked")
private final <T> T get0(Object o, Class<T> uType) {
if (o == null)
return null;
Converter<Object, T> converter = c.converterProvider().provide((Class<Object>) o.getClass(), uType);
if (converter == null)
throw new DataTypeException("Cannot convert from " + o.getClass() + " to " + uType + ". Please report an issue here: https://github.com/jOOQ/jOOQ/issues/new. As a workaround, you can implement a ConverterProvider.");
else
return converter.from(o);
}
}
}
static final class R2DBCResultSetMetaData implements ResultSetMetaData {
@ -1175,8 +1267,13 @@ final class R2DBC {
settings = SettingsTools.clone(settings);
switch (dialect.family()) {
case MARIADB:
case MYSQL:
return settings
.withParamType(NAMED)
.withRenderNamedParamPrefix("?p")
.withParseNamedParamPrefix("?p");
case MARIADB:
return settings.withParamType(NAMED);