[KYUUBI #4075] [ARROW] Rename configuration from kyuubi.operation.result.codec to kyuubi.operation.result.format

### _Why are the changes needed?_

After some offline discussions, I propose to change the configuration key from`kyuubi.operation.result.codec` to `kyuubi.operation.result.format`.

### _How was this patch tested?_

Pass CI.

Closes #4075 from cfmcgrady/arrow-conf-rename.

Closes #4075

5ad45507 [Fu Chen] fix
214b43fd [Fu Chen] rename

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Fu Chen 2023-01-04 18:05:58 +08:00 committed by Cheng Pan
parent 71e46bd316
commit 8b797725b5
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
8 changed files with 54 additions and 54 deletions

View File

@ -446,7 +446,7 @@ kyuubi.operation.plan.only.mode|none|Configures the statement performed mode, Th
kyuubi.operation.plan.only.output.style|plain|Configures the planOnly output style, The value can be 'plain' and 'json', default value is 'plain', this configuration supports only the output styles of the Spark engine|string|1.7.0
kyuubi.operation.progress.enabled|false|Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`.|boolean|1.6.0
kyuubi.operation.query.timeout|&lt;undefined&gt;|Timeout for query executions at server-side, take affect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take fully control whether the query should timeout or not. If set, client-side timeout capped at this point. To cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.|duration|1.2.0
kyuubi.operation.result.codec|simple|Specify the result codec, available configs are: <ul> <li>SIMPLE: the result will convert to TRow at the engine driver side. </li> <li>ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.</li></ul>|string|1.7.0
kyuubi.operation.result.format|thrift|Specify the result format, available configs are: <ul> <li>THRIFT: the result will convert to TRow at the engine driver side. </li> <li>ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.</li></ul>|string|1.7.0
kyuubi.operation.result.max.rows|0|Max rows of Spark query results. Rows that exceeds the limit would be ignored. By setting this value to 0 to disable the max rows limit.|int|1.6.0
kyuubi.operation.scheduler.pool|&lt;undefined&gt;|The scheduler pool of job. Note that, this config should be used after change Spark config spark.scheduler.mode=FAIR.|string|1.1.1
kyuubi.operation.spark.listener.enabled|true|When set to true, Spark engine registers a SQLOperationListener before executing the statement, logs a few summary statistics when each stage completes.|boolean|1.6.0

View File

@ -176,5 +176,5 @@ class ExecuteStatement(
}
override def getResultSetMetadataHints(): Seq[String] =
Seq(s"__kyuubi_operation_result_codec__=$resultCodec")
Seq(s"__kyuubi_operation_result_format__=$resultFormat")
}

View File

@ -260,14 +260,14 @@ abstract class SparkOperation(session: Session)
override def shouldRunAsync: Boolean = false
protected def arrowEnabled(): Boolean = {
resultCodec().equalsIgnoreCase("arrow") &&
resultFormat().equalsIgnoreCase("arrow") &&
// TODO: (fchen) make all operation support arrow
getClass.getCanonicalName == classOf[ExecuteStatement].getCanonicalName
}
protected def resultCodec(): String = {
protected def resultFormat(): String = {
// TODO: respect the config of the operation ExecuteStatement, if it was set.
spark.conf.get("kyuubi.operation.result.codec", "simple")
spark.conf.get("kyuubi.operation.result.format", "thrift")
}
protected def setSessionUserSign(): Unit = {

View File

@ -30,26 +30,26 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
override def withKyuubiConf: Map[String, String] = Map.empty
override def jdbcVars: Map[String, String] = {
Map(KyuubiConf.OPERATION_RESULT_CODEC.key -> resultCodec)
Map(KyuubiConf.OPERATION_RESULT_FORMAT.key -> resultFormat)
}
override def resultCodec: String = "arrow"
override def resultFormat: String = "arrow"
test("detect resultSet codec") {
test("detect resultSet format") {
withJdbcStatement() { statement =>
checkResultSetCodec(statement, "arrow")
statement.executeQuery(s"set ${KyuubiConf.OPERATION_RESULT_CODEC.key}=simple")
checkResultSetCodec(statement, "simple")
checkResultSetFormat(statement, "arrow")
statement.executeQuery(s"set ${KyuubiConf.OPERATION_RESULT_FORMAT.key}=thrift")
checkResultSetFormat(statement, "thrift")
}
}
def checkResultSetCodec(statement: Statement, expectCodec: String): Unit = {
def checkResultSetFormat(statement: Statement, expectFormat: String): Unit = {
val query =
s"""
|SELECT '$${hivevar:${KyuubiConf.OPERATION_RESULT_CODEC.key}}' AS col
|SELECT '$${hivevar:${KyuubiConf.OPERATION_RESULT_FORMAT.key}}' AS col
|""".stripMargin
val resultSet = statement.executeQuery(query)
assert(resultSet.next())
assert(resultSet.getString("col") === expectCodec)
assert(resultSet.getString("col") === expectFormat)
}
}

View File

@ -1534,18 +1534,18 @@ object KyuubiConf {
.booleanConf
.createWithDefault(false)
val OPERATION_RESULT_CODEC: ConfigEntry[String] =
buildConf("kyuubi.operation.result.codec")
.doc("Specify the result codec, available configs are: <ul>" +
" <li>SIMPLE: the result will convert to TRow at the engine driver side. </li>" +
val OPERATION_RESULT_FORMAT: ConfigEntry[String] =
buildConf("kyuubi.operation.result.format")
.doc("Specify the result format, available configs are: <ul>" +
" <li>THRIFT: the result will convert to TRow at the engine driver side. </li>" +
" <li>ARROW: the result will be encoded as Arrow at the executor side before collecting" +
" by the driver, and deserialized at the client side. note that it only takes effect for" +
" kyuubi-hive-jdbc clients now.</li></ul>")
.version("1.7.0")
.stringConf
.checkValues(Set("arrow", "simple"))
.checkValues(Set("arrow", "thrift"))
.transform(_.toLowerCase(Locale.ROOT))
.createWithDefault("simple")
.createWithDefault("thrift")
val OPERATION_RESULT_MAX_ROWS: ConfigEntry[Int] =
buildConf("kyuubi.operation.result.max.rows")

View File

@ -24,10 +24,10 @@ import org.apache.kyuubi.engine.SemanticVersion
trait SparkDataTypeTests extends HiveJDBCTestHelper {
protected lazy val SPARK_ENGINE_VERSION = sparkEngineMajorMinorVersion
def resultCodec: String = "simple"
def resultFormat: String = "thrift"
test("execute statement - select null") {
assume(resultCodec == "simple" || (resultCodec == "arrow" && SPARK_ENGINE_VERSION >= "3.2"))
assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.2"))
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SELECT NULL AS col")
assert(resultSet.next())
@ -186,7 +186,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
}
test("execute statement - select daytime interval") {
assume(resultCodec == "simple" || (resultCodec == "arrow" && SPARK_ENGINE_VERSION >= "3.3"))
assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.3"))
withJdbcStatement() { statement =>
Map(
"interval 1 day 1 hour -60 minutes 30 seconds" ->
@ -231,7 +231,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
}
test("execute statement - select year/month interval") {
assume(resultCodec == "simple" || (resultCodec == "arrow" && SPARK_ENGINE_VERSION >= "3.3"))
assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.3"))
withJdbcStatement() { statement =>
Map(
"INTERVAL 2022 YEAR" -> Tuple2("2022-0", "2022 years"),
@ -260,7 +260,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
}
test("execute statement - select array") {
assume(resultCodec == "simple" || (resultCodec == "arrow" && SPARK_ENGINE_VERSION >= "3.2"))
assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.2"))
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(
"SELECT array() AS col1, array(1) AS col2, array(null) AS col3")
@ -278,7 +278,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
}
test("execute statement - select map") {
assume(resultCodec == "simple" || (resultCodec == "arrow" && SPARK_ENGINE_VERSION >= "3.2"))
assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.2"))
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(
"SELECT map() AS col1, map(1, 2, 3, 4) AS col2, map(1, null) AS col3")
@ -296,7 +296,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper {
}
test("execute statement - select struct") {
assume(resultCodec == "simple" || (resultCodec == "arrow" && SPARK_ENGINE_VERSION >= "3.2"))
assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.2"))
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery(
"SELECT struct('1', '2') AS col1," +

View File

@ -410,12 +410,12 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
}
}
test("operation metadata hint - __kyuubi_operation_result_codec__") {
test("operation metadata hint - __kyuubi_operation_result_format__") {
assume(!httpMode)
withSessionHandle { (client, handle) =>
def checkStatusAndResultSetCodecHint(
def checkStatusAndResultSetFormatHint(
sql: String,
expectedCodec: String): Unit = {
expectedFormat: String): Unit = {
val stmtReq = new TExecuteStatementReq()
stmtReq.setSessionHandle(handle)
stmtReq.setStatement(sql)
@ -425,24 +425,24 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper {
val metaReq = new TGetResultSetMetadataReq(opHandle)
val resp = client.GetResultSetMetadata(metaReq)
assert(resp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS)
val expectedResultSetCodecHint = s"__kyuubi_operation_result_codec__=$expectedCodec"
assert(resp.getStatus.getInfoMessages.contains(expectedResultSetCodecHint))
val expectedResultSetForamtHint = s"__kyuubi_operation_result_format__=$expectedFormat"
assert(resp.getStatus.getInfoMessages.contains(expectedResultSetForamtHint))
}
checkStatusAndResultSetCodecHint(
checkStatusAndResultSetFormatHint(
sql = "SELECT 1",
expectedCodec = "simple")
checkStatusAndResultSetCodecHint(
sql = "set kyuubi.operation.result.codec=arrow",
expectedCodec = "arrow")
checkStatusAndResultSetCodecHint(
expectedFormat = "thrift")
checkStatusAndResultSetFormatHint(
sql = "set kyuubi.operation.result.format=arrow",
expectedFormat = "arrow")
checkStatusAndResultSetFormatHint(
sql = "SELECT 1",
expectedCodec = "arrow")
checkStatusAndResultSetCodecHint(
sql = "set kyuubi.operation.result.codec=simple",
expectedCodec = "simple")
checkStatusAndResultSetCodecHint(
sql = "set kyuubi.operation.result.codec",
expectedCodec = "simple")
expectedFormat = "arrow")
checkStatusAndResultSetFormatHint(
sql = "set kyuubi.operation.result.format=thrift",
expectedFormat = "thrift")
checkStatusAndResultSetFormatHint(
sql = "set kyuubi.operation.result.format",
expectedFormat = "thrift")
}
}
}

View File

@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
public static final Logger LOG = LoggerFactory.getLogger(KyuubiStatement.class.getName());
public static final int DEFAULT_FETCH_SIZE = 1000;
public static final String DEFAULT_RESULT_CODEC = "simple";
public static final String DEFAULT_RESULT_FORMAT = "thrift";
private final KyuubiConnection connection;
private TCLIService.Iface client;
private TOperationHandle stmtHandle = null;
@ -208,10 +208,10 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
List<JdbcColumnAttributes> columnAttributes = new ArrayList<>();
parseMetadata(metadata, columnNames, columnTypes, columnAttributes);
String resultCodec =
properties.getOrDefault("__kyuubi_operation_result_codec__", DEFAULT_RESULT_CODEC);
LOG.info("kyuubi.operation.result.codec: " + resultCodec);
switch (resultCodec) {
String resultFormat =
properties.getOrDefault("__kyuubi_operation_result_format__", DEFAULT_RESULT_FORMAT);
LOG.info("kyuubi.operation.result.format: " + resultFormat);
switch (resultFormat) {
case "arrow":
resultSet =
new KyuubiArrowQueryResultSet.Builder(this)
@ -265,10 +265,10 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable {
List<JdbcColumnAttributes> columnAttributes = new ArrayList<>();
parseMetadata(metadata, columnNames, columnTypes, columnAttributes);
String resultCodec =
properties.getOrDefault("__kyuubi_operation_result_codec__", DEFAULT_RESULT_CODEC);
LOG.info("kyuubi.operation.result.codec: " + resultCodec);
switch (resultCodec) {
String resultFormat =
properties.getOrDefault("__kyuubi_operation_result_format__", DEFAULT_RESULT_FORMAT);
LOG.info("kyuubi.operation.result.format: " + resultFormat);
switch (resultFormat) {
case "arrow":
resultSet =
new KyuubiArrowQueryResultSet.Builder(this)