[jOOQ/jOOQ#14840] Apply Settings.batchSize also to BatchSingle
This commit is contained in:
parent
d092dc9405
commit
5d84509b47
@ -106,6 +106,11 @@ final class BatchMultiple extends AbstractBatch {
|
||||
if (ctx.statement() == null)
|
||||
ctx.statement(new SettingsEnabledPreparedStatement(ctx.connection()));
|
||||
|
||||
// [#9295] use query timeout from settings
|
||||
int t = SettingsTools.getQueryTimeout(0, ctx.settings());
|
||||
if (t != 0)
|
||||
ctx.statement().setQueryTimeout(t);
|
||||
|
||||
String[] batchSQL = ctx.batchSQL();
|
||||
for (int i = 0; i < ctx.batchQueries().length; i++) {
|
||||
ctx.sql(null);
|
||||
@ -122,11 +127,6 @@ final class BatchMultiple extends AbstractBatch {
|
||||
listener.prepareEnd(ctx);
|
||||
}
|
||||
|
||||
// [#9295] use query timeout from settings
|
||||
int t = SettingsTools.getQueryTimeout(0, ctx.settings());
|
||||
if (t != 0)
|
||||
ctx.statement().setQueryTimeout(t);
|
||||
|
||||
listener.executeStart(ctx);
|
||||
|
||||
int[] result = ctx.statement().executeBatch();
|
||||
|
||||
@ -39,26 +39,29 @@ package org.jooq.impl;
|
||||
|
||||
import static org.jooq.conf.ParamType.INLINED;
|
||||
import static org.jooq.conf.SettingsTools.executeStaticStatements;
|
||||
import static org.jooq.conf.SettingsTools.getBatchSize;
|
||||
import static org.jooq.impl.Tools.checkedFunction;
|
||||
import static org.jooq.impl.Tools.chunks;
|
||||
import static org.jooq.impl.Tools.fields;
|
||||
import static org.jooq.impl.Tools.map;
|
||||
import static org.jooq.impl.Tools.visitAll;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.jooq.Batch;
|
||||
import org.jooq.BatchBindStep;
|
||||
import org.jooq.Configuration;
|
||||
import org.jooq.ExecuteContext;
|
||||
import org.jooq.ExecuteContext.BatchMode;
|
||||
import org.jooq.ExecuteListener;
|
||||
import org.jooq.Param;
|
||||
import org.jooq.Query;
|
||||
import org.jooq.ExecuteContext.BatchMode;
|
||||
import org.jooq.conf.SettingsTools;
|
||||
import org.jooq.exception.ControlFlowSignal;
|
||||
import org.jooq.impl.R2DBC.BatchSingleSubscriber;
|
||||
@ -231,16 +234,27 @@ final class BatchSingle extends AbstractBatch implements BatchBindStep {
|
||||
return result;
|
||||
}
|
||||
else {
|
||||
for (Object[] bindValues : allBindValues) {
|
||||
setBindValues(ctx, listener, params, bindValues);
|
||||
ctx.statement().addBatch();
|
||||
}
|
||||
AtomicBoolean reset = new AtomicBoolean();
|
||||
return chunks(allBindValues, getBatchSize(ctx.settings()))
|
||||
.stream()
|
||||
.map(checkedFunction(chunk -> {
|
||||
if (reset.get())
|
||||
ctx.statement().clearBatch();
|
||||
|
||||
listener.executeStart(ctx);
|
||||
int[] result = ctx.statement().executeBatch();
|
||||
setBatchRows(ctx, result);
|
||||
listener.executeEnd(ctx);
|
||||
return result;
|
||||
for (Object[] bindValues : chunk) {
|
||||
setBindValues(ctx, listener, params, bindValues);
|
||||
ctx.statement().addBatch();
|
||||
}
|
||||
|
||||
listener.executeStart(ctx);
|
||||
int[] result = ctx.statement().executeBatch();
|
||||
setBatchRows(ctx, result);
|
||||
listener.executeEnd(ctx);
|
||||
reset.set(true);
|
||||
return result;
|
||||
}))
|
||||
.flatMapToInt(IntStream::of)
|
||||
.toArray();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -2321,6 +2321,50 @@ final class Tools {
|
||||
return DSL.and(map(fields, Field::isNotNull));
|
||||
}
|
||||
|
||||
static final <T> List<List<T>> chunks(List<T> list, int size) {
|
||||
int l;
|
||||
|
||||
if (size <= 0 || size == Integer.MAX_VALUE || (l = list.size()) <= size)
|
||||
return asList(list);
|
||||
|
||||
List<List<T>> result = new ArrayList<>();
|
||||
int prev = 0, next = size;
|
||||
while (prev < l) {
|
||||
result.add(list.subList(prev, Math.min(next, l)));
|
||||
prev = next;
|
||||
next = next += size;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* "sneaky-throw" a checked exception or throwable.
|
||||
*/
|
||||
static final void throwChecked(Throwable t) {
|
||||
Tools.<RuntimeException>throwChecked0(t);
|
||||
}
|
||||
|
||||
/**
|
||||
* "sneaky-throw" a checked exception or throwable.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
static final <E extends Throwable> void throwChecked0(Throwable throwable) throws E {
|
||||
throw (E) throwable;
|
||||
}
|
||||
|
||||
static final <T, R> Function<T, R> checkedFunction(ThrowingFunction<T, R, Throwable> function) {
|
||||
return t -> {
|
||||
try {
|
||||
return function.apply(t);
|
||||
}
|
||||
catch (Throwable e) {
|
||||
throwChecked(e);
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Like <code>Stream.of(array).map(mapper).toArray(constructor)</code> but
|
||||
* without the entire stream pipeline.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user