[KYUUBI #3915] Client support detecting ResultSet codec

### _Why are the changes needed?_

to close #3915

This pr adds support for jdbc client detecting result set codec

1. in this PR, hints are added in the `TStatus.getInfoMessages()` to return, and the hints were added when the client retrieves the result set schema from the server
2. the hints mechanism is a general extension when we need to change the client behavior, e.g. adding support for  result set compression

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3916 from cfmcgrady/arrow-detect-codec.

Closes #3915

90495c30 [Fu Chen] style
bbeada0a [Fu Chen] address comment
825bc0da [Fu Chen] minor refactor
d0a01ff7 [Fu Chen] address comment
08d21a1c [Fu Chen] fix ut
690126ce [Fu Chen] add hint ut
fd32a317 [Fu Chen] style
a1c2bb6c [Fu Chen] simplify KyuubiConnection
f81336d3 [Fu Chen] refactor
500e766f [Fu Chen] unused import
221bc928 [Fu Chen] fix ut
cf564d0a [Fu Chen] refactor
4b895e45 [Fu Chen] fix compile
3efcc335 [Fu Chen] clean up
95ea29c3 [Fu Chen] Client support detecting ResultSet codec

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Fu Chen 2022-12-13 17:05:47 +08:00 committed by Cheng Pan
parent b2831d7656
commit 827ae40bf5
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
27 changed files with 242 additions and 81 deletions

View File

@ -23,7 +23,7 @@ import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet, TTableSchema}
import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.engine.flink.result.ResultSet
@ -74,12 +74,15 @@ abstract class FlinkOperation(session: Session) extends AbstractOperation(sessio
}
}
override def getResultSetSchema: TTableSchema = {
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val tTableSchema = new TTableSchema()
resultSet.getColumns.asScala.zipWithIndex.foreach { case (f, i) =>
tTableSchema.addToColumns(RowSet.toTColumnDesc(f, i))
}
tTableSchema
val resp = new TGetResultSetMetadataResp
resp.setSchema(tTableSchema)
resp.setStatus(OK_STATUS)
resp
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {

View File

@ -21,7 +21,7 @@ import java.util.concurrent.Future
import org.apache.hive.service.cli.operation.{Operation, OperationManager}
import org.apache.hive.service.cli.session.{HiveSession, SessionManager => HiveSessionManager}
import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.hive.session.HiveSessionImpl
@ -84,8 +84,12 @@ abstract class HiveOperation(session: Session) extends AbstractOperation(session
Option(status.getOperationException).map(KyuubiSQLException(_)))
}
override def getResultSetSchema: TTableSchema = {
internalHiveOperation.getResultSetSchema.toTTableSchema
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val schema = internalHiveOperation.getResultSetSchema.toTTableSchema
val resp = new TGetResultSetMetadataResp
resp.setSchema(schema)
resp.setStatus(OK_STATUS)
resp
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {

View File

@ -16,7 +16,7 @@
*/
package org.apache.kyuubi.engine.jdbc.operation
import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
@ -100,9 +100,13 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session
getProtocolVersion)
}
override def getResultSetSchema: TTableSchema = {
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val schemaHelper = dialect.getSchemaHelper()
schemaHelper.toTTTableSchema(schema.columns)
val tTableSchema = schemaHelper.toTTTableSchema(schema.columns)
val resp = new TGetResultSetMetadataResp
resp.setSchema(tTableSchema)
resp.setStatus(OK_STATUS)
resp
}
override def shouldRunAsync: Boolean = false

View File

@ -225,4 +225,7 @@ class ExecuteStatement(
df
}
}
override def getResultSetMetadataHints(): Seq[String] =
Seq(s"__kyuubi_operation_result_codec__=$resultCodec")
}

View File

@ -20,7 +20,7 @@ package org.apache.kyuubi.engine.spark.operation
import java.io.IOException
import java.time.ZoneId
import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
import org.apache.spark.kyuubi.SparkUtilsHelper.redact
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.StructType
@ -37,7 +37,7 @@ import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationS
import org.apache.kyuubi.operation.FetchOrientation._
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.{AbstractSession, Session}
import org.apache.kyuubi.session.Session
abstract class SparkOperation(session: Session)
extends AbstractOperation(session) {
@ -160,8 +160,15 @@ abstract class SparkOperation(session: Session)
}
}
override def getResultSetSchema: TTableSchema =
SchemaHelper.toTTableSchema(resultSchema, timeZone.toString)
def getResultSetMetadataHints(): Seq[String] = Seq.empty
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val resp = new TGetResultSetMetadataResp
val schema = SchemaHelper.toTTableSchema(resultSchema, timeZone.toString)
resp.setSchema(schema)
resp.setStatus(okStatusWithHints(getResultSetMetadataHints()))
resp
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet =
withLocalProperties {
@ -200,14 +207,16 @@ abstract class SparkOperation(session: Session)
override def shouldRunAsync: Boolean = false
protected def arrowEnabled(): Boolean = {
// normalized config is required, to pass unit test
session.asInstanceOf[AbstractSession].normalizedConf
.getOrElse("kyuubi.operation.result.codec", "simple")
.equalsIgnoreCase("arrow") &&
resultCodec().equalsIgnoreCase("arrow") &&
// TODO: (fchen) make all operation support arrow
getClass.getCanonicalName == classOf[ExecuteStatement].getCanonicalName
}
protected def resultCodec(): String = {
// TODO: respect the config of the operation ExecuteStatement, if it was set.
spark.conf.get("kyuubi.operation.result.codec", "simple")
}
protected def setSessionUserSign(): Unit = {
(
session.conf.get(KYUUBI_SESSION_SIGN_PUBLICKEY),

View File

@ -17,6 +17,8 @@
package org.apache.kyuubi.engine.spark.operation
import java.sql.Statement
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.operation.SparkDataTypeTests
@ -33,15 +35,21 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
override def resultCodec: String = "arrow"
test("make sure kyuubi.operation.result.codec=arrow") {
test("detect resultSet codec") {
withJdbcStatement() { statement =>
val query =
s"""
|SELECT '$${hivevar:${KyuubiConf.OPERATION_RESULT_CODEC.key}}' AS col
|""".stripMargin
val resultSet = statement.executeQuery(query)
assert(resultSet.next())
assert(resultSet.getString("col") === "arrow")
checkResultSetCodec(statement, "arrow")
statement.executeQuery(s"set ${KyuubiConf.OPERATION_RESULT_CODEC.key}=simple")
checkResultSetCodec(statement, "simple")
}
}
def checkResultSetCodec(statement: Statement, expectCodec: String): Unit = {
val query =
s"""
|SELECT '$${hivevar:${KyuubiConf.OPERATION_RESULT_CODEC.key}}' AS col
|""".stripMargin
val resultSet = statement.executeQuery(query)
assert(resultSet.next())
assert(resultSet.getString("col") === expectCodec)
}
}

View File

@ -21,8 +21,7 @@ import java.io.IOException
import io.trino.client.Column
import io.trino.client.StatementClient
import org.apache.hive.service.rpc.thrift.TRowSet
import org.apache.hive.service.rpc.thrift.TTableSchema
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.Utils
@ -47,7 +46,13 @@ abstract class TrinoOperation(session: Session) extends AbstractOperation(sessio
protected var iter: FetchIterator[List[Any]] = _
override def getResultSetSchema: TTableSchema = SchemaHelper.toTTableSchema(schema)
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val tTableSchema = SchemaHelper.toTTableSchema(schema)
val resp = new TGetResultSetMetadataResp
resp.setSchema(tTableSchema)
resp.setStatus(OK_STATUS)
resp
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
validateDefaultFetchOrientation(order)

View File

@ -19,8 +19,10 @@ package org.apache.kyuubi.operation
import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
import scala.collection.JavaConverters._
import org.apache.commons.lang3.StringUtils
import org.apache.hive.service.rpc.thrift.{TProgressUpdateResp, TProtocolVersion, TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TProgressUpdateResp, TProtocolVersion, TRowSet, TStatus, TStatusCode}
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf.OPERATION_IDLE_TIMEOUT
@ -173,7 +175,7 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
protected def getProtocolVersion: TProtocolVersion = session.protocol
override def getResultSetSchema: TTableSchema
override def getResultSetMetadata: TGetResultSetMetadataResp
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet
@ -227,4 +229,12 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
lastAccessTime + operationTimeout <= System.currentTimeMillis()
}
}
final val OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS)
def okStatusWithHints(hints: Seq[String]): TStatus = {
val ok = new TStatus(TStatusCode.SUCCESS_STATUS)
ok.setInfoMessages(hints.asJava)
ok
}
}

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.operation
import java.util.concurrent.Future
import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.log.OperationLog
@ -31,7 +31,7 @@ trait Operation {
def cancel(): Unit
def close(): Unit
def getResultSetSchema: TTableSchema
def getResultSetMetadata: TGetResultSetMetadataResp
def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet
def getSession: Session

View File

@ -126,8 +126,8 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
operation.close()
}
final def getOperationResultSetSchema(opHandle: OperationHandle): TTableSchema = {
getOperation(opHandle).getResultSetSchema
final def getOperationResultSetSchema(opHandle: OperationHandle): TGetResultSetMetadataResp = {
getOperation(opHandle).getResultSetMetadata
}
final def getOperationNextRowSet(

View File

@ -21,7 +21,7 @@ import java.util.concurrent.{ExecutionException, TimeoutException, TimeUnit}
import scala.concurrent.CancellationException
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion, TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TGetResultSetMetadataResp, TProtocolVersion, TRowSet}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.{OperationHandle, OperationStatus}
@ -188,7 +188,7 @@ abstract class AbstractBackendService(name: String)
.getOperation(operationHandle).getSession.closeOperation(operationHandle)
}
override def getResultSetMetadata(operationHandle: OperationHandle): TTableSchema = {
override def getResultSetMetadata(operationHandle: OperationHandle): TGetResultSetMetadataResp = {
sessionManager.operationManager
.getOperation(operationHandle).getSession.getResultSetMetadata(operationHandle)
}

View File

@ -94,7 +94,7 @@ trait BackendService {
def getOperationStatus(operationHandle: OperationHandle): OperationStatus
def cancelOperation(operationHandle: OperationHandle): Unit
def closeOperation(operationHandle: OperationHandle): Unit
def getResultSetMetadata(operationHandle: OperationHandle): TTableSchema
def getResultSetMetadata(operationHandle: OperationHandle): TGetResultSetMetadataResp
def fetchResults(
operationHandle: OperationHandle,
orientation: FetchOrientation,

View File

@ -507,17 +507,15 @@ abstract class TFrontendService(name: String)
override def GetResultSetMetadata(req: TGetResultSetMetadataReq): TGetResultSetMetadataResp = {
debug(req.toString)
val resp = new TGetResultSetMetadataResp
try {
val schema = be.getResultSetMetadata(OperationHandle(req.getOperationHandle))
resp.setSchema(schema)
resp.setStatus(OK_STATUS)
be.getResultSetMetadata(OperationHandle(req.getOperationHandle))
} catch {
case e: Exception =>
error("Error getting result set metadata: ", e)
val resp = new TGetResultSetMetadataResp
resp.setStatus(KyuubiSQLException.toTStatus(e))
resp
}
resp
}
override def FetchResults(req: TFetchResultsReq): TFetchResultsResp = {
@ -647,4 +645,10 @@ private[kyuubi] object TFrontendService {
def getSessionHandle: SessionHandle = sessionHandle
}
def okStatusWithHint(hint: Seq[String]): TStatus = {
val ok = new TStatus(TStatusCode.SUCCESS_STATUS)
ok.setInfoMessages(hint.asJava)
ok
}
}

View File

@ -225,7 +225,7 @@ abstract class AbstractSession(
}
override def getResultSetMetadata(
operationHandle: OperationHandle): TTableSchema = withAcquireRelease() {
operationHandle: OperationHandle): TGetResultSetMetadataResp = withAcquireRelease() {
sessionManager.operationManager.getOperationResultSetSchema(operationHandle)
}

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.session
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion, TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TGetResultSetMetadataResp, TProtocolVersion, TRowSet}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationHandle
@ -86,7 +86,7 @@ trait Session {
def cancelOperation(operationHandle: OperationHandle): Unit
def closeOperation(operationHandle: OperationHandle): Unit
def getResultSetMetadata(operationHandle: OperationHandle): TTableSchema
def getResultSetMetadata(operationHandle: OperationHandle): TGetResultSetMetadataResp
def fetchResults(
operationHandle: OperationHandle,
orientation: FetchOrientation,

View File

@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TPrimitiveTypeEntry, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@ -60,7 +60,7 @@ class NoopOperation(session: Session, shouldFail: Boolean = false)
setState(OperationState.CLOSED)
}
override def getResultSetSchema: TTableSchema = {
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val tColumnDesc = new TColumnDesc()
tColumnDesc.setColumnName("noop")
val desc = new TTypeDesc
@ -70,7 +70,10 @@ class NoopOperation(session: Session, shouldFail: Boolean = false)
tColumnDesc.setPosition(0)
val schema = new TTableSchema()
schema.addToColumns(tColumnDesc)
schema
val resp = new TGetResultSetMetadataResp
resp.setSchema(schema)
resp.setStatus(OK_STATUS)
resp
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {

View File

@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TOpenSessionReq, TStatusCode}
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TGetResultSetMetadataReq, TOpenSessionReq, TStatusCode}
import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
import org.apache.kyuubi.config.KyuubiConf
@ -409,4 +409,40 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
}
}
}
test("operation metadata hint - __kyuubi_operation_result_codec__") {
assume(!httpMode)
withSessionHandle { (client, handle) =>
def checkStatusAndResultSetCodecHint(
sql: String,
expectedCodec: String): Unit = {
val stmtReq = new TExecuteStatementReq()
stmtReq.setSessionHandle(handle)
stmtReq.setStatement(sql)
val tExecuteStatementResp = client.ExecuteStatement(stmtReq)
val opHandle = tExecuteStatementResp.getOperationHandle
waitForOperationToComplete(client, opHandle)
val metaReq = new TGetResultSetMetadataReq(opHandle)
val resp = client.GetResultSetMetadata(metaReq)
assert(resp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS)
val expectedResultSetCodecHint = s"__kyuubi_operation_result_codec__=$expectedCodec"
assert(resp.getStatus.getInfoMessages.contains(expectedResultSetCodecHint))
}
checkStatusAndResultSetCodecHint(
sql = "SELECT 1",
expectedCodec = "simple")
checkStatusAndResultSetCodecHint(
sql = "set kyuubi.operation.result.codec=arrow",
expectedCodec = "arrow")
checkStatusAndResultSetCodecHint(
sql = "SELECT 1",
expectedCodec = "arrow")
checkStatusAndResultSetCodecHint(
sql = "set kyuubi.operation.result.codec=simple",
expectedCodec = "simple")
checkStatusAndResultSetCodecHint(
sql = "set kyuubi.operation.result.codec",
expectedCodec = "simple")
}
}
}

View File

@ -190,6 +190,9 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet {
}
this.isScrollable = builder.isScrollable;
this.protocol = builder.getProtocolVersion();
arrowSchema =
ArrowUtils.toArrowSchema(
columnNames, convertComplexTypeToStringType(columnTypes), columnAttributes);
if (allocator == null) {
initArrowSchemaAndAllocator();
}
@ -201,7 +204,7 @@ public class KyuubiArrowQueryResultSet extends KyuubiArrowBasedResultSet {
* @param primitiveTypeEntry primitive type
* @return generated ColumnAttributes, or null
*/
private static JdbcColumnAttributes getColumnAttributes(TPrimitiveTypeEntry primitiveTypeEntry) {
public static JdbcColumnAttributes getColumnAttributes(TPrimitiveTypeEntry primitiveTypeEntry) {
JdbcColumnAttributes ret = null;
if (primitiveTypeEntry.isSetTypeQualifiers()) {
TTypeQualifiers tq = primitiveTypeEntry.getTypeQualifiers();

View File

@ -111,7 +111,6 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable {
private String engineUrl = "";
private boolean isBeeLineMode;
private String resultCodec = "simple";
/** Get all direct HiveServer2 URLs from a ZooKeeper based HiveServer2 URL */
public static List<JdbcConnectionParams> getAllUrls(String zookeeperBasedHS2Url)
@ -153,19 +152,6 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable {
}
port = connParams.getPort();
resultCodec =
connParams
.getSessionVars()
.getOrDefault(
"kyuubi.operation.result.codec",
connParams
.getHiveVars()
.getOrDefault(
"kyuubi.operation.result.codec",
connParams
.getHiveConfs()
.getOrDefault("kyuubi.operation.result.codec", "simple")));
setupTimeout();
if (sessConfMap.containsKey(FETCH_SIZE)) {
@ -1386,8 +1372,4 @@ public class KyuubiConnection implements SQLConnection, KyuubiLoggable {
public String getEngineUrl() {
return engineUrl;
}
String getResultCodec() {
return resultCodec;
}
}

View File

@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
public static final Logger LOG = LoggerFactory.getLogger(KyuubiStatement.class.getName());
public static final int DEFAULT_FETCH_SIZE = 1000;
public static final String DEFAULT_RESULT_CODEC = "simple";
private final KyuubiConnection connection;
private TCLIService.Iface client;
private TOperationHandle stmtHandle = null;
@ -44,6 +45,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
private int fetchSize = DEFAULT_FETCH_SIZE;
private boolean isScrollableResultset = false;
private boolean isOperationComplete = false;
private Map<String, String> properties = new HashMap<>();
/**
* We need to keep a reference to the result set to support the following: <code>
* statement.execute(String sql);
@ -200,7 +202,14 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
return false;
}
String resultCodec = connection.getResultCodec().toLowerCase(Locale.ROOT);
TGetResultSetMetadataResp metadata = getResultSetMetadata();
List<String> columnNames = new ArrayList<>();
List<TTypeId> columnTypes = new ArrayList<>();
List<JdbcColumnAttributes> columnAttributes = new ArrayList<>();
parseMetadata(metadata, columnNames, columnTypes, columnAttributes);
String resultCodec =
properties.getOrDefault("__kyuubi_operation_result_codec__", DEFAULT_RESULT_CODEC);
LOG.info("kyuubi.operation.result.codec: " + resultCodec);
switch (resultCodec) {
case "arrow":
@ -212,6 +221,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
.setMaxRows(maxRows)
.setFetchSize(fetchSize)
.setScrollable(isScrollableResultset)
.setSchema(columnNames, columnTypes, columnAttributes)
.build();
break;
default:
@ -223,6 +233,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
.setMaxRows(maxRows)
.setFetchSize(fetchSize)
.setScrollable(isScrollableResultset)
.setSchema(columnNames, columnTypes, columnAttributes)
.build();
}
return true;
@ -248,7 +259,14 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
if (!status.isHasResultSet()) {
return false;
}
String resultCodec = connection.getResultCodec().toLowerCase(Locale.ROOT);
TGetResultSetMetadataResp metadata = getResultSetMetadata();
List<String> columnNames = new ArrayList<>();
List<TTypeId> columnTypes = new ArrayList<>();
List<JdbcColumnAttributes> columnAttributes = new ArrayList<>();
parseMetadata(metadata, columnNames, columnTypes, columnAttributes);
String resultCodec =
properties.getOrDefault("__kyuubi_operation_result_codec__", DEFAULT_RESULT_CODEC);
LOG.info("kyuubi.operation.result.codec: " + resultCodec);
switch (resultCodec) {
case "arrow":
@ -260,6 +278,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
.setMaxRows(maxRows)
.setFetchSize(fetchSize)
.setScrollable(isScrollableResultset)
.setSchema(columnNames, columnTypes, columnAttributes)
.build();
default:
resultSet =
@ -270,6 +289,7 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
.setMaxRows(maxRows)
.setFetchSize(fetchSize)
.setScrollable(isScrollableResultset)
.setSchema(columnNames, columnTypes, columnAttributes)
.build();
}
return true;
@ -743,4 +763,57 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
public void setInPlaceUpdateStream(InPlaceUpdateStream stream) {
this.inPlaceUpdateStream = stream;
}
private TGetResultSetMetadataResp getResultSetMetadata() throws SQLException {
try {
TGetResultSetMetadataReq metadataReq = new TGetResultSetMetadataReq(stmtHandle);
// TODO need session handle
TGetResultSetMetadataResp metadataResp;
metadataResp = client.GetResultSetMetadata(metadataReq);
Utils.verifySuccess(metadataResp.getStatus());
return metadataResp;
} catch (SQLException eS) {
throw eS; // rethrow the SQLException as is
} catch (Exception ex) {
throw new KyuubiSQLException("Could not create ResultSet: " + ex.getMessage(), ex);
}
}
private void parseMetadata(
TGetResultSetMetadataResp metadataResp,
List<String> columnNames,
List<TTypeId> columnTypes,
List<JdbcColumnAttributes> columnAttributes)
throws KyuubiSQLException {
TTableSchema schema = metadataResp.getSchema();
if (schema == null || !schema.isSetColumns()) {
throw new KyuubiSQLException("the result set schema is null");
}
// parse kyuubi hint
List<String> infoMessages = metadataResp.getStatus().getInfoMessages();
if (infoMessages != null) {
metadataResp.getStatus().getInfoMessages().stream()
.filter(hint -> Utils.isKyuubiOperationHint(hint))
.forEach(
line -> {
String[] keyValue = line.toLowerCase(Locale.ROOT).split("=");
assert keyValue.length == 2 : "Illegal Kyuubi operation hint found!";
String key = keyValue[0];
String value = keyValue[1];
properties.put(key, value);
});
}
// parse metadata
List<TColumnDesc> columns = schema.getColumns();
for (int pos = 0; pos < schema.getColumnsSize(); pos++) {
String columnName = columns.get(pos).getColumnName();
columnNames.add(columnName);
TPrimitiveTypeEntry primitiveTypeEntry =
columns.get(pos).getTypeDesc().getTypes().get(0).getPrimitiveEntry();
columnTypes.add(primitiveTypeEntry.getType());
columnAttributes.add(KyuubiArrowQueryResultSet.getColumnAttributes(primitiveTypeEntry));
}
}
}

View File

@ -56,6 +56,9 @@ public class Utils {
public static final String HIVE_SERVER2_RETRY_TRUE = "true";
public static final String HIVE_SERVER2_RETRY_FALSE = "false";
public static final Pattern KYUUBI_OPERATION_HINT_PATTERN =
Pattern.compile("^__kyuubi_operation_result_(.*)__=(.*)", Pattern.CASE_INSENSITIVE);
static String getMatchedUrlPrefix(String uri) throws JdbcUriParseException {
for (String urlPrefix : URL_PREFIX_LIST) {
if (uri.startsWith(urlPrefix)) {
@ -470,4 +473,8 @@ public class Utils {
return hostName;
}
}
public static boolean isKyuubiOperationHint(String hint) {
return KYUUBI_OPERATION_HINT_PATTERN.matcher(hint).matches();
}
}

View File

@ -384,11 +384,11 @@ class KyuubiSyncThriftClient private (
}
}
def getResultSetMetadata(operationHandle: TOperationHandle): TTableSchema = {
def getResultSetMetadata(operationHandle: TOperationHandle): TGetResultSetMetadataResp = {
val req = new TGetResultSetMetadataReq(operationHandle)
val resp = withLockAcquiredAsyncRequest(GetResultSetMetadata(req))
ThriftUtils.verifyTStatus(resp.getStatus)
resp.getSchema
resp
}
def fetchResults(

View File

@ -22,7 +22,7 @@ import java.util.{ArrayList => JArrayList}
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TPrimitiveTypeEntry, TRow, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TGetResultSetMetadataResp, TPrimitiveTypeEntry, TRow, TRowSet, TStringColumn, TTableSchema, TTypeDesc, TTypeEntry, TTypeId}
import org.apache.kyuubi.engine.ApplicationInfo
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
@ -33,7 +33,7 @@ abstract class KyuubiApplicationOperation(session: Session) extends KyuubiOperat
private[kyuubi] def currentApplicationInfo: Option[ApplicationInfo]
override def getResultSetSchema: TTableSchema = {
override def getResultSetMetadata: TGetResultSetMetadataResp = {
val schema = new TTableSchema()
Seq("key", "value").zipWithIndex.foreach { case (colName, position) =>
val tColumnDesc = new TColumnDesc()
@ -44,7 +44,10 @@ abstract class KyuubiApplicationOperation(session: Session) extends KyuubiOperat
tColumnDesc.setPosition(position)
schema.addToColumns(tColumnDesc)
}
schema
val resp = new TGetResultSetMetadataResp
resp.setSchema(schema)
resp.setStatus(okStatusWithHints(Seq.empty))
resp
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {

View File

@ -141,7 +141,7 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
}
}
override def getResultSetSchema: TTableSchema = {
override def getResultSetMetadata: TGetResultSetMetadataResp = {
if (_remoteOpHandle == null) {
val tColumnDesc = new TColumnDesc()
tColumnDesc.setColumnName("Result")
@ -151,7 +151,10 @@ abstract class KyuubiOperation(session: Session) extends AbstractOperation(sessi
tColumnDesc.setPosition(0)
val schema = new TTableSchema()
schema.addToColumns(tColumnDesc)
schema
val resp = new TGetResultSetMetadataResp
resp.setSchema(schema)
resp.setStatus(OK_STATUS)
resp
} else {
client.getResultSetMetadata(_remoteOpHandle)
}

View File

@ -170,7 +170,8 @@ trait BackendServiceMetric extends BackendService {
}
}
abstract override def getResultSetMetadata(operationHandle: OperationHandle): TTableSchema = {
abstract override def getResultSetMetadata(operationHandle: OperationHandle)
: TGetResultSetMetadataResp = {
MetricsSystem.timerTracing(MetricsConstants.BS_GET_RESULT_SET_METADATA) {
super.getResultSetMetadata(operationHandle)
}

View File

@ -104,7 +104,7 @@ private[v1] class OperationsResource extends ApiRequestContext with Logging {
try {
val operationHandle = OperationHandle(operationHandleStr)
new ResultSetMetaData(
fe.be.getResultSetMetadata(operationHandle).getColumns.asScala.map(c => {
fe.be.getResultSetMetadata(operationHandle).getSchema.getColumns.asScala.map(c => {
val tPrimitiveTypeEntry = c.getTypeDesc.getTypes.get(0).getPrimitiveEntry
var precision = 0
var scale = 0

View File

@ -195,13 +195,13 @@ class MySQLCommandHandler(
throw opStatus.exception
.getOrElse(KyuubiSQLException(s"Error operator state ${opStatus.state}"))
}
val tableSchema = be.getResultSetMetadata(opHandle)
val resultSetMetadata = be.getResultSetMetadata(opHandle)
val rowSet = be.fetchResults(
opHandle,
FetchOrientation.FETCH_NEXT,
Int.MaxValue,
fetchLog = false)
MySQLQueryResult(tableSchema, rowSet)
MySQLQueryResult(resultSetMetadata.getSchema, rowSet)
} catch {
case rethrow: Exception =>
warn("Error executing statement: ", rethrow)