[KYUUBI #4326] [ARROW] Fix Spark session timezone format in arrow-based result format

### _Why are the changes needed?_

1. this PR introduces a new configuration called `kyuubi.operation.result.arrow.timestampAsString`, when true, arrow-based rowsets will convert timestamp-type columns to strings for transmission.

2. `kyuubi.operation.result.arrow.timestampAsString` default setting to false for better transmission performance

3. the PR fixes timezone issue in arrow based result format described in #3958

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4326 from cfmcgrady/arrow-string-ts.

Closes #4326

38c7fc9b [Fu Chen] fix style
d864db00 [Fu Chen] address comment
b714b3ee [Fu Chen] revert externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/RowSet.scala
6c4eb507 [Fu Chen] minor
289b6007 [Fu Chen] timstampAsString = false by default
78b7caba [Fu Chen] fix
f5601356 [Fu Chen] debug info
b8e4b288 [Fu Chen] fix ut
87c6f9ef [Fu Chen] update docs
86f6cb73 [Fu Chen] arrow based rowset timestamp as string

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Fu Chen 2023-02-19 01:10:04 +08:00 committed by Cheng Pan
parent 8fc7adee97
commit 6bd0016fe2
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
11 changed files with 167 additions and 70 deletions

View File

@ -429,22 +429,23 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
### Operation
| Key | Default | Meaning | Type | Since |
|-----------------------------------------|---------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------|
| kyuubi.operation.idle.timeout | PT3H | Operation will be closed when it's not accessed for this duration of time | duration | 1.0.0 |
| kyuubi.operation.interrupt.on.cancel | true | When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished. | boolean | 1.2.0 |
| kyuubi.operation.language | SQL | Choose a programing language for the following inputs<ul><li>SQL: (Default) Run all following statements as SQL queries.</li><li>SCALA: Run all following input as scala codes</li><li>PYTHON: (Experimental) Run all following input as Python codes with Spark engine</li></ul> | string | 1.5.0 |
| kyuubi.operation.log.dir.root | server_operation_logs | Root directory for query operation log at server-side. | string | 1.4.0 |
| kyuubi.operation.plan.only.excludes | ResetCommand,SetCommand,SetNamespaceCommand,UseStatement,SetCatalogAndNamespace | Comma-separated list of query plan names, in the form of simple class names, i.e, for `SET abc=xyz`, the value will be `SetCommand`. For those auxiliary plans, such as `switch databases`, `set properties`, or `create temporary view` etc., which are used for setup evaluating environments for analyzing actual queries, we can use this config to exclude them and let them take effect. See also kyuubi.operation.plan.only.mode. | seq | 1.5.0 |
| kyuubi.operation.plan.only.mode | none | Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, and the Flink engine supports 'parse', 'physical', and 'execution', other engines do not support planOnly currently. | string | 1.4.0 |
| kyuubi.operation.plan.only.output.style | plain | Configures the planOnly output style. The value can be 'plain' or 'json', and the default value is 'plain'. This configuration supports only the output styles of the Spark engine | string | 1.7.0 |
| kyuubi.operation.progress.enabled | false | Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`. | boolean | 1.6.0 |
| kyuubi.operation.query.timeout | &lt;undefined&gt; | Timeout for query executions at server-side, take effect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take full control of whether the query should timeout or not. If set, client-side timeout is capped at this point. To cancel the queries right away without waiting for task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together. | duration | 1.2.0 |
| kyuubi.operation.result.format | thrift | Specify the result format, available configs are: <ul> <li>THRIFT: the result will convert to TRow at the engine driver side. </li> <li>ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.</li></ul> | string | 1.7.0 |
| kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 |
| kyuubi.operation.scheduler.pool | &lt;undefined&gt; | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 |
| kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 |
| kyuubi.operation.status.polling.timeout | PT5S | Timeout(ms) for long polling asynchronous running sql query's status | duration | 1.0.0 |
| Key | Default | Meaning | Type | Since |
|-------------------------------------------------|---------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------|
| kyuubi.operation.idle.timeout | PT3H | Operation will be closed when it's not accessed for this duration of time | duration | 1.0.0 |
| kyuubi.operation.interrupt.on.cancel | true | When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished. | boolean | 1.2.0 |
| kyuubi.operation.language | SQL | Choose a programing language for the following inputs<ul><li>SQL: (Default) Run all following statements as SQL queries.</li><li>SCALA: Run all following input as scala codes</li><li>PYTHON: (Experimental) Run all following input as Python codes with Spark engine</li></ul> | string | 1.5.0 |
| kyuubi.operation.log.dir.root | server_operation_logs | Root directory for query operation log at server-side. | string | 1.4.0 |
| kyuubi.operation.plan.only.excludes | ResetCommand,SetCommand,SetNamespaceCommand,UseStatement,SetCatalogAndNamespace | Comma-separated list of query plan names, in the form of simple class names, i.e, for `SET abc=xyz`, the value will be `SetCommand`. For those auxiliary plans, such as `switch databases`, `set properties`, or `create temporary view` etc., which are used for setup evaluating environments for analyzing actual queries, we can use this config to exclude them and let them take effect. See also kyuubi.operation.plan.only.mode. | seq | 1.5.0 |
| kyuubi.operation.plan.only.mode | none | Configures the statement performed mode, The value can be 'parse', 'analyze', 'optimize', 'optimize_with_stats', 'physical', 'execution', or 'none', when it is 'none', indicate to the statement will be fully executed, otherwise only way without executing the query. different engines currently support different modes, the Spark engine supports all modes, and the Flink engine supports 'parse', 'physical', and 'execution', other engines do not support planOnly currently. | string | 1.4.0 |
| kyuubi.operation.plan.only.output.style | plain | Configures the planOnly output style. The value can be 'plain' or 'json', and the default value is 'plain'. This configuration supports only the output styles of the Spark engine | string | 1.7.0 |
| kyuubi.operation.progress.enabled | false | Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`. | boolean | 1.6.0 |
| kyuubi.operation.query.timeout | &lt;undefined&gt; | Timeout for query executions at server-side, take effect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take full control of whether the query should timeout or not. If set, client-side timeout is capped at this point. To cancel the queries right away without waiting for task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together. | duration | 1.2.0 |
| kyuubi.operation.result.arrow.timestampAsString | false | When true, arrow-based rowsets will convert columns of type timestamp to strings for transmission. | boolean | 1.7.0 |
| kyuubi.operation.result.format | thrift | Specify the result format, available configs are: <ul> <li>THRIFT: the result will convert to TRow at the engine driver side. </li> <li>ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.</li></ul> | string | 1.7.0 |
| kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 |
| kyuubi.operation.scheduler.pool | &lt;undefined&gt; | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 |
| kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 |
| kyuubi.operation.status.polling.timeout | PT5S | Timeout(ms) for long polling asynchronous running sql query's status | duration | 1.0.0 |
### Server

View File

@ -162,17 +162,12 @@ class ExecuteStatement(
}
}
// TODO:(fchen) make this configurable
val kyuubiBeelineConvertToString = true
def convertComplexType(df: DataFrame): DataFrame = {
if (kyuubiBeelineConvertToString) {
SparkDatasetHelper.convertTopLevelComplexTypeToHiveString(df)
} else {
df
}
SparkDatasetHelper.convertTopLevelComplexTypeToHiveString(df, timestampAsString)
}
override def getResultSetMetadataHints(): Seq[String] =
Seq(s"__kyuubi_operation_result_format__=$resultFormat")
Seq(
s"__kyuubi_operation_result_format__=$resultFormat",
s"__kyuubi_operation_result_arrow_timestampAsString__=$timestampAsString")
}

View File

@ -24,7 +24,7 @@ import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressU
import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener}
import org.apache.spark.kyuubi.SparkUtilsHelper.redact
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.{KyuubiSQLException, Utils}
@ -136,7 +136,7 @@ abstract class SparkOperation(session: Session)
spark.sparkContext.setLocalProperty
protected def withLocalProperties[T](f: => T): T = {
SQLConf.withExistingConf(spark.sessionState.conf) {
SQLExecution.withSQLConfPropagated(spark) {
val originalSession = SparkSession.getActiveSession
try {
SparkSession.setActiveSession(spark)
@ -279,6 +279,10 @@ abstract class SparkOperation(session: Session)
spark.conf.get("kyuubi.operation.result.format", "thrift")
}
protected def timestampAsString: Boolean = {
spark.conf.get("kyuubi.operation.result.arrow.timestampAsString", "false").toBoolean
}
protected def setSessionUserSign(): Unit = {
(
session.conf.get(KYUUBI_SESSION_SIGN_PUBLICKEY),

View File

@ -17,8 +17,6 @@
package org.apache.spark.sql.kyuubi
import java.time.ZoneId
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
@ -31,12 +29,13 @@ object SparkDatasetHelper {
ds.toArrowBatchRdd
}
def convertTopLevelComplexTypeToHiveString(df: DataFrame): DataFrame = {
val timeZone = ZoneId.of(df.sparkSession.sessionState.conf.sessionLocalTimeZone)
def convertTopLevelComplexTypeToHiveString(
df: DataFrame,
timestampAsString: Boolean): DataFrame = {
val quotedCol = (name: String) => col(quoteIfNeeded(name))
// an udf to call `RowSet.toHiveString` on complex types(struct/array/map).
// an udf to call `RowSet.toHiveString` on complex types(struct/array/map) and timestamp type.
val toHiveStringUDF = udf[String, Row, String]((row, schemaDDL) => {
val dt = DataType.fromDDL(schemaDDL)
dt match {
@ -46,6 +45,8 @@ object SparkDatasetHelper {
RowSet.toHiveString((row.toSeq.head, at), nested = true)
case StructType(Array(StructField(_, mt: MapType, _, _))) =>
RowSet.toHiveString((row.toSeq.head, mt), nested = true)
case StructType(Array(StructField(_, tt: TimestampType, _, _))) =>
RowSet.toHiveString((row.toSeq.head, tt), nested = true)
case _ =>
throw new UnsupportedOperationException
}
@ -56,6 +57,8 @@ object SparkDatasetHelper {
toHiveStringUDF(quotedCol(name), lit(sf.toDDL)).as(name)
case sf @ StructField(name, _: MapType | _: ArrayType, _, _) =>
toHiveStringUDF(struct(quotedCol(name)), lit(sf.toDDL)).as(name)
case sf @ StructField(name, _: TimestampType, _, _) if timestampAsString =>
toHiveStringUDF(struct(quotedCol(name)), lit(sf.toDDL)).as(name)
case StructField(name, _, _, _) => quotedCol(name)
}
df.select(cols: _*)

View File

@ -35,6 +35,13 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
override def resultFormat: String = "arrow"
override def beforeEach(): Unit = {
super.beforeEach()
withJdbcStatement() { statement =>
checkResultSetFormat(statement, "arrow")
}
}
test("detect resultSet format") {
withJdbcStatement() { statement =>
checkResultSetFormat(statement, "arrow")
@ -43,7 +50,42 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
}
}
def checkResultSetFormat(statement: Statement, expectFormat: String): Unit = {
test("Spark session timezone format") {
withJdbcStatement() { statement =>
def check(expect: String): Unit = {
val query =
"""
|SELECT
| from_utc_timestamp(
| from_unixtime(
| 1670404535000 / 1000, 'yyyy-MM-dd HH:mm:ss'
| ),
| 'GMT+08:00'
| )
|""".stripMargin
val resultSet = statement.executeQuery(query)
assert(resultSet.next())
assert(resultSet.getString(1) == expect)
}
def setTimeZone(timeZone: String): Unit = {
val rs = statement.executeQuery(s"set spark.sql.session.timeZone=$timeZone")
assert(rs.next())
}
Seq("true", "false").foreach { timestampAsString =>
statement.executeQuery(
s"set ${KyuubiConf.ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING.key}=$timestampAsString")
checkArrowBasedRowSetTimestampAsString(statement, timestampAsString)
setTimeZone("UTC")
check("2022-12-07 17:15:35.0")
setTimeZone("GMT+8")
check("2022-12-08 01:15:35.0")
}
}
}
private def checkResultSetFormat(statement: Statement, expectFormat: String): Unit = {
val query =
s"""
|SELECT '$${hivevar:${KyuubiConf.OPERATION_RESULT_FORMAT.key}}' AS col
@ -52,4 +94,16 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
assert(resultSet.next())
assert(resultSet.getString("col") === expectFormat)
}
private def checkArrowBasedRowSetTimestampAsString(
statement: Statement,
expect: String): Unit = {
val query =
s"""
|SELECT '$${hivevar:${KyuubiConf.ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING.key}}' AS col
|""".stripMargin
val resultSet = statement.executeQuery(query)
assert(resultSet.next())
assert(resultSet.getString("col") === expect)
}
}

View File

@ -1676,6 +1676,14 @@ object KyuubiConf {
.transform(_.toLowerCase(Locale.ROOT))
.createWithDefault("thrift")
val ARROW_BASED_ROWSET_TIMESTAMP_AS_STRING: ConfigEntry[Boolean] =
buildConf("kyuubi.operation.result.arrow.timestampAsString")
.doc("When true, arrow-based rowsets will convert columns of type timestamp to strings for" +
" transmission.")
.version("1.7.0")
.booleanConf
.createWithDefault(false)
val SERVER_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] =
buildConf("kyuubi.operation.log.dir.root")
.doc("Root directory for query operation log at server-side.")

View File

@ -20,7 +20,7 @@ package org.apache.kyuubi.jdbc.hive;
public class JdbcColumnAttributes {
public int precision = 0;
public int scale = 0;
public String timeZone = "";
public String timeZone = null;
public JdbcColumnAttributes() {}

View File

@ -50,6 +50,7 @@ public abstract class KyuubiArrowBasedResultSet implements SQLResultSet {
protected Schema arrowSchema;
protected VectorSchemaRoot root;
protected ArrowColumnarBatchRow row;
protected boolean timestampAsString = true;
protected BufferAllocator allocator;
@ -312,11 +313,18 @@ public abstract class KyuubiArrowBasedResultSet implements SQLResultSet {
if (wasNull) {
return null;
} else {
return row.get(columnIndex - 1, columnType);
JdbcColumnAttributes attributes = columnAttributes.get(columnIndex - 1);
return row.get(
columnIndex - 1,
columnType,
attributes == null ? null : attributes.timeZone,
timestampAsString);
}
} catch (Exception e) {
e.printStackTrace();
throw new KyuubiSQLException("Unrecognized column type:", e);
throw new KyuubiSQLException(
String.format(
"Error getting row of type %s at column index %d", columnType, columnIndex - 1),
e);
}
}

View File

@ -58,9 +58,6 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet {
private boolean isScrollable = false;
private boolean fetchFirst = false;
// TODO:(fchen) make this configurable
protected boolean convertComplexTypeToString = true;
private final TProtocolVersion protocol;
public static class Builder {
@ -87,6 +84,8 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet {
private boolean isScrollable = false;
private ReentrantLock transportLock = null;
private boolean timestampAsString = true;
public Builder(Statement statement) throws SQLException {
this.statement = statement;
this.connection = statement.getConnection();
@ -153,6 +152,11 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet {
return this;
}
public Builder setTimestampAsString(boolean timestampAsString) {
this.timestampAsString = timestampAsString;
return this;
}
public Builder setTransportLock(ReentrantLock transportLock) {
this.transportLock = transportLock;
return this;
@ -189,10 +193,10 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet {
this.maxRows = builder.maxRows;
}
this.isScrollable = builder.isScrollable;
this.timestampAsString = builder.timestampAsString;
this.protocol = builder.getProtocolVersion();
arrowSchema =
ArrowUtils.toArrowSchema(
columnNames, convertComplexTypeToStringType(columnTypes), columnAttributes);
ArrowUtils.toArrowSchema(columnNames, convertToStringType(columnTypes), columnAttributes);
if (allocator == null) {
initArrowSchemaAndAllocator();
}
@ -271,8 +275,7 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet {
columnAttributes.add(getColumnAttributes(primitiveTypeEntry));
}
arrowSchema =
ArrowUtils.toArrowSchema(
columnNames, convertComplexTypeToStringType(columnTypes), columnAttributes);
ArrowUtils.toArrowSchema(columnNames, convertToStringType(columnTypes), columnAttributes);
} catch (SQLException eS) {
throw eS; // rethrow the SQLException as is
} catch (Exception ex) {
@ -480,22 +483,25 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet {
return isClosed;
}
private List<TTypeId> convertComplexTypeToStringType(List<TTypeId> colTypes) {
if (convertComplexTypeToString) {
return colTypes.stream()
.map(
type -> {
if (type == TTypeId.ARRAY_TYPE
|| type == TTypeId.MAP_TYPE
|| type == TTypeId.STRUCT_TYPE) {
return TTypeId.STRING_TYPE;
} else {
return type;
}
})
.collect(Collectors.toList());
} else {
return colTypes;
}
/**
* 1. the complex types (map/array/struct) are always converted to string type to transport 2. if
* the user set `timestampAsString = true`, then the timestamp type will be converted to string
* type too.
*/
private List<TTypeId> convertToStringType(List<TTypeId> colTypes) {
return colTypes.stream()
.map(
type -> {
if ((type == TTypeId.ARRAY_TYPE
|| type == TTypeId.MAP_TYPE
|| type == TTypeId.STRUCT_TYPE) // complex type (map/array/struct)
// timestamp type
|| (type == TTypeId.TIMESTAMP_TYPE && timestampAsString)) {
return TTypeId.STRING_TYPE;
} else {
return type;
}
})
.collect(Collectors.toList());
}
}

View File

@ -37,6 +37,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
public static final Logger LOG = LoggerFactory.getLogger(KyuubiStatement.class.getName());
public static final int DEFAULT_FETCH_SIZE = 1000;
public static final String DEFAULT_RESULT_FORMAT = "thrift";
public static final String DEFAULT_ARROW_TIMESTAMP_AS_STRING = "false";
private final KyuubiConnection connection;
private TCLIService.Iface client;
private TOperationHandle stmtHandle = null;
@ -45,7 +46,8 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
private int fetchSize = DEFAULT_FETCH_SIZE;
private boolean isScrollableResultset = false;
private boolean isOperationComplete = false;
private Map<String, String> properties = new HashMap<>();
private Map<String, String> properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
/**
* We need to keep a reference to the result set to support the following: <code>
* statement.execute(String sql);
@ -213,6 +215,11 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
LOG.info("kyuubi.operation.result.format: " + resultFormat);
switch (resultFormat) {
case "arrow":
boolean timestampAsString =
Boolean.parseBoolean(
properties.getOrDefault(
"__kyuubi_operation_result_arrow_timestampAsString__",
DEFAULT_ARROW_TIMESTAMP_AS_STRING));
resultSet =
new KyuubiArrowQueryResultSet.Builder(this)
.setClient(client)
@ -222,6 +229,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
.setFetchSize(fetchSize)
.setScrollable(isScrollableResultset)
.setSchema(columnNames, columnTypes, columnAttributes)
.setTimestampAsString(timestampAsString)
.build();
break;
default:
@ -270,6 +278,11 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
LOG.info("kyuubi.operation.result.format: " + resultFormat);
switch (resultFormat) {
case "arrow":
boolean timestampAsString =
Boolean.parseBoolean(
properties.getOrDefault(
"__kyuubi_operation_result_arrow_timestampAsString__",
DEFAULT_ARROW_TIMESTAMP_AS_STRING));
resultSet =
new KyuubiArrowQueryResultSet.Builder(this)
.setClient(client)
@ -279,6 +292,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
.setFetchSize(fetchSize)
.setScrollable(isScrollableResultset)
.setSchema(columnNames, columnTypes, columnAttributes)
.setTimestampAsString(timestampAsString)
.build();
break;
default:

View File

@ -19,6 +19,8 @@ package org.apache.kyuubi.jdbc.hive.arrow;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import org.apache.arrow.vector.util.DateUtility;
import org.apache.hive.service.rpc.thrift.TTypeId;
import org.apache.kyuubi.jdbc.hive.common.DateUtils;
import org.apache.kyuubi.jdbc.hive.common.HiveIntervalDayTime;
@ -104,7 +106,7 @@ public class ArrowColumnarBatchRow {
throw new UnsupportedOperationException();
}
public Object get(int ordinal, TTypeId dataType) {
public Object get(int ordinal, TTypeId dataType, String timeZone, boolean timestampAsString) {
long seconds;
long milliseconds;
long microseconds;
@ -131,17 +133,19 @@ public class ArrowColumnarBatchRow {
case STRING_TYPE:
return getString(ordinal);
case TIMESTAMP_TYPE:
microseconds = getLong(ordinal);
nanos = (int) (microseconds % 1000000) * 1000;
Timestamp timestamp = new Timestamp(microseconds / 1000);
timestamp.setNanos(nanos);
return timestamp;
if (timestampAsString) {
return Timestamp.valueOf(getString(ordinal));
} else {
LocalDateTime localDateTime =
DateUtility.getLocalDateTimeFromEpochMicro(getLong(ordinal), timeZone);
return Timestamp.valueOf(localDateTime);
}
case DATE_TYPE:
return DateUtils.internalToDate(getInt(ordinal));
case INTERVAL_DAY_TIME_TYPE:
microseconds = getLong(ordinal);
seconds = microseconds / 1000000;
nanos = (int) (microseconds % 1000000) * 1000;
seconds = microseconds / 1_000_000;
nanos = (int) (microseconds % 1_000_000) * 1_000;
return new HiveIntervalDayTime(seconds, nanos);
case INTERVAL_YEAR_MONTH_TYPE:
return new HiveIntervalYearMonth(getInt(ordinal));