[KYUUBI #1714] Add executeScala api for KyuubiStatement
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Add executeScala api for KyuubiStatement, so that user can invoke this method to execute scala code directly. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1714 from turboFei/execute_scala. Closes #1714 a62616df [Fei Wang] Add executeScala api for KyuubiStatement Authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: Fei Wang <fwang12@ebay.com>
This commit is contained in:
parent
84839e0279
commit
36b95d333f
@ -22,6 +22,7 @@ import java.util.Properties
|
||||
import org.apache.kyuubi.IcebergSuiteMixin
|
||||
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
|
||||
import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
|
||||
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
|
||||
import org.apache.kyuubi.tags.IcebergTest
|
||||
|
||||
@IcebergTest
|
||||
@ -86,4 +87,19 @@ class KyuubiHiveDriverSuite extends WithSparkSQLEngine with IcebergSuiteMixin {
|
||||
connection.close()
|
||||
}
|
||||
}
|
||||
|
||||
test("add executeScala api for KyuubiStatement") {
|
||||
val driver = new KyuubiHiveDriver()
|
||||
val connection = driver.connect(getJdbcUrl, new Properties())
|
||||
val statement = connection.createStatement().asInstanceOf[KyuubiStatement]
|
||||
try {
|
||||
val code = """spark.sql("set kyuubi.operation.language").show(false)"""
|
||||
val resultSet = statement.executeScala(code)
|
||||
assert(resultSet.next())
|
||||
assert(resultSet.getString(1).contains("kyuubi.operation.language"))
|
||||
} finally {
|
||||
statement.close()
|
||||
connection.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -24,6 +24,7 @@ import java.sql.SQLFeatureNotSupportedException;
|
||||
import java.sql.SQLTimeoutException;
|
||||
import java.sql.SQLWarning;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -245,7 +246,12 @@ public class KyuubiStatement implements java.sql.Statement, KyuubiLoggable {
|
||||
|
||||
@Override
|
||||
public boolean execute(String sql) throws SQLException {
|
||||
runAsyncOnServer(sql);
|
||||
return executeWithConfOverlay(sql, null);
|
||||
}
|
||||
|
||||
private boolean executeWithConfOverlay(String sql, Map<String, String> confOverlay)
|
||||
throws SQLException {
|
||||
runAsyncOnServer(sql, confOverlay);
|
||||
TGetOperationStatusResp status = waitForOperationToComplete();
|
||||
|
||||
// The query should be completed by now
|
||||
@ -297,6 +303,10 @@ public class KyuubiStatement implements java.sql.Statement, KyuubiLoggable {
|
||||
}
|
||||
|
||||
private void runAsyncOnServer(String sql) throws SQLException {
|
||||
runAsyncOnServer(sql, null);
|
||||
}
|
||||
|
||||
private void runAsyncOnServer(String sql, Map<String, String> confOneTime) throws SQLException {
|
||||
checkConnection("execute");
|
||||
|
||||
closeClientOperation();
|
||||
@ -309,7 +319,13 @@ public class KyuubiStatement implements java.sql.Statement, KyuubiLoggable {
|
||||
* execution run asynchronously
|
||||
*/
|
||||
execReq.setRunAsync(true);
|
||||
execReq.setConfOverlay(sessConf);
|
||||
if (confOneTime != null) {
|
||||
Map<String, String> confOverlay = new HashMap<String, String>(sessConf);
|
||||
confOverlay.putAll(confOneTime);
|
||||
execReq.setConfOverlay(confOverlay);
|
||||
} else {
|
||||
execReq.setConfOverlay(sessConf);
|
||||
}
|
||||
execReq.setQueryTimeout(queryTimeout);
|
||||
try {
|
||||
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
|
||||
@ -483,6 +499,14 @@ public class KyuubiStatement implements java.sql.Statement, KyuubiLoggable {
|
||||
return resultSet;
|
||||
}
|
||||
|
||||
public ResultSet executeScala(String code) throws SQLException {
|
||||
if (!executeWithConfOverlay(
|
||||
code, Collections.singletonMap("kyuubi.operation.language", "SCALA"))) {
|
||||
throw new SQLException("The query did not generate a result set!");
|
||||
}
|
||||
return resultSet;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
|
||||
Loading…
Reference in New Issue
Block a user