[jOOQ/jOOQ#11700] Fix MariaDB and SQL Server bind variable usage

- SQL Server's R2DBC driver seems to only support named params
- Support fetchSize
- Support BINARY_FLOAT and BINARY_DOUBLE types in Oracle
- Support binding temporal types (R2DBC doesn't support java.sql types)
This commit is contained in:
Lukas Eder 2021-03-25 09:06:28 +01:00
parent 5fa2669f19
commit db6f8efc22
3 changed files with 184 additions and 99 deletions

View File

@ -108,27 +108,27 @@ abstract class AbstractResultQuery<R extends Record> extends AbstractQuery<R> im
/**
* Generated UID
*/
private static final long serialVersionUID = -5588344253566055707L;
private static final JooqLogger log = JooqLogger.getLogger(AbstractResultQuery.class);
private static final long serialVersionUID = -5588344253566055707L;
private static final JooqLogger log = JooqLogger.getLogger(AbstractResultQuery.class);
private static final Set<SQLDialect> REPORT_FETCH_SIZE_WITH_AUTOCOMMIT = SQLDialect.supportedBy(POSTGRES);
private static final Set<SQLDialect> REPORT_FETCH_SIZE_WITH_AUTOCOMMIT = SQLDialect.supportedBy(POSTGRES);
private int maxRows;
private int fetchSize;
private int resultSetConcurrency;
private int resultSetType;
private int resultSetHoldability;
private Table<?> coerceTable;
private Collection<? extends Field<?>> coerceFields;
private transient boolean lazy;
private transient boolean many;
private transient Cursor<R> cursor;
private transient boolean autoclosing = true;
private Result<R> result;
private ResultsImpl results;
private int maxRows;
private int fetchSize;
private int resultSetConcurrency;
private int resultSetType;
private int resultSetHoldability;
private Table<?> coerceTable;
private Collection<? extends Field<?>> coerceFields;
private transient boolean lazy;
private transient boolean many;
private transient Cursor<R> cursor;
private transient boolean autoclosing = true;
private Result<R> result;
private ResultsImpl results;
// Some temp variables for String interning
private final Intern intern = new Intern();
private final Intern intern = new Intern();
AbstractResultQuery(Configuration configuration) {
super(configuration);
@ -181,6 +181,10 @@ abstract class AbstractResultQuery<R extends Record> extends AbstractQuery<R> im
return this;
}
final int fetchSize() {
return fetchSize;
}
@Override
public final ResultQuery<R> resultSetConcurrency(int concurrency) {
this.resultSetConcurrency = concurrency;

View File

@ -39,6 +39,7 @@ package org.jooq.impl;
import static org.jooq.impl.DSL.val;
import static org.jooq.impl.ParsingConnection.translate;
import static org.jooq.impl.R2DBC.setParamType;
import static org.jooq.impl.Tools.EMPTY_PARAM;
import java.util.ArrayList;
@ -75,10 +76,7 @@ final class ParsingConnectionFactory implements ConnectionFactory {
ParsingConnectionFactory(Configuration configuration) {
this.configuration = configuration.derive();
this.configuration.set(SettingsTools.clone(configuration.settings())
.withParseNamedParamPrefix("$")
.withRenderNamedParamPrefix("$")
.withParamType(ParamType.NAMED));
this.configuration.set(setParamType(configuration.dialect(), configuration.settings()));
}
@Override

View File

@ -37,11 +37,9 @@
*/
package org.jooq.impl;
import static org.jooq.SQLDialect.MARIADB;
// ...
import static org.jooq.conf.ParamType.NAMED;
import static org.jooq.conf.StatementType.STATIC_STATEMENT;
import static org.jooq.impl.Tools.recordFactory;
import static org.jooq.tools.Convert.convert;
import static org.jooq.tools.StringUtils.defaultIfNull;
import java.math.BigDecimal;
@ -57,7 +55,6 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayDeque;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
@ -67,12 +64,14 @@ import org.jooq.Configuration;
import org.jooq.Cursor;
import org.jooq.DataType;
import org.jooq.Field;
// ...
import org.jooq.Record;
import org.jooq.SQLDialect;
import org.jooq.conf.Settings;
import org.jooq.conf.SettingsTools;
import org.jooq.conf.StatementType;
import org.jooq.impl.DefaultRenderContext.Rendered;
import org.jooq.tools.Convert;
import org.jooq.tools.JooqLogger;
import org.jooq.tools.jdbc.DefaultPreparedStatement;
import org.jooq.tools.jdbc.DefaultResultSet;
@ -91,8 +90,8 @@ import io.r2dbc.spi.Statement;
*/
final class R2DBC {
static final Set<SQLDialect> NO_SUPPORT_BIND_VARIABLES = SQLDialect.supportedBy(MARIADB);
static volatile boolean is_0_9 = true;
private static final JooqLogger log = JooqLogger.getLogger(R2DBC.class);
static volatile boolean is_0_9 = true;
static final class RowSubscriber<R extends Record> implements Subscriber<R> {
@ -214,19 +213,23 @@ final class R2DBC {
@Override
public final void onNext(Connection c) {
try {
Settings settings = SettingsTools.clone(query.configuration().settings())
.withParseNamedParamPrefix("$")
.withRenderNamedParamPrefix("$")
.withParamType(NAMED);
DefaultRenderContext render = new DefaultRenderContext(query.configuration().derive(
setParamType(query.configuration().dialect(), query.configuration().settings())
));
// MariaDB's R2DBC driver doesn't support bind variables yet: https://jira.mariadb.org/browse/CONJ-868
if (NO_SUPPORT_BIND_VARIABLES.contains(query.configuration().dialect()))
settings.withStatementType(STATIC_STATEMENT);
DefaultRenderContext render = new DefaultRenderContext(query.configuration().derive(settings));
Rendered rendered = new Rendered(render.paramType(NAMED).visit(query).render(), render.bindValues(), render.skipUpdateCounts());
Statement stmt = c.createStatement(rendered.sql);
new DefaultBindContext(query.configuration(), new R2DBCPreparedStatement(query.configuration(), stmt)).visit(rendered.bindValues);
// TODO: Reuse org.jooq.impl.Tools.setFetchSize(ExecuteContext ctx, int fetchSize)
int f = SettingsTools.getFetchSize(((AbstractResultQuery<?>) query).fetchSize(), render.settings());
if (f != 0) {
if (log.isDebugEnabled())
log.debug("Setting fetch size", f);
stmt.fetchSize(f);
}
stmt.execute().subscribe(new ResultSubscriber<>(query, this));
}
@ -274,61 +277,6 @@ final class R2DBC {
}
}
static final class BlockingSubscription<R extends Record> implements Subscription {
private final AbstractResultQuery<R> query;
private final Subscriber<? super R> subscriber;
private Cursor<R> c;
private ArrayDeque<R> buffer;
BlockingSubscription(AbstractResultQuery<R> query, Subscriber<? super R> subscriber) {
this.query = query;
this.subscriber = subscriber;
}
@Override
public final void request(long n) {
int i = (int) Math.min(n, Integer.MAX_VALUE);
try {
if (c == null)
c = query.fetchLazyNonAutoClosing();
if (buffer == null)
buffer = new ArrayDeque<>();
if (buffer.size() < i)
buffer.addAll(c.fetchNext(i - buffer.size()));
boolean complete = buffer.size() < i;
while (!buffer.isEmpty()) {
subscriber.onNext(buffer.pollFirst());
}
if (complete)
doComplete();
}
catch (Throwable t) {
subscriber.onError(t);
doComplete();
}
}
private void doComplete() {
close();
subscriber.onComplete();
}
private void close() {
if (c != null)
c.close();
}
@Override
public final void cancel() {
close();
}
}
// -------------------------------------------------------------------------
// JDBC to R2DBC bridges for better interop, where it doesn't matter
// -------------------------------------------------------------------------
@ -345,8 +293,34 @@ final class R2DBC {
this.s = s;
}
private final void bindNonNull(int parameterIndex, Object x) {
switch (c.family()) {
default:
s.bind(parameterIndex - 1, x);
break;
}
}
private final <T> void bindNull(int parameterIndex, Class<T> type) {
s.bindNull(parameterIndex - 1, type);
switch (c.family()) {
default:
s.bindNull(parameterIndex - 1, type);
break;
}
}
private final <T> void bindNullable(int parameterIndex, T x, Class<T> type) {
@ -355,17 +329,24 @@ final class R2DBC {
private final <T, U> void bindNullable(int parameterIndex, T x, Class<U> type, Function<? super T, ? extends U> conversion) {
if (x == null)
s.bindNull(parameterIndex - 1, type);
bindNull(parameterIndex, type);
else
bindNonNull(parameterIndex, conversion.apply(x));
}
private final void bindNonNull(int parameterIndex, Object x) {
s.bind(parameterIndex - 1, x);
}
private final Class<?> type(int sqlType) {
return DefaultDataType.getDataType(c.family(), sqlType).getType();
// [#11700] Intercept JDBC temporal types, which aren't supported by R2DBC
switch (sqlType) {
case Types.DATE:
return LocalDate.class;
case Types.TIME:
return LocalTime.class;
case Types.TIMESTAMP:
return LocalDateTime.class;
default:
return DefaultDataType.getDataType(c.family(), sqlType).getType();
}
}
@Override
@ -473,6 +454,24 @@ final class R2DBC {
public final void setObject(int parameterIndex, Object x, SQLType targetSqlType) throws SQLException {
setObject(parameterIndex, x, defaultIfNull(targetSqlType.getVendorTypeNumber(), Types.OTHER));
}
}
static final class R2DBCResultSet extends DefaultResultSet {
@ -745,4 +744,88 @@ final class R2DBC {
return false;
}
}
static final Settings setParamType(SQLDialect dialect, Settings settings) {
settings = SettingsTools.clone(settings);
switch (dialect.family()) {
case MARIADB:
case MYSQL:
return settings.withParamType(NAMED);
default:
return settings
.withParamType(NAMED)
.withRenderNamedParamPrefix("$")
.withParseNamedParamPrefix("$");
}
}
// -------------------------------------------------------------------------
// XXX: Legacy implementation
// -------------------------------------------------------------------------
static final class BlockingSubscription<R extends Record> implements Subscription {
private final AbstractResultQuery<R> query;
private final Subscriber<? super R> subscriber;
private Cursor<R> c;
private ArrayDeque<R> buffer;
BlockingSubscription(AbstractResultQuery<R> query, Subscriber<? super R> subscriber) {
this.query = query;
this.subscriber = subscriber;
}
@Override
public final void request(long n) {
int i = (int) Math.min(n, Integer.MAX_VALUE);
try {
if (c == null)
c = query.fetchLazyNonAutoClosing();
if (buffer == null)
buffer = new ArrayDeque<>();
if (buffer.size() < i)
buffer.addAll(c.fetchNext(i - buffer.size()));
boolean complete = buffer.size() < i;
while (!buffer.isEmpty()) {
subscriber.onNext(buffer.pollFirst());
}
if (complete)
doComplete();
}
catch (Throwable t) {
subscriber.onError(t);
doComplete();
}
}
private void doComplete() {
close();
subscriber.onComplete();
}
private void close() {
if (c != null)
c.close();
}
@Override
public final void cancel() {
close();
}
}
}