From 24134dd65e1ddd93fcd39676be264aad3c343450 Mon Sep 17 00:00:00 2001 From: lukaseder Date: Thu, 5 Feb 2015 13:00:33 +0100 Subject: [PATCH] [#3975] Add support for bulk insertion in the Loader API --- jOOQ/src/main/java/org/jooq/Loader.java | 2 +- .../main/java/org/jooq/LoaderOptionsStep.java | 83 +++++++-- .../main/java/org/jooq/impl/LoaderImpl.java | 163 ++++++++++-------- 3 files changed, 164 insertions(+), 84 deletions(-) diff --git a/jOOQ/src/main/java/org/jooq/Loader.java b/jOOQ/src/main/java/org/jooq/Loader.java index c514cd13d8..11e1ce58c9 100644 --- a/jOOQ/src/main/java/org/jooq/Loader.java +++ b/jOOQ/src/main/java/org/jooq/Loader.java @@ -63,7 +63,7 @@ public interface Loader> { int processed(); /** - * The number of executed statements or batch statements. + * The number of executed statements, bulk statements, or batch statements. */ int executed(); diff --git a/jOOQ/src/main/java/org/jooq/LoaderOptionsStep.java b/jOOQ/src/main/java/org/jooq/LoaderOptionsStep.java index b8e83c017c..a5cdbde0e0 100644 --- a/jOOQ/src/main/java/org/jooq/LoaderOptionsStep.java +++ b/jOOQ/src/main/java/org/jooq/LoaderOptionsStep.java @@ -54,12 +54,26 @@ import java.sql.Connection; /** * The Loader API is used for configuring data loads. *

- * Add options to for the loading behaviour + * Add options to for the loading behaviour. For performance reasons, you can + * fine-tune three different types of measures: + *

* * @author Lukas Eder */ public interface LoaderOptionsStep> extends LoaderSourceStep { + // ------------------------------------------------------------------------- + // Duplicate handling + // ------------------------------------------------------------------------- + /** * Instruct the Loader to update duplicate records if the main * unique key's value is already in the database. This is only supported if @@ -106,6 +120,10 @@ public interface LoaderOptionsStep> extends LoaderSourc @Support LoaderOptionsStep onDuplicateKeyError(); + // ------------------------------------------------------------------------- + // Error handling + // ------------------------------------------------------------------------- + /** * Instruct the Loader to ignore any errors that might occur * when inserting a record. The Loader will then skip the @@ -132,8 +150,12 @@ public interface LoaderOptionsStep> extends LoaderSourc @Support LoaderOptionsStep onErrorAbort(); + // ------------------------------------------------------------------------- + // Commit strategy + // ------------------------------------------------------------------------- + /** - * Commit each loaded record or each batch. + * Commit each batch. *

* This is the same as calling {@link #commitAfter(int)} with 1 * as parameter. @@ -156,8 +178,7 @@ public interface LoaderOptionsStep> extends LoaderSourc LoaderOptionsStep commitEach(); /** - * Commit after a certain number of inserted records or after a certain - * number of batches. + * Commit after a certain number of batches. *

* With this clause, errors will never result in a rollback, even when you * specify {@link #onDuplicateKeyError()} or {@link #onErrorAbort()} @@ -179,9 +200,9 @@ public interface LoaderOptionsStep> extends LoaderSourc LoaderOptionsStep commitAfter(int number); /** - * Commit only after inserting all records or batches. If this is used - * together with {@link #onDuplicateKeyError()} or {@link #onErrorAbort()}, - * an abort will result in a rollback of previously loaded records. + * Commit only after inserting all batches. If this is used together with + * {@link #onDuplicateKeyError()} or {@link #onErrorAbort()}, an abort will + * result in a rollback of previously loaded records. *

* The COMMIT OPTIONS might be useful for fine-tuning performance behaviour * in some RDBMS, where large commits lead to a high level of concurrency in @@ -212,8 +233,12 @@ public interface LoaderOptionsStep> extends LoaderSourc @Support LoaderOptionsStep commitNone(); + // ------------------------------------------------------------------------- + // Batch strategy + // ------------------------------------------------------------------------- + /** - * Batch all statements in one JDBC batch statement. + * Batch all bulk statements in one JDBC batch statement. *

* If {@link #commitEach()} or {@link #commitAfter(int)} are set, this will * force the COMMIT option to {@link #commitAll()}. @@ -222,7 +247,7 @@ public interface LoaderOptionsStep> extends LoaderSourc LoaderOptionsStep batchAll(); /** - * Do not batch statements. + * Do not batch bulk statements together. *

* If you don't specify a BATCH OPTION, this will be the default. */ @@ -230,15 +255,45 @@ public interface LoaderOptionsStep> extends LoaderSourc LoaderOptionsStep batchNone(); /** - * Batch a given number of statements together. - *

- * If {@link #commitEach()} is set, each batch statement will be committed. - * If {@link #commitAfter(int)} is set, the given number of batch statements - * are committed. + * Batch a given number of bulk statements together. * * @param number The number of records that are batched together. */ @Support LoaderOptionsStep batchAfter(int number); + // ------------------------------------------------------------------------- + // Bulk strategy + // ------------------------------------------------------------------------- + + /** + * Bulk-insert all rows in a single multi-row bulk statement. + *

+ * If {@link #commitEach()} or {@link #commitAfter(int)} are set, this will + * force the COMMIT option to {@link #commitAll()}. + */ + @Support + LoaderOptionsStep bulkAll(); + + /** + * Do not bulk-insert rows in multi-row bulk statements. + *

+ * If you don't specify a BULK OPTION, this will be the default. + */ + @Support + LoaderOptionsStep bulkNone(); + + /** + * Bulk-insert a given number of statements in a single multi-row bulk + * statement. + *

+ * If {@link #commitEach()} is set, each bulk statement will be committed. + * If {@link #commitAfter(int)} is set, the given number of bulk statements + * are committed. + * + * @param number The number of records that are put together in one bulk + * statement. + */ + @Support + LoaderOptionsStep bulkAfter(int number); } diff --git a/jOOQ/src/main/java/org/jooq/impl/LoaderImpl.java b/jOOQ/src/main/java/org/jooq/impl/LoaderImpl.java index 8915e6c12b..62cfd0ee35 100644 --- a/jOOQ/src/main/java/org/jooq/impl/LoaderImpl.java +++ b/jOOQ/src/main/java/org/jooq/impl/LoaderImpl.java @@ -113,6 +113,10 @@ class LoaderImpl> implements private static final int BATCH_AFTER = 1; private static final int BATCH_ALL = 2; + private static final int BULK_NONE = 0; + private static final int BULK_AFTER = 1; + private static final int BULK_ALL = 2; + private static final int CONTENT_CSV = 0; private static final int CONTENT_XML = 1; private static final int CONTENT_JSON = 2; @@ -128,6 +132,8 @@ class LoaderImpl> implements private int commitAfter = 1; private int batch = BATCH_NONE; private int batchAfter = 1; + private int bulk = BULK_NONE; + private int bulkAfter = 1; private int content = CONTENT_CSV; private BufferedReader data; @@ -146,7 +152,7 @@ class LoaderImpl> implements private int processed; private int stored; private int executed; - private int batched; + private int buffered; private final List errors; LoaderImpl(Configuration configuration, Table table) { @@ -242,6 +248,25 @@ class LoaderImpl> implements return this; } + @Override + public final LoaderImpl bulkAll() { + bulk = BULK_ALL; + return this; + } + + @Override + public final LoaderImpl bulkNone() { + bulk = BULK_NONE; + return this; + } + + @Override + public final LoaderImpl bulkAfter(int number) { + bulk = BULK_AFTER; + bulkAfter = number; + return this; + } + @Override public final LoaderImpl loadCSV(File file) throws FileNotFoundException { content = CONTENT_CSV; @@ -413,6 +438,9 @@ class LoaderImpl> implements private void checkFlags() { if (batch != BATCH_NONE && onDuplicate == ON_DUPLICATE_KEY_IGNORE) throw new LoaderConfigurationException("Cannot apply batch loading with onDuplicateKeyIgnore flag. Turn off either flag."); + + if (bulk != BULK_NONE && onDuplicate != ON_DUPLICATE_KEY_ERROR) + throw new LoaderConfigurationException("Cannot apply bulk loading with onDuplicateKey flags. Turn off either flag."); } private void executeJSON() throws IOException { @@ -462,47 +490,24 @@ class LoaderImpl> implements rows: while (reader.hasNext() && ((row = reader.next()) != null)) { // [#1627] Handle NULL values - for (int i = 0; i < row.length; i++) { - if (StringUtils.equals(nullString, row[i])) { + for (int i = 0; i < row.length; i++) + if (StringUtils.equals(nullString, row[i])) row[i] = null; - } - } // TODO: In batch mode, we can probably optimise this by not creating // new statements every time, just to convert bind values to their // appropriate target types. But beware of SQL dialects that tend to // need very explicit casting of bind values (e.g. Firebird) processed++; - insert = create.insertQuery(table); - - for (int i = 0; i < row.length; i++) { - if (i < fields.length && fields[i] != null) { - addValue0(insert, fields[i], row[i]); - } - } - - // TODO: This is only supported by some dialects. Let other - // dialects execute a SELECT and then either an INSERT or UPDATE - if (onDuplicate == ON_DUPLICATE_KEY_UPDATE) { - insert.onDuplicateKeyUpdate(true); - - for (int i = 0; i < row.length; i++) { - if (i < fields.length && fields[i] != null && !primaryKey[i]) { - addValueForUpdate0(insert, fields[i], row[i]); - } - } - } // TODO: This can be implemented faster using a MERGE statement // in some dialects - else if (onDuplicate == ON_DUPLICATE_KEY_IGNORE) { + if (onDuplicate == ON_DUPLICATE_KEY_IGNORE) { SelectQuery select = create.selectQuery(table); - for (int i = 0; i < row.length; i++) { - if (i < fields.length && primaryKey[i]) { + for (int i = 0; i < row.length; i++) + if (i < fields.length && primaryKey[i]) select.addConditions(getCondition(fields[i], row[i])); - } - } try { if (select.execute() > 0) { @@ -515,77 +520,99 @@ class LoaderImpl> implements } } + buffered++; + + if (insert == null) + insert = create.insertQuery(table); + + for (int i = 0; i < row.length; i++) + if (i < fields.length && fields[i] != null) + addValue0(insert, fields[i], row[i]); + + // TODO: This is only supported by some dialects. Let other + // dialects execute a SELECT and then either an INSERT or UPDATE + if (onDuplicate == ON_DUPLICATE_KEY_UPDATE) { + insert.onDuplicateKeyUpdate(true); + + for (int i = 0; i < row.length; i++) + if (i < fields.length && fields[i] != null && !primaryKey[i]) + addValueForUpdate0(insert, fields[i], row[i]); + } + // Don't do anything. Let the execution fail else if (onDuplicate == ON_DUPLICATE_KEY_ERROR) {} try { - if (batch == BATCH_NONE) { - batched = 1; - insert.execute(); - stored++; - executed++; + if (bulk != BULK_NONE) { + if (bulk == BULK_ALL || processed % bulkAfter != 0) { + insert.newRecord(); + continue rows; + } } - else { + + if (batch != BATCH_NONE) { if (bind == null) bind = create.batch(insert); - batched++; bind.bind(insert.getBindValues().toArray()); + insert = null; + + if (batch == BATCH_ALL || processed % (bulkAfter * batchAfter) != 0) + continue rows; } - if (batch == BATCH_AFTER) { - if (processed % batchAfter == 0) { - if (bind != null) { - bind.execute(); - stored += batched; - batched = 0; - executed++; - bind = null; - } - } - } + if (bind != null) + bind.execute(); + else if (insert != null) + insert.execute(); + stored += buffered; + executed++; - if (commit == COMMIT_AFTER) { - if ((processed % batchAfter == 0) && ((processed / batchAfter) % commitAfter == 0)) { + buffered = 0; + bind = null; + insert = null; + + if (commit == COMMIT_AFTER) + if ((processed % batchAfter == 0) && ((processed / batchAfter) % commitAfter == 0)) commit(); - } - } } catch (DataAccessException e) { errors.add(new LoaderErrorImpl(e, row, processed - 1, insert)); - ignored += batched; - batched = 0; + ignored += buffered; + buffered = 0; - if (onError == ON_ERROR_ABORT) { + if (onError == ON_ERROR_ABORT) break execution; - } } - // rowloop + // rows: } // Execute remaining batch - if (bind != null) { + if (buffered != 0) { try { - bind.execute(); - stored += batched; - batched = 0; + if (bind != null) + bind.execute(); + if (insert != null) + insert.execute(); + + stored += buffered; executed++; - bind = null; + + buffered = 0; } catch (DataAccessException e) { errors.add(new LoaderErrorImpl(e, row, processed - 1, insert)); - ignored += batched; - batched = 0; + ignored += buffered; + buffered = 0; } - if (onError == ON_ERROR_ABORT) { + if (onError == ON_ERROR_ABORT) break execution; - } } - // executionBlock + // execution: } // Rollback on errors in COMMIT_ALL mode @@ -602,9 +629,7 @@ class LoaderImpl> implements // Commit remaining elements in COMMIT_AFTER mode else if (commit == COMMIT_AFTER) { - if ((processed % batchAfter != 0) || ((processed / batchAfter) % commitAfter != 0)) { - commit(); - } + commit(); } } catch (DataAccessException e) {