[jOOQ/jOOQ#11462] LoaderOptionsStep.commitAfter() is not implemented correctly

This commit is contained in:
Lukas Eder 2021-06-08 14:57:09 +02:00
parent 68858c390f
commit df155878e5
2 changed files with 25 additions and 24 deletions

View File

@ -184,7 +184,8 @@ final class LoaderImpl<R extends Record> implements
private int processed;
private int stored;
private int executed;
private int buffered;
private int unexecuted;
private int uncommitted;
private final List<LoaderError> errors;
LoaderImpl(Configuration configuration, Table<R> table) {
@ -821,7 +822,8 @@ final class LoaderImpl<R extends Record> implements
// appropriate target types. But beware of SQL dialects that tend to
// need very explicit casting of bind values (e.g. Firebird)
processed++;
buffered++;
unexecuted++;
uncommitted++;
if (insert == null)
insert = ctx.insertQuery(table);
@ -887,27 +889,27 @@ final class LoaderImpl<R extends Record> implements
// in INSERT .. ON DUPLICATE KEY UPDATE statements, but
// 1 = INSERT, 2 = UPDATE, instead
if (onDuplicate == ON_DUPLICATE_KEY_UPDATE && NO_SUPPORT_ROWCOUNT_ON_DUPLICATE.contains(ctx.dialect()))
totalRowCounts = buffered;
totalRowCounts = unexecuted;
else
for (int rowCount : rowcounts)
totalRowCounts += rowCount;
stored += totalRowCounts;
ignored += buffered - totalRowCounts;
ignored += unexecuted - totalRowCounts;
executed++;
buffered = 0;
unexecuted = 0;
bind = null;
insert = null;
if (commit == COMMIT_AFTER)
if ((processed % batchAfter == 0) && ((processed / batchAfter) % commitAfter == 0))
if ((processed % (bulkAfter * batchAfter) == 0) && ((processed / (bulkAfter * batchAfter)) % commitAfter == 0))
commit();
}
catch (DataAccessException e) {
errors.add(new LoaderErrorImpl(e, row, processed - 1, insert));
ignored += buffered;
buffered = 0;
ignored += unexecuted;
unexecuted = 0;
if (onError == ON_ERROR_ABORT)
break execution;
@ -922,28 +924,32 @@ final class LoaderImpl<R extends Record> implements
}
// Execute remaining batch
if (buffered != 0) {
if (unexecuted != 0) {
try {
if (bind != null)
bind.execute();
if (insert != null)
insert.execute();
stored += buffered;
stored += unexecuted;
executed++;
buffered = 0;
unexecuted = 0;
}
catch (DataAccessException e) {
errors.add(new LoaderErrorImpl(e, row, processed - 1, insert));
ignored += buffered;
buffered = 0;
ignored += unexecuted;
unexecuted = 0;
}
if (onError == ON_ERROR_ABORT)
break execution;
}
// Commit remaining elements in COMMIT_AFTER mode
if (commit == COMMIT_AFTER && uncommitted != 0)
commit();
if (onError == ON_ERROR_ABORT)
break execution;
// execution:
}
@ -954,14 +960,8 @@ final class LoaderImpl<R extends Record> implements
stored = 0;
rollback();
}
else {
else
commit();
}
}
// Commit remaining elements in COMMIT_AFTER mode
else if (commit == COMMIT_AFTER) {
commit();
}
}
catch (DataAccessException e) {
@ -971,6 +971,7 @@ final class LoaderImpl<R extends Record> implements
private final void commit() {
configuration.dsl().connection(Connection::commit);
uncommitted = 0;
}
private final void rollback() {

View File

@ -45,7 +45,7 @@
<javax.activation.version>1.2.0</javax.activation.version>
<!-- Scala versions -->
<scala.version>2.13.5</scala.version>
<scala.version>2.13.6</scala.version>
<!-- Kotlin versions -->
<kotlin.version>1.3.71</kotlin.version>