[jOOQ/jOOQ#2961] Add UpdatableRecord.merge()

This commit is contained in:
Lukas Eder 2020-04-06 13:30:35 +02:00
parent 4bf092bb2b
commit b131440bef
9 changed files with 184 additions and 34 deletions

View File

@ -79,7 +79,7 @@ import java.util.Map;
* @param <R> The record type of the table being inserted into
* @author Lukas Eder
*/
public interface InsertQuery<R extends Record> extends StoreQuery<R>, Insert<R> {
public interface InsertQuery<R extends Record> extends StoreQuery<R>, Insert<R>, ConditionProvider {
/**
* Adds a new Record to the insert statement for multi-record inserts
@ -260,6 +260,7 @@ public interface InsertQuery<R extends Record> extends StoreQuery<R>, Insert<R>
*
* @param condition The condition
*/
@Override
@Support({ CUBRID, DERBY, H2, MARIADB, POSTGRES, SQLITE })
void addConditions(Condition condition);
@ -272,6 +273,7 @@ public interface InsertQuery<R extends Record> extends StoreQuery<R>, Insert<R>
*
* @param conditions The condition
*/
@Override
@Support({ CUBRID, DERBY, H2, MARIADB, POSTGRES, SQLITE })
void addConditions(Condition... conditions);
@ -284,6 +286,7 @@ public interface InsertQuery<R extends Record> extends StoreQuery<R>, Insert<R>
*
* @param conditions The condition
*/
@Override
@Support({ CUBRID, DERBY, H2, MARIADB, POSTGRES, SQLITE })
void addConditions(Collection<? extends Condition> conditions);
@ -296,6 +299,7 @@ public interface InsertQuery<R extends Record> extends StoreQuery<R>, Insert<R>
*
* @param condition The condition
*/
@Override
@Support({ CUBRID, DERBY, H2, MARIADB, POSTGRES, SQLITE })
void addConditions(Operator operator, Condition condition);
@ -308,6 +312,7 @@ public interface InsertQuery<R extends Record> extends StoreQuery<R>, Insert<R>
*
* @param conditions The condition
*/
@Override
@Support({ CUBRID, DERBY, H2, MARIADB, POSTGRES, SQLITE })
void addConditions(Operator operator, Condition... conditions);
@ -320,6 +325,7 @@ public interface InsertQuery<R extends Record> extends StoreQuery<R>, Insert<R>
*
* @param conditions The condition
*/
@Override
@Support({ CUBRID, DERBY, H2, MARIADB, POSTGRES, SQLITE })
void addConditions(Operator operator, Collection<? extends Condition> conditions);

View File

@ -128,6 +128,26 @@ public interface RecordListener extends EventListener {
*/
void updateEnd(RecordContext ctx);
/**
* Called before merging an <code>UpdatableRecord</code>.
* <p>
* Implementations are allowed to modify {@link RecordContext#record()}
* prior to merging.
*
* @see UpdatableRecord#merge()
*/
void mergeStart(RecordContext ctx);
/**
* Called after merging an <code>UpdatableRecord</code>.
* <p>
* Implementations are allowed to modify {@link RecordContext#record()}
* after merging.
*
* @see UpdatableRecord#merge()
*/
void mergeEnd(RecordContext ctx);
/**
* Called before deleting an <code>UpdatableRecord</code>.
* <p>

View File

@ -37,6 +37,25 @@
*/
package org.jooq;
// ...
// ...
// ...
import static org.jooq.SQLDialect.CUBRID;
// ...
import static org.jooq.SQLDialect.DERBY;
import static org.jooq.SQLDialect.H2;
import static org.jooq.SQLDialect.HSQLDB;
// ...
import static org.jooq.SQLDialect.MARIADB;
// ...
import static org.jooq.SQLDialect.MYSQL;
// ...
import static org.jooq.SQLDialect.POSTGRES;
import static org.jooq.SQLDialect.SQLITE;
// ...
// ...
// ...
import java.sql.Statement;
import java.util.Collection;
@ -350,6 +369,62 @@ public interface UpdatableRecord<R extends UpdatableRecord<R>> extends TableReco
@Support
int update(Collection<? extends Field<?>> fields) throws DataAccessException, DataChangedException;
/**
* Store this record back to the database using a <code>MERGE</code>
* statement.
* <p>
* Unlike {@link #store()}, the statement produced by this operation does
* not depend on whether the record has been previously fetched from the
* database or created afresh. It implements the semantics of an
* <code>INSERT .. ON DUPLICATE KEY UPDATE</code> statement, which will
* update the row regardless of which (unique) key value is already present.
* See {@link InsertOnDuplicateStep#onDuplicateKeyUpdate()}.
* <p>
* Optimistic locking only works if the underlying dialect supports
* {@link InsertOnConflictWhereStep#where(Condition)}. Otherwise, the
* <code>UPDATE</code> path of the statement will not be able to update the
* row conditionally.
* <p>
* If you want to enforce statement execution, regardless if the values in
* this record were changed, you can explicitly set the changed flags for
* all values with {@link #changed(boolean)} or for single values with
* {@link #changed(Field, boolean)}, prior to insertion.
* <p>
* This is the same as calling <code>record.merge(record.fields())</code>
*
* @return <code>1</code> if the record was merged to the database. <code>0
* </code> if merging was not necessary.
* @throws DataAccessException if something went wrong executing the query
* @see #store()
* @see InsertOnDuplicateStep#onDuplicateKeyUpdate()
*/
@Support({ CUBRID, DERBY, H2, HSQLDB, MARIADB, MYSQL, POSTGRES, SQLITE })
int merge() throws DataAccessException;
/**
* Store parts of this record to the database using a <code>MERGE</code>
* statement.
*
* @return <code>1</code> if the record was merged to the database. <code>0
* </code> if merging was not necessary.
* @throws DataAccessException if something went wrong executing the query
* @see #merge()
*/
@Support({ CUBRID, DERBY, H2, HSQLDB, MARIADB, MYSQL, POSTGRES, SQLITE })
int merge(Field<?>... fields) throws DataAccessException;
/**
* Store parts of this record to the database using a <code>MERGE</code>
* statement.
*
* @return <code>1</code> if the record was merged to the database. <code>0
* </code> if merging was not necessary.
* @throws DataAccessException if something went wrong executing the query
* @see #merge()
*/
@Support({ CUBRID, DERBY, H2, HSQLDB, MARIADB, MYSQL, POSTGRES, SQLITE })
int merge(Collection<? extends Field<?>> fields) throws DataAccessException;
/**
* Deletes this record from the database, based on the value of the primary
* key or main unique key.

View File

@ -68,6 +68,12 @@ public class DefaultRecordListener implements RecordListener {
@Override
public void updateEnd(RecordContext ctx) {}
@Override
public void mergeStart(RecordContext ctx) {}
@Override
public void mergeEnd(RecordContext ctx) {}
@Override
public void deleteStart(RecordContext ctx) {}

View File

@ -603,7 +603,6 @@ class DefaultRenderContext extends AbstractContext<RenderContext> implements Ren
return visit(part);
}
@SuppressWarnings("deprecation")
@Override
protected final void visit0(QueryPartInternal internal) {
int before = bindValues.size();

View File

@ -114,6 +114,7 @@ final class RecordDelegate<R extends Record> {
case STORE: listener.storeStart(ctx); break;
case INSERT: listener.insertStart(ctx); break;
case UPDATE: listener.updateStart(ctx); break;
case MERGE: listener.mergeStart(ctx); break;
case DELETE: listener.deleteStart(ctx); break;
default:
throw new IllegalStateException("Type not supported: " + type);
@ -158,6 +159,7 @@ final class RecordDelegate<R extends Record> {
case STORE: listener.storeEnd(ctx); break;
case INSERT: listener.insertEnd(ctx); break;
case UPDATE: listener.updateEnd(ctx); break;
case MERGE: listener.mergeEnd(ctx); break;
case DELETE: listener.deleteEnd(ctx); break;
default:
throw new IllegalStateException("Type not supported: " + type);
@ -181,6 +183,7 @@ final class RecordDelegate<R extends Record> {
STORE,
INSERT,
UPDATE,
MERGE,
DELETE
}
}

View File

@ -185,7 +185,7 @@ public class TableRecordImpl<R extends TableRecord<R>> extends AbstractRecord im
final int storeInsert0(Field<?>[] storeFields) {
DSLContext create = create();
InsertQuery<R> insert = create.insertQuery(getTable());
addChangedValues(storeFields, insert);
addChangedValues(storeFields, insert, false);
// Don't store records if no value was set by client code
if (!insert.isExecutable()) {
@ -196,8 +196,8 @@ public class TableRecordImpl<R extends TableRecord<R>> extends AbstractRecord im
}
// [#1596] Set timestamp and/or version columns to appropriate values
BigInteger version = addRecordVersion(insert);
Timestamp timestamp = addRecordTimestamp(insert);
BigInteger version = addRecordVersion(insert, false);
Timestamp timestamp = addRecordTimestamp(insert, false);
// [#814] Refresh identity and/or main unique key values
// [#1002] Consider also identity columns of non-updatable records
@ -292,34 +292,37 @@ public class TableRecordImpl<R extends TableRecord<R>> extends AbstractRecord im
}
/**
* Set all changed values of this record to a store query
* Set all changed values of this record to a store query.
*/
final void addChangedValues(Field<?>[] storeFields, StoreQuery<R> query) {
final void addChangedValues(Field<?>[] storeFields, StoreQuery<R> query, boolean forUpdate) {
Fields<Record> f = new Fields<>(storeFields);
for (Field<?> field : fields.fields.fields)
if (changed(field) && f.field(field) != null)
addValue(query, field);
addValue(query, field, forUpdate);
}
/**
* Extracted method to ensure generic type safety.
*/
final <T> void addValue(StoreQuery<?> store, Field<T> field, Object value) {
final <T> void addValue(StoreQuery<?> store, Field<T> field, Object value, boolean forUpdate) {
store.addValue(field, Tools.field(value, field));
if (forUpdate)
((InsertQuery<?>) store).addValueForUpdate(field, Tools.field(value, field));
}
/**
* Extracted method to ensure generic type safety.
*/
final <T> void addValue(StoreQuery<?> store, Field<T> field) {
addValue(store, field, get(field));
final <T> void addValue(StoreQuery<?> store, Field<T> field, boolean forUpdate) {
addValue(store, field, get(field), forUpdate);
}
/**
* Set an updated timestamp value to a store query
*/
final Timestamp addRecordTimestamp(StoreQuery<?> store) {
final Timestamp addRecordTimestamp(StoreQuery<?> store, boolean forUpdate) {
Timestamp result = null;
TableField<R, ?> timestamp = getTable().getRecordTimestamp();
@ -334,7 +337,7 @@ public class TableRecordImpl<R extends TableRecord<R>> extends AbstractRecord im
// [#9933] Truncate timestamp to column precision, if needed
addValue(store, timestamp, result = truncate(result, timestamp.getDataType()));
addValue(store, timestamp, result = truncate(result, timestamp.getDataType()), forUpdate);
}
return result;
@ -354,7 +357,7 @@ public class TableRecordImpl<R extends TableRecord<R>> extends AbstractRecord im
/**
* Set an updated version value to a store query
*/
final BigInteger addRecordVersion(StoreQuery<?> store) {
final BigInteger addRecordVersion(StoreQuery<?> store, boolean forUpdate) {
BigInteger result = null;
TableField<R, ?> version = getTable().getRecordVersion();
@ -367,7 +370,7 @@ public class TableRecordImpl<R extends TableRecord<R>> extends AbstractRecord im
else
result = new BigInteger(value.toString()).add(BigInteger.ONE);
addValue(store, version, result);
addValue(store, version, result, forUpdate);
}
return result;

View File

@ -3144,7 +3144,6 @@ final class Tools {
/**
* Add primary key conditions to a query
*/
@SuppressWarnings("deprecation")
static final void addConditions(org.jooq.ConditionProvider query, Record record, Field<?>... keys) {
for (Field<?> field : keys)
addCondition(query, record, field);
@ -3153,7 +3152,6 @@ final class Tools {
/**
* Add a field condition to a query
*/
@SuppressWarnings("deprecation")
static final <T> void addCondition(org.jooq.ConditionProvider provider, Record record, Field<T> field) {
// [#2764] If primary keys are allowed to be changed, the

View File

@ -43,6 +43,7 @@ import static org.jooq.SQLDialect.SQLITE;
import static org.jooq.conf.SettingsTools.updatablePrimaryKeys;
import static org.jooq.impl.RecordDelegate.delegate;
import static org.jooq.impl.RecordDelegate.RecordLifecycleType.DELETE;
import static org.jooq.impl.RecordDelegate.RecordLifecycleType.MERGE;
import static org.jooq.impl.RecordDelegate.RecordLifecycleType.REFRESH;
import static org.jooq.impl.RecordDelegate.RecordLifecycleType.STORE;
import static org.jooq.impl.RecordDelegate.RecordLifecycleType.UPDATE;
@ -59,16 +60,17 @@ import org.jooq.Configuration;
import org.jooq.DeleteQuery;
import org.jooq.Field;
import org.jooq.ForeignKey;
import org.jooq.InsertQuery;
import org.jooq.Record;
import org.jooq.Result;
import org.jooq.SQLDialect;
import org.jooq.SelectQuery;
import org.jooq.StoreQuery;
import org.jooq.Table;
import org.jooq.TableField;
import org.jooq.TableRecord;
import org.jooq.UniqueKey;
import org.jooq.UpdatableRecord;
import org.jooq.UpdateQuery;
import org.jooq.exception.DataChangedException;
import org.jooq.exception.NoDataFoundException;
import org.jooq.tools.JooqLogger;
@ -160,6 +162,21 @@ public class UpdatableRecordImpl<R extends UpdatableRecord<R>> extends TableReco
return update(storeFields.toArray(EMPTY_FIELD));
}
@Override
public final int merge() {
return merge(fields.fields.fields);
}
@Override
public int merge(Field<?>... storeFields) {
return storeMerge(storeFields, getPrimaryKey().getFieldsArray());
}
@Override
public final int merge(Collection<? extends Field<?>> storeFields) {
return merge(storeFields.toArray(EMPTY_FIELD));
}
private final int store0(Field<?>[] storeFields) {
TableField<R, ?>[] keys = getPrimaryKey().getFieldsArray();
boolean executeUpdate = false;
@ -188,12 +205,10 @@ public class UpdatableRecordImpl<R extends UpdatableRecord<R>> extends TableReco
int result = 0;
if (executeUpdate) {
if (executeUpdate)
result = storeUpdate(storeFields, keys);
}
else {
else
result = storeInsert(storeFields);
}
return result;
}
@ -212,32 +227,57 @@ public class UpdatableRecordImpl<R extends UpdatableRecord<R>> extends TableReco
});
return result[0];
}
private final int storeUpdate0(Field<?>[] storeFields, TableField<R, ?>[] keys) {
UpdateQuery<R> update = create().updateQuery(getTable());
addChangedValues(storeFields, update);
Tools.addConditions(update, this, keys);
return storeMergeOrUpdate0(storeFields, keys, create().updateQuery(getTable()), false);
}
private final int storeMerge(final Field<?>[] storeFields, final TableField<R, ?>[] keys) {
final int[] result = new int[1];
delegate(configuration(), (Record) this, MERGE)
.operate(new RecordOperation<Record, RuntimeException>() {
@Override
public Record operate(Record record) throws RuntimeException {
result[0] = storeMerge0(storeFields, keys);
return record;
}
});
// MySQL returns 0 when nothing was updated, 1 when something was inserted, and 2 if something was updated
return Math.min(result[0], 1);
}
private final int storeMerge0(Field<?>[] storeFields, TableField<R, ?>[] keys) {
InsertQuery<R> merge = create().insertQuery(getTable());
merge.onDuplicateKeyUpdate(true);
return storeMergeOrUpdate0(storeFields, keys, merge, true);
}
private final <Q extends StoreQuery<R> & org.jooq.ConditionProvider> int storeMergeOrUpdate0(Field<?>[] storeFields, TableField<R, ?>[] keys, Q query, boolean merge) {
addChangedValues(storeFields, query, merge);
Tools.addConditions(query, this, keys);
// Don't store records if no value was set by client code
if (!update.isExecutable()) {
if (!query.isExecutable()) {
if (log.isDebugEnabled())
log.debug("Query is not executable", update);
log.debug("Query is not executable", query);
return 0;
}
// [#1596] Set timestamp and/or version columns to appropriate values
// [#8924] Allow for overriding this using a setting
BigInteger version = addRecordVersion(update);
Timestamp timestamp = addRecordTimestamp(update);
BigInteger version = addRecordVersion(query, merge);
Timestamp timestamp = addRecordTimestamp(query, merge);
if (isExecuteWithOptimisticLocking())
// [#1596] Add additional conditions for version and/or timestamp columns
if (isTimestampOrVersionAvailable())
addConditionForVersionAndTimestamp(update);
addConditionForVersionAndTimestamp(query);
// [#1547] Try fetching the Record again first, and compare this
// Record's original values with the ones in the database
@ -247,8 +287,8 @@ public class UpdatableRecordImpl<R extends UpdatableRecord<R>> extends TableReco
// [#1596] Check if the record was really changed in the database
// [#1859] Specify the returning clause if needed
Collection<Field<?>> key = setReturningIfNeeded(update);
int result = update.execute();
Collection<Field<?>> key = setReturningIfNeeded(query);
int result = query.execute();
checkIfChanged(result, version, timestamp);
if (result > 0) {
@ -256,7 +296,7 @@ public class UpdatableRecordImpl<R extends UpdatableRecord<R>> extends TableReco
changed(storeField, false);
// [#1859] If an update was successful try fetching the generated
getReturningIfNeeded(update, key);
getReturningIfNeeded(query, key);
}
return result;