[#3975] Add support for bulk insertion in the Loader API

This commit is contained in:
lukaseder 2015-02-05 13:00:33 +01:00
parent dcff603068
commit 24134dd65e
3 changed files with 164 additions and 84 deletions

View File

@ -63,7 +63,7 @@ public interface Loader<R extends TableRecord<R>> {
int processed();
/**
* The number of executed statements or batch statements.
* The number of executed statements, bulk statements, or batch statements.
*/
int executed();

View File

@ -54,12 +54,26 @@ import java.sql.Connection;
/**
* The <code>Loader</code> API is used for configuring data loads.
* <p>
* Add options to for the loading behaviour
* Add options to for the loading behaviour. For performance reasons, you can
* fine-tune three different types of measures:
* <ul>
* <li><strong>The bulk statement size</strong>. This specifies how many rows
* will be inserted in a single bulk statement / multi-row <code>INSERT</code>
* statement.</li>
* <li><strong>The batch statement size</strong>. This specifies how many bulk
* statements will be sent to the server as a single JDBC batch statement.</li>
* <li><strong>The commit size</strong>. This specifies how many batch
* statements will be committed in a single transaction.</li>
* </ul>
*
* @author Lukas Eder
*/
public interface LoaderOptionsStep<R extends TableRecord<R>> extends LoaderSourceStep<R> {
// -------------------------------------------------------------------------
// Duplicate handling
// -------------------------------------------------------------------------
/**
* Instruct the <code>Loader</code> to update duplicate records if the main
* unique key's value is already in the database. This is only supported if
@ -106,6 +120,10 @@ public interface LoaderOptionsStep<R extends TableRecord<R>> extends LoaderSourc
@Support
LoaderOptionsStep<R> onDuplicateKeyError();
// -------------------------------------------------------------------------
// Error handling
// -------------------------------------------------------------------------
/**
* Instruct the <code>Loader</code> to ignore any errors that might occur
* when inserting a record. The <code>Loader</code> will then skip the
@ -132,8 +150,12 @@ public interface LoaderOptionsStep<R extends TableRecord<R>> extends LoaderSourc
@Support
LoaderOptionsStep<R> onErrorAbort();
// -------------------------------------------------------------------------
// Commit strategy
// -------------------------------------------------------------------------
/**
* Commit each loaded record or each batch.
* Commit each batch.
* <p>
* This is the same as calling {@link #commitAfter(int)} with <code>1</code>
* as parameter.
@ -156,8 +178,7 @@ public interface LoaderOptionsStep<R extends TableRecord<R>> extends LoaderSourc
LoaderOptionsStep<R> commitEach();
/**
* Commit after a certain number of inserted records or after a certain
* number of batches.
* Commit after a certain number of batches.
* <p>
* With this clause, errors will never result in a rollback, even when you
* specify {@link #onDuplicateKeyError()} or {@link #onErrorAbort()}
@ -179,9 +200,9 @@ public interface LoaderOptionsStep<R extends TableRecord<R>> extends LoaderSourc
LoaderOptionsStep<R> commitAfter(int number);
/**
* Commit only after inserting all records or batches. If this is used
* together with {@link #onDuplicateKeyError()} or {@link #onErrorAbort()},
* an abort will result in a rollback of previously loaded records.
* Commit only after inserting all batches. If this is used together with
* {@link #onDuplicateKeyError()} or {@link #onErrorAbort()}, an abort will
* result in a rollback of previously loaded records.
* <p>
* The COMMIT OPTIONS might be useful for fine-tuning performance behaviour
* in some RDBMS, where large commits lead to a high level of concurrency in
@ -212,8 +233,12 @@ public interface LoaderOptionsStep<R extends TableRecord<R>> extends LoaderSourc
@Support
LoaderOptionsStep<R> commitNone();
// -------------------------------------------------------------------------
// Batch strategy
// -------------------------------------------------------------------------
/**
* Batch all statements in one JDBC batch statement.
* Batch all bulk statements in one JDBC batch statement.
* <p>
* If {@link #commitEach()} or {@link #commitAfter(int)} are set, this will
* force the <code>COMMIT</code> option to {@link #commitAll()}.
@ -222,7 +247,7 @@ public interface LoaderOptionsStep<R extends TableRecord<R>> extends LoaderSourc
LoaderOptionsStep<R> batchAll();
/**
* Do not batch statements.
* Do not batch bulk statements together.
* <p>
* If you don't specify a BATCH OPTION, this will be the default.
*/
@ -230,15 +255,45 @@ public interface LoaderOptionsStep<R extends TableRecord<R>> extends LoaderSourc
LoaderOptionsStep<R> batchNone();
/**
* Batch a given number of statements together.
* <p>
* If {@link #commitEach()} is set, each batch statement will be committed.
* If {@link #commitAfter(int)} is set, the given number of batch statements
* are committed.
* Batch a given number of bulk statements together.
*
* @param number The number of records that are batched together.
*/
@Support
LoaderOptionsStep<R> batchAfter(int number);
// -------------------------------------------------------------------------
// Bulk strategy
// -------------------------------------------------------------------------
/**
* Bulk-insert all rows in a single multi-row bulk statement.
* <p>
* If {@link #commitEach()} or {@link #commitAfter(int)} are set, this will
* force the <code>COMMIT</code> option to {@link #commitAll()}.
*/
@Support
LoaderOptionsStep<R> bulkAll();
/**
* Do not bulk-insert rows in multi-row bulk statements.
* <p>
* If you don't specify a BULK OPTION, this will be the default.
*/
@Support
LoaderOptionsStep<R> bulkNone();
/**
* Bulk-insert a given number of statements in a single multi-row bulk
* statement.
* <p>
* If {@link #commitEach()} is set, each bulk statement will be committed.
* If {@link #commitAfter(int)} is set, the given number of bulk statements
* are committed.
*
* @param number The number of records that are put together in one bulk
* statement.
*/
@Support
LoaderOptionsStep<R> bulkAfter(int number);
}

View File

@ -113,6 +113,10 @@ class LoaderImpl<R extends TableRecord<R>> implements
private static final int BATCH_AFTER = 1;
private static final int BATCH_ALL = 2;
private static final int BULK_NONE = 0;
private static final int BULK_AFTER = 1;
private static final int BULK_ALL = 2;
private static final int CONTENT_CSV = 0;
private static final int CONTENT_XML = 1;
private static final int CONTENT_JSON = 2;
@ -128,6 +132,8 @@ class LoaderImpl<R extends TableRecord<R>> implements
private int commitAfter = 1;
private int batch = BATCH_NONE;
private int batchAfter = 1;
private int bulk = BULK_NONE;
private int bulkAfter = 1;
private int content = CONTENT_CSV;
private BufferedReader data;
@ -146,7 +152,7 @@ class LoaderImpl<R extends TableRecord<R>> implements
private int processed;
private int stored;
private int executed;
private int batched;
private int buffered;
private final List<LoaderError> errors;
LoaderImpl(Configuration configuration, Table<R> table) {
@ -242,6 +248,25 @@ class LoaderImpl<R extends TableRecord<R>> implements
return this;
}
@Override
public final LoaderImpl<R> bulkAll() {
bulk = BULK_ALL;
return this;
}
@Override
public final LoaderImpl<R> bulkNone() {
bulk = BULK_NONE;
return this;
}
@Override
public final LoaderImpl<R> bulkAfter(int number) {
bulk = BULK_AFTER;
bulkAfter = number;
return this;
}
@Override
public final LoaderImpl<R> loadCSV(File file) throws FileNotFoundException {
content = CONTENT_CSV;
@ -413,6 +438,9 @@ class LoaderImpl<R extends TableRecord<R>> implements
private void checkFlags() {
if (batch != BATCH_NONE && onDuplicate == ON_DUPLICATE_KEY_IGNORE)
throw new LoaderConfigurationException("Cannot apply batch loading with onDuplicateKeyIgnore flag. Turn off either flag.");
if (bulk != BULK_NONE && onDuplicate != ON_DUPLICATE_KEY_ERROR)
throw new LoaderConfigurationException("Cannot apply bulk loading with onDuplicateKey flags. Turn off either flag.");
}
private void executeJSON() throws IOException {
@ -462,47 +490,24 @@ class LoaderImpl<R extends TableRecord<R>> implements
rows: while (reader.hasNext() && ((row = reader.next()) != null)) {
// [#1627] Handle NULL values
for (int i = 0; i < row.length; i++) {
if (StringUtils.equals(nullString, row[i])) {
for (int i = 0; i < row.length; i++)
if (StringUtils.equals(nullString, row[i]))
row[i] = null;
}
}
// TODO: In batch mode, we can probably optimise this by not creating
// new statements every time, just to convert bind values to their
// appropriate target types. But beware of SQL dialects that tend to
// need very explicit casting of bind values (e.g. Firebird)
processed++;
insert = create.insertQuery(table);
for (int i = 0; i < row.length; i++) {
if (i < fields.length && fields[i] != null) {
addValue0(insert, fields[i], row[i]);
}
}
// TODO: This is only supported by some dialects. Let other
// dialects execute a SELECT and then either an INSERT or UPDATE
if (onDuplicate == ON_DUPLICATE_KEY_UPDATE) {
insert.onDuplicateKeyUpdate(true);
for (int i = 0; i < row.length; i++) {
if (i < fields.length && fields[i] != null && !primaryKey[i]) {
addValueForUpdate0(insert, fields[i], row[i]);
}
}
}
// TODO: This can be implemented faster using a MERGE statement
// in some dialects
else if (onDuplicate == ON_DUPLICATE_KEY_IGNORE) {
if (onDuplicate == ON_DUPLICATE_KEY_IGNORE) {
SelectQuery<R> select = create.selectQuery(table);
for (int i = 0; i < row.length; i++) {
if (i < fields.length && primaryKey[i]) {
for (int i = 0; i < row.length; i++)
if (i < fields.length && primaryKey[i])
select.addConditions(getCondition(fields[i], row[i]));
}
}
try {
if (select.execute() > 0) {
@ -515,77 +520,99 @@ class LoaderImpl<R extends TableRecord<R>> implements
}
}
buffered++;
if (insert == null)
insert = create.insertQuery(table);
for (int i = 0; i < row.length; i++)
if (i < fields.length && fields[i] != null)
addValue0(insert, fields[i], row[i]);
// TODO: This is only supported by some dialects. Let other
// dialects execute a SELECT and then either an INSERT or UPDATE
if (onDuplicate == ON_DUPLICATE_KEY_UPDATE) {
insert.onDuplicateKeyUpdate(true);
for (int i = 0; i < row.length; i++)
if (i < fields.length && fields[i] != null && !primaryKey[i])
addValueForUpdate0(insert, fields[i], row[i]);
}
// Don't do anything. Let the execution fail
else if (onDuplicate == ON_DUPLICATE_KEY_ERROR) {}
try {
if (batch == BATCH_NONE) {
batched = 1;
insert.execute();
stored++;
executed++;
if (bulk != BULK_NONE) {
if (bulk == BULK_ALL || processed % bulkAfter != 0) {
insert.newRecord();
continue rows;
}
}
else {
if (batch != BATCH_NONE) {
if (bind == null)
bind = create.batch(insert);
batched++;
bind.bind(insert.getBindValues().toArray());
insert = null;
if (batch == BATCH_ALL || processed % (bulkAfter * batchAfter) != 0)
continue rows;
}
if (batch == BATCH_AFTER) {
if (processed % batchAfter == 0) {
if (bind != null) {
bind.execute();
stored += batched;
batched = 0;
executed++;
bind = null;
}
}
}
if (bind != null)
bind.execute();
else if (insert != null)
insert.execute();
stored += buffered;
executed++;
if (commit == COMMIT_AFTER) {
if ((processed % batchAfter == 0) && ((processed / batchAfter) % commitAfter == 0)) {
buffered = 0;
bind = null;
insert = null;
if (commit == COMMIT_AFTER)
if ((processed % batchAfter == 0) && ((processed / batchAfter) % commitAfter == 0))
commit();
}
}
}
catch (DataAccessException e) {
errors.add(new LoaderErrorImpl(e, row, processed - 1, insert));
ignored += batched;
batched = 0;
ignored += buffered;
buffered = 0;
if (onError == ON_ERROR_ABORT) {
if (onError == ON_ERROR_ABORT)
break execution;
}
}
// rowloop
// rows:
}
// Execute remaining batch
if (bind != null) {
if (buffered != 0) {
try {
bind.execute();
stored += batched;
batched = 0;
if (bind != null)
bind.execute();
if (insert != null)
insert.execute();
stored += buffered;
executed++;
bind = null;
buffered = 0;
}
catch (DataAccessException e) {
errors.add(new LoaderErrorImpl(e, row, processed - 1, insert));
ignored += batched;
batched = 0;
ignored += buffered;
buffered = 0;
}
if (onError == ON_ERROR_ABORT) {
if (onError == ON_ERROR_ABORT)
break execution;
}
}
// executionBlock
// execution:
}
// Rollback on errors in COMMIT_ALL mode
@ -602,9 +629,7 @@ class LoaderImpl<R extends TableRecord<R>> implements
// Commit remaining elements in COMMIT_AFTER mode
else if (commit == COMMIT_AFTER) {
if ((processed % batchAfter != 0) || ((processed / batchAfter) % commitAfter != 0)) {
commit();
}
commit();
}
}
catch (DataAccessException e) {