[jOOQ/jOOQ#7253] Add support for Loader.batchAll() with onDuplicateKeyIgnore()

This commit is contained in:
Lukas Eder 2020-07-03 17:33:05 +02:00
parent 82dc4b3f7f
commit 84ad860878
2 changed files with 73 additions and 65 deletions

View File

@ -37,6 +37,7 @@
*/
package org.jooq;
import java.sql.Statement;
import java.util.List;
import org.jetbrains.annotations.NotNull;
@ -69,6 +70,13 @@ public interface Loader<R extends Record> {
/**
* The number of ignored rows.
* <p>
* If using {@link LoaderOptionsStep#onDuplicateKeyIgnore()} along with
* {@link LoaderOptionsStep#batchAll()} or
* {@link LoaderOptionsStep#batchAfter(int)}, it may be possible that some
* dialects will not produce the correct ignored count, as the respective
* JDBC drivers cannot produce this count over
* {@link Statement#executeBatch()} and related methods.
*/
int ignored();

View File

@ -37,6 +37,10 @@
*/
package org.jooq.impl;
// ...
import static org.jooq.SQLDialect.MARIADB;
// ...
import static org.jooq.SQLDialect.MYSQL;
import static org.jooq.impl.Tools.EMPTY_FIELD;
import java.io.File;
@ -53,6 +57,7 @@ import java.util.BitSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;
import javax.xml.bind.DatatypeConverter;
@ -78,7 +83,7 @@ import org.jooq.LoaderRowsStep;
import org.jooq.LoaderXMLStep;
import org.jooq.Record;
import org.jooq.Result;
import org.jooq.SelectQuery;
import org.jooq.SQLDialect;
import org.jooq.Source;
import org.jooq.Table;
import org.jooq.exception.DataAccessException;
@ -106,57 +111,58 @@ final class LoaderImpl<R extends Record> implements
LoaderJSONOptionsStep<R>,
Loader<R> {
private static final JooqLogger log = JooqLogger.getLogger(LoaderImpl.class);
private static final JooqLogger log = JooqLogger.getLogger(LoaderImpl.class);
private static final Set<SQLDialect> NO_SUPPORT_ROWCOUNT_ON_DUPLICATE = SQLDialect.supportedBy(MARIADB, MYSQL);
// Configuration constants
// -----------------------
private static final int ON_DUPLICATE_KEY_ERROR = 0;
private static final int ON_DUPLICATE_KEY_IGNORE = 1;
private static final int ON_DUPLICATE_KEY_UPDATE = 2;
private static final int ON_DUPLICATE_KEY_ERROR = 0;
private static final int ON_DUPLICATE_KEY_IGNORE = 1;
private static final int ON_DUPLICATE_KEY_UPDATE = 2;
private static final int ON_ERROR_ABORT = 0;
private static final int ON_ERROR_IGNORE = 1;
private static final int ON_ERROR_ABORT = 0;
private static final int ON_ERROR_IGNORE = 1;
private static final int COMMIT_NONE = 0;
private static final int COMMIT_AFTER = 1;
private static final int COMMIT_ALL = 2;
private static final int COMMIT_NONE = 0;
private static final int COMMIT_AFTER = 1;
private static final int COMMIT_ALL = 2;
private static final int BATCH_NONE = 0;
private static final int BATCH_AFTER = 1;
private static final int BATCH_ALL = 2;
private static final int BATCH_NONE = 0;
private static final int BATCH_AFTER = 1;
private static final int BATCH_ALL = 2;
private static final int BULK_NONE = 0;
private static final int BULK_AFTER = 1;
private static final int BULK_ALL = 2;
private static final int BULK_NONE = 0;
private static final int BULK_AFTER = 1;
private static final int BULK_ALL = 2;
private static final int CONTENT_CSV = 0;
private static final int CONTENT_XML = 1;
private static final int CONTENT_JSON = 2;
private static final int CONTENT_ARRAYS = 3;
private static final int CONTENT_CSV = 0;
private static final int CONTENT_XML = 1;
private static final int CONTENT_JSON = 2;
private static final int CONTENT_ARRAYS = 3;
// Configuration data
// ------------------
private final DSLContext create;
private final Configuration configuration;
private final Table<R> table;
private int onDuplicate = ON_DUPLICATE_KEY_ERROR;
private int onError = ON_ERROR_ABORT;
private int commit = COMMIT_NONE;
private int commitAfter = 1;
private int batch = BATCH_NONE;
private int batchAfter = 1;
private int bulk = BULK_NONE;
private int bulkAfter = 1;
private int content = CONTENT_CSV;
private int onDuplicate = ON_DUPLICATE_KEY_ERROR;
private int onError = ON_ERROR_ABORT;
private int commit = COMMIT_NONE;
private int commitAfter = 1;
private int batch = BATCH_NONE;
private int batchAfter = 1;
private int bulk = BULK_NONE;
private int bulkAfter = 1;
private int content = CONTENT_CSV;
private Source input;
private Iterator<? extends Object[]> arrays;
// CSV configuration data
// ----------------------
private int ignoreRows = 1;
private char quote = CSVParser.DEFAULT_QUOTE_CHARACTER;
private char separator = CSVParser.DEFAULT_SEPARATOR;
private String nullString = null;
private int ignoreRows = 1;
private char quote = CSVParser.DEFAULT_QUOTE_CHARACTER;
private char separator = CSVParser.DEFAULT_SEPARATOR;
private String nullString = null;
private Field<?>[] source;
private Field<?>[] fields;
private LoaderFieldMapper fieldMapper;
@ -166,7 +172,7 @@ final class LoaderImpl<R extends Record> implements
// Result data
// -----------
private LoaderRowListener listener;
private LoaderContext result = new DefaultLoaderContext();
private LoaderContext result = new DefaultLoaderContext();
private int ignored;
private int processed;
private int stored;
@ -660,9 +666,6 @@ final class LoaderImpl<R extends Record> implements
}
private void checkFlags() {
if (batch != BATCH_NONE && onDuplicate == ON_DUPLICATE_KEY_IGNORE)
throw new LoaderConfigurationException("Cannot apply batch loading with onDuplicateKeyIgnore flag. Turn off either flag.");
if (bulk != BULK_NONE && onDuplicate != ON_DUPLICATE_KEY_ERROR)
throw new LoaderConfigurationException("Cannot apply bulk loading with onDuplicateKey flags. Turn off either flag.");
}
@ -764,29 +767,6 @@ final class LoaderImpl<R extends Record> implements
// appropriate target types. But beware of SQL dialects that tend to
// need very explicit casting of bind values (e.g. Firebird)
processed++;
// TODO: This can be implemented faster using a MERGE statement
// in some dialects
// [#5200] When the primary key is not supplied in the data,
// we'll assume it uses an identity, and there will never be duplicates
if (onDuplicate == ON_DUPLICATE_KEY_IGNORE && primaryKey.cardinality() > 0) {
SelectQuery<R> select = create.selectQuery(table);
for (int i = 0; i < row.length; i++)
if (i < fields.length && primaryKey.get(i))
select.addConditions(getCondition(fields[i], row[i]));
try {
if (create.fetchExists(select)) {
ignored++;
continue rows;
}
}
catch (DataAccessException e) {
errors.add(new LoaderErrorImpl(e, row, processed - 1, select));
}
}
buffered++;
if (insert == null)
@ -811,6 +791,13 @@ final class LoaderImpl<R extends Record> implements
addValueForUpdate0(insert, fields[i], row[i]);
}
// [#5200] When the primary key is not supplied in the data,
// we'll assume it uses an identity, and there will never be duplicates
// [#7253] Use native onDuplicateKeyIgnore() support
else if (onDuplicate == ON_DUPLICATE_KEY_IGNORE && primaryKey.cardinality() > 0) {
insert.onDuplicateKeyIgnore(true);
}
// Don't do anything. Let the execution fail
else if (onDuplicate == ON_DUPLICATE_KEY_ERROR) {}
@ -833,12 +820,25 @@ final class LoaderImpl<R extends Record> implements
continue rows;
}
if (bind != null)
bind.execute();
else if (insert != null)
insert.execute();
int[] rowcounts = { 0 };
int totalRowCounts = 0;
stored += buffered;
if (bind != null)
rowcounts = bind.execute();
else if (insert != null)
rowcounts = new int[] { insert.execute() };
// [#10358] The MySQL dialect category doesn't return rowcounts
// in INSERT .. ON DUPLICATE KEY UPDATE statements, but
// 1 = INSERT, 2 = UPDATE, instead
if (onDuplicate == ON_DUPLICATE_KEY_UPDATE && NO_SUPPORT_ROWCOUNT_ON_DUPLICATE.contains(create.dialect()))
totalRowCounts = buffered;
else
for (int rowCount : rowcounts)
totalRowCounts += rowCount;
stored += totalRowCounts;
ignored += buffered - totalRowCounts;
executed++;
buffered = 0;