From 36b95d333fca6d8bdbf28d13d4951612e2eae8e4 Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Sun, 16 Jan 2022 20:23:51 +0800 Subject: [PATCH] [KYUUBI #1714] Add executeScala api for KyuubiStatement ### _Why are the changes needed?_ Add executeScala api for KyuubiStatement, so that user can invoke this method to execute scala code directly. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1714 from turboFei/execute_scala. Closes #1714 a62616df [Fei Wang] Add executeScala api for KyuubiStatement Authored-by: Fei Wang Signed-off-by: Fei Wang --- .../kyuubi/jdbc/KyuubiHiveDriverSuite.scala | 16 +++++++++++ .../kyuubi/jdbc/hive/KyuubiStatement.java | 28 +++++++++++++++++-- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/jdbc/KyuubiHiveDriverSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/jdbc/KyuubiHiveDriverSuite.scala index c49a122cb..26cd252b2 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/jdbc/KyuubiHiveDriverSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/jdbc/KyuubiHiveDriverSuite.scala @@ -22,6 +22,7 @@ import java.util.Properties import org.apache.kyuubi.IcebergSuiteMixin import org.apache.kyuubi.engine.spark.WithSparkSQLEngine import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim +import org.apache.kyuubi.jdbc.hive.KyuubiStatement import org.apache.kyuubi.tags.IcebergTest @IcebergTest @@ -86,4 +87,19 @@ class KyuubiHiveDriverSuite extends WithSparkSQLEngine with IcebergSuiteMixin { connection.close() } } + + test("add executeScala api for KyuubiStatement") { + val driver = new KyuubiHiveDriver() + val connection = driver.connect(getJdbcUrl, new Properties()) + val statement = connection.createStatement().asInstanceOf[KyuubiStatement] + try { + val code = """spark.sql("set kyuubi.operation.language").show(false)""" + val resultSet = statement.executeScala(code) + assert(resultSet.next()) + assert(resultSet.getString(1).contains("kyuubi.operation.language")) + } finally { + statement.close() + connection.close() + } + } } diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java index b2c5de262..1e750774e 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java @@ -24,6 +24,7 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.SQLTimeoutException; import java.sql.SQLWarning; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -245,7 +246,12 @@ public class KyuubiStatement implements java.sql.Statement, KyuubiLoggable { @Override public boolean execute(String sql) throws SQLException { - runAsyncOnServer(sql); + return executeWithConfOverlay(sql, null); + } + + private boolean executeWithConfOverlay(String sql, Map confOverlay) + throws SQLException { + runAsyncOnServer(sql, confOverlay); TGetOperationStatusResp status = waitForOperationToComplete(); // The query should be completed by now @@ -297,6 +303,10 @@ public class KyuubiStatement implements java.sql.Statement, KyuubiLoggable { } private void runAsyncOnServer(String sql) throws SQLException { + runAsyncOnServer(sql, null); + } + + private void runAsyncOnServer(String sql, Map confOneTime) throws SQLException { checkConnection("execute"); closeClientOperation(); @@ -309,7 +319,13 @@ public class KyuubiStatement implements java.sql.Statement, KyuubiLoggable { * execution run asynchronously */ execReq.setRunAsync(true); - execReq.setConfOverlay(sessConf); + if (confOneTime != null) { + Map confOverlay = new HashMap(sessConf); + confOverlay.putAll(confOneTime); + execReq.setConfOverlay(confOverlay); + } else { + execReq.setConfOverlay(sessConf); + } execReq.setQueryTimeout(queryTimeout); try { TExecuteStatementResp execResp = client.ExecuteStatement(execReq); @@ -483,6 +499,14 @@ public class KyuubiStatement implements java.sql.Statement, KyuubiLoggable { return resultSet; } + public ResultSet executeScala(String code) throws SQLException { + if (!executeWithConfOverlay( + code, Collections.singletonMap("kyuubi.operation.language", "SCALA"))) { + throw new SQLException("The query did not generate a result set!"); + } + return resultSet; + } + /* * (non-Javadoc) *