From 8b797725b52c232faef626b11e2a314293235d8a Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Wed, 4 Jan 2023 18:05:58 +0800 Subject: [PATCH] [KYUUBI #4075] [ARROW] Rename configuration from kyuubi.operation.result.codec to kyuubi.operation.result.format ### _Why are the changes needed?_ After some offline discussions, I propose to change the configuration key from`kyuubi.operation.result.codec` to `kyuubi.operation.result.format`. ### _How was this patch tested?_ Pass CI. Closes #4075 from cfmcgrady/arrow-conf-rename. Closes #4075 5ad45507 [Fu Chen] fix 214b43fd [Fu Chen] rename Authored-by: Fu Chen Signed-off-by: Cheng Pan --- docs/deployment/settings.md | 2 +- .../spark/operation/ExecuteStatement.scala | 2 +- .../spark/operation/SparkOperation.scala | 6 ++-- .../SparkArrowbasedOperationSuite.scala | 18 +++++----- .../org/apache/kyuubi/config/KyuubiConf.scala | 12 +++---- .../kyuubi/operation/SparkDataTypeTests.scala | 14 ++++---- .../kyuubi/operation/SparkQueryTests.scala | 36 +++++++++---------- .../kyuubi/jdbc/hive/KyuubiStatement.java | 18 +++++----- 8 files changed, 54 insertions(+), 54 deletions(-) diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index c12047443..1a3fee2fc 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -446,7 +446,7 @@ kyuubi.operation.plan.only.mode|none|Configures the statement performed mode, Th kyuubi.operation.plan.only.output.style|plain|Configures the planOnly output style, The value can be 'plain' and 'json', default value is 'plain', this configuration supports only the output styles of the Spark engine|string|1.7.0 kyuubi.operation.progress.enabled|false|Whether to enable the operation progress. When true, the operation progress will be returned in `GetOperationStatus`.|boolean|1.6.0 kyuubi.operation.query.timeout|<undefined>|Timeout for query executions at server-side, take affect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take fully control whether the query should timeout or not. If set, client-side timeout capped at this point. To cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.|duration|1.2.0 -kyuubi.operation.result.codec|simple|Specify the result codec, available configs are:
  • SIMPLE: the result will convert to TRow at the engine driver side.
  • ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.
|string|1.7.0 +kyuubi.operation.result.format|thrift|Specify the result format, available configs are:
  • THRIFT: the result will convert to TRow at the engine driver side.
  • ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.
|string|1.7.0 kyuubi.operation.result.max.rows|0|Max rows of Spark query results. Rows that exceeds the limit would be ignored. By setting this value to 0 to disable the max rows limit.|int|1.6.0 kyuubi.operation.scheduler.pool|<undefined>|The scheduler pool of job. Note that, this config should be used after change Spark config spark.scheduler.mode=FAIR.|string|1.1.1 kyuubi.operation.spark.listener.enabled|true|When set to true, Spark engine registers a SQLOperationListener before executing the statement, logs a few summary statistics when each stage completes.|boolean|1.6.0 diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 27b9dde4d..2cdc2b500 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -176,5 +176,5 @@ class ExecuteStatement( } override def getResultSetMetadataHints(): Seq[String] = - Seq(s"__kyuubi_operation_result_codec__=$resultCodec") + Seq(s"__kyuubi_operation_result_format__=$resultFormat") } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 67ab29172..842ff944f 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -260,14 +260,14 @@ abstract class SparkOperation(session: Session) override def shouldRunAsync: Boolean = false protected def arrowEnabled(): Boolean = { - resultCodec().equalsIgnoreCase("arrow") && + resultFormat().equalsIgnoreCase("arrow") && // TODO: (fchen) make all operation support arrow getClass.getCanonicalName == classOf[ExecuteStatement].getCanonicalName } - protected def resultCodec(): String = { + protected def resultFormat(): String = { // TODO: respect the config of the operation ExecuteStatement, if it was set. - spark.conf.get("kyuubi.operation.result.codec", "simple") + spark.conf.get("kyuubi.operation.result.format", "thrift") } protected def setSessionUserSign(): Unit = { diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala index 9a4b3955e..e46456914 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala @@ -30,26 +30,26 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp override def withKyuubiConf: Map[String, String] = Map.empty override def jdbcVars: Map[String, String] = { - Map(KyuubiConf.OPERATION_RESULT_CODEC.key -> resultCodec) + Map(KyuubiConf.OPERATION_RESULT_FORMAT.key -> resultFormat) } - override def resultCodec: String = "arrow" + override def resultFormat: String = "arrow" - test("detect resultSet codec") { + test("detect resultSet format") { withJdbcStatement() { statement => - checkResultSetCodec(statement, "arrow") - statement.executeQuery(s"set ${KyuubiConf.OPERATION_RESULT_CODEC.key}=simple") - checkResultSetCodec(statement, "simple") + checkResultSetFormat(statement, "arrow") + statement.executeQuery(s"set ${KyuubiConf.OPERATION_RESULT_FORMAT.key}=thrift") + checkResultSetFormat(statement, "thrift") } } - def checkResultSetCodec(statement: Statement, expectCodec: String): Unit = { + def checkResultSetFormat(statement: Statement, expectFormat: String): Unit = { val query = s""" - |SELECT '$${hivevar:${KyuubiConf.OPERATION_RESULT_CODEC.key}}' AS col + |SELECT '$${hivevar:${KyuubiConf.OPERATION_RESULT_FORMAT.key}}' AS col |""".stripMargin val resultSet = statement.executeQuery(query) assert(resultSet.next()) - assert(resultSet.getString("col") === expectCodec) + assert(resultSet.getString("col") === expectFormat) } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 0703412cc..349292d87 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1534,18 +1534,18 @@ object KyuubiConf { .booleanConf .createWithDefault(false) - val OPERATION_RESULT_CODEC: ConfigEntry[String] = - buildConf("kyuubi.operation.result.codec") - .doc("Specify the result codec, available configs are:
    " + - "
  • SIMPLE: the result will convert to TRow at the engine driver side.
  • " + + val OPERATION_RESULT_FORMAT: ConfigEntry[String] = + buildConf("kyuubi.operation.result.format") + .doc("Specify the result format, available configs are:
      " + + "
    • THRIFT: the result will convert to TRow at the engine driver side.
    • " + "
    • ARROW: the result will be encoded as Arrow at the executor side before collecting" + " by the driver, and deserialized at the client side. note that it only takes effect for" + " kyuubi-hive-jdbc clients now.
    ") .version("1.7.0") .stringConf - .checkValues(Set("arrow", "simple")) + .checkValues(Set("arrow", "thrift")) .transform(_.toLowerCase(Locale.ROOT)) - .createWithDefault("simple") + .createWithDefault("thrift") val OPERATION_RESULT_MAX_ROWS: ConfigEntry[Int] = buildConf("kyuubi.operation.result.max.rows") diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala index 2ffe13063..688167703 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkDataTypeTests.scala @@ -24,10 +24,10 @@ import org.apache.kyuubi.engine.SemanticVersion trait SparkDataTypeTests extends HiveJDBCTestHelper { protected lazy val SPARK_ENGINE_VERSION = sparkEngineMajorMinorVersion - def resultCodec: String = "simple" + def resultFormat: String = "thrift" test("execute statement - select null") { - assume(resultCodec == "simple" || (resultCodec == "arrow" && SPARK_ENGINE_VERSION >= "3.2")) + assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.2")) withJdbcStatement() { statement => val resultSet = statement.executeQuery("SELECT NULL AS col") assert(resultSet.next()) @@ -186,7 +186,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper { } test("execute statement - select daytime interval") { - assume(resultCodec == "simple" || (resultCodec == "arrow" && SPARK_ENGINE_VERSION >= "3.3")) + assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.3")) withJdbcStatement() { statement => Map( "interval 1 day 1 hour -60 minutes 30 seconds" -> @@ -231,7 +231,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper { } test("execute statement - select year/month interval") { - assume(resultCodec == "simple" || (resultCodec == "arrow" && SPARK_ENGINE_VERSION >= "3.3")) + assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.3")) withJdbcStatement() { statement => Map( "INTERVAL 2022 YEAR" -> Tuple2("2022-0", "2022 years"), @@ -260,7 +260,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper { } test("execute statement - select array") { - assume(resultCodec == "simple" || (resultCodec == "arrow" && SPARK_ENGINE_VERSION >= "3.2")) + assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.2")) withJdbcStatement() { statement => val resultSet = statement.executeQuery( "SELECT array() AS col1, array(1) AS col2, array(null) AS col3") @@ -278,7 +278,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper { } test("execute statement - select map") { - assume(resultCodec == "simple" || (resultCodec == "arrow" && SPARK_ENGINE_VERSION >= "3.2")) + assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.2")) withJdbcStatement() { statement => val resultSet = statement.executeQuery( "SELECT map() AS col1, map(1, 2, 3, 4) AS col2, map(1, null) AS col3") @@ -296,7 +296,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper { } test("execute statement - select struct") { - assume(resultCodec == "simple" || (resultCodec == "arrow" && SPARK_ENGINE_VERSION >= "3.2")) + assume(resultFormat == "thrift" || (resultFormat == "arrow" && SPARK_ENGINE_VERSION >= "3.2")) withJdbcStatement() { statement => val resultSet = statement.executeQuery( "SELECT struct('1', '2') AS col1," + diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala index c64778efa..03b5fc6eb 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala @@ -410,12 +410,12 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper { } } - test("operation metadata hint - __kyuubi_operation_result_codec__") { + test("operation metadata hint - __kyuubi_operation_result_format__") { assume(!httpMode) withSessionHandle { (client, handle) => - def checkStatusAndResultSetCodecHint( + def checkStatusAndResultSetFormatHint( sql: String, - expectedCodec: String): Unit = { + expectedFormat: String): Unit = { val stmtReq = new TExecuteStatementReq() stmtReq.setSessionHandle(handle) stmtReq.setStatement(sql) @@ -425,24 +425,24 @@ trait SparkQueryTests extends SparkDataTypeTests with HiveJDBCTestHelper { val metaReq = new TGetResultSetMetadataReq(opHandle) val resp = client.GetResultSetMetadata(metaReq) assert(resp.getStatus.getStatusCode == TStatusCode.SUCCESS_STATUS) - val expectedResultSetCodecHint = s"__kyuubi_operation_result_codec__=$expectedCodec" - assert(resp.getStatus.getInfoMessages.contains(expectedResultSetCodecHint)) + val expectedResultSetForamtHint = s"__kyuubi_operation_result_format__=$expectedFormat" + assert(resp.getStatus.getInfoMessages.contains(expectedResultSetForamtHint)) } - checkStatusAndResultSetCodecHint( + checkStatusAndResultSetFormatHint( sql = "SELECT 1", - expectedCodec = "simple") - checkStatusAndResultSetCodecHint( - sql = "set kyuubi.operation.result.codec=arrow", - expectedCodec = "arrow") - checkStatusAndResultSetCodecHint( + expectedFormat = "thrift") + checkStatusAndResultSetFormatHint( + sql = "set kyuubi.operation.result.format=arrow", + expectedFormat = "arrow") + checkStatusAndResultSetFormatHint( sql = "SELECT 1", - expectedCodec = "arrow") - checkStatusAndResultSetCodecHint( - sql = "set kyuubi.operation.result.codec=simple", - expectedCodec = "simple") - checkStatusAndResultSetCodecHint( - sql = "set kyuubi.operation.result.codec", - expectedCodec = "simple") + expectedFormat = "arrow") + checkStatusAndResultSetFormatHint( + sql = "set kyuubi.operation.result.format=thrift", + expectedFormat = "thrift") + checkStatusAndResultSetFormatHint( + sql = "set kyuubi.operation.result.format", + expectedFormat = "thrift") } } } diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java index ea3f3f56f..ee3dc71a9 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java @@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory; public class KyuubiStatement implements SQLStatement, KyuubiLoggable { public static final Logger LOG = LoggerFactory.getLogger(KyuubiStatement.class.getName()); public static final int DEFAULT_FETCH_SIZE = 1000; - public static final String DEFAULT_RESULT_CODEC = "simple"; + public static final String DEFAULT_RESULT_FORMAT = "thrift"; private final KyuubiConnection connection; private TCLIService.Iface client; private TOperationHandle stmtHandle = null; @@ -208,10 +208,10 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable { List columnAttributes = new ArrayList<>(); parseMetadata(metadata, columnNames, columnTypes, columnAttributes); - String resultCodec = - properties.getOrDefault("__kyuubi_operation_result_codec__", DEFAULT_RESULT_CODEC); - LOG.info("kyuubi.operation.result.codec: " + resultCodec); - switch (resultCodec) { + String resultFormat = + properties.getOrDefault("__kyuubi_operation_result_format__", DEFAULT_RESULT_FORMAT); + LOG.info("kyuubi.operation.result.format: " + resultFormat); + switch (resultFormat) { case "arrow": resultSet = new KyuubiArrowQueryResultSet.Builder(this) @@ -265,10 +265,10 @@ public class KyuubiStatement implements SQLStatement, KyuubiLoggable { List columnAttributes = new ArrayList<>(); parseMetadata(metadata, columnNames, columnTypes, columnAttributes); - String resultCodec = - properties.getOrDefault("__kyuubi_operation_result_codec__", DEFAULT_RESULT_CODEC); - LOG.info("kyuubi.operation.result.codec: " + resultCodec); - switch (resultCodec) { + String resultFormat = + properties.getOrDefault("__kyuubi_operation_result_format__", DEFAULT_RESULT_FORMAT); + LOG.info("kyuubi.operation.result.format: " + resultFormat); + switch (resultFormat) { case "arrow": resultSet = new KyuubiArrowQueryResultSet.Builder(this)