[jOOQ/jOOQ#17088] Fix R2DBC support for TIMESTAMP WITH TIME ZONE types

This commit is contained in:
Lukas Eder 2024-08-16 14:56:38 +02:00
parent 8aefba19c1
commit 79e60a7369
4 changed files with 108 additions and 24 deletions

View File

@ -62,7 +62,7 @@ abstract class AbstractRowCountQuery extends AbstractQuery<Record> implements Ro
ConnectionFactory cf = configuration().connectionFactory();
if (!(cf instanceof NoConnectionFactory))
subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, (t, u) -> new RowCountSubscriber(u)));
subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, (t, u, s) -> new RowCountSubscriber(u, s)));
else
subscriber.onSubscribe(new BlockingRowCountSubscription(this, subscriber));
}

View File

@ -872,10 +872,30 @@ public class DefaultBinding<T, U> implements Binding<T, U> {
}
if (dataType.getType() == OffsetDateTime.class ||
dataType.getType() == OffsetTime.class ||
dataType.getType() == Instant.class
) {
switch (ctx.family()) {
case TRINO:
return true;
}
}
if (dataType.getType() == OffsetTime.class) {
switch (ctx.family()) {
case TRINO:
return true;
}
@ -3610,6 +3630,29 @@ public class DefaultBinding<T, U> implements Binding<T, U> {
}
}
@Override
final void sqlBind0(BindingSQLContext<U> ctx, OffsetDateTime value) throws SQLException {
switch (ctx.family()) {
// [#17088] The R2DBC H2 driver binds strings as CLOB, which cannot be converted to TIMESTAMPTZ
case H2:
if (isR2dbc(ctx)) {
Cast.renderCast(ctx.render(),
c -> super.sqlBind0(ctx, value),
c -> c.sql(VARCHAR.getCastTypeName(c.configuration()))
);
}
else
super.sqlBind0(ctx, value);
break;
default:
super.sqlBind0(ctx, value);
break;
}
}
@Override
final void set0(BindingSetStatementContext<U> ctx, OffsetDateTime value) throws SQLException {
SQLDialect family = ctx.family();
@ -3622,6 +3665,7 @@ public class DefaultBinding<T, U> implements Binding<T, U> {
else
ctx.statement().setString(ctx.index(), format(value, family));
}
@ -3651,6 +3695,10 @@ public class DefaultBinding<T, U> implements Binding<T, U> {
else
return OffsetDateTimeParser.offsetDateTime(ctx.resultSet().getString(ctx.index()));
}
@ -3895,7 +3943,7 @@ public class DefaultBinding<T, U> implements Binding<T, U> {
(BiFunction<Instant, ConverterContext, OffsetDateTime> & Serializable) (i, x) -> OffsetDateTime.ofInstant(i, ZoneOffset.UTC)
);
private final DefaultOffsetDateTimeBinding<U> delegate;
private final DefaultOffsetDateTimeBinding<U> delegate;
DefaultInstantBinding(DataType<Instant> dataType, Converter<Instant, U> converter) {
super(dataType, converter);
@ -3913,6 +3961,11 @@ public class DefaultBinding<T, U> implements Binding<T, U> {
delegate.sqlInline0(ctx, CONVERTER.to(value, ctx.converterContext()));
}
@Override
final void sqlBind0(BindingSQLContext<U> ctx, Instant value) throws SQLException {
delegate.sqlBind0(ctx, CONVERTER.to(value, ctx.converterContext()));
}
@Override
final void set0(BindingSetStatementContext<U> ctx, Instant value) throws SQLException {
delegate.set0(ctx, CONVERTER.to(value, ctx.converterContext()));

View File

@ -37,6 +37,7 @@
*/
package org.jooq.impl;
import static java.lang.Boolean.TRUE;
import static org.jooq.ContextConverter.scoped;
import static org.jooq.SQLDialect.MARIADB;
import static org.jooq.SQLDialect.MYSQL;
@ -51,6 +52,7 @@ import static org.jooq.impl.Tools.fields;
import static org.jooq.impl.Tools.recordFactory;
import static org.jooq.impl.Tools.translate;
import static org.jooq.impl.Tools.visitAll;
import static org.jooq.impl.Tools.BooleanDataKey.DATA_RENDER_FOR_R2DBC;
import static org.jooq.tools.StringUtils.defaultIfNull;
import static org.jooq.tools.jdbc.JDBCUtils.safeClose;
@ -91,6 +93,7 @@ import org.jooq.Cursor;
import org.jooq.DSLContext;
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.Function3;
import org.jooq.JSON;
import org.jooq.JSONB;
import org.jooq.Param;
@ -98,6 +101,7 @@ import org.jooq.Param;
import org.jooq.Query;
import org.jooq.Record;
import org.jooq.SQLDialect;
import org.jooq.Scope;
import org.jooq.TransactionalPublishable;
import org.jooq.XML;
import org.jooq.conf.Settings;
@ -268,11 +272,16 @@ final class R2DBC {
final AbstractNonBlockingSubscription<? super T> downstream;
final AtomicBoolean completed;
final AtomicBoolean completionRequested;
final R2DBCPreparedStatement statement;
AbstractResultSubscriber(AbstractNonBlockingSubscription<? super T> downstream) {
AbstractResultSubscriber(
AbstractNonBlockingSubscription<? super T> downstream,
R2DBCPreparedStatement statement
) {
this.downstream = downstream;
this.completed = new AtomicBoolean();
this.completionRequested = new AtomicBoolean();
this.statement = statement;
}
@Override
@ -302,8 +311,11 @@ final class R2DBC {
}
static final class RowCountSubscriber extends AbstractResultSubscriber<Integer> {
RowCountSubscriber(AbstractNonBlockingSubscription<? super Integer> downstream) {
super(downstream);
RowCountSubscriber(
AbstractNonBlockingSubscription<? super Integer> downstream,
R2DBCPreparedStatement statement
) {
super(downstream, statement);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@ -333,8 +345,12 @@ final class R2DBC {
final Q query;
ResultSubscriber(Q query, AbstractNonBlockingSubscription<? super R> downstream) {
super(downstream);
ResultSubscriber(
Q query,
AbstractNonBlockingSubscription<? super R> downstream,
R2DBCPreparedStatement statement
) {
super(downstream, statement);
this.query = query;
}
@ -355,7 +371,7 @@ final class R2DBC {
// TODO: What data to pass here?
DefaultBindingGetResultSetContext<?> ctx = new DefaultBindingGetResultSetContext(
new SimpleExecuteContext(query.configuration(), query.configuration().data()),
new R2DBCResultSet(query.configuration(), row, meta),
new R2DBCResultSet(query.configuration(), statement, row, meta),
0
);
@ -406,15 +422,15 @@ final class R2DBC {
static final class QueryExecutionSubscriber<T, Q extends Query> extends ConnectionSubscriber<T> {
final Q query;
final Configuration configuration;
final BiFunction<Q, AbstractNonBlockingSubscription<T>, Subscriber<Result>> resultSubscriber;
volatile String sql;
final Q query;
final Configuration configuration;
final Function3<Q, AbstractNonBlockingSubscription<T>, R2DBCPreparedStatement, Subscriber<Result>> resultSubscriber;
volatile String sql;
QueryExecutionSubscriber(
Q query,
QuerySubscription<T, Q> downstream,
BiFunction<Q, AbstractNonBlockingSubscription<T>, Subscriber<Result>> resultSubscriber
Function3<Q, AbstractNonBlockingSubscription<T>, R2DBCPreparedStatement, Subscriber<Result>> resultSubscriber
) {
super(downstream);
@ -429,7 +445,8 @@ final class R2DBC {
if (query.isExecutable()) {
Rendered rendered = rendered(configuration, query);
Statement stmt = c.createStatement(sql = rendered.sql);
new DefaultBindContext(configuration, null, new R2DBCPreparedStatement(configuration, stmt)).visit(rendered.bindValues);
R2DBCPreparedStatement s = new R2DBCPreparedStatement(configuration, stmt);
new DefaultBindContext(configuration, null, s).visit(rendered.bindValues);
// TODO: Reuse org.jooq.impl.Tools.setFetchSize(ExecuteContext ctx, int fetchSize)
AbstractResultQuery<?> q1 = abstractResultQuery(query);
@ -453,13 +470,13 @@ final class R2DBC {
&& !q2.nativeSupportReturningOrDataChangeDeltaTable(configuration.dsl()))
stmt.returnGeneratedValues(Tools.map(q2.returningResolvedAsterisks, Field::getName, String[]::new));
stmt.execute().subscribe(resultSubscriber.apply(query, downstream));
stmt.execute().subscribe(resultSubscriber.apply(query, downstream, s));
}
else {
if (log.isDebugEnabled())
log.debug("Query is not executable", query);
Subscriber<Result> s = resultSubscriber.apply(query, downstream);
Subscriber<Result> s = resultSubscriber.apply(query, downstream, null);
s.onSubscribe(new NoOpSubscription(s));
}
}
@ -505,7 +522,7 @@ final class R2DBC {
for (int i = 0; i < batch.queries.length; i++)
b = b.add(DSL.using(batch.configuration).renderInlined(batch.queries[i]));
b.execute().subscribe(new RowCountSubscriber(downstream));
b.execute().subscribe(new RowCountSubscriber(downstream, null));
}
// [#13343] Cancel the downstream in case of a rendering bug in jOOQ
@ -535,6 +552,7 @@ final class R2DBC {
batch.checkBindValues();
Rendered rendered = rendered(batch.configuration, batch.query);
Statement stmt = c.createStatement(rendered.sql);
R2DBCPreparedStatement s = new R2DBCPreparedStatement(batch.query.configuration(), stmt);
Param<?>[] params = rendered.bindValues.toArray(EMPTY_PARAM);
boolean first = true;
@ -546,19 +564,19 @@ final class R2DBC {
if (first)
first = false;
else
stmt = stmt.add();
s = new R2DBCPreparedStatement(batch.query.configuration(), stmt = stmt.add());
// [#1371] [#2139] Don't bind variables directly onto statement, bind them through the collected params
// list to preserve type information
// [#3547] The original query may have no Params specified - e.g. when it was constructed with
// plain SQL. In that case, infer the bind value type directly from the bind value
visitAll(new DefaultBindContext(batch.configuration, null, new R2DBCPreparedStatement(batch.query.configuration(), stmt)),
visitAll(new DefaultBindContext(batch.configuration, null, s),
(params.length > 0)
? fields(bindValues, params)
: fields(bindValues));
}
stmt.execute().subscribe(new RowCountSubscriber(downstream));
stmt.execute().subscribe(new RowCountSubscriber(downstream, s));
}
// [#13343] Cancel the downstream in case of a rendering bug in jOOQ
@ -680,7 +698,7 @@ final class R2DBC {
QuerySubscription(
Q query,
Subscriber<? super T> subscriber,
BiFunction<Q, AbstractNonBlockingSubscription<T>, Subscriber<Result>> resultSubscriber
Function3<Q, AbstractNonBlockingSubscription<T>, R2DBCPreparedStatement, Subscriber<Result>> resultSubscriber
) {
super(query.configuration(), subscriber);
@ -804,6 +822,9 @@ final class R2DBC {
setParamType(configuration.dialect(), s)
), null);
// [#17088] Some rendering decisions may be made based on whether we're using R2DBC
render.data(DATA_RENDER_FOR_R2DBC, true);
return new Rendered(
render.paramType(render.settings().getParamType()).visit(query).render(),
render.bindValues(),
@ -1181,8 +1202,8 @@ final class R2DBC {
final RowMetadata m;
boolean wasNull;
R2DBCResultSet(Configuration c, Row r, RowMetadata m) {
super(null, null, () -> new SQLFeatureNotSupportedException("Unsupported operation of the JDBC to R2DBC bridge."));
R2DBCResultSet(Configuration c, R2DBCPreparedStatement s, Row r, RowMetadata m) {
super(null, s, () -> new SQLFeatureNotSupportedException("Unsupported operation of the JDBC to R2DBC bridge."));
this.c = c;
this.r = new DefaultRow(c, r);
@ -1653,6 +1674,10 @@ final class R2DBC {
return statement instanceof R2DBCPreparedStatement;
}
static final boolean isR2dbc(Scope ctx) {
return TRUE.equals(ctx.data(DATA_RENDER_FOR_R2DBC));
}
/**
* [#13343] Prevent debug rendering errors from influencing control flow.
*/

View File

@ -688,6 +688,12 @@ final class Tools {
* not the query itself.
*/
DATA_RENDER_IMPLICIT_JOIN,
/**
* [#17088] Tell the {@link RenderContext} that we're rendering for
* R2DBC.
*/
DATA_RENDER_FOR_R2DBC
;
private final boolean resetInSubqueryScope;