[jOOQ/jOOQ#11700] Add a R2DBCPreparedStatement proxy

In order to re-use existing variable binding logic (and eventually, to
offer supporing org.jooq.Binding transparently), we'll implement an
R2DBCPreparedStatement proxy that wraps the R2DBC Statement in a JDBC
PreparedStatement and can thus use the existing lifecycle methods.
This commit is contained in:
Lukas Eder 2021-03-24 13:39:50 +01:00
parent 87d356de41
commit a49950b831
2 changed files with 340 additions and 30 deletions

View File

@ -205,7 +205,7 @@ final class ParsingConnectionFactory implements ConnectionFactory {
@Override
public Statement createStatement(String sql) {
return new ParsingR2DBCStatement(this, sql);
return new ParsingR2DBCStatement(delegate, sql);
}
}

View File

@ -37,27 +37,47 @@
*/
package org.jooq.impl;
import static org.jooq.impl.DefaultDataType.getDataType;
import static org.jooq.conf.ParamType.NAMED;
import static org.jooq.impl.Tools.recordFactory;
import static org.jooq.tools.StringUtils.defaultIfNull;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
import java.net.URL;
import java.sql.Array;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
import java.sql.NClob;
import java.sql.Ref;
import java.sql.ResultSetMetaData;
import java.sql.RowId;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLType;
import java.sql.SQLXML;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.sql.Wrapper;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayDeque;
import java.util.Calendar;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import org.jooq.Configuration;
import org.jooq.Cursor;
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.Param;
import org.jooq.Record;
import org.jooq.conf.ParamType;
import org.jooq.conf.SettingsTools;
import org.jooq.impl.DefaultRenderContext.Rendered;
import org.jooq.tools.jdbc.DefaultPreparedStatement;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@ -80,18 +100,18 @@ final class R2DBC {
final ResultSubscriber<R> upstream;
Subscription subscription;
public RowSubscriber(ResultSubscriber<R> s) {
RowSubscriber(ResultSubscriber<R> s) {
this.upstream = s;
}
@Override
public void onSubscribe(Subscription s) {
public final void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(upstream.upstream.upstream.requested.getAndSet(0L));
}
@Override
public void onNext(R record) {
public final void onNext(R record) {
upstream.upstream.upstream.subscriber.onNext(record);
long requested = upstream.upstream.upstream.requested.getAndSet(0L);
@ -100,12 +120,12 @@ final class R2DBC {
}
@Override
public void onError(Throwable t) {
public final void onError(Throwable t) {
upstream.onError(t);
}
@Override
public void onComplete() {
public final void onComplete() {
upstream.upstream.upstream.subscriber.onComplete();
}
}
@ -121,13 +141,13 @@ final class R2DBC {
}
@Override
public void onSubscribe(Subscription s) {
public final void onSubscribe(Subscription s) {
s.request(1);
}
@SuppressWarnings("unchecked")
@Override
public void onNext(io.r2dbc.spi.Result r) {
public final void onNext(io.r2dbc.spi.Result r) {
r.map((row, m) -> {
try {
// TODO: Cache this getFields() call
@ -161,12 +181,12 @@ final class R2DBC {
}
@Override
public void onError(Throwable t) {
public final void onError(Throwable t) {
upstream.onError(t);
}
@Override
public void onComplete() {}
public final void onComplete() {}
}
static final class ConnectionSubscriber<R extends Record> implements Subscriber<Connection> {
@ -180,26 +200,27 @@ final class R2DBC {
}
@Override
public void onSubscribe(Subscription s) {
public final void onSubscribe(Subscription s) {
s.request(1);
}
@Override
public void onNext(Connection c) {
public final void onNext(Connection c) {
try {
DefaultRenderContext render = new DefaultRenderContext(query.configuration().derive(
SettingsTools.clone(query.configuration().settings()).withParamType(ParamType.NAMED).withRenderNamedParamPrefix("$")
SettingsTools.clone(query.configuration().settings()).withParamType(NAMED).withRenderNamedParamPrefix("$")
));
Rendered r = new Rendered(render.visit(query).render(), render.bindValues(), render.skipUpdateCounts());
Rendered r = new Rendered(render.paramType(NAMED).visit(query).render(), render.bindValues(), render.skipUpdateCounts());
Statement stmt = c.createStatement(r.sql);
int i = 0;
for (Param<?> p : r.bindValues)
if (p.getValue() == null)
stmt.bindNull(i++, p.getType());
else
stmt.bind(i++, p.getValue());
new DefaultBindContext(query.configuration(), new R2DBCPreparedStatement(query.configuration(), stmt)).visit(r.bindValues);
// ;
// int i = 0;
// for (Param<?> p : r.bindValues)
// if (p.getValue() == null)
// stmt.bindNull(i++, p.getType());
// else
// stmt.bind(i++, p.getValue());
stmt.execute().subscribe(new ResultSubscriber<>(query, this));
}
@ -211,12 +232,12 @@ final class R2DBC {
}
@Override
public void onError(Throwable t) {
public final void onError(Throwable t) {
upstream.subscriber.onError(t);
}
@Override
public void onComplete() {}
public final void onComplete() {}
}
static final class RecordSubscription<R extends Record> implements Subscription {
@ -233,7 +254,7 @@ final class R2DBC {
}
@Override
public void request(long n) {
public final void request(long n) {
requested.getAndAdd(n);
if (connection == null) {
@ -243,7 +264,7 @@ final class R2DBC {
}
@Override
public void cancel() {
public final void cancel() {
subscriber.onComplete();
}
}
@ -260,7 +281,7 @@ final class R2DBC {
}
@Override
public void request(long n) {
public final void request(long n) {
int i = (int) Math.min(n, Integer.MAX_VALUE);
try {
@ -298,7 +319,7 @@ final class R2DBC {
}
@Override
public void cancel() {
public final void cancel() {
close();
}
}
@ -320,6 +341,295 @@ final class R2DBC {
}
}
static final class R2DBCPreparedStatement extends DefaultPreparedStatement {
final Configuration c;
final Statement s;
R2DBCPreparedStatement(Configuration c, Statement s) {
// TODO: Refactor super class to throw a custom exception if trying to dereference this null pointer.
super(null);
this.c = c;
this.s = s;
}
private final <T> void bindNull(int parameterIndex, Class<T> type) {
s.bindNull(parameterIndex - 1, type);
}
private final <T> void bindNullable(int parameterIndex, T x, Class<T> type) {
bindNullable(parameterIndex, x, type, t -> t);
}
private final <T, U> void bindNullable(int parameterIndex, T x, Class<U> type, Function<? super T, ? extends U> conversion) {
if (x == null)
s.bindNull(parameterIndex - 1, type);
else
bindNonNull(parameterIndex - 1, conversion.apply(x));
}
private final void bindNonNull(int parameterIndex, Object x) {
s.bind(parameterIndex - 1, x);
}
private final Class<?> type(int sqlType) {
return DefaultDataType.getDataType(c.family(), sqlType).getType();
}
@Override
public final void setNull(int parameterIndex, int sqlType) throws SQLException {
bindNull(parameterIndex, type(sqlType));
}
@Override
public final void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
bindNull(parameterIndex, type(sqlType));
}
@Override
public final void setBoolean(int parameterIndex, boolean x) throws SQLException {
bindNonNull(parameterIndex, x);
}
@Override
public final void setByte(int parameterIndex, byte x) throws SQLException {
bindNonNull(parameterIndex, x);
}
@Override
public final void setShort(int parameterIndex, short x) throws SQLException {
bindNonNull(parameterIndex, x);
}
@Override
public final void setInt(int parameterIndex, int x) throws SQLException {
bindNonNull(parameterIndex, x);
}
@Override
public final void setLong(int parameterIndex, long x) throws SQLException {
bindNonNull(parameterIndex, x);
}
@Override
public final void setFloat(int parameterIndex, float x) throws SQLException {
bindNonNull(parameterIndex, x);
}
@Override
public final void setDouble(int parameterIndex, double x) throws SQLException {
bindNonNull(parameterIndex, x);
}
@Override
public final void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
bindNullable(parameterIndex, x, BigDecimal.class);
}
@Override
public final void setString(int parameterIndex, String x) throws SQLException {
bindNullable(parameterIndex, x, String.class);
}
@Override
public final void setNString(int parameterIndex, String value) throws SQLException {
bindNullable(parameterIndex, value, String.class);
}
@Override
public final void setBytes(int parameterIndex, byte[] x) throws SQLException {
bindNullable(parameterIndex, x, byte[].class);
}
@Override
public final void setDate(int parameterIndex, Date x) throws SQLException {
bindNullable(parameterIndex, x, LocalDate.class, Date::toLocalDate);
}
@Override
public final void setTime(int parameterIndex, Time x) throws SQLException {
bindNullable(parameterIndex, x, LocalTime.class, Time::toLocalTime);
}
@Override
public final void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
bindNullable(parameterIndex, x, LocalDateTime.class, Timestamp::toLocalDateTime);
}
@SuppressWarnings("unchecked")
@Override
public final void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
bindNullable(parameterIndex, x, (Class<Object>) type(targetSqlType));
}
@Override
public final void setObject(int parameterIndex, Object x) throws SQLException {
bindNullable(parameterIndex, x, Object.class);
}
@Override
public final void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
setObject(parameterIndex, x, targetSqlType);
}
@Override
public final void setObject(int parameterIndex, Object x, SQLType targetSqlType, int scaleOrLength) throws SQLException {
setObject(parameterIndex, x, defaultIfNull(targetSqlType.getVendorTypeNumber(), Types.OTHER));
}
@Override
public final void setObject(int parameterIndex, Object x, SQLType targetSqlType) throws SQLException {
setObject(parameterIndex, x, defaultIfNull(targetSqlType.getVendorTypeNumber(), Types.OTHER));
}
@Override
public final void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setRef(int parameterIndex, Ref x) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setBlob(int parameterIndex, Blob x) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setClob(int parameterIndex, Clob x) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setArray(int parameterIndex, Array x) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setURL(int parameterIndex, URL x) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setRowId(int parameterIndex, RowId x) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setNClob(int parameterIndex, NClob value) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setClob(int parameterIndex, Reader reader) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
@Override
public final void setNClob(int parameterIndex, Reader reader) throws SQLException {
throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type");
}
}
static final class R2DBCResultSetMetaData extends R2DBCWrapper implements ResultSetMetaData {
final Configuration c;