[KYUUBI #1490] Introduce the basic framework for running scala

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

Introduce the basic framework for running scala, see #1490 for the detail
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate
```
Beeline version 1.5.0-SNAPSHOT by Apache Kyuubi (Incubating)
0: jdbc:hive2://10.242.189.214:10009/> spark.version;
2021-12-03 13:47:07.556 INFO operation.ExecuteStatement: Processing kent's query[08b8b6da-d434-4296-b613-2027e3518441]: INITIALIZED_STATE -> PENDING_STATE, statement: spark.version
2021-12-03 13:47:07.560 INFO operation.ExecuteStatement: Processing kent's query[08b8b6da-d434-4296-b613-2027e3518441]: PENDING_STATE -> RUNNING_STATE, statement: spark.version
2021-12-03 13:47:07.558 INFO operation.ExecuteStatement: Processing kent's query[321dc15d-68d0-4f91-9216-1e08f09842df]: INITIALIZED_STATE -> PENDING_STATE, statement: spark.version
2021-12-03 13:47:07.559 INFO operation.ExecuteStatement: Processing kent's query[321dc15d-68d0-4f91-9216-1e08f09842df]: PENDING_STATE -> RUNNING_STATE, statement: spark.version
2021-12-03 13:47:07.560 INFO operation.ExecuteStatement:
           Spark application name: kyuubi_USER_SPARK_SQL_kent_default_61cff9fb-7035-4435-b509-80c1730876ed
                 application ID: local-1638510289918
                 application web UI: http://10.242.189.214:65027
                 master: local[*]
                 deploy mode: client
                 version: 3.1.2
           Start time: 2021-12-03T13:44:49.313
           User: kent
2021-12-03 13:47:07.562 INFO scheduler.DAGScheduler: Asked to cancel job group 321dc15d-68d0-4f91-9216-1e08f09842df
2021-12-03 13:47:07.565 INFO operation.ExecuteStatement: Processing kent's query[321dc15d-68d0-4f91-9216-1e08f09842df]: RUNNING_STATE -> ERROR_STATE, statement: spark.version, time taken: 0.006 seconds
2021-12-03 13:47:07.565 ERROR operation.ExecuteStatement: Error operating EXECUTE_STATEMENT: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'spark' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)

== SQL ==
spark.version
^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:255)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:124)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:75)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:616)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:616)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:98)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.withLocalProperties(ExecuteStatement.scala:157)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.org$apache$kyuubi$engine$spark$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:92)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:125)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'spark' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)

== SQL ==
spark.version
^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:255)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:124)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:75)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:616)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:616)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:98)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.withLocalProperties(ExecuteStatement.scala:157)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.org$apache$kyuubi$engine$spark$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:92)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:125)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2021-12-03 13:47:07.569 INFO operation.ExecuteStatement: Query[08b8b6da-d434-4296-b613-2027e3518441] in ERROR_STATE
2021-12-03 13:47:07.569 INFO operation.ExecuteStatement: Processing kent's query[08b8b6da-d434-4296-b613-2027e3518441]: RUNNING_STATE -> ERROR_STATE, statement: spark.version, time taken: 0.009 seconds
Error: Error operating EXECUTE_STATEMENT: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'spark' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)

== SQL ==
spark.version
^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:255)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:124)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:75)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:616)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:616)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:98)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.withLocalProperties(ExecuteStatement.scala:157)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.org$apache$kyuubi$engine$spark$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:92)
	at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:125)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748) (state=,code=0)
0: jdbc:hive2://10.242.189.214:10009/> set kyuubi.operation.language=scala;
2021-12-03 13:47:11.982 INFO operation.ExecuteStatement: Processing kent's query[e4e4dc55-4e05-460e-9d9d-86b2cf5c054b]: INITIALIZED_STATE -> PENDING_STATE, statement: set kyuubi.operation.language=scala
2021-12-03 13:47:11.985 INFO operation.ExecuteStatement: Processing kent's query[e4e4dc55-4e05-460e-9d9d-86b2cf5c054b]: PENDING_STATE -> RUNNING_STATE, statement: set kyuubi.operation.language=scala
2021-12-03 13:47:11.983 INFO operation.ExecuteStatement: Processing kent's query[2b6ec68c-2ae9-4d93-a35b-9e2319df4d4f]: INITIALIZED_STATE -> PENDING_STATE, statement: set kyuubi.operation.language=scala
2021-12-03 13:47:11.984 INFO operation.ExecuteStatement: Processing kent's query[2b6ec68c-2ae9-4d93-a35b-9e2319df4d4f]: PENDING_STATE -> RUNNING_STATE, statement: set kyuubi.operation.language=scala
2021-12-03 13:47:11.985 INFO operation.ExecuteStatement:
           Spark application name: kyuubi_USER_SPARK_SQL_kent_default_61cff9fb-7035-4435-b509-80c1730876ed
                 application ID: local-1638510289918
                 application web UI: http://10.242.189.214:65027
                 master: local[*]
                 deploy mode: client
                 version: 3.1.2
           Start time: 2021-12-03T13:44:49.313
           User: kent
2021-12-03 13:47:11.995 INFO operation.ExecuteStatement: Processing kent's query[2b6ec68c-2ae9-4d93-a35b-9e2319df4d4f]: RUNNING_STATE -> RUNNING_STATE, statement: set kyuubi.operation.language=scala
2021-12-03 13:47:11.995 INFO operation.ExecuteStatement: Execute in full collect mode
2021-12-03 13:47:12.006 INFO operation.ExecuteStatement: Processing kent's query[2b6ec68c-2ae9-4d93-a35b-9e2319df4d4f]: RUNNING_STATE -> FINISHED_STATE, statement: set kyuubi.operation.language=scala, time taken: 0.022 seconds
2021-12-03 13:47:12.007 INFO operation.ExecuteStatement: Query[e4e4dc55-4e05-460e-9d9d-86b2cf5c054b] in FINISHED_STATE
2021-12-03 13:47:12.007 INFO operation.ExecuteStatement: Processing kent's query[e4e4dc55-4e05-460e-9d9d-86b2cf5c054b]: RUNNING_STATE -> FINISHED_STATE, statement: set kyuubi.operation.language=scala, time taken: 0.022 seconds
+----------------------------+--------+
|            key             | value  |
+----------------------------+--------+
| kyuubi.operation.language  | scala  |
+----------------------------+--------+
1 row selected (0.052 seconds)
0: jdbc:hive2://10.242.189.214:10009/> spark.version;
2021-12-03 13:47:13.685 INFO operation.ExecuteStatement: Processing kent's query[178ff72d-b870-44d4-99d8-08a0fa0e8efa]: INITIALIZED_STATE -> PENDING_STATE, statement: spark.version
2021-12-03 13:47:15.541 INFO operation.ExecuteStatement: Processing kent's query[178ff72d-b870-44d4-99d8-08a0fa0e8efa]: PENDING_STATE -> RUNNING_STATE, statement: spark.version
2021-12-03 13:47:15.543 INFO operation.ExecuteStatement: Query[178ff72d-b870-44d4-99d8-08a0fa0e8efa] in FINISHED_STATE
2021-12-03 13:47:15.544 INFO operation.ExecuteStatement: Processing kent's query[178ff72d-b870-44d4-99d8-08a0fa0e8efa]: RUNNING_STATE -> FINISHED_STATE, statement: spark.version, time taken: 0.003 seconds
+-----------------------+
|        output         |
+-----------------------+
| res0: String = 3.1.2  |
+-----------------------+
1 row selected (1.871 seconds)
0: jdbc:hive2://10.242.189.214:10009/> spark.sql("select current_date()")
. . . . . . . . . . . . . . . . . . .> ;
2021-12-03 13:47:36.512 INFO operation.ExecuteStatement: Processing kent's query[602c2d88-7e8a-4175-9b53-e34adfff007a]: INITIALIZED_STATE -> PENDING_STATE, statement: spark.sql("select current_date()")
2021-12-03 13:47:36.689 INFO operation.ExecuteStatement: Processing kent's query[602c2d88-7e8a-4175-9b53-e34adfff007a]: PENDING_STATE -> RUNNING_STATE, statement: spark.sql("select current_date()")
2021-12-03 13:47:36.692 INFO operation.ExecuteStatement: Query[602c2d88-7e8a-4175-9b53-e34adfff007a] in FINISHED_STATE
2021-12-03 13:47:36.692 INFO operation.ExecuteStatement: Processing kent's query[602c2d88-7e8a-4175-9b53-e34adfff007a]: RUNNING_STATE -> FINISHED_STATE, statement: spark.sql("select current_date()"), time taken: 0.003 seconds
+----------------------------------------------------+
|                       output                       |
+----------------------------------------------------+
| res1: org.apache.spark.sql.DataFrame = [current_date(): date] |
+----------------------------------------------------+
1 row selected (0.187 seconds)
0: jdbc:hive2://10.242.189.214:10009/> results += spark.range(1, 5, 2, 3);
Error: Error operating EXECUTE_STATEMENT: org.apache.kyuubi.KyuubiSQLException: Interpret error:
results += spark.range(1, 5, 2, 3)
 <console>:26: error: type mismatch;
 found   : org.apache.spark.sql.Dataset[Long]
 required: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
       results += spark.range(1, 5, 2, 3)
                             ^
	at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69)
	at org.apache.kyuubi.engine.spark.operation.ExecuteScala.runInternal(ExecuteScala.scala:70)
	at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:130)
	at org.apache.kyuubi.session.AbstractSession.runOperation(AbstractSession.scala:93)
	at org.apache.kyuubi.engine.spark.session.SparkSessionImpl.runOperation(SparkSessionImpl.scala:62)
	at org.apache.kyuubi.session.AbstractSession.$anonfun$executeStatement$1(AbstractSession.scala:121)
	at org.apache.kyuubi.session.AbstractSession.withAcquireRelease(AbstractSession.scala:75)
	at org.apache.kyuubi.session.AbstractSession.executeStatement(AbstractSession.scala:118)
	at org.apache.kyuubi.service.AbstractBackendService.executeStatement(AbstractBackendService.scala:61)
	at org.apache.kyuubi.service.ThriftBinaryFrontendService.ExecuteStatement(ThriftBinaryFrontendService.scala:265)
	at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1557)
	at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542)
	at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38)
	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
	at org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36)
	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748) (state=,code=0)
0: jdbc:hive2://10.242.189.214:10009/> results += spark.range(1, 5, 2, 3).toDF;
2021-12-03 13:48:30.327 INFO operation.ExecuteStatement: Processing kent's query[daccd163-978a-4d66-af85-af466086d1db]: INITIALIZED_STATE -> PENDING_STATE, statement: results += spark.range(1, 5, 2, 3).toDF
2021-12-03 13:48:31.700 INFO operation.ExecuteStatement: Processing kent's query[daccd163-978a-4d66-af85-af466086d1db]: PENDING_STATE -> RUNNING_STATE, statement: results += spark.range(1, 5, 2, 3).toDF
2021-12-03 13:48:31.702 INFO operation.ExecuteStatement: Query[daccd163-978a-4d66-af85-af466086d1db] in FINISHED_STATE
2021-12-03 13:48:31.702 INFO operation.ExecuteStatement: Processing kent's query[daccd163-978a-4d66-af85-af466086d1db]: RUNNING_STATE -> FINISHED_STATE, statement: results += spark.range(1, 5, 2, 3).toDF, time taken: 0.003 seconds
+-----+
| id  |
+-----+
| 1   |
| 3   |
+-----+
2 rows selected (1.387 seconds)
```

** Session level isolated **

![image](https://user-images.githubusercontent.com/8326978/144553902-fc390c30-06de-453b-af3d-cf8def577aa9.png)

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1491 from yaooqinn/scala.

Closes #1490

af4d0a13 [Kent Yao] Merge branch 'master' into scala
2ebdf6a3 [Kent Yao] provided scala dep
d58b2bfe [Kent Yao] [KYUUBI #1490] Introduce the basic framework for running scala
d256bfde [Kent Yao] [KYUUBI #1490] Introduce the basic framework for running scala
52090434 [Kent Yao] init

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
Kent Yao 2021-12-06 11:17:59 +08:00
parent a876678a91
commit 97c3835738
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
22 changed files with 452 additions and 116 deletions

View File

@ -282,6 +282,7 @@ Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
kyuubi\.operation\.idle<br>\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT3H</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Operation will be closed when it's not accessed for this duration of time</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.operation<br>\.interrupt\.on\.cancel|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.operation<br>\.language|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SQL</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Choose a programing language for the following inputs <ul><li>SQL: (Default) Run all following statements as SQL queries.</li> <li>SCALA: Run all following input a scala codes</li></ul></div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.5.0</div>
kyuubi\.operation\.log<br>\.dir\.root|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>server_operation_logs</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Root directory for query operation log at server-side.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.operation\.plan<br>\.only\.mode|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>NONE</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Whether to perform the statement in a PARSE, ANALYZE, OPTIMIZE only way without executing the query. When it is NONE, the statement will be fully executed</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.operation<br>\.query\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Timeout for query executions at server-side, take affect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take fully control whether the query should timeout or not. If set, client-side timeout capped at this point. To cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.2.0</div>

View File

@ -50,6 +50,24 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>

View File

@ -24,6 +24,7 @@ import scala.util.control.NonFatal
import org.apache.spark.{ui, SparkConf}
import org.apache.spark.kyuubi.SparkSQLEngineListener
import org.apache.spark.repl.Main
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.{KyuubiException, Logging}
@ -81,6 +82,9 @@ object SparkSQLEngine extends Logging {
sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled", "true")
sparkConf.setIfMissing("spark.master", "local")
sparkConf.setIfMissing("spark.ui.port", "0")
// register the repl's output dir with the file server.
// see also `spark.repl.classdir`
sparkConf.set("spark.repl.class.outputDir", Main.outputDir.getAbsolutePath)
sparkConf.setIfMissing(
"spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads",
"20")

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.operation
import scala.tools.nsc.interpreter.Results.{Error, Incomplete, Success}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.spark.ArrayFetchIterator
import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.session.Session
/**
* Support executing Scala Script with or without common Spark APIs, only support running in sync
* mode, as an operation may [[Incomplete]] and wait for others to make [[Success]].
*
* [[KyuubiSparkILoop.results]] is exposed as a [[org.apache.spark.sql.DataFrame]] to users in repl
* to transfer result they wanted to client side.
*
* @param session parent session
* @param repl Scala Interpreter
* @param statement a scala code snippet
*/
class ExecuteScala(
session: Session,
repl: KyuubiSparkILoop,
override val statement: String)
extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) {
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
new StructType().add("output", "string")
} else {
result.schema
}
}
override protected def runInternal(): Unit = {
try {
spark.sparkContext.setJobGroup(statementId, statement)
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
repl.interpret(statement) match {
case Success =>
iter =
if (repl.results.nonEmpty) {
result = repl.results.remove(0)
new ArrayFetchIterator[Row](result.collect())
} else {
new ArrayFetchIterator[Row](Array(Row(repl.getOutput)))
}
case Error =>
throw KyuubiSQLException(s"Interpret error:\n$statement\n ${repl.getOutput}")
case Incomplete =>
throw KyuubiSQLException(s"Incomplete code:\n$statement")
}
} catch {
onError(cancel = true)
} finally {
spark.sparkContext.clearJobGroup()
}
}
}

View File

@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
import org.apache.spark.kyuubi.SQLOperationListener
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
@ -37,13 +36,12 @@ import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThreadUtils
class ExecuteStatement(
spark: SparkSession,
session: Session,
override protected val statement: String,
override val shouldRunAsync: Boolean,
queryTimeout: Long,
incrementalCollect: Boolean)
extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) with Logging {
extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging {
import org.apache.kyuubi.KyuubiSparkUtils._

View File

@ -17,7 +17,6 @@
package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@ -26,8 +25,8 @@ import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
import org.apache.kyuubi.session.Session
class GetCatalogs(spark: SparkSession, session: Session)
extends SparkOperation(spark, OperationType.GET_CATALOGS, session) {
class GetCatalogs(session: Session)
extends SparkOperation(OperationType.GET_CATALOGS, session) {
override protected def resultSchema: StructType = {
new StructType()

View File

@ -17,7 +17,6 @@
package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@ -27,13 +26,12 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetColumns(
spark: SparkSession,
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String)
extends SparkOperation(spark, OperationType.GET_COLUMNS, session) {
extends SparkOperation(OperationType.GET_COLUMNS, session) {
override def statement: String = {
super.statement +

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
import java.sql.DatabaseMetaData
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@ -28,12 +28,11 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetFunctions(
spark: SparkSession,
session: Session,
catalogName: String,
schemaName: String,
functionName: String)
extends SparkOperation(spark, OperationType.GET_FUNCTIONS, session) {
extends SparkOperation(OperationType.GET_FUNCTIONS, session) {
override def statement: String = {
super.statement +

View File

@ -17,7 +17,6 @@
package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@ -26,8 +25,8 @@ import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetSchemas(spark: SparkSession, session: Session, catalogName: String, schema: String)
extends SparkOperation(spark, OperationType.GET_SCHEMAS, session) {
class GetSchemas(session: Session, catalogName: String, schema: String)
extends SparkOperation(OperationType.GET_SCHEMAS, session) {
override def statement: String = {
super.statement + s" [catalog : $catalogName, schemaPattern : $schema]"

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@ -26,8 +26,8 @@ import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetTableTypes(spark: SparkSession, session: Session)
extends SparkOperation(spark, OperationType.GET_TABLE_TYPES, session) {
class GetTableTypes(session: Session)
extends SparkOperation(OperationType.GET_TABLE_TYPES, session) {
override protected def resultSchema: StructType = {
new StructType()
.add(TABLE_TYPE, "string", nullable = true, "Table type name.")

View File

@ -17,7 +17,6 @@
package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@ -27,13 +26,12 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetTables(
spark: SparkSession,
session: Session,
catalog: String,
schema: String,
tableName: String,
tableTypes: Set[String])
extends SparkOperation(spark, OperationType.GET_TABLES, session) {
extends SparkOperation(OperationType.GET_TABLES, session) {
override def statement: String = {
super.statement +

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
import java.sql.Types._
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@ -27,8 +27,8 @@ import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetTypeInfo(spark: SparkSession, session: Session)
extends SparkOperation(spark, OperationType.GET_TYPE_INFO, session) {
class GetTypeInfo(session: Session)
extends SparkOperation(OperationType.GET_TYPE_INFO, session) {
override protected def resultSchema: StructType = {
new StructType()
.add(TYPE_NAME, "string", nullable = false, "Type name")

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.StructType
@ -31,11 +31,10 @@ import org.apache.kyuubi.session.Session
* Perform the statement parsing, analyzing or optimizing only without executing it
*/
class PlanOnlyStatement(
spark: SparkSession,
session: Session,
override protected val statement: String,
mode: OperationMode)
extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) {
extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) {
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)

View File

@ -28,6 +28,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.engine.spark.FetchIterator
import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.operation.{AbstractOperation, OperationState}
import org.apache.kyuubi.operation.FetchOrientation._
import org.apache.kyuubi.operation.OperationState.OperationState
@ -36,9 +37,11 @@ import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.schema.{RowSet, SchemaHelper}
import org.apache.kyuubi.session.Session
abstract class SparkOperation(spark: SparkSession, opType: OperationType, session: Session)
abstract class SparkOperation(opType: OperationType, session: Session)
extends AbstractOperation(opType, session) {
protected val spark: SparkSession = session.asInstanceOf[SparkSessionImpl].spark
private val timeZone: ZoneId = {
spark.conf.getOption(TIMEZONE_KEY).map { timeZoneId =>
ZoneId.of(timeZoneId.replaceFirst("(\\+|\\-)(\\d):", "$10$2:"), ZoneId.SHORT_IDS)

View File

@ -22,11 +22,10 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_INCREMENTAL_COLLECT, OPERATION_PLAN_ONLY, OperationModes}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiConf.OperationModes._
import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
import org.apache.kyuubi.operation.{Operation, OperationManager}
import org.apache.kyuubi.session.{Session, SessionHandle}
@ -35,58 +34,50 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
def this() = this(classOf[SparkSQLOperationManager].getSimpleName)
private val sessionToSpark = new ConcurrentHashMap[SessionHandle, SparkSession]()
private def getSparkSession(sessionHandle: SessionHandle): SparkSession = {
val sparkSession = sessionToSpark.get(sessionHandle)
if (sparkSession == null) {
throw KyuubiSQLException(s"$sessionHandle has not been initialized or already been closed")
}
sparkSession
}
def setSparkSession(sessionHandle: SessionHandle, spark: SparkSession): Unit = {
sessionToSpark.put(sessionHandle, spark)
}
def removeSparkSession(sessionHandle: SessionHandle): SparkSession = {
sessionToSpark.remove(sessionHandle)
}
def getOpenSparkSessionCount: Int = sessionToSpark.size()
private lazy val operationModeDefault = getConf.get(OPERATION_PLAN_ONLY)
private lazy val operationIncrementalCollectDefault = getConf.get(OPERATION_INCREMENTAL_COLLECT)
private lazy val operationLanguageDefault = getConf.get(OPERATION_LANGUAGE)
private val sessionToRepl = new ConcurrentHashMap[SessionHandle, KyuubiSparkILoop]().asScala
def closeILoop(session: SessionHandle): Unit = {
val maybeRepl = sessionToRepl.remove(session)
maybeRepl.foreach(_.close())
}
override def newExecuteStatementOperation(
session: Session,
statement: String,
runAsync: Boolean,
queryTimeout: Long): Operation = {
val spark = getSparkSession(session.handle)
val operationModeStr =
spark.conf.get(OPERATION_PLAN_ONLY.key, operationModeDefault).toUpperCase(Locale.ROOT)
val incrementalCollect = spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key)
.map(_.toBoolean).getOrElse(operationIncrementalCollectDefault)
val operation = OperationModes.withName(operationModeStr) match {
case NONE =>
new ExecuteStatement(spark, session, statement, runAsync, queryTimeout, incrementalCollect)
case mode =>
new PlanOnlyStatement(spark, session, statement, mode)
}
val spark = session.asInstanceOf[SparkSessionImpl].spark
val lang = spark.conf.get(OPERATION_LANGUAGE.key, operationLanguageDefault)
val operation =
OperationLanguages.withName(lang.toUpperCase(Locale.ROOT)) match {
case OperationLanguages.SQL =>
val mode = spark.conf.get(OPERATION_PLAN_ONLY.key, operationModeDefault)
OperationModes.withName(mode.toUpperCase(Locale.ROOT)) match {
case NONE =>
val incrementalCollect = spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key)
.map(_.toBoolean).getOrElse(operationIncrementalCollectDefault)
new ExecuteStatement(session, statement, runAsync, queryTimeout, incrementalCollect)
case mode =>
new PlanOnlyStatement(session, statement, mode)
}
case OperationLanguages.SCALA =>
val repl = sessionToRepl.getOrElseUpdate(session.handle, KyuubiSparkILoop(spark))
new ExecuteScala(session, repl, statement)
}
addOperation(operation)
}
override def newGetTypeInfoOperation(session: Session): Operation = {
val spark = getSparkSession(session.handle)
val op = new GetTypeInfo(spark, session)
val op = new GetTypeInfo(session)
addOperation(op)
}
override def newGetCatalogsOperation(session: Session): Operation = {
val spark = getSparkSession(session.handle)
val op = new GetCatalogs(spark, session)
val op = new GetCatalogs(session)
addOperation(op)
}
@ -94,8 +85,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
session: Session,
catalog: String,
schema: String): Operation = {
val spark = getSparkSession(session.handle)
val op = new GetSchemas(spark, session, catalog, schema)
val op = new GetSchemas(session, catalog, schema)
addOperation(op)
}
@ -105,20 +95,18 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
schemaName: String,
tableName: String,
tableTypes: java.util.List[String]): Operation = {
val spark = getSparkSession(session.handle)
val tTypes =
if (tableTypes == null || tableTypes.isEmpty) {
SparkCatalogShim.sparkTableTypes
} else {
tableTypes.asScala.toSet
}
val op = new GetTables(spark, session, catalogName, schemaName, tableName, tTypes)
val op = new GetTables(session, catalogName, schemaName, tableName, tTypes)
addOperation(op)
}
override def newGetTableTypesOperation(session: Session): Operation = {
val spark = getSparkSession(session.handle)
val op = new GetTableTypes(spark, session)
val op = new GetTableTypes(session)
addOperation(op)
}
@ -128,8 +116,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
schemaName: String,
tableName: String,
columnName: String): Operation = {
val spark = getSparkSession(session.handle)
val op = new GetColumns(spark, session, catalogName, schemaName, tableName, columnName)
val op = new GetColumns(session, catalogName, schemaName, tableName, columnName)
addOperation(op)
}
@ -138,8 +125,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
catalogName: String,
schemaName: String,
functionName: String): Operation = {
val spark = getSparkSession(session.handle)
val op = new GetFunctions(spark, session, catalogName, schemaName, functionName)
val op = new GetFunctions(session, catalogName, schemaName, functionName)
addOperation(op)
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.repl
import java.io.{ByteArrayOutputStream, File}
import scala.collection.mutable.ArrayBuffer
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.JPrintWriter
import org.apache.spark.SparkContext
import org.apache.spark.repl.{Main, SparkILoop}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.util.MutableURLClassLoader
private[spark] case class KyuubiSparkILoop private (
spark: SparkSession,
output: ByteArrayOutputStream)
extends SparkILoop(None, new JPrintWriter(output)) {
// TODO: this is a little hacky
val results = new ArrayBuffer[Dataset[Row]]()
private def initialize(): Unit = {
settings = new Settings
val interpArguments = List(
"-Yrepl-class-based",
"-Yrepl-outdir",
s"${Main.outputDir.getAbsolutePath}")
settings.processArguments(interpArguments, processAll = true)
settings.usejavacp.value = true
val currentClassLoader = Thread.currentThread().getContextClassLoader
settings.embeddedDefaults(currentClassLoader)
this.createInterpreter()
this.initializeSynchronous()
try {
this.compilerClasspath
this.ensureClassLoader()
var classLoader = Thread.currentThread().getContextClassLoader
while (classLoader != null) {
classLoader match {
case loader: MutableURLClassLoader =>
val allJars = loader.getURLs.filter { u =>
val file = new File(u.getPath)
u.getProtocol == "file" && file.isFile &&
file.getName.contains("scala-lang_scala-reflect")
}
this.addUrlsToClassPath(allJars: _*)
classLoader = null
case _ =>
classLoader = classLoader.getParent
}
}
} finally {
Thread.currentThread().setContextClassLoader(currentClassLoader)
}
this.beQuietDuring {
// SparkSession/SparkContext and their implicits
this.bind("spark", classOf[SparkSession].getCanonicalName, spark, List("""@transient"""))
this.bind(
"sc",
classOf[SparkContext].getCanonicalName,
spark.sparkContext,
List("""@transient"""))
this.interpret("import org.apache.spark.SparkContext._")
this.interpret("import spark.implicits._")
this.interpret("import spark.sql")
this.interpret("import org.apache.spark.sql.functions._")
// for feeding results to client, e.g. beeline
this.bind(
"results",
"scala.collection.mutable.ArrayBuffer[" +
"org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]]",
results)
}
}
def getOutput: String = {
val res = output.toString.trim
output.reset()
res
}
}
private[spark] object KyuubiSparkILoop {
def apply(spark: SparkSession): KyuubiSparkILoop = {
val os = new ByteArrayOutputStream()
val iLoop = new KyuubiSparkILoop(spark, os)
iLoop.initialize()
iLoop
}
}

View File

@ -18,7 +18,7 @@
package org.apache.kyuubi.engine.spark.session
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
@ -26,7 +26,6 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.spark.SparkSQLEngine
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.engine.spark.udf.KDFRegistry
import org.apache.kyuubi.session._
/**
@ -58,60 +57,52 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
ipAddress: String,
conf: Map[String, String]): SessionHandle = {
info(s"Opening session for $user@$ipAddress")
val sessionImpl = new SparkSessionImpl(protocol, user, password, ipAddress, conf, this)
val handle = sessionImpl.handle
try {
val sparkSession =
val sparkSession =
try {
if (singleSparkSession) {
spark
} else {
val ss = spark.newSession()
this.conf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sqlStr =>
ss.sparkContext.setJobGroup(
handle.identifier.toString,
sqlStr,
interruptOnCancel = true)
ss.sparkContext.setJobDescription(sqlStr)
ss.sql(sqlStr).isEmpty
}
ss
}
sessionImpl.normalizedConf.foreach {
case ("use:database", database) => sparkSession.catalog.setCurrentDatabase(database)
case (key, value) => setModifiableConfig(sparkSession, key, value)
} catch {
case e: Exception => throw KyuubiSQLException(e)
}
sessionImpl.open()
KDFRegistry.registerAll(sparkSession)
operationManager.setSparkSession(handle, sparkSession)
setSession(handle, sessionImpl)
val session = new SparkSessionImpl(
protocol,
user,
password,
ipAddress,
conf,
this,
sparkSession)
try {
val handle = session.handle
session.open()
setSession(handle, session)
info(s"$user's session with $handle is opened, current opening sessions" +
s" $getOpenSessionCount")
handle
} catch {
case e: Exception =>
sessionImpl.close()
session.close()
throw KyuubiSQLException(e)
}
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
super.closeSession(sessionHandle)
operationManager.removeSparkSession(sessionHandle)
if (conf.get(ENGINE_SHARE_LEVEL) == ShareLevel.CONNECTION.toString) {
info("Session stopped due to shared level is Connection.")
stopSession()
}
}
private def setModifiableConfig(spark: SparkSession, key: String, value: String): Unit = {
try {
spark.conf.set(key, value)
} catch {
case e: AnalysisException =>
warn(e.getMessage())
}
}
private def stopSession(): Unit = {
SparkSQLEngine.currentEngine.foreach(_.stop())
}

View File

@ -18,8 +18,11 @@
package org.apache.kyuubi.engine.spark.session
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.kyuubi.engine.spark.events.{EventLoggingService, SessionEvent}
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.engine.spark.udf.KDFRegistry
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
@ -29,13 +32,27 @@ class SparkSessionImpl(
password: String,
ipAddress: String,
conf: Map[String, String],
sessionManager: SessionManager)
sessionManager: SessionManager,
val spark: SparkSession)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
override val handle: SessionHandle = SessionHandle(protocol)
private def setModifiableConfig(key: String, value: String): Unit = {
try {
spark.conf.set(key, value)
} catch {
case e: AnalysisException => warn(e.getMessage())
}
}
private val sessionEvent = SessionEvent(this)
override def open(): Unit = {
normalizedConf.foreach {
case ("use:database", database) => spark.catalog.setCurrentDatabase(database)
case (key, value) => setModifiableConfig(key, value)
}
KDFRegistry.registerAll(spark)
EventLoggingService.onEvent(sessionEvent)
super.open()
}
@ -49,6 +66,7 @@ class SparkSessionImpl(
sessionEvent.endTime = System.currentTimeMillis()
EventLoggingService.onEvent(sessionEvent)
super.close()
sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closeILoop(handle)
}
}

View File

@ -512,15 +512,12 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
withThriftClient { client =>
val operationManager =
engine.backendService.sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager]
assert(operationManager.getOpenSparkSessionCount === 0)
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val tOpenSessionResp = client.OpenSession(req)
assert(operationManager.getOpenSparkSessionCount === 1)
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setRunAsync(true)
@ -546,8 +543,6 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
val tCloseSessionResp = client.CloseSession(tCloseSessionReq)
assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
assert(operationManager.getOpenSparkSessionCount === 0)
}
}

View File

@ -982,10 +982,10 @@ object KyuubiConf {
.doc("A comma separated list of engine history loggers, where engine/session/operation etc" +
" events go. We use spark logger by default.<ul>" +
" <li>SPARK: the events will be written to the spark listener bus.</li>" +
s" <li>JSON: the events will be written to the location of" +
" <li>JSON: the events will be written to the location of" +
s" ${ENGINE_EVENT_JSON_LOG_PATH.key}</li>" +
s" <li>JDBC: to be done</li>" +
s" <li>CUSTOM: to be done.</li></ul>")
" <li>JDBC: to be done</li>" +
" <li>CUSTOM: to be done.</li></ul>")
.version("1.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
@ -1048,4 +1048,20 @@ object KyuubiConf {
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(OperationModes.values.map(_.toString))
.createWithDefault(OperationModes.NONE.toString)
object OperationLanguages extends Enumeration {
type OperationLanguage = Value
val SQL, SCALA = Value
}
val OPERATION_LANGUAGE: ConfigEntry[String] =
buildConf("operation.language")
.doc("Choose a programing language for the following inputs" +
" <ul><li>SQL: (Default) Run all following statements as SQL queries.</li>" +
" <li>SCALA: Run all following input a scala codes</li></ul>")
.version("1.5.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(OperationLanguages.values.map(_.toString))
.createWithDefault(OperationLanguages.SQL.toString)
}

View File

@ -412,4 +412,110 @@ trait SparkQueryTests extends HiveJDBCTestHelper {
}
}
}
test("execute simple scala code") {
withJdbcStatement() { statement =>
statement.execute("SET kyuubi.operation.language=scala")
val rs = statement.executeQuery("spark.version")
rs.next()
// scala repl will return resX = YYYYY, and here we only check YYYYY
val sparkVer = rs.getString(1).split("=")(1).trim
assert("\\d\\.\\d\\.\\d".r.pattern.matcher(sparkVer).matches())
assert(rs.getMetaData.getColumnName(1) === "output")
}
}
test("execute simple scala code with result returned") {
withJdbcStatement() { statement =>
statement.execute("SET kyuubi.operation.language=scala")
val code =
"""
|val df = spark
| .range(0, 10, 2, 1)
| .toDF
|""".stripMargin
val rs1 = statement.executeQuery(code)
rs1.next()
assert(rs1.getString(1) startsWith "df: org.apache.spark.sql.DataFrame")
// continue
val rs2 = statement.executeQuery("df.count()")
rs2.next()
assert(rs2.getString(1).endsWith("5"))
// continue
val rs3 = statement.executeQuery("results += df")
for (i <- Range(0, 10, 2)) {
assert(rs3.next)
assert(rs3.getInt(1) === i)
}
// switch to sql
val set =
"""
|spark.conf.set("kyuubi.operation.language", "SQL")
|""".stripMargin
val t = statement.executeQuery(set)
t.next()
val rs4 = statement.executeQuery("select 12345")
assert(rs4.next())
assert(rs4.getInt(1) === 12345)
// switch to scala again
statement.execute("SET kyuubi.operation.language=scala")
val code2 =
"""
|/* this
| * is
| * a
| * multi-line comments
| */
|val df = spark
| .range(0, 10, 2, 1)
| .map(x => (x, x + 1, x * 2)) // this is a single-line comment
| .toDF
|""".stripMargin
val rs5 = statement.executeQuery(code2)
rs5.next()
assert(rs5.getString(1) startsWith "df: org.apache.spark.sql.DataFrame")
// re-assign
val rs6 = statement.executeQuery("results += df")
for (i <- Range(0, 10, 2)) {
assert(rs6.next)
assert(rs6.getInt(2) === i + 1)
}
}
}
test("incomplete scala code block will fail") {
withJdbcStatement() { statement =>
statement.execute("SET kyuubi.operation.language=scala")
// incomplete code block
val incompleteCode =
"""
|val df = spark
| .range(0, 10, 2, 1)
| .map {
| x => (x, x + 1, x * 2)
|""".stripMargin
val e = intercept[SQLException](statement.executeQuery(incompleteCode))
assert(e.getMessage contains "Incomplete code:")
}
}
test("scala code compile error will fail") {
withJdbcStatement() { statement =>
statement.execute("SET kyuubi.operation.language=scala")
// incomplete code block
val incompleteCode =
"""
|val df = spark
| .range(0, 10, 2, 1)
| .map { x => (x, x + 1, y * 2) } // y is missing
|""".stripMargin
val e = intercept[SQLException](statement.executeQuery(incompleteCode))
assert(e.getMessage contains "not found: value y")
}
}
}

18
pom.xml
View File

@ -248,6 +248,12 @@
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
@ -336,6 +342,18 @@
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>