[jOOQ/jOOQ#11700] Support BatchSingle

This commit is contained in:
Lukas Eder 2021-03-31 14:25:30 +02:00
parent 4f17f0ea29
commit dcbcdf3409
5 changed files with 307 additions and 63 deletions

View File

@ -46,8 +46,8 @@ import org.jooq.ExecuteListener;
import org.jooq.Query;
import org.jooq.conf.SettingsTools;
import org.jooq.exception.ControlFlowSignal;
import org.jooq.impl.R2DBC.BatchMultipleSubscription;
import org.jooq.impl.R2DBC.RowCountSubscriber;
import org.jooq.impl.R2DBC.BatchMultipleSubscriber;
import org.jooq.impl.R2DBC.BatchSubscription;
import org.reactivestreams.Subscriber;
@ -81,7 +81,7 @@ final class BatchMultiple extends AbstractBatch {
ConnectionFactory cf = configuration.connectionFactory();
if (!(cf instanceof NoConnectionFactory))
subscriber.onSubscribe(new BatchMultipleSubscription(this, subscriber, (t, u) -> new RowCountSubscriber(u, queries.length)));
subscriber.onSubscribe(new BatchSubscription<>(this, subscriber, s -> new BatchMultipleSubscriber(this, s)));
// TODO: [#11700] Implement this
else

View File

@ -59,8 +59,14 @@ import org.jooq.Param;
import org.jooq.Query;
import org.jooq.conf.SettingsTools;
import org.jooq.exception.ControlFlowSignal;
import org.jooq.impl.R2DBC.BatchSingleSubscriber;
import org.jooq.impl.R2DBC.BatchSubscription;
import org.jooq.tools.JooqLogger;
import org.reactivestreams.Subscriber;
import io.r2dbc.spi.ConnectionFactory;
/**
* @author Lukas Eder
*/
@ -69,13 +75,13 @@ final class BatchSingle extends AbstractBatch implements BatchBindStep {
/**
* Generated UID
*/
private static final long serialVersionUID = 3793967258181493207L;
private static final JooqLogger log = JooqLogger.getLogger(BatchSingle.class);
private static final long serialVersionUID = 3793967258181493207L;
private static final JooqLogger log = JooqLogger.getLogger(BatchSingle.class);
private final Query query;
private final Map<String, List<Integer>> nameToIndexMapping;
private final List<Object[]> allBindValues;
private final int expectedBindValues;
final Query query;
final Map<String, List<Integer>> nameToIndexMapping;
final List<Object[]> allBindValues;
final int expectedBindValues;
public BatchSingle(Configuration configuration, Query query) {
super(configuration);
@ -141,6 +147,18 @@ final class BatchSingle extends AbstractBatch implements BatchBindStep {
return allBindValues.size();
}
@Override
public final void subscribe(Subscriber<? super Integer> subscriber) {
ConnectionFactory cf = configuration.connectionFactory();
if (!(cf instanceof NoConnectionFactory))
subscriber.onSubscribe(new BatchSubscription<>(this, subscriber, s -> new BatchSingleSubscriber(this, s)));
// TODO: [#11700] Implement this
else
throw new UnsupportedOperationException();
}
@Override
public final int[] execute() {
@ -162,7 +180,7 @@ final class BatchSingle extends AbstractBatch implements BatchBindStep {
return executePrepared();
}
private final void checkBindValues() {
final void checkBindValues() {
// [#4071] Help users debug cases where bind value counts don't match the expected number
// [#5362] Don't do this for plain SQL queries
@ -177,15 +195,7 @@ final class BatchSingle extends AbstractBatch implements BatchBindStep {
ExecuteListener listener = ExecuteListeners.get(ctx);
Connection connection = ctx.connection();
// [#1371] fetch bind variables to restore them again, later
// [#3940] Don't include inlined bind variables
// [#4062] Make sure we collect also repeated named parameters
ParamCollector collector = new ParamCollector(configuration, false);
collector.visit(query);
Param<?>[] params = new Param[collector.resultList.size()];
Iterator<Entry<String, Param<?>>> it = collector.resultList.iterator();
for (int i = 0; it.hasNext(); i++)
params[i] = it.next().getValue();
Param<?>[] params = extractParams();
try {
// [#8968] Keep start() event inside of lifecycle management
@ -252,6 +262,20 @@ final class BatchSingle extends AbstractBatch implements BatchBindStep {
}
}
final Param<?>[] extractParams() {
// [#1371] fetch bind variables to restore them again, later
// [#3940] Don't include inlined bind variables
// [#4062] Make sure we collect also repeated named parameters
ParamCollector collector = new ParamCollector(configuration, false);
collector.visit(query);
Param<?>[] params = new Param[collector.resultList.size()];
Iterator<Entry<String, Param<?>>> it = collector.resultList.iterator();
for (int i = 0; it.hasNext(); i++)
params[i] = it.next().getValue();
return params;
}
private final int[] executeStatic() {
List<Query> queries = new ArrayList<>(allBindValues.size());

View File

@ -61,6 +61,14 @@ import org.jooq.InsertResultStep;
import org.jooq.InsertSetMoreStep;
import org.jooq.InsertSetStep;
import org.jooq.InsertValuesStep1;
import org.jooq.InsertValuesStep2;
import org.jooq.InsertValuesStep3;
import org.jooq.InsertValuesStep4;
import org.jooq.InsertValuesStep5;
import org.jooq.InsertValuesStep6;
import org.jooq.InsertValuesStep7;
import org.jooq.InsertValuesStep8;
import org.jooq.InsertValuesStep9;
import org.jooq.InsertValuesStep10;
import org.jooq.InsertValuesStep11;
import org.jooq.InsertValuesStep12;
@ -71,23 +79,23 @@ import org.jooq.InsertValuesStep16;
import org.jooq.InsertValuesStep17;
import org.jooq.InsertValuesStep18;
import org.jooq.InsertValuesStep19;
import org.jooq.InsertValuesStep2;
import org.jooq.InsertValuesStep20;
import org.jooq.InsertValuesStep21;
import org.jooq.InsertValuesStep22;
import org.jooq.InsertValuesStep3;
import org.jooq.InsertValuesStep4;
import org.jooq.InsertValuesStep5;
import org.jooq.InsertValuesStep6;
import org.jooq.InsertValuesStep7;
import org.jooq.InsertValuesStep8;
import org.jooq.InsertValuesStep9;
import org.jooq.InsertValuesStepN;
import org.jooq.Name;
import org.jooq.Operator;
import org.jooq.QueryPart;
import org.jooq.Record;
import org.jooq.Record1;
import org.jooq.Record2;
import org.jooq.Record3;
import org.jooq.Record4;
import org.jooq.Record5;
import org.jooq.Record6;
import org.jooq.Record7;
import org.jooq.Record8;
import org.jooq.Record9;
import org.jooq.Record10;
import org.jooq.Record11;
import org.jooq.Record12;
@ -98,19 +106,20 @@ import org.jooq.Record16;
import org.jooq.Record17;
import org.jooq.Record18;
import org.jooq.Record19;
import org.jooq.Record2;
import org.jooq.Record20;
import org.jooq.Record21;
import org.jooq.Record22;
import org.jooq.Record3;
import org.jooq.Record4;
import org.jooq.Record5;
import org.jooq.Record6;
import org.jooq.Record7;
import org.jooq.Record8;
import org.jooq.Record9;
import org.jooq.Result;
import org.jooq.Row;
import org.jooq.Row1;
import org.jooq.Row2;
import org.jooq.Row3;
import org.jooq.Row4;
import org.jooq.Row5;
import org.jooq.Row6;
import org.jooq.Row7;
import org.jooq.Row8;
import org.jooq.Row9;
import org.jooq.Row10;
import org.jooq.Row11;
import org.jooq.Row12;
@ -121,17 +130,9 @@ import org.jooq.Row16;
import org.jooq.Row17;
import org.jooq.Row18;
import org.jooq.Row19;
import org.jooq.Row2;
import org.jooq.Row20;
import org.jooq.Row21;
import org.jooq.Row22;
import org.jooq.Row3;
import org.jooq.Row4;
import org.jooq.Row5;
import org.jooq.Row6;
import org.jooq.Row7;
import org.jooq.Row8;
import org.jooq.Row9;
import org.jooq.SQL;
import org.jooq.Select;
import org.jooq.SelectField;

View File

@ -0,0 +1,164 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Other licenses:
* -----------------------------------------------------------------------------
* Commercial licenses for this work are available. These replace the above
* ASL 2.0 and offer limited warranties, support, maintenance, and commercial
* database integrations.
*
* For more information, please visit: http://www.jooq.org/licenses
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/
package org.jooq.impl;
import static org.jooq.impl.DefaultExecuteContext.localConnection;
import static org.jooq.impl.DefaultExecuteContext.localTargetConnection;
import java.sql.NClob;
import java.sql.SQLException;
import java.sql.Types;
import org.jooq.Binding;
import org.jooq.BindingGetResultSetContext;
import org.jooq.BindingGetSQLInputContext;
import org.jooq.BindingGetStatementContext;
import org.jooq.BindingRegisterContext;
import org.jooq.BindingSQLContext;
import org.jooq.BindingSetSQLOutputContext;
import org.jooq.BindingSetStatementContext;
import org.jooq.Converter;
import org.jooq.Converters;
import org.jooq.ResourceManagingScope;
import org.jooq.tools.jdbc.JDBCUtils;
// ...
/**
* A binding that takes binary values but binds them as {@link NClob} to at the
* JDBC level.
* <p>
* This is useful for workarounds for bugs in Oracle, like ORA-01461: can bind a
* LONG value only for insert into a LONG column (see [#4091])
*
* @author Lukas Eder
*/
public class NClobBinding implements Binding<String, String> {
/**
* Generated UID
*/
private static final long serialVersionUID = 358789452467943117L;
@Override
public final Converter<String, String> converter() {
return Converters.identity(String.class);
}
@Override
public final void sql(BindingSQLContext<String> ctx) throws SQLException {
ctx.render().visit(DSL.val(ctx.value(), SQLDataType.NCLOB));
}
@Override
public final void register(BindingRegisterContext<String> ctx) throws SQLException {
ctx.statement().registerOutParameter(ctx.index(), Types.NCLOB);
}
@Override
public final void set(BindingSetStatementContext<String> ctx) throws SQLException {
ctx.statement().setClob(ctx.index(), newNClob(ctx, ctx.value()));
}
@Override
public final void set(BindingSetSQLOutputContext<String> ctx) throws SQLException {
ctx.output().writeClob(newNClob(ctx, ctx.value()));
}
@Override
public final void get(BindingGetResultSetContext<String> ctx) throws SQLException {
NClob clob = ctx.resultSet().getNClob(ctx.index());
try {
ctx.value(clob == null ? null : clob.getSubString(1, (int) clob.length()));
}
finally {
JDBCUtils.safeFree(clob);
}
}
@Override
public final void get(BindingGetStatementContext<String> ctx) throws SQLException {
NClob clob = ctx.statement().getNClob(ctx.index());
try {
ctx.value(clob == null ? null : clob.getSubString(1, (int) clob.length()));
}
finally {
JDBCUtils.safeFree(clob);
}
}
@Override
public final void get(BindingGetSQLInputContext<String> ctx) throws SQLException {
NClob clob = ctx.input().readNClob();
try {
ctx.value(clob == null ? null : clob.getSubString(1, (int) clob.length()));
}
finally {
JDBCUtils.safeFree(clob);
}
}
static final NClob newNClob(ResourceManagingScope scope, String string) throws SQLException {
NClob clob;
switch (scope.dialect()) {
default: {
clob = localConnection().createNClob();
break;
}
}
scope.autoFree(clob);
clob.setString(1, string);
return clob;
}
}

View File

@ -40,7 +40,10 @@ package org.jooq.impl;
import static org.jooq.Publisher.subscriber;
// ...
import static org.jooq.conf.ParamType.NAMED;
import static org.jooq.impl.Tools.EMPTY_PARAM;
import static org.jooq.impl.Tools.fields;
import static org.jooq.impl.Tools.recordFactory;
import static org.jooq.impl.Tools.visitAll;
import static org.jooq.tools.StringUtils.defaultIfNull;
import java.math.BigDecimal;
@ -70,6 +73,7 @@ import org.jooq.Configuration;
import org.jooq.Cursor;
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.Param;
// ...
import org.jooq.Query;
import org.jooq.Record;
@ -77,6 +81,7 @@ import org.jooq.SQLDialect;
import org.jooq.conf.Settings;
import org.jooq.conf.SettingsTools;
import org.jooq.impl.DefaultRenderContext.Rendered;
import org.jooq.impl.R2DBC.RowCountSubscriber;
import org.jooq.tools.Convert;
import org.jooq.tools.JooqLogger;
import org.jooq.tools.jdbc.DefaultPreparedStatement;
@ -284,17 +289,13 @@ final class R2DBC {
@Override
final void onNext0(Connection c) {
try {
DefaultRenderContext render = new DefaultRenderContext(query.configuration().derive(
setParamType(query.configuration().dialect(), query.configuration().settings())
));
Rendered rendered = new Rendered(render.paramType(NAMED).visit(query).render(), render.bindValues(), render.skipUpdateCounts());
Rendered rendered = rendered(query);
Statement stmt = c.createStatement(rendered.sql);
new DefaultBindContext(query.configuration(), new R2DBCPreparedStatement(query.configuration(), stmt)).visit(rendered.bindValues);
// TODO: Reuse org.jooq.impl.Tools.setFetchSize(ExecuteContext ctx, int fetchSize)
if (query instanceof AbstractResultQuery) {
int f = SettingsTools.getFetchSize(((AbstractResultQuery<?>) query).fetchSize(), render.settings());
int f = SettingsTools.getFetchSize(((AbstractResultQuery<?>) query).fetchSize(), query.configuration().settings());
if (f != 0) {
if (log.isDebugEnabled())
log.debug("Setting fetch size", f);
@ -315,18 +316,15 @@ final class R2DBC {
static final class BatchMultipleSubscriber extends ConnectionSubscriber<Integer> {
final BatchMultiple batch;
final BiFunction<BatchMultiple, AbstractSubscription<Integer>, Subscriber<Result>> resultSubscriber;
final BatchMultiple batch;
BatchMultipleSubscriber(
BatchMultiple batch,
BatchMultipleSubscription downstream,
BiFunction<BatchMultiple, AbstractSubscription<Integer>, Subscriber<Result>> resultSubscriber
BatchSubscription<BatchMultiple> downstream
) {
super(downstream);
this.batch = batch;
this.resultSubscriber = resultSubscriber;
}
@Override
@ -337,7 +335,52 @@ final class R2DBC {
for (int i = 0; i < batch.queries.length; i++)
b = b.add(DSL.using(batch.configuration).renderInlined(batch.queries[i]));
b.execute().subscribe(resultSubscriber.apply(batch, downstream));
b.execute().subscribe(new RowCountSubscriber(downstream, batch.size()));
}
// TODO: More specific error handling
catch (Throwable t) {
onError(t);
}
}
}
static final class BatchSingleSubscriber extends ConnectionSubscriber<Integer> {
final BatchSingle batch;
BatchSingleSubscriber(
BatchSingle batch,
BatchSubscription<BatchSingle> downstream
) {
super(downstream);
this.batch = batch;
}
@Override
final void onNext0(Connection c) {
try {
batch.checkBindValues();
Rendered rendered = rendered(batch.query);
Statement stmt = c.createStatement(rendered.sql);
Param<?>[] params = rendered.bindValues.toArray(EMPTY_PARAM);
for (Object[] bindValues : batch.allBindValues) {
// [#1371] [#2139] Don't bind variables directly onto statement, bind them through the collected params
// list to preserve type information
// [#3547] The original query may have no Params specified - e.g. when it was constructed with
// plain SQL. In that case, infer the bind value type directly from the bind value
visitAll(new DefaultBindContext(batch.configuration, new R2DBCPreparedStatement(batch.query.configuration(), stmt)),
(params.length > 0)
? fields(bindValues, params)
: fields(bindValues));
stmt = stmt.add();
}
stmt.execute().subscribe(new RowCountSubscriber(downstream, batch.size()));
}
// TODO: More specific error handling
@ -409,26 +452,38 @@ final class R2DBC {
}
}
static final class BatchMultipleSubscription extends AbstractSubscription<Integer> {
static final class BatchSubscription<B extends AbstractBatch> extends AbstractSubscription<Integer> {
final BatchMultipleSubscriber batchMultipleSubscriber;
final ConnectionSubscriber<Integer> batchSubscriber;
BatchMultipleSubscription(
BatchMultiple batch,
BatchSubscription(
B batch,
Subscriber<? super Integer> subscriber,
BiFunction<BatchMultiple, AbstractSubscription<Integer>, Subscriber<Result>> resultSubscriber
Function<BatchSubscription<B>, ConnectionSubscriber<Integer>> batchSubscriber
) {
super(batch.configuration, subscriber);
this.batchMultipleSubscriber = new BatchMultipleSubscriber(batch, this, resultSubscriber);
this.batchSubscriber = batchSubscriber.apply(this);
}
@Override
final BatchMultipleSubscriber delegate() {
return batchMultipleSubscriber;
final ConnectionSubscriber<Integer> delegate() {
return batchSubscriber;
}
}
// -------------------------------------------------------------------------
// Internal R2DBC specific utilities
// -------------------------------------------------------------------------
private static final Rendered rendered(Query query) {
DefaultRenderContext render = new DefaultRenderContext(query.configuration().derive(
setParamType(query.configuration().dialect(), query.configuration().settings())
));
return new Rendered(render.paramType(NAMED).visit(query).render(), render.bindValues(), render.skipUpdateCounts());
}
// -------------------------------------------------------------------------
// JDBC to R2DBC bridges for better interop, where it doesn't matter
// -------------------------------------------------------------------------