[#4143] Add onRow(LoaderRowListener) to the Loader API

This commit is contained in:
lukaseder 2015-03-17 12:05:25 +01:00
parent 05514b7f2d
commit eba347143c
7 changed files with 321 additions and 80 deletions

View File

@ -77,4 +77,9 @@ public interface Loader<R extends TableRecord<R>> {
*/
int stored();
/**
* The results that are also returned from {@link Loader}.
*/
LoaderContext result();
}

View File

@ -48,7 +48,7 @@ package org.jooq;
*
* @author Lukas Eder
*/
public interface LoaderCSVOptionsStep<R extends TableRecord<R>> extends LoaderLoadStep<R> {
public interface LoaderCSVOptionsStep<R extends TableRecord<R>> extends LoaderListenerStep<R> {
/**
* Specify that a certain number of rows should be ignored from the CSV

View File

@ -0,0 +1,77 @@
/**
* Copyright (c) 2009-2015, Data Geekery GmbH (http://www.datageekery.com)
* All rights reserved.
*
* This work is dual-licensed
* - under the Apache Software License 2.0 (the "ASL")
* - under the jOOQ License and Maintenance Agreement (the "jOOQ License")
* =============================================================================
* You may choose which license applies to you:
*
* - If you're using this work with Open Source databases, you may choose
* either ASL or jOOQ License.
* - If you're using this work with at least one commercial database, you must
* choose jOOQ License
*
* For more information, please visit http://www.jooq.org/licenses
*
* Apache Software License 2.0:
* -----------------------------------------------------------------------------
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* jOOQ License and Maintenance Agreement:
* -----------------------------------------------------------------------------
* Data Geekery grants the Customer the non-exclusive, timely limited and
* non-transferable license to install and use the Software under the terms of
* the jOOQ License and Maintenance Agreement.
*
* This library is distributed with a LIMITED WARRANTY. See the jOOQ License
* and Maintenance Agreement for more details: http://www.jooq.org/licensing
*/
package org.jooq;
import java.util.List;
/**
* A context object that provides information about a loader's current state to
* {@link LoaderRowListener}.
*
* @author Lukas Eder
*/
public interface LoaderContext {
/**
* A list of errors that might have happened during the load.
*/
List<LoaderError> errors();
/**
* The number of processed rows.
*/
int processed();
/**
* The number of executed statements, bulk statements, or batch statements.
*/
int executed();
/**
* The number of ignored rows.
*/
int ignored();
/**
* The number of inserted or updated rows.
*/
int stored();
}

View File

@ -49,7 +49,7 @@ package org.jooq;
* @author Lukas Eder
* @author Johannes Bühler
*/
public interface LoaderJSONOptionsStep<R extends TableRecord<R>> extends LoaderLoadStep<R> {
public interface LoaderJSONOptionsStep<R extends TableRecord<R>> extends LoaderListenerStep<R> {
/**
* Specify that a certain number of rows should be ignored from the JSON

View File

@ -0,0 +1,58 @@
/**
* Copyright (c) 2009-2015, Data Geekery GmbH (http://www.datageekery.com)
* All rights reserved.
*
* This work is dual-licensed
* - under the Apache Software License 2.0 (the "ASL")
* - under the jOOQ License and Maintenance Agreement (the "jOOQ License")
* =============================================================================
* You may choose which license applies to you:
*
* - If you're using this work with Open Source databases, you may choose
* either ASL or jOOQ License.
* - If you're using this work with at least one commercial database, you must
* choose jOOQ License
*
* For more information, please visit http://www.jooq.org/licenses
*
* Apache Software License 2.0:
* -----------------------------------------------------------------------------
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* jOOQ License and Maintenance Agreement:
* -----------------------------------------------------------------------------
* Data Geekery grants the Customer the non-exclusive, timely limited and
* non-transferable license to install and use the Software under the terms of
* the jOOQ License and Maintenance Agreement.
*
* This library is distributed with a LIMITED WARRANTY. See the jOOQ License
* and Maintenance Agreement for more details: http://www.jooq.org/licensing
*/
package org.jooq;
/**
* The <code>Loader</code> API is used for configuring data loads.
* <p>
* The step in constructing the {@link Loader} object where you can add
* listeners to the loader.
*
* @author Lukas Eder
*/
public interface LoaderListenerStep<R extends TableRecord<R>> extends LoaderLoadStep<R> {
/**
* Specify a listener that is invoked whenever a row has been processed.
*/
@Support
LoaderLoadStep<R> onRow(LoaderRowListener listener);
}

View File

@ -0,0 +1,54 @@
/**
* Copyright (c) 2009-2015, Data Geekery GmbH (http://www.datageekery.com)
* All rights reserved.
*
* This work is dual-licensed
* - under the Apache Software License 2.0 (the "ASL")
* - under the jOOQ License and Maintenance Agreement (the "jOOQ License")
* =============================================================================
* You may choose which license applies to you:
*
* - If you're using this work with Open Source databases, you may choose
* either ASL or jOOQ License.
* - If you're using this work with at least one commercial database, you must
* choose jOOQ License
*
* For more information, please visit http://www.jooq.org/licenses
*
* Apache Software License 2.0:
* -----------------------------------------------------------------------------
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* jOOQ License and Maintenance Agreement:
* -----------------------------------------------------------------------------
* Data Geekery grants the Customer the non-exclusive, timely limited and
* non-transferable license to install and use the Software under the terms of
* the jOOQ License and Maintenance Agreement.
*
* This library is distributed with a LIMITED WARRANTY. See the jOOQ License
* and Maintenance Agreement for more details: http://www.jooq.org/licensing
*/
package org.jooq;
/**
* A listener for {@link Loader} progress.
*
* @author Lukas Eder
*/
public interface LoaderRowListener {
/**
* A row has been processed by the {@link Loader}
*/
void row(LoaderContext ctx);
}

View File

@ -69,10 +69,12 @@ import org.jooq.InsertQuery;
import org.jooq.Loader;
import org.jooq.LoaderCSVOptionsStep;
import org.jooq.LoaderCSVStep;
import org.jooq.LoaderContext;
import org.jooq.LoaderError;
import org.jooq.LoaderJSONOptionsStep;
import org.jooq.LoaderJSONStep;
import org.jooq.LoaderOptionsStep;
import org.jooq.LoaderRowListener;
import org.jooq.LoaderXMLStep;
import org.jooq.SelectQuery;
import org.jooq.Table;
@ -152,6 +154,7 @@ class LoaderImpl<R extends TableRecord<R>> implements
// Result data
// -----------
private LoaderRowListener listener;
private int ignored;
private int processed;
private int stored;
@ -489,6 +492,16 @@ class LoaderImpl<R extends TableRecord<R>> implements
// [...] to be specified
// -------------------------------------------------------------------------
// Listening
// -------------------------------------------------------------------------
@Override
public final LoaderImpl<R> onRow(LoaderRowListener l) {
listener = l;
return this;
}
// -------------------------------------------------------------------------
// Execution
// -------------------------------------------------------------------------
@ -563,107 +576,114 @@ class LoaderImpl<R extends TableRecord<R>> implements
String[] row = null;
BatchBindStep bind = null;
InsertQuery<R> insert = null;
LoaderContext ctx = new DefaultLoaderContext();
execution: {
rows: while (reader.hasNext() && ((row = reader.next()) != null)) {
try {
// [#1627] Handle NULL values
for (int i = 0; i < row.length; i++)
if (StringUtils.equals(nullString, row[i]))
row[i] = null;
// [#1627] Handle NULL values
for (int i = 0; i < row.length; i++)
if (StringUtils.equals(nullString, row[i]))
row[i] = null;
// TODO: In batch mode, we can probably optimise this by not creating
// new statements every time, just to convert bind values to their
// appropriate target types. But beware of SQL dialects that tend to
// need very explicit casting of bind values (e.g. Firebird)
processed++;
// TODO: In batch mode, we can probably optimise this by not creating
// new statements every time, just to convert bind values to their
// appropriate target types. But beware of SQL dialects that tend to
// need very explicit casting of bind values (e.g. Firebird)
processed++;
// TODO: This can be implemented faster using a MERGE statement
// in some dialects
if (onDuplicate == ON_DUPLICATE_KEY_IGNORE) {
SelectQuery<R> select = create.selectQuery(table);
// TODO: This can be implemented faster using a MERGE statement
// in some dialects
if (onDuplicate == ON_DUPLICATE_KEY_IGNORE) {
SelectQuery<R> select = create.selectQuery(table);
for (int i = 0; i < row.length; i++)
if (i < fields.length && primaryKey[i])
select.addConditions(getCondition(fields[i], row[i]));
try {
if (create.fetchExists(select)) {
ignored++;
continue rows;
}
}
catch (DataAccessException e) {
errors.add(new LoaderErrorImpl(e, row, processed - 1, select));
}
}
buffered++;
if (insert == null)
insert = create.insertQuery(table);
for (int i = 0; i < row.length; i++)
if (i < fields.length && primaryKey[i])
select.addConditions(getCondition(fields[i], row[i]));
if (i < fields.length && fields[i] != null)
addValue0(insert, fields[i], row[i]);
// TODO: This is only supported by some dialects. Let other
// dialects execute a SELECT and then either an INSERT or UPDATE
if (onDuplicate == ON_DUPLICATE_KEY_UPDATE) {
insert.onDuplicateKeyUpdate(true);
for (int i = 0; i < row.length; i++)
if (i < fields.length && fields[i] != null && !primaryKey[i])
addValueForUpdate0(insert, fields[i], row[i]);
}
// Don't do anything. Let the execution fail
else if (onDuplicate == ON_DUPLICATE_KEY_ERROR) {}
try {
if (create.fetchExists(select)) {
ignored++;
continue rows;
if (bulk != BULK_NONE) {
if (bulk == BULK_ALL || processed % bulkAfter != 0) {
insert.newRecord();
continue rows;
}
}
}
catch (DataAccessException e) {
errors.add(new LoaderErrorImpl(e, row, processed - 1, select));
}
}
buffered++;
if (batch != BATCH_NONE) {
if (bind == null)
bind = create.batch(insert);
if (insert == null)
insert = create.insertQuery(table);
bind.bind(insert.getBindValues().toArray());
insert = null;
for (int i = 0; i < row.length; i++)
if (i < fields.length && fields[i] != null)
addValue0(insert, fields[i], row[i]);
// TODO: This is only supported by some dialects. Let other
// dialects execute a SELECT and then either an INSERT or UPDATE
if (onDuplicate == ON_DUPLICATE_KEY_UPDATE) {
insert.onDuplicateKeyUpdate(true);
for (int i = 0; i < row.length; i++)
if (i < fields.length && fields[i] != null && !primaryKey[i])
addValueForUpdate0(insert, fields[i], row[i]);
}
// Don't do anything. Let the execution fail
else if (onDuplicate == ON_DUPLICATE_KEY_ERROR) {}
try {
if (bulk != BULK_NONE) {
if (bulk == BULK_ALL || processed % bulkAfter != 0) {
insert.newRecord();
continue rows;
if (batch == BATCH_ALL || processed % (bulkAfter * batchAfter) != 0)
continue rows;
}
}
if (batch != BATCH_NONE) {
if (bind == null)
bind = create.batch(insert);
if (bind != null)
bind.execute();
else if (insert != null)
insert.execute();
bind.bind(insert.getBindValues().toArray());
stored += buffered;
executed++;
buffered = 0;
bind = null;
insert = null;
if (batch == BATCH_ALL || processed % (bulkAfter * batchAfter) != 0)
continue rows;
if (commit == COMMIT_AFTER)
if ((processed % batchAfter == 0) && ((processed / batchAfter) % commitAfter == 0))
commit();
}
catch (DataAccessException e) {
errors.add(new LoaderErrorImpl(e, row, processed - 1, insert));
ignored += buffered;
buffered = 0;
if (onError == ON_ERROR_ABORT)
break execution;
}
if (bind != null)
bind.execute();
else if (insert != null)
insert.execute();
stored += buffered;
executed++;
buffered = 0;
bind = null;
insert = null;
if (commit == COMMIT_AFTER)
if ((processed % batchAfter == 0) && ((processed / batchAfter) % commitAfter == 0))
commit();
}
catch (DataAccessException e) {
errors.add(new LoaderErrorImpl(e, row, processed - 1, insert));
ignored += buffered;
buffered = 0;
if (onError == ON_ERROR_ABORT)
break execution;
finally {
if (listener != null)
listener.row(ctx);
}
// rows:
}
@ -786,4 +806,31 @@ class LoaderImpl<R extends TableRecord<R>> implements
public final int stored() {
return stored;
}
private class DefaultLoaderContext implements LoaderContext {
@Override
public final List<LoaderError> errors() {
return errors;
}
@Override
public final int processed() {
return processed;
}
@Override
public final int executed() {
return executed;
}
@Override
public final int ignored() {
return ignored;
}
@Override
public final int stored() {
return stored;
}
}
}