From dcbcdf34098d306e0b41db491758b1510532f5ed Mon Sep 17 00:00:00 2001 From: Lukas Eder Date: Wed, 31 Mar 2021 14:25:30 +0200 Subject: [PATCH] [jOOQ/jOOQ#11700] Support BatchSingle --- .../java/org/jooq/impl/BatchMultiple.java | 6 +- .../main/java/org/jooq/impl/BatchSingle.java | 56 ++++-- .../main/java/org/jooq/impl/InsertImpl.java | 49 +++--- .../main/java/org/jooq/impl/NClobBinding.java | 164 ++++++++++++++++++ jOOQ/src/main/java/org/jooq/impl/R2DBC.java | 95 +++++++--- 5 files changed, 307 insertions(+), 63 deletions(-) create mode 100644 jOOQ/src/main/java/org/jooq/impl/NClobBinding.java diff --git a/jOOQ/src/main/java/org/jooq/impl/BatchMultiple.java b/jOOQ/src/main/java/org/jooq/impl/BatchMultiple.java index b44a99de96..d4b1c76ffb 100644 --- a/jOOQ/src/main/java/org/jooq/impl/BatchMultiple.java +++ b/jOOQ/src/main/java/org/jooq/impl/BatchMultiple.java @@ -46,8 +46,8 @@ import org.jooq.ExecuteListener; import org.jooq.Query; import org.jooq.conf.SettingsTools; import org.jooq.exception.ControlFlowSignal; -import org.jooq.impl.R2DBC.BatchMultipleSubscription; -import org.jooq.impl.R2DBC.RowCountSubscriber; +import org.jooq.impl.R2DBC.BatchMultipleSubscriber; +import org.jooq.impl.R2DBC.BatchSubscription; import org.reactivestreams.Subscriber; @@ -81,7 +81,7 @@ final class BatchMultiple extends AbstractBatch { ConnectionFactory cf = configuration.connectionFactory(); if (!(cf instanceof NoConnectionFactory)) - subscriber.onSubscribe(new BatchMultipleSubscription(this, subscriber, (t, u) -> new RowCountSubscriber(u, queries.length))); + subscriber.onSubscribe(new BatchSubscription<>(this, subscriber, s -> new BatchMultipleSubscriber(this, s))); // TODO: [#11700] Implement this else diff --git a/jOOQ/src/main/java/org/jooq/impl/BatchSingle.java b/jOOQ/src/main/java/org/jooq/impl/BatchSingle.java index 1e45439ce2..e221bb2f28 100644 --- a/jOOQ/src/main/java/org/jooq/impl/BatchSingle.java +++ b/jOOQ/src/main/java/org/jooq/impl/BatchSingle.java @@ -59,8 +59,14 @@ import org.jooq.Param; import org.jooq.Query; import org.jooq.conf.SettingsTools; import org.jooq.exception.ControlFlowSignal; +import org.jooq.impl.R2DBC.BatchSingleSubscriber; +import org.jooq.impl.R2DBC.BatchSubscription; import org.jooq.tools.JooqLogger; +import org.reactivestreams.Subscriber; + +import io.r2dbc.spi.ConnectionFactory; + /** * @author Lukas Eder */ @@ -69,13 +75,13 @@ final class BatchSingle extends AbstractBatch implements BatchBindStep { /** * Generated UID */ - private static final long serialVersionUID = 3793967258181493207L; - private static final JooqLogger log = JooqLogger.getLogger(BatchSingle.class); + private static final long serialVersionUID = 3793967258181493207L; + private static final JooqLogger log = JooqLogger.getLogger(BatchSingle.class); - private final Query query; - private final Map> nameToIndexMapping; - private final List allBindValues; - private final int expectedBindValues; + final Query query; + final Map> nameToIndexMapping; + final List allBindValues; + final int expectedBindValues; public BatchSingle(Configuration configuration, Query query) { super(configuration); @@ -141,6 +147,18 @@ final class BatchSingle extends AbstractBatch implements BatchBindStep { return allBindValues.size(); } + @Override + public final void subscribe(Subscriber subscriber) { + ConnectionFactory cf = configuration.connectionFactory(); + + if (!(cf instanceof NoConnectionFactory)) + subscriber.onSubscribe(new BatchSubscription<>(this, subscriber, s -> new BatchSingleSubscriber(this, s))); + + // TODO: [#11700] Implement this + else + throw new UnsupportedOperationException(); + } + @Override public final int[] execute() { @@ -162,7 +180,7 @@ final class BatchSingle extends AbstractBatch implements BatchBindStep { return executePrepared(); } - private final void checkBindValues() { + final void checkBindValues() { // [#4071] Help users debug cases where bind value counts don't match the expected number // [#5362] Don't do this for plain SQL queries @@ -177,15 +195,7 @@ final class BatchSingle extends AbstractBatch implements BatchBindStep { ExecuteListener listener = ExecuteListeners.get(ctx); Connection connection = ctx.connection(); - // [#1371] fetch bind variables to restore them again, later - // [#3940] Don't include inlined bind variables - // [#4062] Make sure we collect also repeated named parameters - ParamCollector collector = new ParamCollector(configuration, false); - collector.visit(query); - Param[] params = new Param[collector.resultList.size()]; - Iterator>> it = collector.resultList.iterator(); - for (int i = 0; it.hasNext(); i++) - params[i] = it.next().getValue(); + Param[] params = extractParams(); try { // [#8968] Keep start() event inside of lifecycle management @@ -252,6 +262,20 @@ final class BatchSingle extends AbstractBatch implements BatchBindStep { } } + final Param[] extractParams() { + // [#1371] fetch bind variables to restore them again, later + // [#3940] Don't include inlined bind variables + // [#4062] Make sure we collect also repeated named parameters + ParamCollector collector = new ParamCollector(configuration, false); + collector.visit(query); + Param[] params = new Param[collector.resultList.size()]; + Iterator>> it = collector.resultList.iterator(); + for (int i = 0; it.hasNext(); i++) + params[i] = it.next().getValue(); + + return params; + } + private final int[] executeStatic() { List queries = new ArrayList<>(allBindValues.size()); diff --git a/jOOQ/src/main/java/org/jooq/impl/InsertImpl.java b/jOOQ/src/main/java/org/jooq/impl/InsertImpl.java index 490e6f64b1..8aaf66951e 100644 --- a/jOOQ/src/main/java/org/jooq/impl/InsertImpl.java +++ b/jOOQ/src/main/java/org/jooq/impl/InsertImpl.java @@ -61,6 +61,14 @@ import org.jooq.InsertResultStep; import org.jooq.InsertSetMoreStep; import org.jooq.InsertSetStep; import org.jooq.InsertValuesStep1; +import org.jooq.InsertValuesStep2; +import org.jooq.InsertValuesStep3; +import org.jooq.InsertValuesStep4; +import org.jooq.InsertValuesStep5; +import org.jooq.InsertValuesStep6; +import org.jooq.InsertValuesStep7; +import org.jooq.InsertValuesStep8; +import org.jooq.InsertValuesStep9; import org.jooq.InsertValuesStep10; import org.jooq.InsertValuesStep11; import org.jooq.InsertValuesStep12; @@ -71,23 +79,23 @@ import org.jooq.InsertValuesStep16; import org.jooq.InsertValuesStep17; import org.jooq.InsertValuesStep18; import org.jooq.InsertValuesStep19; -import org.jooq.InsertValuesStep2; import org.jooq.InsertValuesStep20; import org.jooq.InsertValuesStep21; import org.jooq.InsertValuesStep22; -import org.jooq.InsertValuesStep3; -import org.jooq.InsertValuesStep4; -import org.jooq.InsertValuesStep5; -import org.jooq.InsertValuesStep6; -import org.jooq.InsertValuesStep7; -import org.jooq.InsertValuesStep8; -import org.jooq.InsertValuesStep9; import org.jooq.InsertValuesStepN; import org.jooq.Name; import org.jooq.Operator; import org.jooq.QueryPart; import org.jooq.Record; import org.jooq.Record1; +import org.jooq.Record2; +import org.jooq.Record3; +import org.jooq.Record4; +import org.jooq.Record5; +import org.jooq.Record6; +import org.jooq.Record7; +import org.jooq.Record8; +import org.jooq.Record9; import org.jooq.Record10; import org.jooq.Record11; import org.jooq.Record12; @@ -98,19 +106,20 @@ import org.jooq.Record16; import org.jooq.Record17; import org.jooq.Record18; import org.jooq.Record19; -import org.jooq.Record2; import org.jooq.Record20; import org.jooq.Record21; import org.jooq.Record22; -import org.jooq.Record3; -import org.jooq.Record4; -import org.jooq.Record5; -import org.jooq.Record6; -import org.jooq.Record7; -import org.jooq.Record8; -import org.jooq.Record9; +import org.jooq.Result; import org.jooq.Row; import org.jooq.Row1; +import org.jooq.Row2; +import org.jooq.Row3; +import org.jooq.Row4; +import org.jooq.Row5; +import org.jooq.Row6; +import org.jooq.Row7; +import org.jooq.Row8; +import org.jooq.Row9; import org.jooq.Row10; import org.jooq.Row11; import org.jooq.Row12; @@ -121,17 +130,9 @@ import org.jooq.Row16; import org.jooq.Row17; import org.jooq.Row18; import org.jooq.Row19; -import org.jooq.Row2; import org.jooq.Row20; import org.jooq.Row21; import org.jooq.Row22; -import org.jooq.Row3; -import org.jooq.Row4; -import org.jooq.Row5; -import org.jooq.Row6; -import org.jooq.Row7; -import org.jooq.Row8; -import org.jooq.Row9; import org.jooq.SQL; import org.jooq.Select; import org.jooq.SelectField; diff --git a/jOOQ/src/main/java/org/jooq/impl/NClobBinding.java b/jOOQ/src/main/java/org/jooq/impl/NClobBinding.java new file mode 100644 index 0000000000..d0cc05c06d --- /dev/null +++ b/jOOQ/src/main/java/org/jooq/impl/NClobBinding.java @@ -0,0 +1,164 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Other licenses: + * ----------------------------------------------------------------------------- + * Commercial licenses for this work are available. These replace the above + * ASL 2.0 and offer limited warranties, support, maintenance, and commercial + * database integrations. + * + * For more information, please visit: http://www.jooq.org/licenses + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + */ +package org.jooq.impl; + +import static org.jooq.impl.DefaultExecuteContext.localConnection; +import static org.jooq.impl.DefaultExecuteContext.localTargetConnection; + +import java.sql.NClob; +import java.sql.SQLException; +import java.sql.Types; + +import org.jooq.Binding; +import org.jooq.BindingGetResultSetContext; +import org.jooq.BindingGetSQLInputContext; +import org.jooq.BindingGetStatementContext; +import org.jooq.BindingRegisterContext; +import org.jooq.BindingSQLContext; +import org.jooq.BindingSetSQLOutputContext; +import org.jooq.BindingSetStatementContext; +import org.jooq.Converter; +import org.jooq.Converters; +import org.jooq.ResourceManagingScope; +import org.jooq.tools.jdbc.JDBCUtils; + +// ... + +/** + * A binding that takes binary values but binds them as {@link NClob} to at the + * JDBC level. + *

+ * This is useful for workarounds for bugs in Oracle, like ORA-01461: can bind a + * LONG value only for insert into a LONG column (see [#4091]) + * + * @author Lukas Eder + */ +public class NClobBinding implements Binding { + + /** + * Generated UID + */ + private static final long serialVersionUID = 358789452467943117L; + + @Override + public final Converter converter() { + return Converters.identity(String.class); + } + + @Override + public final void sql(BindingSQLContext ctx) throws SQLException { + ctx.render().visit(DSL.val(ctx.value(), SQLDataType.NCLOB)); + } + + @Override + public final void register(BindingRegisterContext ctx) throws SQLException { + ctx.statement().registerOutParameter(ctx.index(), Types.NCLOB); + } + + @Override + public final void set(BindingSetStatementContext ctx) throws SQLException { + ctx.statement().setClob(ctx.index(), newNClob(ctx, ctx.value())); + } + + @Override + public final void set(BindingSetSQLOutputContext ctx) throws SQLException { + ctx.output().writeClob(newNClob(ctx, ctx.value())); + } + + @Override + public final void get(BindingGetResultSetContext ctx) throws SQLException { + NClob clob = ctx.resultSet().getNClob(ctx.index()); + + try { + ctx.value(clob == null ? null : clob.getSubString(1, (int) clob.length())); + } + finally { + JDBCUtils.safeFree(clob); + } + } + + @Override + public final void get(BindingGetStatementContext ctx) throws SQLException { + NClob clob = ctx.statement().getNClob(ctx.index()); + + try { + ctx.value(clob == null ? null : clob.getSubString(1, (int) clob.length())); + } + finally { + JDBCUtils.safeFree(clob); + } + } + + @Override + public final void get(BindingGetSQLInputContext ctx) throws SQLException { + NClob clob = ctx.input().readNClob(); + + try { + ctx.value(clob == null ? null : clob.getSubString(1, (int) clob.length())); + } + finally { + JDBCUtils.safeFree(clob); + } + } + + static final NClob newNClob(ResourceManagingScope scope, String string) throws SQLException { + NClob clob; + + switch (scope.dialect()) { + + + + + + + + + + + + default: { + clob = localConnection().createNClob(); + break; + } + } + + scope.autoFree(clob); + clob.setString(1, string); + return clob; + } +} diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java index 6081041c9f..d6e7c792eb 100644 --- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java +++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java @@ -40,7 +40,10 @@ package org.jooq.impl; import static org.jooq.Publisher.subscriber; // ... import static org.jooq.conf.ParamType.NAMED; +import static org.jooq.impl.Tools.EMPTY_PARAM; +import static org.jooq.impl.Tools.fields; import static org.jooq.impl.Tools.recordFactory; +import static org.jooq.impl.Tools.visitAll; import static org.jooq.tools.StringUtils.defaultIfNull; import java.math.BigDecimal; @@ -70,6 +73,7 @@ import org.jooq.Configuration; import org.jooq.Cursor; import org.jooq.DataType; import org.jooq.Field; +import org.jooq.Param; // ... import org.jooq.Query; import org.jooq.Record; @@ -77,6 +81,7 @@ import org.jooq.SQLDialect; import org.jooq.conf.Settings; import org.jooq.conf.SettingsTools; import org.jooq.impl.DefaultRenderContext.Rendered; +import org.jooq.impl.R2DBC.RowCountSubscriber; import org.jooq.tools.Convert; import org.jooq.tools.JooqLogger; import org.jooq.tools.jdbc.DefaultPreparedStatement; @@ -284,17 +289,13 @@ final class R2DBC { @Override final void onNext0(Connection c) { try { - DefaultRenderContext render = new DefaultRenderContext(query.configuration().derive( - setParamType(query.configuration().dialect(), query.configuration().settings()) - )); - - Rendered rendered = new Rendered(render.paramType(NAMED).visit(query).render(), render.bindValues(), render.skipUpdateCounts()); + Rendered rendered = rendered(query); Statement stmt = c.createStatement(rendered.sql); new DefaultBindContext(query.configuration(), new R2DBCPreparedStatement(query.configuration(), stmt)).visit(rendered.bindValues); // TODO: Reuse org.jooq.impl.Tools.setFetchSize(ExecuteContext ctx, int fetchSize) if (query instanceof AbstractResultQuery) { - int f = SettingsTools.getFetchSize(((AbstractResultQuery) query).fetchSize(), render.settings()); + int f = SettingsTools.getFetchSize(((AbstractResultQuery) query).fetchSize(), query.configuration().settings()); if (f != 0) { if (log.isDebugEnabled()) log.debug("Setting fetch size", f); @@ -315,18 +316,15 @@ final class R2DBC { static final class BatchMultipleSubscriber extends ConnectionSubscriber { - final BatchMultiple batch; - final BiFunction, Subscriber> resultSubscriber; + final BatchMultiple batch; BatchMultipleSubscriber( BatchMultiple batch, - BatchMultipleSubscription downstream, - BiFunction, Subscriber> resultSubscriber + BatchSubscription downstream ) { super(downstream); this.batch = batch; - this.resultSubscriber = resultSubscriber; } @Override @@ -337,7 +335,52 @@ final class R2DBC { for (int i = 0; i < batch.queries.length; i++) b = b.add(DSL.using(batch.configuration).renderInlined(batch.queries[i])); - b.execute().subscribe(resultSubscriber.apply(batch, downstream)); + b.execute().subscribe(new RowCountSubscriber(downstream, batch.size())); + } + + // TODO: More specific error handling + catch (Throwable t) { + onError(t); + } + } + } + + static final class BatchSingleSubscriber extends ConnectionSubscriber { + + final BatchSingle batch; + + BatchSingleSubscriber( + BatchSingle batch, + BatchSubscription downstream + ) { + super(downstream); + + this.batch = batch; + } + + @Override + final void onNext0(Connection c) { + try { + batch.checkBindValues(); + Rendered rendered = rendered(batch.query); + Statement stmt = c.createStatement(rendered.sql); + Param[] params = rendered.bindValues.toArray(EMPTY_PARAM); + + for (Object[] bindValues : batch.allBindValues) { + + // [#1371] [#2139] Don't bind variables directly onto statement, bind them through the collected params + // list to preserve type information + // [#3547] The original query may have no Params specified - e.g. when it was constructed with + // plain SQL. In that case, infer the bind value type directly from the bind value + visitAll(new DefaultBindContext(batch.configuration, new R2DBCPreparedStatement(batch.query.configuration(), stmt)), + (params.length > 0) + ? fields(bindValues, params) + : fields(bindValues)); + + stmt = stmt.add(); + } + + stmt.execute().subscribe(new RowCountSubscriber(downstream, batch.size())); } // TODO: More specific error handling @@ -409,26 +452,38 @@ final class R2DBC { } } - static final class BatchMultipleSubscription extends AbstractSubscription { + static final class BatchSubscription extends AbstractSubscription { - final BatchMultipleSubscriber batchMultipleSubscriber; + final ConnectionSubscriber batchSubscriber; - BatchMultipleSubscription( - BatchMultiple batch, + BatchSubscription( + B batch, Subscriber subscriber, - BiFunction, Subscriber> resultSubscriber + Function, ConnectionSubscriber> batchSubscriber ) { super(batch.configuration, subscriber); - this.batchMultipleSubscriber = new BatchMultipleSubscriber(batch, this, resultSubscriber); + this.batchSubscriber = batchSubscriber.apply(this); } @Override - final BatchMultipleSubscriber delegate() { - return batchMultipleSubscriber; + final ConnectionSubscriber delegate() { + return batchSubscriber; } } + // ------------------------------------------------------------------------- + // Internal R2DBC specific utilities + // ------------------------------------------------------------------------- + + private static final Rendered rendered(Query query) { + DefaultRenderContext render = new DefaultRenderContext(query.configuration().derive( + setParamType(query.configuration().dialect(), query.configuration().settings()) + )); + + return new Rendered(render.paramType(NAMED).visit(query).render(), render.bindValues(), render.skipUpdateCounts()); + } + // ------------------------------------------------------------------------- // JDBC to R2DBC bridges for better interop, where it doesn't matter // -------------------------------------------------------------------------