From db6f8efc22b6f40990651f2d579a8a01c5818d93 Mon Sep 17 00:00:00 2001 From: Lukas Eder Date: Thu, 25 Mar 2021 09:06:28 +0100 Subject: [PATCH] [jOOQ/jOOQ#11700] Fix MariaDB and SQL Server bind variable usage - SQL Server's R2DBC driver seems to only support named params - Support fetchSize - Support BINARY_FLOAT and BINARY_DOUBLE types in Oracle - Support binding temporal types (R2DBC doesn't support java.sql types) --- .../org/jooq/impl/AbstractResultQuery.java | 38 +-- .../jooq/impl/ParsingConnectionFactory.java | 6 +- jOOQ/src/main/java/org/jooq/impl/R2DBC.java | 239 ++++++++++++------ 3 files changed, 184 insertions(+), 99 deletions(-) diff --git a/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java b/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java index 105fef7969..5347b86a5c 100644 --- a/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java +++ b/jOOQ/src/main/java/org/jooq/impl/AbstractResultQuery.java @@ -108,27 +108,27 @@ abstract class AbstractResultQuery extends AbstractQuery im /** * Generated UID */ - private static final long serialVersionUID = -5588344253566055707L; - private static final JooqLogger log = JooqLogger.getLogger(AbstractResultQuery.class); + private static final long serialVersionUID = -5588344253566055707L; + private static final JooqLogger log = JooqLogger.getLogger(AbstractResultQuery.class); - private static final Set REPORT_FETCH_SIZE_WITH_AUTOCOMMIT = SQLDialect.supportedBy(POSTGRES); + private static final Set REPORT_FETCH_SIZE_WITH_AUTOCOMMIT = SQLDialect.supportedBy(POSTGRES); - private int maxRows; - private int fetchSize; - private int resultSetConcurrency; - private int resultSetType; - private int resultSetHoldability; - private Table coerceTable; - private Collection> coerceFields; - private transient boolean lazy; - private transient boolean many; - private transient Cursor cursor; - private transient boolean autoclosing = true; - private Result result; - private ResultsImpl results; + private int maxRows; + private int fetchSize; + private int resultSetConcurrency; + private int resultSetType; + private int resultSetHoldability; + private Table coerceTable; + private Collection> coerceFields; + private transient boolean lazy; + private transient boolean many; + private transient Cursor cursor; + private transient boolean autoclosing = true; + private Result result; + private ResultsImpl results; // Some temp variables for String interning - private final Intern intern = new Intern(); + private final Intern intern = new Intern(); AbstractResultQuery(Configuration configuration) { super(configuration); @@ -181,6 +181,10 @@ abstract class AbstractResultQuery extends AbstractQuery im return this; } + final int fetchSize() { + return fetchSize; + } + @Override public final ResultQuery resultSetConcurrency(int concurrency) { this.resultSetConcurrency = concurrency; diff --git a/jOOQ/src/main/java/org/jooq/impl/ParsingConnectionFactory.java b/jOOQ/src/main/java/org/jooq/impl/ParsingConnectionFactory.java index 0993cf84de..0522b2f6cc 100644 --- a/jOOQ/src/main/java/org/jooq/impl/ParsingConnectionFactory.java +++ b/jOOQ/src/main/java/org/jooq/impl/ParsingConnectionFactory.java @@ -39,6 +39,7 @@ package org.jooq.impl; import static org.jooq.impl.DSL.val; import static org.jooq.impl.ParsingConnection.translate; +import static org.jooq.impl.R2DBC.setParamType; import static org.jooq.impl.Tools.EMPTY_PARAM; import java.util.ArrayList; @@ -75,10 +76,7 @@ final class ParsingConnectionFactory implements ConnectionFactory { ParsingConnectionFactory(Configuration configuration) { this.configuration = configuration.derive(); - this.configuration.set(SettingsTools.clone(configuration.settings()) - .withParseNamedParamPrefix("$") - .withRenderNamedParamPrefix("$") - .withParamType(ParamType.NAMED)); + this.configuration.set(setParamType(configuration.dialect(), configuration.settings())); } @Override diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java index 70152f775e..6380b64ba4 100644 --- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java +++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java @@ -37,11 +37,9 @@ */ package org.jooq.impl; -import static org.jooq.SQLDialect.MARIADB; -// ... import static org.jooq.conf.ParamType.NAMED; -import static org.jooq.conf.StatementType.STATIC_STATEMENT; import static org.jooq.impl.Tools.recordFactory; +import static org.jooq.tools.Convert.convert; import static org.jooq.tools.StringUtils.defaultIfNull; import java.math.BigDecimal; @@ -57,7 +55,6 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.util.ArrayDeque; -import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Supplier; @@ -67,12 +64,14 @@ import org.jooq.Configuration; import org.jooq.Cursor; import org.jooq.DataType; import org.jooq.Field; +// ... import org.jooq.Record; import org.jooq.SQLDialect; import org.jooq.conf.Settings; import org.jooq.conf.SettingsTools; -import org.jooq.conf.StatementType; import org.jooq.impl.DefaultRenderContext.Rendered; +import org.jooq.tools.Convert; +import org.jooq.tools.JooqLogger; import org.jooq.tools.jdbc.DefaultPreparedStatement; import org.jooq.tools.jdbc.DefaultResultSet; @@ -91,8 +90,8 @@ import io.r2dbc.spi.Statement; */ final class R2DBC { - static final Set NO_SUPPORT_BIND_VARIABLES = SQLDialect.supportedBy(MARIADB); - static volatile boolean is_0_9 = true; + private static final JooqLogger log = JooqLogger.getLogger(R2DBC.class); + static volatile boolean is_0_9 = true; static final class RowSubscriber implements Subscriber { @@ -214,19 +213,23 @@ final class R2DBC { @Override public final void onNext(Connection c) { try { - Settings settings = SettingsTools.clone(query.configuration().settings()) - .withParseNamedParamPrefix("$") - .withRenderNamedParamPrefix("$") - .withParamType(NAMED); + DefaultRenderContext render = new DefaultRenderContext(query.configuration().derive( + setParamType(query.configuration().dialect(), query.configuration().settings()) + )); - // MariaDB's R2DBC driver doesn't support bind variables yet: https://jira.mariadb.org/browse/CONJ-868 - if (NO_SUPPORT_BIND_VARIABLES.contains(query.configuration().dialect())) - settings.withStatementType(STATIC_STATEMENT); - - DefaultRenderContext render = new DefaultRenderContext(query.configuration().derive(settings)); Rendered rendered = new Rendered(render.paramType(NAMED).visit(query).render(), render.bindValues(), render.skipUpdateCounts()); Statement stmt = c.createStatement(rendered.sql); new DefaultBindContext(query.configuration(), new R2DBCPreparedStatement(query.configuration(), stmt)).visit(rendered.bindValues); + + // TODO: Reuse org.jooq.impl.Tools.setFetchSize(ExecuteContext ctx, int fetchSize) + int f = SettingsTools.getFetchSize(((AbstractResultQuery) query).fetchSize(), render.settings()); + if (f != 0) { + if (log.isDebugEnabled()) + log.debug("Setting fetch size", f); + + stmt.fetchSize(f); + } + stmt.execute().subscribe(new ResultSubscriber<>(query, this)); } @@ -274,61 +277,6 @@ final class R2DBC { } } - static final class BlockingSubscription implements Subscription { - private final AbstractResultQuery query; - private final Subscriber subscriber; - private Cursor c; - private ArrayDeque buffer; - - BlockingSubscription(AbstractResultQuery query, Subscriber subscriber) { - this.query = query; - this.subscriber = subscriber; - } - - @Override - public final void request(long n) { - int i = (int) Math.min(n, Integer.MAX_VALUE); - - try { - if (c == null) - c = query.fetchLazyNonAutoClosing(); - - if (buffer == null) - buffer = new ArrayDeque<>(); - - if (buffer.size() < i) - buffer.addAll(c.fetchNext(i - buffer.size())); - - boolean complete = buffer.size() < i; - while (!buffer.isEmpty()) { - subscriber.onNext(buffer.pollFirst()); - } - - if (complete) - doComplete(); - } - catch (Throwable t) { - subscriber.onError(t); - doComplete(); - } - } - - private void doComplete() { - close(); - subscriber.onComplete(); - } - - private void close() { - if (c != null) - c.close(); - } - - @Override - public final void cancel() { - close(); - } - } - // ------------------------------------------------------------------------- // JDBC to R2DBC bridges for better interop, where it doesn't matter // ------------------------------------------------------------------------- @@ -345,8 +293,34 @@ final class R2DBC { this.s = s; } + private final void bindNonNull(int parameterIndex, Object x) { + switch (c.family()) { + + + + + + + + default: + s.bind(parameterIndex - 1, x); + break; + } + } + private final void bindNull(int parameterIndex, Class type) { - s.bindNull(parameterIndex - 1, type); + switch (c.family()) { + + + + + + + + default: + s.bindNull(parameterIndex - 1, type); + break; + } } private final void bindNullable(int parameterIndex, T x, Class type) { @@ -355,17 +329,24 @@ final class R2DBC { private final void bindNullable(int parameterIndex, T x, Class type, Function conversion) { if (x == null) - s.bindNull(parameterIndex - 1, type); + bindNull(parameterIndex, type); else bindNonNull(parameterIndex, conversion.apply(x)); } - private final void bindNonNull(int parameterIndex, Object x) { - s.bind(parameterIndex - 1, x); - } - private final Class type(int sqlType) { - return DefaultDataType.getDataType(c.family(), sqlType).getType(); + + // [#11700] Intercept JDBC temporal types, which aren't supported by R2DBC + switch (sqlType) { + case Types.DATE: + return LocalDate.class; + case Types.TIME: + return LocalTime.class; + case Types.TIMESTAMP: + return LocalDateTime.class; + default: + return DefaultDataType.getDataType(c.family(), sqlType).getType(); + } } @Override @@ -473,6 +454,24 @@ final class R2DBC { public final void setObject(int parameterIndex, Object x, SQLType targetSqlType) throws SQLException { setObject(parameterIndex, x, defaultIfNull(targetSqlType.getVendorTypeNumber(), Types.OTHER)); } + + + + + + + + + + + + + + + + + + } static final class R2DBCResultSet extends DefaultResultSet { @@ -745,4 +744,88 @@ final class R2DBC { return false; } } + + static final Settings setParamType(SQLDialect dialect, Settings settings) { + settings = SettingsTools.clone(settings); + + switch (dialect.family()) { + case MARIADB: + case MYSQL: + + return settings.withParamType(NAMED); + + + + + + + + + + default: + return settings + .withParamType(NAMED) + .withRenderNamedParamPrefix("$") + .withParseNamedParamPrefix("$"); + } + } + + // ------------------------------------------------------------------------- + // XXX: Legacy implementation + // ------------------------------------------------------------------------- + + static final class BlockingSubscription implements Subscription { + private final AbstractResultQuery query; + private final Subscriber subscriber; + private Cursor c; + private ArrayDeque buffer; + + BlockingSubscription(AbstractResultQuery query, Subscriber subscriber) { + this.query = query; + this.subscriber = subscriber; + } + + @Override + public final void request(long n) { + int i = (int) Math.min(n, Integer.MAX_VALUE); + + try { + if (c == null) + c = query.fetchLazyNonAutoClosing(); + + if (buffer == null) + buffer = new ArrayDeque<>(); + + if (buffer.size() < i) + buffer.addAll(c.fetchNext(i - buffer.size())); + + boolean complete = buffer.size() < i; + while (!buffer.isEmpty()) { + subscriber.onNext(buffer.pollFirst()); + } + + if (complete) + doComplete(); + } + catch (Throwable t) { + subscriber.onError(t); + doComplete(); + } + } + + private void doComplete() { + close(); + subscriber.onComplete(); + } + + private void close() { + if (c != null) + c.close(); + } + + @Override + public final void cancel() { + close(); + } + } }