[#4420] [#6373] Bulk insertion fixes

[#4402] Bulk insert fails when records have different sets of changed columns
[#6373] InsertQuery.newRecord() doesn't generate additional record unless values are set
This commit is contained in:
lukaseder 2017-07-03 17:41:40 +02:00
parent f8191d8390
commit 361c240e01
5 changed files with 298 additions and 62 deletions

View File

@ -41,13 +41,25 @@ import static org.jooq.impl.DSL.table;
import static org.jooq.impl.Keywords.K_DEFAULT_VALUES;
import static org.jooq.impl.Keywords.K_VALUES;
import java.util.AbstractList;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.jooq.Clause;
import org.jooq.Context;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.Select;
import org.jooq.impl.AbstractStoreQuery.UnknownField;
/**
* @author Lukas Eder
@ -57,13 +69,15 @@ final class FieldMapsForInsert extends AbstractQueryPart {
/**
* Generated UID
*/
private static final long serialVersionUID = -6227074228534414225L;
private static final long serialVersionUID = -6227074228534414225L;
final List<FieldMapForInsert> insertMaps;
final List<Field<?>> empty;
final Map<Field<?>, List<Field<?>>> values;
int rows;
FieldMapsForInsert() {
insertMaps = new ArrayList<FieldMapForInsert>();
insertMaps.add(null);
values = new LinkedHashMap<Field<?>, List<Field<?>>>();
empty = new ArrayList<Field<?>>();
}
// -------------------------------------------------------------------------
@ -80,13 +94,13 @@ final class FieldMapsForInsert extends AbstractQueryPart {
}
// Single record inserts can use the standard syntax in any dialect
else if (insertMaps.size() == 1 || insertMaps.get(1) == null) {
else if (rows == 1) {
ctx.formatSeparator()
.start(INSERT_VALUES)
.visit(K_VALUES)
.sql(' ')
.visit(insertMaps.get(0))
.end(INSERT_VALUES);
.sql(' ');
toSQL92Values(ctx);
ctx.end(INSERT_VALUES);
}
// True SQL92 multi-record inserts aren't always supported
@ -145,31 +159,51 @@ final class FieldMapsForInsert extends AbstractQueryPart {
private final Select<Record> insertSelect(Context<?> context) {
Select<Record> select = null;
for (FieldMapForInsert map : insertMaps) {
if (map != null) {
Select<Record> iteration = DSL.using(context.configuration()).select(map.values());
for (int row = 0; row < rows; row++) {
List<Field<?>> fields = new ArrayList<Field<?>>();
if (select == null)
select = iteration;
else
select = select.unionAll(iteration);
}
for (List<Field<?>> list : values.values())
fields.add(list.get(row));
Select<Record> iteration = DSL.using(context.configuration()).select(fields);
if (select == null)
select = iteration;
else
select = select.unionAll(iteration);
}
return select;
}
private final void toSQL92Values(Context<?> context) {
context.visit(insertMaps.get(0));
final void toSQL92Values(Context<?> ctx) {
boolean indent = (values.size() > 1);
int i = 0;
for (FieldMapForInsert map : insertMaps) {
if (map != null && i > 0) {
context.sql(", ");
context.visit(map);
for (int row = 0; row < rows; row++) {
if (row > 0)
ctx.sql(", ");
ctx.sql('(');
if (indent)
ctx.formatIndentStart();
String separator = "";
for (List<Field<?>> list : values.values()) {
ctx.sql(separator);
if (indent)
ctx.formatNewLine();
ctx.visit(list.get(row));
separator = ", ";
}
i++;
if (indent)
ctx.formatIndentEnd()
.formatNewLine();
ctx.sql(')');
}
}
@ -182,23 +216,216 @@ final class FieldMapsForInsert extends AbstractQueryPart {
// The FieldMapsForInsert API
// -------------------------------------------------------------------------
final void addFields(Collection<? extends Field<?>> fields) {
if (rows == 0)
newRecord();
for (Field<?> field : fields) {
Field<?> e = DSL.val(null, field);
empty.add(e);
if (!values.containsKey(field)) {
values.put(field, rows > 0
? new ArrayList<Field<?>>(Collections.nCopies(rows, e))
: new ArrayList<Field<?>>()
);
}
}
}
final void set(Collection<? extends Field<?>> fields) {
Iterator<? extends Field<?>> it1 = fields.iterator();
Iterator<List<Field<?>>> it2 = values.values().iterator();
while (it1.hasNext() && it2.hasNext())
it2.next().set(rows - 1, it1.next());
if (it1.hasNext() || it2.hasNext())
throw new IllegalArgumentException("Added record size (" + fields.size() + ") must match fields size (" + values.size() + ")");
}
@SuppressWarnings("unchecked")
final <T> Field<T> set(Field<T> field, Field<T> value) {
addFields(Collections.singletonList(field));
return (Field<T>) values.get(field).set(rows - 1, value);
}
final void set(Map<? extends Field<?>, ?> map) {
addFields(map.keySet());
for (Entry<? extends Field<?>, ?> entry : map.entrySet())
values.get(entry.getKey())
.set(rows - 1, Tools.field(entry.getValue(), entry.getKey()));
}
final void newRecord() {
int i = 0;
for (List<Field<?>> list : values.values())
list.add(empty.get(i++));
rows++;
}
final Collection<Field<?>> fields() {
return values.keySet();
}
final List<Map<Field<?>, Field<?>>> maps() {
return new AbstractList<Map<Field<?>, Field<?>>>() {
@Override
public Map<Field<?>, Field<?>> get(int index) {
return map(index);
}
@Override
public int size() {
return rows;
}
};
}
final Map<Field<?>, Field<?>> map(int index) {
return new AbstractMap<Field<?>, Field<?>>() {
transient Set<Entry<Field<?>, Field<?>>> entrySet;
@Override
public Set<Entry<Field<?>, Field<?>>> entrySet() {
if (entrySet == null)
entrySet = new EntrySet();
return entrySet;
}
@Override
public boolean containsKey(Object key) {
return values.containsKey(key);
}
@Override
public boolean containsValue(Object value) {
for (List<Field<?>> list : values.values())
if (list.get(index).equals(value))
return true;
return false;
}
@Override
public Field<?> get(Object key) {
List<Field<?>> list = values.get(key);
return list == null ? null : list.get(index);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public Field<?> put(Field<?> key, Field<?> value) {
return FieldMapsForInsert.this.set((Field) key, (Field) value);
}
@Override
public Field<?> remove(Object key) {
List<Field<?>> list = values.get(key);
values.remove(key);
return list == null ? null : list.get(index);
}
@Override
public Set<Field<?>> keySet() {
return values.keySet();
}
final class EntrySet extends AbstractSet<Entry<Field<?>, Field<?>>> {
@Override
public final int size() {
return values.size();
}
@Override
public final void clear() {
values.clear();
}
@Override
public final Iterator<Entry<Field<?>, Field<?>>> iterator() {
return new Iterator<Entry<Field<?>, Field<?>>>() {
Iterator<Entry<Field<?>, List<Field<?>>>> delegate = values.entrySet().iterator();
@Override
public boolean hasNext() {
return delegate.hasNext();
}
@Override
public Entry<Field<?>, Field<?>> next() {
Entry<Field<?>, List<Field<?>>> entry = delegate.next();
return new SimpleImmutableEntry<Field<?>, Field<?>>(entry.getKey(), entry.getValue().get(index));
}
@Override
public void remove() {
delegate.remove();
}
};
}
}
};
}
final Map<Field<?>, Field<?>> lastMap() {
return map(rows - 1);
}
final boolean isExecutable() {
return !insertMaps.isEmpty() && insertMaps.get(0) != null;
return rows > 0;
}
public final FieldMapForInsert getMap() {
if (insertMaps.get(index()) == null)
insertMaps.set(index(), new FieldMapForInsert());
final void toSQLReferenceKeys(Context<?> ctx) {
return insertMaps.get(index());
}
// [#1506] with DEFAULT VALUES, we might not have any columns to render
if (!isExecutable())
return;
public final void newRecord() {
if (insertMaps.get(index()) != null)
insertMaps.add(null);
}
// [#2995] Do not generate empty column lists.
if (values.size() == 0)
return;
private final int index() {
return insertMaps.size() - 1;
// [#4629] Do not generate column lists for unknown columns
unknownFields: {
for (Field<?> field : values.keySet())
if (!(field instanceof UnknownField))
break unknownFields;
return;
}
boolean indent = (values.size() > 1);
ctx.sql(" (");
if (indent)
ctx.formatIndentStart();
// [#989] Avoid qualifying fields in INSERT field declaration
boolean qualify = ctx.qualify();
ctx.qualify(false);
String separator = "";
for (Field<?> field : values.keySet()) {
ctx.sql(separator);
if (indent)
ctx.formatNewLine();
ctx.visit(field);
separator = ", ";
}
ctx.qualify(qualify);
if (indent)
ctx.formatIndentEnd()
.formatNewLine();
ctx.sql(')');
}
}

View File

@ -121,8 +121,8 @@ final class InsertQueryImpl<R extends Record> extends AbstractStoreQuery<R> impl
}
@Override
protected final FieldMapForInsert getValues() {
return insertMaps.getMap();
protected final Map<Field<?>, Field<?>> getValues() {
return insertMaps.lastMap();
}
@Override
@ -195,13 +195,13 @@ final class InsertQueryImpl<R extends Record> extends AbstractStoreQuery<R> impl
@Override
public final void setSelect(Field<?>[] f, Select<?> s) {
insertMaps.getMap().putFields(Arrays.asList(f));
insertMaps.addFields(Arrays.asList(f));
select = s;
}
@Override
public final void addValues(Map<? extends Field<?>, ?> map) {
insertMaps.getMap().set(map);
insertMaps.set(map);
}
@Override
@ -424,10 +424,7 @@ final class InsertQueryImpl<R extends Record> extends AbstractStoreQuery<R> impl
.visit(table)
.declareTables(declareTables);
// [#1506] with DEFAULT VALUES, we might not have any columns to render
if (insertMaps.isExecutable())
insertMaps.insertMaps.get(0).toSQLReferenceKeys(ctx);
insertMaps.toSQLReferenceKeys(ctx);
ctx.end(INSERT_INSERT_INTO);
if (select != null) {
@ -435,7 +432,7 @@ final class InsertQueryImpl<R extends Record> extends AbstractStoreQuery<R> impl
// [#2995] Prevent the generation of wrapping parentheses around the
// INSERT .. SELECT statement's SELECT because they would be
// interpreted as the (missing) INSERT column list's parens.
if (insertMaps.insertMaps.get(0).size() == 0)
if (insertMaps.fields().size() == 0)
ctx.data(DATA_INSERT_SELECT_WITHOUT_INSERT_COLUMN_LIST, true);
@ -530,9 +527,9 @@ final class InsertQueryImpl<R extends Record> extends AbstractStoreQuery<R> impl
// re-used.
Select<Record> rows = null;
Name[] aliases = fieldNames(insertMaps.getMap().keySet().toArray(EMPTY_FIELD));
Name[] aliases = fieldNames(insertMaps.fields().toArray(EMPTY_FIELD));
for (FieldMapForInsert map : insertMaps.insertMaps) {
for (Map<Field<?>, Field<?>> map : insertMaps.maps()) {
Select<Record> row =
select(aliasedFields(map.values().toArray(EMPTY_FIELD), aliases))
.whereNotExists(
@ -549,7 +546,7 @@ final class InsertQueryImpl<R extends Record> extends AbstractStoreQuery<R> impl
return create(configuration)
.insertInto(table)
.columns(insertMaps.getMap().keySet())
.columns(insertMaps.fields())
.select(selectFrom(table(rows).as("t")));
}
else {
@ -562,7 +559,7 @@ final class InsertQueryImpl<R extends Record> extends AbstractStoreQuery<R> impl
MergeOnConditionStep<R> on =
create(configuration).mergeInto(table)
.usingDual()
.on(matchByPrimaryKey(insertMaps.getMap()));
.on(matchByPrimaryKey(insertMaps.lastMap()));
// [#1295] Use UPDATE clause only when with ON DUPLICATE KEY UPDATE,
// not with ON DUPLICATE KEY IGNORE
@ -572,8 +569,8 @@ final class InsertQueryImpl<R extends Record> extends AbstractStoreQuery<R> impl
.set(updateMap);
}
return notMatched.whenNotMatchedThenInsert(insertMaps.getMap().keySet())
.values(insertMaps.getMap().values());
return notMatched.whenNotMatchedThenInsert(insertMaps.fields())
.values(insertMaps.lastMap().values());
}
else {
throw new IllegalStateException("The ON DUPLICATE KEY IGNORE/UPDATE clause cannot be emulated when inserting into non-updatable tables : " + table);
@ -585,7 +582,7 @@ final class InsertQueryImpl<R extends Record> extends AbstractStoreQuery<R> impl
* updated primary key values.
*/
@SuppressWarnings("unchecked")
private final Condition matchByPrimaryKey(FieldMapForInsert map) {
private final Condition matchByPrimaryKey(Map<Field<?>, Field<?>> map) {
Condition result = null;
for (Field<?> f : table.getPrimaryKey().getFields()) {

View File

@ -716,6 +716,7 @@ final class LoaderImpl<R extends Record> implements
Object[] row = null;
BatchBindStep bind = null;
InsertQuery<R> insert = null;
boolean newRecord = false;
execution: {
rows: while (iterator.hasNext() && ((row = iterator.next()) != null)) {
@ -770,6 +771,11 @@ final class LoaderImpl<R extends Record> implements
if (insert == null)
insert = create.insertQuery(table);
if (newRecord) {
newRecord = false;
insert.newRecord();
}
for (int i = 0; i < row.length; i++)
if (i < fields.length && fields[i] != null)
addValue0(insert, fields[i], row[i]);
@ -790,7 +796,7 @@ final class LoaderImpl<R extends Record> implements
try {
if (bulk != BULK_NONE) {
if (bulk == BULK_ALL || processed % bulkAfter != 0) {
insert.newRecord();
newRecord = true;
continue rows;
}
}

View File

@ -240,7 +240,7 @@ implements
private boolean matchedClause;
private FieldMapForUpdate matchedUpdate;
private boolean notMatchedClause;
private FieldMapForInsert notMatchedInsert;
private FieldMapsForInsert notMatchedInsert;
// Objects for the UPSERT syntax (including H2 MERGE, HANA UPSERT, etc.)
private boolean upsertStyle;
@ -721,8 +721,8 @@ implements
getUpsertValues().addAll(Tools.fields(values, getUpsertFields().toArray(EMPTY_FIELD)));
}
else {
Field<?>[] fields = notMatchedInsert.keySet().toArray(EMPTY_FIELD);
notMatchedInsert.putValues(Tools.fields(values, fields));
Field<?>[] fields = notMatchedInsert.fields().toArray(EMPTY_FIELD);
notMatchedInsert.set(Tools.fields(values, fields));
}
return this;
@ -937,7 +937,7 @@ implements
matchedUpdate.put(field, nullSafe(value));
}
else if (notMatchedClause) {
notMatchedInsert.put(field, nullSafe(value));
notMatchedInsert.set(field, nullSafe(value));
}
else {
throw new IllegalStateException("Cannot call where() on the current state of the MERGE statement");
@ -1124,8 +1124,8 @@ implements
@Override
public final MergeImpl whenNotMatchedThenInsert(Collection<? extends Field<?>> fields) {
notMatchedClause = true;
notMatchedInsert = new FieldMapForInsert();
notMatchedInsert.putFields(fields);
notMatchedInsert = new FieldMapsForInsert();
notMatchedInsert.addFields(fields);
matchedClause = false;
return this;
@ -1524,9 +1524,9 @@ implements
notMatchedInsert.toSQLReferenceKeys(ctx);
ctx.formatSeparator()
.start(MERGE_VALUES)
.visit(K_VALUES).sql(' ')
.visit(notMatchedInsert)
.end(MERGE_VALUES);
.visit(K_VALUES).sql(' ');
notMatchedInsert.toSQL92Values(ctx);
ctx.end(MERGE_VALUES);
}
ctx.start(MERGE_WHERE);

View File

@ -43,6 +43,7 @@ import javax.sql.DataSource;
import org.jooq.Configuration;
import org.jooq.ConnectionProvider;
import org.jooq.ConverterProvider;
import org.jooq.DSLContext;
import org.jooq.ExecuteListener;
import org.jooq.ExecuteListenerProvider;
import org.jooq.ExecutorProvider;
@ -85,6 +86,11 @@ public class MockConfiguration implements Configuration {
this.provider = provider;
}
@Override
public DSLContext dsl() {
return delegate.dsl();
}
@Override
public Map<Object, Object> data() {
return delegate.data();