From 84ad8608787022444710629ba8230929fcbd4f9f Mon Sep 17 00:00:00 2001 From: Lukas Eder Date: Fri, 3 Jul 2020 17:33:05 +0200 Subject: [PATCH] [jOOQ/jOOQ#7253] Add support for Loader.batchAll() with onDuplicateKeyIgnore() --- jOOQ/src/main/java/org/jooq/Loader.java | 8 ++ .../main/java/org/jooq/impl/LoaderImpl.java | 130 +++++++++--------- 2 files changed, 73 insertions(+), 65 deletions(-) diff --git a/jOOQ/src/main/java/org/jooq/Loader.java b/jOOQ/src/main/java/org/jooq/Loader.java index 43c9105b2e..3f0567fda5 100644 --- a/jOOQ/src/main/java/org/jooq/Loader.java +++ b/jOOQ/src/main/java/org/jooq/Loader.java @@ -37,6 +37,7 @@ */ package org.jooq; +import java.sql.Statement; import java.util.List; import org.jetbrains.annotations.NotNull; @@ -69,6 +70,13 @@ public interface Loader { /** * The number of ignored rows. + *

+ * If using {@link LoaderOptionsStep#onDuplicateKeyIgnore()} along with + * {@link LoaderOptionsStep#batchAll()} or + * {@link LoaderOptionsStep#batchAfter(int)}, it may be possible that some + * dialects will not produce the correct ignored count, as the respective + * JDBC drivers cannot produce this count over + * {@link Statement#executeBatch()} and related methods. */ int ignored(); diff --git a/jOOQ/src/main/java/org/jooq/impl/LoaderImpl.java b/jOOQ/src/main/java/org/jooq/impl/LoaderImpl.java index 25729b1351..2fe56f6e01 100644 --- a/jOOQ/src/main/java/org/jooq/impl/LoaderImpl.java +++ b/jOOQ/src/main/java/org/jooq/impl/LoaderImpl.java @@ -37,6 +37,10 @@ */ package org.jooq.impl; +// ... +import static org.jooq.SQLDialect.MARIADB; +// ... +import static org.jooq.SQLDialect.MYSQL; import static org.jooq.impl.Tools.EMPTY_FIELD; import java.io.File; @@ -53,6 +57,7 @@ import java.util.BitSet; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.stream.Stream; import javax.xml.bind.DatatypeConverter; @@ -78,7 +83,7 @@ import org.jooq.LoaderRowsStep; import org.jooq.LoaderXMLStep; import org.jooq.Record; import org.jooq.Result; -import org.jooq.SelectQuery; +import org.jooq.SQLDialect; import org.jooq.Source; import org.jooq.Table; import org.jooq.exception.DataAccessException; @@ -106,57 +111,58 @@ final class LoaderImpl implements LoaderJSONOptionsStep, Loader { - private static final JooqLogger log = JooqLogger.getLogger(LoaderImpl.class); + private static final JooqLogger log = JooqLogger.getLogger(LoaderImpl.class); + private static final Set NO_SUPPORT_ROWCOUNT_ON_DUPLICATE = SQLDialect.supportedBy(MARIADB, MYSQL); // Configuration constants // ----------------------- - private static final int ON_DUPLICATE_KEY_ERROR = 0; - private static final int ON_DUPLICATE_KEY_IGNORE = 1; - private static final int ON_DUPLICATE_KEY_UPDATE = 2; + private static final int ON_DUPLICATE_KEY_ERROR = 0; + private static final int ON_DUPLICATE_KEY_IGNORE = 1; + private static final int ON_DUPLICATE_KEY_UPDATE = 2; - private static final int ON_ERROR_ABORT = 0; - private static final int ON_ERROR_IGNORE = 1; + private static final int ON_ERROR_ABORT = 0; + private static final int ON_ERROR_IGNORE = 1; - private static final int COMMIT_NONE = 0; - private static final int COMMIT_AFTER = 1; - private static final int COMMIT_ALL = 2; + private static final int COMMIT_NONE = 0; + private static final int COMMIT_AFTER = 1; + private static final int COMMIT_ALL = 2; - private static final int BATCH_NONE = 0; - private static final int BATCH_AFTER = 1; - private static final int BATCH_ALL = 2; + private static final int BATCH_NONE = 0; + private static final int BATCH_AFTER = 1; + private static final int BATCH_ALL = 2; - private static final int BULK_NONE = 0; - private static final int BULK_AFTER = 1; - private static final int BULK_ALL = 2; + private static final int BULK_NONE = 0; + private static final int BULK_AFTER = 1; + private static final int BULK_ALL = 2; - private static final int CONTENT_CSV = 0; - private static final int CONTENT_XML = 1; - private static final int CONTENT_JSON = 2; - private static final int CONTENT_ARRAYS = 3; + private static final int CONTENT_CSV = 0; + private static final int CONTENT_XML = 1; + private static final int CONTENT_JSON = 2; + private static final int CONTENT_ARRAYS = 3; // Configuration data // ------------------ private final DSLContext create; private final Configuration configuration; private final Table table; - private int onDuplicate = ON_DUPLICATE_KEY_ERROR; - private int onError = ON_ERROR_ABORT; - private int commit = COMMIT_NONE; - private int commitAfter = 1; - private int batch = BATCH_NONE; - private int batchAfter = 1; - private int bulk = BULK_NONE; - private int bulkAfter = 1; - private int content = CONTENT_CSV; + private int onDuplicate = ON_DUPLICATE_KEY_ERROR; + private int onError = ON_ERROR_ABORT; + private int commit = COMMIT_NONE; + private int commitAfter = 1; + private int batch = BATCH_NONE; + private int batchAfter = 1; + private int bulk = BULK_NONE; + private int bulkAfter = 1; + private int content = CONTENT_CSV; private Source input; private Iterator arrays; // CSV configuration data // ---------------------- - private int ignoreRows = 1; - private char quote = CSVParser.DEFAULT_QUOTE_CHARACTER; - private char separator = CSVParser.DEFAULT_SEPARATOR; - private String nullString = null; + private int ignoreRows = 1; + private char quote = CSVParser.DEFAULT_QUOTE_CHARACTER; + private char separator = CSVParser.DEFAULT_SEPARATOR; + private String nullString = null; private Field[] source; private Field[] fields; private LoaderFieldMapper fieldMapper; @@ -166,7 +172,7 @@ final class LoaderImpl implements // Result data // ----------- private LoaderRowListener listener; - private LoaderContext result = new DefaultLoaderContext(); + private LoaderContext result = new DefaultLoaderContext(); private int ignored; private int processed; private int stored; @@ -660,9 +666,6 @@ final class LoaderImpl implements } private void checkFlags() { - if (batch != BATCH_NONE && onDuplicate == ON_DUPLICATE_KEY_IGNORE) - throw new LoaderConfigurationException("Cannot apply batch loading with onDuplicateKeyIgnore flag. Turn off either flag."); - if (bulk != BULK_NONE && onDuplicate != ON_DUPLICATE_KEY_ERROR) throw new LoaderConfigurationException("Cannot apply bulk loading with onDuplicateKey flags. Turn off either flag."); } @@ -764,29 +767,6 @@ final class LoaderImpl implements // appropriate target types. But beware of SQL dialects that tend to // need very explicit casting of bind values (e.g. Firebird) processed++; - - // TODO: This can be implemented faster using a MERGE statement - // in some dialects - // [#5200] When the primary key is not supplied in the data, - // we'll assume it uses an identity, and there will never be duplicates - if (onDuplicate == ON_DUPLICATE_KEY_IGNORE && primaryKey.cardinality() > 0) { - SelectQuery select = create.selectQuery(table); - - for (int i = 0; i < row.length; i++) - if (i < fields.length && primaryKey.get(i)) - select.addConditions(getCondition(fields[i], row[i])); - - try { - if (create.fetchExists(select)) { - ignored++; - continue rows; - } - } - catch (DataAccessException e) { - errors.add(new LoaderErrorImpl(e, row, processed - 1, select)); - } - } - buffered++; if (insert == null) @@ -811,6 +791,13 @@ final class LoaderImpl implements addValueForUpdate0(insert, fields[i], row[i]); } + // [#5200] When the primary key is not supplied in the data, + // we'll assume it uses an identity, and there will never be duplicates + // [#7253] Use native onDuplicateKeyIgnore() support + else if (onDuplicate == ON_DUPLICATE_KEY_IGNORE && primaryKey.cardinality() > 0) { + insert.onDuplicateKeyIgnore(true); + } + // Don't do anything. Let the execution fail else if (onDuplicate == ON_DUPLICATE_KEY_ERROR) {} @@ -833,12 +820,25 @@ final class LoaderImpl implements continue rows; } - if (bind != null) - bind.execute(); - else if (insert != null) - insert.execute(); + int[] rowcounts = { 0 }; + int totalRowCounts = 0; - stored += buffered; + if (bind != null) + rowcounts = bind.execute(); + else if (insert != null) + rowcounts = new int[] { insert.execute() }; + + // [#10358] The MySQL dialect category doesn't return rowcounts + // in INSERT .. ON DUPLICATE KEY UPDATE statements, but + // 1 = INSERT, 2 = UPDATE, instead + if (onDuplicate == ON_DUPLICATE_KEY_UPDATE && NO_SUPPORT_ROWCOUNT_ON_DUPLICATE.contains(create.dialect())) + totalRowCounts = buffered; + else + for (int rowCount : rowcounts) + totalRowCounts += rowCount; + + stored += totalRowCounts; + ignored += buffered - totalRowCounts; executed++; buffered = 0;