From a49950b8317d5bc1ee890dc3e62966f1029b1028 Mon Sep 17 00:00:00 2001 From: Lukas Eder Date: Wed, 24 Mar 2021 13:39:50 +0100 Subject: [PATCH] [jOOQ/jOOQ#11700] Add a R2DBCPreparedStatement proxy In order to re-use existing variable binding logic (and eventually, to offer supporing org.jooq.Binding transparently), we'll implement an R2DBCPreparedStatement proxy that wraps the R2DBC Statement in a JDBC PreparedStatement and can thus use the existing lifecycle methods. --- .../jooq/impl/ParsingConnectionFactory.java | 2 +- jOOQ/src/main/java/org/jooq/impl/R2DBC.java | 368 ++++++++++++++++-- 2 files changed, 340 insertions(+), 30 deletions(-) diff --git a/jOOQ/src/main/java/org/jooq/impl/ParsingConnectionFactory.java b/jOOQ/src/main/java/org/jooq/impl/ParsingConnectionFactory.java index 72fa9f12a4..aaf061570f 100644 --- a/jOOQ/src/main/java/org/jooq/impl/ParsingConnectionFactory.java +++ b/jOOQ/src/main/java/org/jooq/impl/ParsingConnectionFactory.java @@ -205,7 +205,7 @@ final class ParsingConnectionFactory implements ConnectionFactory { @Override public Statement createStatement(String sql) { - return new ParsingR2DBCStatement(this, sql); + return new ParsingR2DBCStatement(delegate, sql); } } diff --git a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java index 6d13d0ac30..10f9b3bc9c 100644 --- a/jOOQ/src/main/java/org/jooq/impl/R2DBC.java +++ b/jOOQ/src/main/java/org/jooq/impl/R2DBC.java @@ -37,27 +37,47 @@ */ package org.jooq.impl; -import static org.jooq.impl.DefaultDataType.getDataType; +import static org.jooq.conf.ParamType.NAMED; import static org.jooq.impl.Tools.recordFactory; import static org.jooq.tools.StringUtils.defaultIfNull; +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.NClob; +import java.sql.Ref; import java.sql.ResultSetMetaData; +import java.sql.RowId; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLType; +import java.sql.SQLXML; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.Types; import java.sql.Wrapper; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.ArrayDeque; +import java.util.Calendar; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.function.Supplier; import org.jooq.Configuration; import org.jooq.Cursor; import org.jooq.DataType; import org.jooq.Field; -import org.jooq.Param; import org.jooq.Record; -import org.jooq.conf.ParamType; import org.jooq.conf.SettingsTools; import org.jooq.impl.DefaultRenderContext.Rendered; +import org.jooq.tools.jdbc.DefaultPreparedStatement; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -80,18 +100,18 @@ final class R2DBC { final ResultSubscriber upstream; Subscription subscription; - public RowSubscriber(ResultSubscriber s) { + RowSubscriber(ResultSubscriber s) { this.upstream = s; } @Override - public void onSubscribe(Subscription s) { + public final void onSubscribe(Subscription s) { this.subscription = s; subscription.request(upstream.upstream.upstream.requested.getAndSet(0L)); } @Override - public void onNext(R record) { + public final void onNext(R record) { upstream.upstream.upstream.subscriber.onNext(record); long requested = upstream.upstream.upstream.requested.getAndSet(0L); @@ -100,12 +120,12 @@ final class R2DBC { } @Override - public void onError(Throwable t) { + public final void onError(Throwable t) { upstream.onError(t); } @Override - public void onComplete() { + public final void onComplete() { upstream.upstream.upstream.subscriber.onComplete(); } } @@ -121,13 +141,13 @@ final class R2DBC { } @Override - public void onSubscribe(Subscription s) { + public final void onSubscribe(Subscription s) { s.request(1); } @SuppressWarnings("unchecked") @Override - public void onNext(io.r2dbc.spi.Result r) { + public final void onNext(io.r2dbc.spi.Result r) { r.map((row, m) -> { try { // TODO: Cache this getFields() call @@ -161,12 +181,12 @@ final class R2DBC { } @Override - public void onError(Throwable t) { + public final void onError(Throwable t) { upstream.onError(t); } @Override - public void onComplete() {} + public final void onComplete() {} } static final class ConnectionSubscriber implements Subscriber { @@ -180,26 +200,27 @@ final class R2DBC { } @Override - public void onSubscribe(Subscription s) { + public final void onSubscribe(Subscription s) { s.request(1); } @Override - public void onNext(Connection c) { + public final void onNext(Connection c) { try { DefaultRenderContext render = new DefaultRenderContext(query.configuration().derive( - SettingsTools.clone(query.configuration().settings()).withParamType(ParamType.NAMED).withRenderNamedParamPrefix("$") + SettingsTools.clone(query.configuration().settings()).withParamType(NAMED).withRenderNamedParamPrefix("$") )); - Rendered r = new Rendered(render.visit(query).render(), render.bindValues(), render.skipUpdateCounts()); + Rendered r = new Rendered(render.paramType(NAMED).visit(query).render(), render.bindValues(), render.skipUpdateCounts()); Statement stmt = c.createStatement(r.sql); - - int i = 0; - for (Param p : r.bindValues) - if (p.getValue() == null) - stmt.bindNull(i++, p.getType()); - else - stmt.bind(i++, p.getValue()); + new DefaultBindContext(query.configuration(), new R2DBCPreparedStatement(query.configuration(), stmt)).visit(r.bindValues); +// ; +// int i = 0; +// for (Param p : r.bindValues) +// if (p.getValue() == null) +// stmt.bindNull(i++, p.getType()); +// else +// stmt.bind(i++, p.getValue()); stmt.execute().subscribe(new ResultSubscriber<>(query, this)); } @@ -211,12 +232,12 @@ final class R2DBC { } @Override - public void onError(Throwable t) { + public final void onError(Throwable t) { upstream.subscriber.onError(t); } @Override - public void onComplete() {} + public final void onComplete() {} } static final class RecordSubscription implements Subscription { @@ -233,7 +254,7 @@ final class R2DBC { } @Override - public void request(long n) { + public final void request(long n) { requested.getAndAdd(n); if (connection == null) { @@ -243,7 +264,7 @@ final class R2DBC { } @Override - public void cancel() { + public final void cancel() { subscriber.onComplete(); } } @@ -260,7 +281,7 @@ final class R2DBC { } @Override - public void request(long n) { + public final void request(long n) { int i = (int) Math.min(n, Integer.MAX_VALUE); try { @@ -298,7 +319,7 @@ final class R2DBC { } @Override - public void cancel() { + public final void cancel() { close(); } } @@ -320,6 +341,295 @@ final class R2DBC { } } + static final class R2DBCPreparedStatement extends DefaultPreparedStatement { + + final Configuration c; + final Statement s; + + R2DBCPreparedStatement(Configuration c, Statement s) { + + // TODO: Refactor super class to throw a custom exception if trying to dereference this null pointer. + super(null); + + this.c = c; + this.s = s; + } + + private final void bindNull(int parameterIndex, Class type) { + s.bindNull(parameterIndex - 1, type); + } + + private final void bindNullable(int parameterIndex, T x, Class type) { + bindNullable(parameterIndex, x, type, t -> t); + } + + private final void bindNullable(int parameterIndex, T x, Class type, Function conversion) { + if (x == null) + s.bindNull(parameterIndex - 1, type); + else + bindNonNull(parameterIndex - 1, conversion.apply(x)); + } + + private final void bindNonNull(int parameterIndex, Object x) { + s.bind(parameterIndex - 1, x); + } + + private final Class type(int sqlType) { + return DefaultDataType.getDataType(c.family(), sqlType).getType(); + } + + @Override + public final void setNull(int parameterIndex, int sqlType) throws SQLException { + bindNull(parameterIndex, type(sqlType)); + } + + @Override + public final void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException { + bindNull(parameterIndex, type(sqlType)); + } + + @Override + public final void setBoolean(int parameterIndex, boolean x) throws SQLException { + bindNonNull(parameterIndex, x); + } + + @Override + public final void setByte(int parameterIndex, byte x) throws SQLException { + bindNonNull(parameterIndex, x); + } + + @Override + public final void setShort(int parameterIndex, short x) throws SQLException { + bindNonNull(parameterIndex, x); + } + + @Override + public final void setInt(int parameterIndex, int x) throws SQLException { + bindNonNull(parameterIndex, x); + } + + @Override + public final void setLong(int parameterIndex, long x) throws SQLException { + bindNonNull(parameterIndex, x); + } + + @Override + public final void setFloat(int parameterIndex, float x) throws SQLException { + bindNonNull(parameterIndex, x); + } + + @Override + public final void setDouble(int parameterIndex, double x) throws SQLException { + bindNonNull(parameterIndex, x); + } + + @Override + public final void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException { + bindNullable(parameterIndex, x, BigDecimal.class); + } + + @Override + public final void setString(int parameterIndex, String x) throws SQLException { + bindNullable(parameterIndex, x, String.class); + } + + @Override + public final void setNString(int parameterIndex, String value) throws SQLException { + bindNullable(parameterIndex, value, String.class); + } + + @Override + public final void setBytes(int parameterIndex, byte[] x) throws SQLException { + bindNullable(parameterIndex, x, byte[].class); + } + + @Override + public final void setDate(int parameterIndex, Date x) throws SQLException { + bindNullable(parameterIndex, x, LocalDate.class, Date::toLocalDate); + } + + @Override + public final void setTime(int parameterIndex, Time x) throws SQLException { + bindNullable(parameterIndex, x, LocalTime.class, Time::toLocalTime); + } + + @Override + public final void setTimestamp(int parameterIndex, Timestamp x) throws SQLException { + bindNullable(parameterIndex, x, LocalDateTime.class, Timestamp::toLocalDateTime); + } + + @SuppressWarnings("unchecked") + @Override + public final void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException { + bindNullable(parameterIndex, x, (Class) type(targetSqlType)); + } + + @Override + public final void setObject(int parameterIndex, Object x) throws SQLException { + bindNullable(parameterIndex, x, Object.class); + } + + @Override + public final void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException { + setObject(parameterIndex, x, targetSqlType); + } + + @Override + public final void setObject(int parameterIndex, Object x, SQLType targetSqlType, int scaleOrLength) throws SQLException { + setObject(parameterIndex, x, defaultIfNull(targetSqlType.getVendorTypeNumber(), Types.OTHER)); + } + + @Override + public final void setObject(int parameterIndex, Object x, SQLType targetSqlType) throws SQLException { + setObject(parameterIndex, x, defaultIfNull(targetSqlType.getVendorTypeNumber(), Types.OTHER)); + } + + @Override + public final void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setRef(int parameterIndex, Ref x) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setBlob(int parameterIndex, Blob x) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setClob(int parameterIndex, Clob x) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setArray(int parameterIndex, Array x) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setURL(int parameterIndex, URL x) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setRowId(int parameterIndex, RowId x) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setNClob(int parameterIndex, NClob value) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setClob(int parameterIndex, Reader reader, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setNClob(int parameterIndex, Reader reader, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setAsciiStream(int parameterIndex, InputStream x) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setCharacterStream(int parameterIndex, Reader reader) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setNCharacterStream(int parameterIndex, Reader value) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setClob(int parameterIndex, Reader reader) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setBlob(int parameterIndex, InputStream inputStream) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + + @Override + public final void setNClob(int parameterIndex, Reader reader) throws SQLException { + throw new SQLFeatureNotSupportedException("The JDBC to R2DBC bridge doesn't support this data type"); + } + } + static final class R2DBCResultSetMetaData extends R2DBCWrapper implements ResultSetMetaData { final Configuration c;