[jOOQ/jOOQ#14541] Add support for top level nested records in R2DBC

queries
This commit is contained in:
Lukas Eder 2023-01-23 18:05:42 +01:00
parent 10c809f8b2
commit cb5516023c
2 changed files with 220 additions and 177 deletions

View File

@ -77,14 +77,13 @@ import java.util.function.Supplier;
import org.jooq.Attachable;
import org.jooq.BindingGetResultSetContext;
import org.jooq.Converter;
import org.jooq.ContextConverter;
import org.jooq.ExecuteContext;
import org.jooq.ExecuteListener;
import org.jooq.Field;
// ...
import org.jooq.Record;
import org.jooq.Result;
import org.jooq.ContextConverter;
import org.jooq.exception.ControlFlowSignal;
import org.jooq.tools.JooqLogger;
import org.jooq.tools.jdbc.JDBC41ResultSet;
@ -95,28 +94,21 @@ import org.jooq.tools.jdbc.JDBCUtils;
*/
final class CursorImpl<R extends Record> extends AbstractCursor<R> {
private static final JooqLogger log = JooqLogger.getLogger(CursorImpl.class);
private static final JooqLogger log = JooqLogger.getLogger(CursorImpl.class);
final ExecuteContext ctx;
final ExecuteListener listener;
private final boolean[] intern;
private final boolean keepResultSet;
private final boolean keepStatement;
private final boolean autoclosing;
private final int maxRows;
private final Supplier<? extends R> factory;
private boolean isClosed;
final ExecuteContext ctx;
final ExecuteListener listener;
private final boolean[] intern;
private final boolean keepResultSet;
private final boolean keepStatement;
private final boolean autoclosing;
private final int maxRows;
private final Supplier<? extends R> factory;
private boolean isClosed;
private transient CursorResultSet rs;
private transient DefaultBindingGetResultSetContext<?> rsContext;
private transient Iterator<R> iterator;
private transient int rows;
private transient CursorResultSet rs;
private transient Iterator<R> iterator;
private transient int rows;
@SuppressWarnings("unchecked")
CursorImpl(ExecuteContext ctx, ExecuteListener listener, Field<?>[] fields, int[] internIndexes, boolean keepStatement, boolean keepResultSet) {
@ -132,11 +124,6 @@ final class CursorImpl<R extends Record> extends AbstractCursor<R> {
this.keepStatement = keepStatement;
this.keepResultSet = keepResultSet;
this.rs = new CursorResultSet();
this.rsContext = new DefaultBindingGetResultSetContext<>(ctx, rs, 0);
this.maxRows = maxRows;
this.autoclosing = autoclosing;
@ -163,59 +150,6 @@ final class CursorImpl<R extends Record> extends AbstractCursor<R> {
// XXX: Cursor API
// -------------------------------------------------------------------------
@Override
public final Iterator<R> iterator() {
if (iterator == null) {
@ -1410,7 +1344,12 @@ final class CursorImpl<R extends Record> extends AbstractCursor<R> {
/**
* [#11099] Cache this instance for the entire cursor.
*/
private final CursorRecordInitialiser initialiser = new CursorRecordInitialiser(fields, 0);
private final CursorRecordInitialiser initialiser = new CursorRecordInitialiser(
ctx, listener,
new DefaultBindingGetResultSetContext<>(ctx, rs, 0),
fields, 0,
intern
);
@SuppressWarnings("unchecked")
private final RecordDelegate<AbstractRecord> recordDelegate = Tools.newRecord(true, (Supplier<AbstractRecord>) factory, ((DefaultExecuteContext) ctx).originalConfiguration());
@ -1481,27 +1420,79 @@ final class CursorImpl<R extends Record> extends AbstractCursor<R> {
public final void remove() {
throw new UnsupportedOperationException();
}
}
private class CursorRecordInitialiser implements ThrowingFunction<AbstractRecord, AbstractRecord, SQLException> {
/**
* A utility to initialise records and transfer data in a
* {@link RecordDelegate}.
* <p>
* While {@link CursorImpl} is strictly for blocking execution on JDBC, this
* initialiser can also be used by the {@link R2DBC} implementation.
*/
static class CursorRecordInitialiser implements ThrowingFunction<AbstractRecord, AbstractRecord, SQLException> {
private final AbstractRow<?> initialiserFields;
private int offset;
private final ExecuteContext ctx;
private final ExecuteListener listener;
private final AbstractRow<?> initialiserFields;
private int offset;
private final boolean[] intern;
CursorRecordInitialiser(AbstractRow<?> initialiserFields, int offset) {
this.initialiserFields = initialiserFields;
this.offset = offset;
}
CursorRecordInitialiser reset() {
offset = 0;
return this;
}
@Override
public AbstractRecord apply(AbstractRecord record) throws SQLException {
ctx.record(record);
listener.recordStart(ctx);
int size = initialiserFields.size();
private final DefaultBindingGetResultSetContext<?> rsContext;
CursorRecordInitialiser(
ExecuteContext ctx,
ExecuteListener listener,
DefaultBindingGetResultSetContext<?> rsContext,
AbstractRow<?> initialiserFields,
int offset,
boolean[] intern
) {
this.ctx = ctx;
this.listener = listener;
this.rsContext = rsContext;
this.initialiserFields = initialiserFields;
this.offset = offset;
this.intern = intern;
}
CursorRecordInitialiser reset() {
offset = 0;
return this;
}
@Override
public AbstractRecord apply(AbstractRecord record) throws SQLException {
ctx.record(record);
listener.recordStart(ctx);
int size = initialiserFields.size();
@ -1522,75 +1513,136 @@ final class CursorImpl<R extends Record> extends AbstractCursor<R> {
for (int i = 0; i < size; i++)
setValue(record, initialiserFields.field(i), i);
for (int i = 0; i < size; i++)
setValue(record, initialiserFields.field(i), i);
if (intern != null)
for (int i = 0; i < intern.length; i++)
if (intern[i])
record.intern0(i);
if (intern != null)
for (int i = 0; i < intern.length; i++)
if (intern[i])
record.intern0(i);
ctx.record(record);
listener.recordEnd(ctx);
ctx.record(record);
listener.recordEnd(ctx);
return record;
}
return record;
}
/**
* Utility method to prevent unnecessary unchecked conversions
*/
@SuppressWarnings("unchecked")
private final <T> void setValue(AbstractRecord record, Field<T> field, int index) throws SQLException {
try {
T value;
AbstractRow<?> nested = null;
Class<? extends AbstractRecord> recordType = null;
/**
* Utility method to prevent unnecessary unchecked conversions
*/
@SuppressWarnings("unchecked")
private final <T> void setValue(AbstractRecord record, Field<T> field, int index) throws SQLException {
try {
T value;
AbstractRow<?> nested = null;
Class<? extends AbstractRecord> recordType = null;
// [#7100] TODO: This should be transparent to the CursorImpl
// RowField may have a Row[N].mapping(...) applied
Field<?> f = uncoerce(field);
// [#7100] TODO: This should be transparent to the CursorImpl
// RowField may have a Row[N].mapping(...) applied
Field<?> f = uncoerce(field);
// [#13560] Queries may decide themselves to replace the
// flattening emulation by the MULTISET emulation
if (f instanceof AbstractRowAsField
&& NO_NATIVE_SUPPORT.contains(ctx.dialect())
&& !TRUE.equals(ctx.data(DATA_MULTISET_CONTENT))) {
nested = ((AbstractRowAsField<?>) f).emulatedFields(configuration);
recordType = (Class<? extends AbstractRecord>) ((AbstractRowAsField<?>) f).getRecordType();
}
else if (f.getDataType().isEmbeddable()) {
nested = Tools.row0(embeddedFields(f));
recordType = embeddedRecordType(f);
}
int nestedOffset = offset + index;
if (nested != null) {
CursorRecordInitialiser operation = new CursorRecordInitialiser(nested, nestedOffset);
value = (T) Tools.newRecord(true, (Class<AbstractRecord>) recordType, (AbstractRow<AbstractRecord>) nested, ((DefaultExecuteContext) ctx).originalConfiguration())
.operate(operation);
// [#7100] TODO: Is there a more elegant way to do this?
if (f != field)
value = ((ContextConverter<Object, T>) field.getConverter()).from(value, ctx.converterContext());
offset += operation.offset - nestedOffset + nested.size() - 1;
}
else {
rsContext.index(nestedOffset + 1);
rsContext.field((Field) field);
field.getBinding().get((BindingGetResultSetContext<T>) rsContext);
value = (T) rsContext.value();
}
record.values[index] = value;
record.originals[index] = value;
// [#13560] Queries may decide themselves to replace the
// flattening emulation by the MULTISET emulation
if (f instanceof AbstractRowAsField
&& NO_NATIVE_SUPPORT.contains(ctx.dialect())
&& !TRUE.equals(ctx.data(DATA_MULTISET_CONTENT))) {
nested = ((AbstractRowAsField<?>) f).emulatedFields(ctx.configuration());
recordType = (Class<? extends AbstractRecord>) ((AbstractRowAsField<?>) f).getRecordType();
}
else if (f.getDataType().isEmbeddable()) {
nested = Tools.row0(embeddedFields(f));
recordType = embeddedRecordType(f);
}
// [#5901] Improved error logging, mostly useful when there are some data type conversion errors
catch (Exception e) {
throw new SQLException("Error while reading field: " + field + ", at JDBC index: " + (offset + index + 1), e);
int nestedOffset = offset + index;
if (nested != null) {
CursorRecordInitialiser operation = new CursorRecordInitialiser(
ctx, listener,
rsContext,
nested, nestedOffset,
intern
);
value = (T) Tools.newRecord(true, (Class<AbstractRecord>) recordType, (AbstractRow<AbstractRecord>) nested, ((DefaultExecuteContext) ctx).originalConfiguration())
.operate(operation);
// [#7100] TODO: Is there a more elegant way to do this?
if (f != field)
value = ((ContextConverter<Object, T>) field.getConverter()).from(value, ctx.converterContext());
offset += operation.offset - nestedOffset + nested.size() - 1;
}
else {
rsContext.index(nestedOffset + 1);
rsContext.field((Field) field);
field.getBinding().get((BindingGetResultSetContext<T>) rsContext);
value = (T) rsContext.value();
}
record.values[index] = value;
record.originals[index] = value;
}
// [#5901] Improved error logging, mostly useful when there are some data type conversion errors
catch (Exception e) {
throw new SQLException("Error while reading field: " + field + ", at JDBC index: " + (offset + index + 1), e);
}
}
}
}

View File

@ -334,7 +334,7 @@ final class R2DBC {
this.query = query;
}
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public final void onNext(Result r) {
r.map((row, meta) -> {
@ -345,28 +345,19 @@ final class R2DBC {
// TODO: This call is duplicated from CursorImpl and related classes.
// Refactor this call to make sure code is re-used, especially when
// ExecuteListener lifecycle management is implemented
RecordDelegate<? extends AbstractRecord> delegate = Tools.newRecord(true, recordFactory((Class<AbstractRecord>) query.getRecordType(), (AbstractRow<AbstractRecord>) Tools.row0(fields)), query.configuration());
RecordDelegate<AbstractRecord> delegate = Tools.newRecord(true, recordFactory((Class<AbstractRecord>) query.getRecordType(), (AbstractRow<AbstractRecord>) Tools.row0(fields)), query.configuration());
return (R) delegate.operate(record -> {
// TODO: What data to pass here?
DefaultBindingGetResultSetContext<?> ctx = new DefaultBindingGetResultSetContext(
new SimpleExecuteContext(query.configuration(), query.configuration().data()),
new R2DBCResultSet(query.configuration(), row, meta),
0
);
// TODO: What data to pass here?
DefaultBindingGetResultSetContext<?> ctx = new DefaultBindingGetResultSetContext(
new SimpleExecuteContext(query.configuration(), query.configuration().data()),
new R2DBCResultSet(query.configuration(), row, meta),
0
);
// TODO: Make sure all the embeddable records, and other types of nested records are supported
for (int i = 0; i < fields.length; i++) {
ctx.index(i + 1);
ctx.field((Field) fields[i]);
fields[i].getBinding().get((BindingGetResultSetContext) ctx);
record.values[i] = ctx.value();
record.originals[i] = ctx.value();
}
return record;
});
return (R) delegate.operate(new CursorImpl.CursorRecordInitialiser(
new DefaultExecuteContext(query.configuration(), query), new DefaultExecuteListener(),
ctx, Tools.row0(fields), 0, new boolean[0]
));
}
catch (Throwable t) {
onError(t);