From 97c3835738cf5c4b86eed72443fcfd9ea32f228e Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 6 Dec 2021 11:17:59 +0800 Subject: [PATCH] [KYUUBI #1490] Introduce the basic framework for running scala ### _Why are the changes needed?_ Introduce the basic framework for running scala, see #1490 for the detail ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate ``` Beeline version 1.5.0-SNAPSHOT by Apache Kyuubi (Incubating) 0: jdbc:hive2://10.242.189.214:10009/> spark.version; 2021-12-03 13:47:07.556 INFO operation.ExecuteStatement: Processing kent's query[08b8b6da-d434-4296-b613-2027e3518441]: INITIALIZED_STATE -> PENDING_STATE, statement: spark.version 2021-12-03 13:47:07.560 INFO operation.ExecuteStatement: Processing kent's query[08b8b6da-d434-4296-b613-2027e3518441]: PENDING_STATE -> RUNNING_STATE, statement: spark.version 2021-12-03 13:47:07.558 INFO operation.ExecuteStatement: Processing kent's query[321dc15d-68d0-4f91-9216-1e08f09842df]: INITIALIZED_STATE -> PENDING_STATE, statement: spark.version 2021-12-03 13:47:07.559 INFO operation.ExecuteStatement: Processing kent's query[321dc15d-68d0-4f91-9216-1e08f09842df]: PENDING_STATE -> RUNNING_STATE, statement: spark.version 2021-12-03 13:47:07.560 INFO operation.ExecuteStatement: Spark application name: kyuubi_USER_SPARK_SQL_kent_default_61cff9fb-7035-4435-b509-80c1730876ed application ID: local-1638510289918 application web UI: http://10.242.189.214:65027 master: local[*] deploy mode: client version: 3.1.2 Start time: 2021-12-03T13:44:49.313 User: kent 2021-12-03 13:47:07.562 INFO scheduler.DAGScheduler: Asked to cancel job group 321dc15d-68d0-4f91-9216-1e08f09842df 2021-12-03 13:47:07.565 INFO operation.ExecuteStatement: Processing kent's query[321dc15d-68d0-4f91-9216-1e08f09842df]: RUNNING_STATE -> ERROR_STATE, statement: spark.version, time taken: 0.006 seconds 2021-12-03 13:47:07.565 ERROR operation.ExecuteStatement: Error operating EXECUTE_STATEMENT: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'spark' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == spark.version ^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:255) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:124) at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:75) at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:616) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:616) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:98) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.withLocalProperties(ExecuteStatement.scala:157) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.org$apache$kyuubi$engine$spark$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:92) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:125) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'spark' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == spark.version ^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:255) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:124) at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:75) at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:616) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:616) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:98) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.withLocalProperties(ExecuteStatement.scala:157) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.org$apache$kyuubi$engine$spark$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:92) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:125) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2021-12-03 13:47:07.569 INFO operation.ExecuteStatement: Query[08b8b6da-d434-4296-b613-2027e3518441] in ERROR_STATE 2021-12-03 13:47:07.569 INFO operation.ExecuteStatement: Processing kent's query[08b8b6da-d434-4296-b613-2027e3518441]: RUNNING_STATE -> ERROR_STATE, statement: spark.version, time taken: 0.009 seconds Error: Error operating EXECUTE_STATEMENT: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'spark' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0) == SQL == spark.version ^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:255) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:124) at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:75) at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:616) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:616) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.$anonfun$executeStatement$1(ExecuteStatement.scala:98) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.withLocalProperties(ExecuteStatement.scala:157) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement.org$apache$kyuubi$engine$spark$operation$ExecuteStatement$$executeStatement(ExecuteStatement.scala:92) at org.apache.kyuubi.engine.spark.operation.ExecuteStatement$$anon$1.run(ExecuteStatement.scala:125) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) (state=,code=0) 0: jdbc:hive2://10.242.189.214:10009/> set kyuubi.operation.language=scala; 2021-12-03 13:47:11.982 INFO operation.ExecuteStatement: Processing kent's query[e4e4dc55-4e05-460e-9d9d-86b2cf5c054b]: INITIALIZED_STATE -> PENDING_STATE, statement: set kyuubi.operation.language=scala 2021-12-03 13:47:11.985 INFO operation.ExecuteStatement: Processing kent's query[e4e4dc55-4e05-460e-9d9d-86b2cf5c054b]: PENDING_STATE -> RUNNING_STATE, statement: set kyuubi.operation.language=scala 2021-12-03 13:47:11.983 INFO operation.ExecuteStatement: Processing kent's query[2b6ec68c-2ae9-4d93-a35b-9e2319df4d4f]: INITIALIZED_STATE -> PENDING_STATE, statement: set kyuubi.operation.language=scala 2021-12-03 13:47:11.984 INFO operation.ExecuteStatement: Processing kent's query[2b6ec68c-2ae9-4d93-a35b-9e2319df4d4f]: PENDING_STATE -> RUNNING_STATE, statement: set kyuubi.operation.language=scala 2021-12-03 13:47:11.985 INFO operation.ExecuteStatement: Spark application name: kyuubi_USER_SPARK_SQL_kent_default_61cff9fb-7035-4435-b509-80c1730876ed application ID: local-1638510289918 application web UI: http://10.242.189.214:65027 master: local[*] deploy mode: client version: 3.1.2 Start time: 2021-12-03T13:44:49.313 User: kent 2021-12-03 13:47:11.995 INFO operation.ExecuteStatement: Processing kent's query[2b6ec68c-2ae9-4d93-a35b-9e2319df4d4f]: RUNNING_STATE -> RUNNING_STATE, statement: set kyuubi.operation.language=scala 2021-12-03 13:47:11.995 INFO operation.ExecuteStatement: Execute in full collect mode 2021-12-03 13:47:12.006 INFO operation.ExecuteStatement: Processing kent's query[2b6ec68c-2ae9-4d93-a35b-9e2319df4d4f]: RUNNING_STATE -> FINISHED_STATE, statement: set kyuubi.operation.language=scala, time taken: 0.022 seconds 2021-12-03 13:47:12.007 INFO operation.ExecuteStatement: Query[e4e4dc55-4e05-460e-9d9d-86b2cf5c054b] in FINISHED_STATE 2021-12-03 13:47:12.007 INFO operation.ExecuteStatement: Processing kent's query[e4e4dc55-4e05-460e-9d9d-86b2cf5c054b]: RUNNING_STATE -> FINISHED_STATE, statement: set kyuubi.operation.language=scala, time taken: 0.022 seconds +----------------------------+--------+ | key | value | +----------------------------+--------+ | kyuubi.operation.language | scala | +----------------------------+--------+ 1 row selected (0.052 seconds) 0: jdbc:hive2://10.242.189.214:10009/> spark.version; 2021-12-03 13:47:13.685 INFO operation.ExecuteStatement: Processing kent's query[178ff72d-b870-44d4-99d8-08a0fa0e8efa]: INITIALIZED_STATE -> PENDING_STATE, statement: spark.version 2021-12-03 13:47:15.541 INFO operation.ExecuteStatement: Processing kent's query[178ff72d-b870-44d4-99d8-08a0fa0e8efa]: PENDING_STATE -> RUNNING_STATE, statement: spark.version 2021-12-03 13:47:15.543 INFO operation.ExecuteStatement: Query[178ff72d-b870-44d4-99d8-08a0fa0e8efa] in FINISHED_STATE 2021-12-03 13:47:15.544 INFO operation.ExecuteStatement: Processing kent's query[178ff72d-b870-44d4-99d8-08a0fa0e8efa]: RUNNING_STATE -> FINISHED_STATE, statement: spark.version, time taken: 0.003 seconds +-----------------------+ | output | +-----------------------+ | res0: String = 3.1.2 | +-----------------------+ 1 row selected (1.871 seconds) 0: jdbc:hive2://10.242.189.214:10009/> spark.sql("select current_date()") . . . . . . . . . . . . . . . . . . .> ; 2021-12-03 13:47:36.512 INFO operation.ExecuteStatement: Processing kent's query[602c2d88-7e8a-4175-9b53-e34adfff007a]: INITIALIZED_STATE -> PENDING_STATE, statement: spark.sql("select current_date()") 2021-12-03 13:47:36.689 INFO operation.ExecuteStatement: Processing kent's query[602c2d88-7e8a-4175-9b53-e34adfff007a]: PENDING_STATE -> RUNNING_STATE, statement: spark.sql("select current_date()") 2021-12-03 13:47:36.692 INFO operation.ExecuteStatement: Query[602c2d88-7e8a-4175-9b53-e34adfff007a] in FINISHED_STATE 2021-12-03 13:47:36.692 INFO operation.ExecuteStatement: Processing kent's query[602c2d88-7e8a-4175-9b53-e34adfff007a]: RUNNING_STATE -> FINISHED_STATE, statement: spark.sql("select current_date()"), time taken: 0.003 seconds +----------------------------------------------------+ | output | +----------------------------------------------------+ | res1: org.apache.spark.sql.DataFrame = [current_date(): date] | +----------------------------------------------------+ 1 row selected (0.187 seconds) 0: jdbc:hive2://10.242.189.214:10009/> results += spark.range(1, 5, 2, 3); Error: Error operating EXECUTE_STATEMENT: org.apache.kyuubi.KyuubiSQLException: Interpret error: results += spark.range(1, 5, 2, 3) :26: error: type mismatch; found : org.apache.spark.sql.Dataset[Long] required: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] results += spark.range(1, 5, 2, 3) ^ at org.apache.kyuubi.KyuubiSQLException$.apply(KyuubiSQLException.scala:69) at org.apache.kyuubi.engine.spark.operation.ExecuteScala.runInternal(ExecuteScala.scala:70) at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:130) at org.apache.kyuubi.session.AbstractSession.runOperation(AbstractSession.scala:93) at org.apache.kyuubi.engine.spark.session.SparkSessionImpl.runOperation(SparkSessionImpl.scala:62) at org.apache.kyuubi.session.AbstractSession.$anonfun$executeStatement$1(AbstractSession.scala:121) at org.apache.kyuubi.session.AbstractSession.withAcquireRelease(AbstractSession.scala:75) at org.apache.kyuubi.session.AbstractSession.executeStatement(AbstractSession.scala:118) at org.apache.kyuubi.service.AbstractBackendService.executeStatement(AbstractBackendService.scala:61) at org.apache.kyuubi.service.ThriftBinaryFrontendService.ExecuteStatement(ThriftBinaryFrontendService.scala:265) at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1557) at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:38) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) at org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:310) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) (state=,code=0) 0: jdbc:hive2://10.242.189.214:10009/> results += spark.range(1, 5, 2, 3).toDF; 2021-12-03 13:48:30.327 INFO operation.ExecuteStatement: Processing kent's query[daccd163-978a-4d66-af85-af466086d1db]: INITIALIZED_STATE -> PENDING_STATE, statement: results += spark.range(1, 5, 2, 3).toDF 2021-12-03 13:48:31.700 INFO operation.ExecuteStatement: Processing kent's query[daccd163-978a-4d66-af85-af466086d1db]: PENDING_STATE -> RUNNING_STATE, statement: results += spark.range(1, 5, 2, 3).toDF 2021-12-03 13:48:31.702 INFO operation.ExecuteStatement: Query[daccd163-978a-4d66-af85-af466086d1db] in FINISHED_STATE 2021-12-03 13:48:31.702 INFO operation.ExecuteStatement: Processing kent's query[daccd163-978a-4d66-af85-af466086d1db]: RUNNING_STATE -> FINISHED_STATE, statement: results += spark.range(1, 5, 2, 3).toDF, time taken: 0.003 seconds +-----+ | id | +-----+ | 1 | | 3 | +-----+ 2 rows selected (1.387 seconds) ``` ** Session level isolated ** ![image](https://user-images.githubusercontent.com/8326978/144553902-fc390c30-06de-453b-af3d-cf8def577aa9.png) - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1491 from yaooqinn/scala. Closes #1490 af4d0a13 [Kent Yao] Merge branch 'master' into scala 2ebdf6a3 [Kent Yao] provided scala dep d58b2bfe [Kent Yao] [KYUUBI #1490] Introduce the basic framework for running scala d256bfde [Kent Yao] [KYUUBI #1490] Introduce the basic framework for running scala 52090434 [Kent Yao] init Authored-by: Kent Yao Signed-off-by: Kent Yao --- docs/deployment/settings.md | 1 + externals/kyuubi-spark-sql-engine/pom.xml | 18 +++ .../kyuubi/engine/spark/SparkSQLEngine.scala | 4 + .../engine/spark/operation/ExecuteScala.scala | 80 +++++++++++++ .../spark/operation/ExecuteStatement.scala | 4 +- .../engine/spark/operation/GetCatalogs.scala | 5 +- .../engine/spark/operation/GetColumns.scala | 4 +- .../engine/spark/operation/GetFunctions.scala | 5 +- .../engine/spark/operation/GetSchemas.scala | 5 +- .../spark/operation/GetTableTypes.scala | 6 +- .../engine/spark/operation/GetTables.scala | 4 +- .../engine/spark/operation/GetTypeInfo.scala | 6 +- .../spark/operation/PlanOnlyStatement.scala | 5 +- .../spark/operation/SparkOperation.scala | 5 +- .../operation/SparkSQLOperationManager.scala | 86 ++++++-------- .../engine/spark/repl/KyuubiSparkILoop.scala | 110 ++++++++++++++++++ .../session/SparkSQLSessionManager.scala | 49 ++++---- .../spark/session/SparkSessionImpl.scala | 20 +++- .../spark/operation/SparkOperationSuite.scala | 5 - .../org/apache/kyuubi/config/KyuubiConf.scala | 22 +++- .../kyuubi/operation/SparkQueryTests.scala | 106 +++++++++++++++++ pom.xml | 18 +++ 22 files changed, 452 insertions(+), 116 deletions(-) create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index 86790b5ef..1a4b807b5 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -282,6 +282,7 @@ Key | Default | Meaning | Type | Since --- | --- | --- | --- | --- kyuubi\.operation\.idle
\.timeout|
PT3H
|
Operation will be closed when it's not accessed for this duration of time
|
duration
|
1.0.0
kyuubi\.operation
\.interrupt\.on\.cancel|
true
|
When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.
|
boolean
|
1.2.0
+kyuubi\.operation
\.language|
SQL
|
Choose a programing language for the following inputs
  • SQL: (Default) Run all following statements as SQL queries.
  • SCALA: Run all following input a scala codes
|
string
|
1.5.0
kyuubi\.operation\.log
\.dir\.root|
server_operation_logs
|
Root directory for query operation log at server-side.
|
string
|
1.4.0
kyuubi\.operation\.plan
\.only\.mode|
NONE
|
Whether to perform the statement in a PARSE, ANALYZE, OPTIMIZE only way without executing the query. When it is NONE, the statement will be fully executed
|
string
|
1.4.0
kyuubi\.operation
\.query\.timeout|
<undefined>
|
Timeout for query executions at server-side, take affect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take fully control whether the query should timeout or not. If set, client-side timeout capped at this point. To cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.
|
duration
|
1.2.0
diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml index f05b5c6db..360b01fe6 100644 --- a/externals/kyuubi-spark-sql-engine/pom.xml +++ b/externals/kyuubi-spark-sql-engine/pom.xml @@ -50,6 +50,24 @@ provided + + org.apache.spark + spark-repl_${scala.binary.version} + provided + + + + org.scala-lang + scala-compiler + provided + + + + org.scala-lang + scala-reflect + provided + + org.apache.hadoop hadoop-client-api diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 8a5b95177..57bc67ffd 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -24,6 +24,7 @@ import scala.util.control.NonFatal import org.apache.spark.{ui, SparkConf} import org.apache.spark.kyuubi.SparkSQLEngineListener +import org.apache.spark.repl.Main import org.apache.spark.sql.SparkSession import org.apache.kyuubi.{KyuubiException, Logging} @@ -81,6 +82,9 @@ object SparkSQLEngine extends Logging { sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled", "true") sparkConf.setIfMissing("spark.master", "local") sparkConf.setIfMissing("spark.ui.port", "0") + // register the repl's output dir with the file server. + // see also `spark.repl.classdir` + sparkConf.set("spark.repl.class.outputDir", Main.outputDir.getAbsolutePath) sparkConf.setIfMissing( "spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads", "20") diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala new file mode 100644 index 000000000..9e8ca37fc --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.operation + +import scala.tools.nsc.interpreter.Results.{Error, Incomplete, Success} + +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.StructType + +import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.engine.spark.ArrayFetchIterator +import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop +import org.apache.kyuubi.operation.OperationType +import org.apache.kyuubi.session.Session + +/** + * Support executing Scala Script with or without common Spark APIs, only support running in sync + * mode, as an operation may [[Incomplete]] and wait for others to make [[Success]]. + * + * [[KyuubiSparkILoop.results]] is exposed as a [[org.apache.spark.sql.DataFrame]] to users in repl + * to transfer result they wanted to client side. + * + * @param session parent session + * @param repl Scala Interpreter + * @param statement a scala code snippet + */ +class ExecuteScala( + session: Session, + repl: KyuubiSparkILoop, + override val statement: String) + extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) { + + override protected def resultSchema: StructType = { + if (result == null || result.schema.isEmpty) { + new StructType().add("output", "string") + } else { + result.schema + } + } + + override protected def runInternal(): Unit = { + try { + spark.sparkContext.setJobGroup(statementId, statement) + Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader) + repl.interpret(statement) match { + case Success => + iter = + if (repl.results.nonEmpty) { + result = repl.results.remove(0) + new ArrayFetchIterator[Row](result.collect()) + } else { + new ArrayFetchIterator[Row](Array(Row(repl.getOutput))) + } + case Error => + throw KyuubiSQLException(s"Interpret error:\n$statement\n ${repl.getOutput}") + case Incomplete => + throw KyuubiSQLException(s"Incomplete code:\n$statement") + } + } catch { + onError(cancel = true) + } finally { + spark.sparkContext.clearJobGroup() + } + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 7022217f8..f43983382 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._ import org.apache.spark.kyuubi.SQLOperationListener import org.apache.spark.sql.Row -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ import org.apache.kyuubi.{KyuubiSQLException, Logging} @@ -37,13 +36,12 @@ import org.apache.kyuubi.session.Session import org.apache.kyuubi.util.ThreadUtils class ExecuteStatement( - spark: SparkSession, session: Session, override protected val statement: String, override val shouldRunAsync: Boolean, queryTimeout: Long, incrementalCollect: Boolean) - extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) with Logging { + extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging { import org.apache.kyuubi.KyuubiSparkUtils._ diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala index 86c6ee522..cbd3fdcc9 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala @@ -17,7 +17,6 @@ package org.apache.kyuubi.engine.spark.operation -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType import org.apache.kyuubi.engine.spark.IterableFetchIterator @@ -26,8 +25,8 @@ import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT import org.apache.kyuubi.session.Session -class GetCatalogs(spark: SparkSession, session: Session) - extends SparkOperation(spark, OperationType.GET_CATALOGS, session) { +class GetCatalogs(session: Session) + extends SparkOperation(OperationType.GET_CATALOGS, session) { override protected def resultSchema: StructType = { new StructType() diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala index d02c0d501..471304c0a 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala @@ -17,7 +17,6 @@ package org.apache.kyuubi.engine.spark.operation -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ import org.apache.kyuubi.engine.spark.IterableFetchIterator @@ -27,13 +26,12 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ import org.apache.kyuubi.session.Session class GetColumns( - spark: SparkSession, session: Session, catalogName: String, schemaName: String, tableName: String, columnName: String) - extends SparkOperation(spark, OperationType.GET_COLUMNS, session) { + extends SparkOperation(OperationType.GET_COLUMNS, session) { override def statement: String = { super.statement + diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetFunctions.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetFunctions.scala index 2c424d255..f7ff1fb14 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetFunctions.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetFunctions.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation import java.sql.DatabaseMetaData -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.apache.kyuubi.engine.spark.IterableFetchIterator @@ -28,12 +28,11 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ import org.apache.kyuubi.session.Session class GetFunctions( - spark: SparkSession, session: Session, catalogName: String, schemaName: String, functionName: String) - extends SparkOperation(spark, OperationType.GET_FUNCTIONS, session) { + extends SparkOperation(OperationType.GET_FUNCTIONS, session) { override def statement: String = { super.statement + diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala index ce312f949..0ac7abbb3 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala @@ -17,7 +17,6 @@ package org.apache.kyuubi.engine.spark.operation -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType import org.apache.kyuubi.engine.spark.IterableFetchIterator @@ -26,8 +25,8 @@ import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ import org.apache.kyuubi.session.Session -class GetSchemas(spark: SparkSession, session: Session, catalogName: String, schema: String) - extends SparkOperation(spark, OperationType.GET_SCHEMAS, session) { +class GetSchemas(session: Session, catalogName: String, schema: String) + extends SparkOperation(OperationType.GET_SCHEMAS, session) { override def statement: String = { super.statement + s" [catalog : $catalogName, schemaPattern : $schema]" diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala index 50e8edf48..6ad3a6cf4 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.engine.spark.operation -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.apache.kyuubi.engine.spark.IterableFetchIterator @@ -26,8 +26,8 @@ import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ import org.apache.kyuubi.session.Session -class GetTableTypes(spark: SparkSession, session: Session) - extends SparkOperation(spark, OperationType.GET_TABLE_TYPES, session) { +class GetTableTypes(session: Session) + extends SparkOperation(OperationType.GET_TABLE_TYPES, session) { override protected def resultSchema: StructType = { new StructType() .add(TABLE_TYPE, "string", nullable = true, "Table type name.") diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala index 733877192..fb57367a5 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala @@ -17,7 +17,6 @@ package org.apache.kyuubi.engine.spark.operation -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType import org.apache.kyuubi.engine.spark.IterableFetchIterator @@ -27,13 +26,12 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ import org.apache.kyuubi.session.Session class GetTables( - spark: SparkSession, session: Session, catalog: String, schema: String, tableName: String, tableTypes: Set[String]) - extends SparkOperation(spark, OperationType.GET_TABLES, session) { + extends SparkOperation(OperationType.GET_TABLES, session) { override def statement: String = { super.statement + diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala index 210976a3b..5d0ad510e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation import java.sql.Types._ -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.apache.kyuubi.engine.spark.IterableFetchIterator @@ -27,8 +27,8 @@ import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ import org.apache.kyuubi.session.Session -class GetTypeInfo(spark: SparkSession, session: Session) - extends SparkOperation(spark, OperationType.GET_TYPE_INFO, session) { +class GetTypeInfo(session: Session) + extends SparkOperation(OperationType.GET_TYPE_INFO, session) { override protected def resultSchema: StructType = { new StructType() .add(TYPE_NAME, "string", nullable = false, "Type name") diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala index cf1fdce47..e98c8b28c 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.engine.spark.operation -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.StructType @@ -31,11 +31,10 @@ import org.apache.kyuubi.session.Session * Perform the statement parsing, analyzing or optimizing only without executing it */ class PlanOnlyStatement( - spark: SparkSession, session: Session, override protected val statement: String, mode: OperationMode) - extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) { + extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) { private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle) override def getOperationLog: Option[OperationLog] = Option(operationLog) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index 028329f48..a6238824f 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.types.StructType import org.apache.kyuubi.{KyuubiSQLException, Utils} import org.apache.kyuubi.engine.spark.FetchIterator import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY +import org.apache.kyuubi.engine.spark.session.SparkSessionImpl import org.apache.kyuubi.operation.{AbstractOperation, OperationState} import org.apache.kyuubi.operation.FetchOrientation._ import org.apache.kyuubi.operation.OperationState.OperationState @@ -36,9 +37,11 @@ import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.schema.{RowSet, SchemaHelper} import org.apache.kyuubi.session.Session -abstract class SparkOperation(spark: SparkSession, opType: OperationType, session: Session) +abstract class SparkOperation(opType: OperationType, session: Session) extends AbstractOperation(opType, session) { + protected val spark: SparkSession = session.asInstanceOf[SparkSessionImpl].spark + private val timeZone: ZoneId = { spark.conf.getOption(TIMEZONE_KEY).map { timeZoneId => ZoneId.of(timeZoneId.replaceFirst("(\\+|\\-)(\\d):", "$10$2:"), ZoneId.SHORT_IDS) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala index ac64d60db..58b539c7e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala @@ -22,11 +22,10 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import org.apache.spark.sql.SparkSession - -import org.apache.kyuubi.KyuubiSQLException -import org.apache.kyuubi.config.KyuubiConf.{OPERATION_INCREMENTAL_COLLECT, OPERATION_PLAN_ONLY, OperationModes} +import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiConf.OperationModes._ +import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop +import org.apache.kyuubi.engine.spark.session.SparkSessionImpl import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim import org.apache.kyuubi.operation.{Operation, OperationManager} import org.apache.kyuubi.session.{Session, SessionHandle} @@ -35,58 +34,50 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n def this() = this(classOf[SparkSQLOperationManager].getSimpleName) - private val sessionToSpark = new ConcurrentHashMap[SessionHandle, SparkSession]() - - private def getSparkSession(sessionHandle: SessionHandle): SparkSession = { - val sparkSession = sessionToSpark.get(sessionHandle) - if (sparkSession == null) { - throw KyuubiSQLException(s"$sessionHandle has not been initialized or already been closed") - } - sparkSession - } - - def setSparkSession(sessionHandle: SessionHandle, spark: SparkSession): Unit = { - sessionToSpark.put(sessionHandle, spark) - } - - def removeSparkSession(sessionHandle: SessionHandle): SparkSession = { - sessionToSpark.remove(sessionHandle) - } - - def getOpenSparkSessionCount: Int = sessionToSpark.size() - private lazy val operationModeDefault = getConf.get(OPERATION_PLAN_ONLY) private lazy val operationIncrementalCollectDefault = getConf.get(OPERATION_INCREMENTAL_COLLECT) + private lazy val operationLanguageDefault = getConf.get(OPERATION_LANGUAGE) + + private val sessionToRepl = new ConcurrentHashMap[SessionHandle, KyuubiSparkILoop]().asScala + + def closeILoop(session: SessionHandle): Unit = { + val maybeRepl = sessionToRepl.remove(session) + maybeRepl.foreach(_.close()) + } override def newExecuteStatementOperation( session: Session, statement: String, runAsync: Boolean, queryTimeout: Long): Operation = { - val spark = getSparkSession(session.handle) - - val operationModeStr = - spark.conf.get(OPERATION_PLAN_ONLY.key, operationModeDefault).toUpperCase(Locale.ROOT) - val incrementalCollect = spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key) - .map(_.toBoolean).getOrElse(operationIncrementalCollectDefault) - val operation = OperationModes.withName(operationModeStr) match { - case NONE => - new ExecuteStatement(spark, session, statement, runAsync, queryTimeout, incrementalCollect) - case mode => - new PlanOnlyStatement(spark, session, statement, mode) - } + val spark = session.asInstanceOf[SparkSessionImpl].spark + val lang = spark.conf.get(OPERATION_LANGUAGE.key, operationLanguageDefault) + val operation = + OperationLanguages.withName(lang.toUpperCase(Locale.ROOT)) match { + case OperationLanguages.SQL => + val mode = spark.conf.get(OPERATION_PLAN_ONLY.key, operationModeDefault) + OperationModes.withName(mode.toUpperCase(Locale.ROOT)) match { + case NONE => + val incrementalCollect = spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key) + .map(_.toBoolean).getOrElse(operationIncrementalCollectDefault) + new ExecuteStatement(session, statement, runAsync, queryTimeout, incrementalCollect) + case mode => + new PlanOnlyStatement(session, statement, mode) + } + case OperationLanguages.SCALA => + val repl = sessionToRepl.getOrElseUpdate(session.handle, KyuubiSparkILoop(spark)) + new ExecuteScala(session, repl, statement) + } addOperation(operation) } override def newGetTypeInfoOperation(session: Session): Operation = { - val spark = getSparkSession(session.handle) - val op = new GetTypeInfo(spark, session) + val op = new GetTypeInfo(session) addOperation(op) } override def newGetCatalogsOperation(session: Session): Operation = { - val spark = getSparkSession(session.handle) - val op = new GetCatalogs(spark, session) + val op = new GetCatalogs(session) addOperation(op) } @@ -94,8 +85,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n session: Session, catalog: String, schema: String): Operation = { - val spark = getSparkSession(session.handle) - val op = new GetSchemas(spark, session, catalog, schema) + val op = new GetSchemas(session, catalog, schema) addOperation(op) } @@ -105,20 +95,18 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n schemaName: String, tableName: String, tableTypes: java.util.List[String]): Operation = { - val spark = getSparkSession(session.handle) val tTypes = if (tableTypes == null || tableTypes.isEmpty) { SparkCatalogShim.sparkTableTypes } else { tableTypes.asScala.toSet } - val op = new GetTables(spark, session, catalogName, schemaName, tableName, tTypes) + val op = new GetTables(session, catalogName, schemaName, tableName, tTypes) addOperation(op) } override def newGetTableTypesOperation(session: Session): Operation = { - val spark = getSparkSession(session.handle) - val op = new GetTableTypes(spark, session) + val op = new GetTableTypes(session) addOperation(op) } @@ -128,8 +116,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n schemaName: String, tableName: String, columnName: String): Operation = { - val spark = getSparkSession(session.handle) - val op = new GetColumns(spark, session, catalogName, schemaName, tableName, columnName) + val op = new GetColumns(session, catalogName, schemaName, tableName, columnName) addOperation(op) } @@ -138,8 +125,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n catalogName: String, schemaName: String, functionName: String): Operation = { - val spark = getSparkSession(session.handle) - val op = new GetFunctions(spark, session, catalogName, schemaName, functionName) + val op = new GetFunctions(session, catalogName, schemaName, functionName) addOperation(op) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala new file mode 100644 index 000000000..e1da8f7ff --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.repl + +import java.io.{ByteArrayOutputStream, File} + +import scala.collection.mutable.ArrayBuffer +import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter.JPrintWriter + +import org.apache.spark.SparkContext +import org.apache.spark.repl.{Main, SparkILoop} +import org.apache.spark.sql.{Dataset, Row, SparkSession} +import org.apache.spark.util.MutableURLClassLoader + +private[spark] case class KyuubiSparkILoop private ( + spark: SparkSession, + output: ByteArrayOutputStream) + extends SparkILoop(None, new JPrintWriter(output)) { + + // TODO: this is a little hacky + val results = new ArrayBuffer[Dataset[Row]]() + + private def initialize(): Unit = { + settings = new Settings + val interpArguments = List( + "-Yrepl-class-based", + "-Yrepl-outdir", + s"${Main.outputDir.getAbsolutePath}") + settings.processArguments(interpArguments, processAll = true) + settings.usejavacp.value = true + val currentClassLoader = Thread.currentThread().getContextClassLoader + settings.embeddedDefaults(currentClassLoader) + this.createInterpreter() + this.initializeSynchronous() + try { + this.compilerClasspath + this.ensureClassLoader() + var classLoader = Thread.currentThread().getContextClassLoader + while (classLoader != null) { + classLoader match { + case loader: MutableURLClassLoader => + val allJars = loader.getURLs.filter { u => + val file = new File(u.getPath) + u.getProtocol == "file" && file.isFile && + file.getName.contains("scala-lang_scala-reflect") + } + this.addUrlsToClassPath(allJars: _*) + classLoader = null + case _ => + classLoader = classLoader.getParent + } + } + } finally { + Thread.currentThread().setContextClassLoader(currentClassLoader) + } + + this.beQuietDuring { + // SparkSession/SparkContext and their implicits + this.bind("spark", classOf[SparkSession].getCanonicalName, spark, List("""@transient""")) + this.bind( + "sc", + classOf[SparkContext].getCanonicalName, + spark.sparkContext, + List("""@transient""")) + + this.interpret("import org.apache.spark.SparkContext._") + this.interpret("import spark.implicits._") + this.interpret("import spark.sql") + this.interpret("import org.apache.spark.sql.functions._") + + // for feeding results to client, e.g. beeline + this.bind( + "results", + "scala.collection.mutable.ArrayBuffer[" + + "org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]]", + results) + } + } + + def getOutput: String = { + val res = output.toString.trim + output.reset() + res + } +} + +private[spark] object KyuubiSparkILoop { + def apply(spark: SparkSession): KyuubiSparkILoop = { + val os = new ByteArrayOutputStream() + val iLoop = new KyuubiSparkILoop(spark, os) + iLoop.initialize() + iLoop + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala index 046948cc3..4ab7f6097 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala @@ -18,7 +18,7 @@ package org.apache.kyuubi.engine.spark.session import org.apache.hive.service.rpc.thrift.TProtocolVersion -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf @@ -26,7 +26,6 @@ import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.spark.SparkSQLEngine import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager -import org.apache.kyuubi.engine.spark.udf.KDFRegistry import org.apache.kyuubi.session._ /** @@ -58,60 +57,52 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) ipAddress: String, conf: Map[String, String]): SessionHandle = { info(s"Opening session for $user@$ipAddress") - val sessionImpl = new SparkSessionImpl(protocol, user, password, ipAddress, conf, this) - val handle = sessionImpl.handle - try { - val sparkSession = + val sparkSession = + try { if (singleSparkSession) { spark } else { val ss = spark.newSession() this.conf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sqlStr => - ss.sparkContext.setJobGroup( - handle.identifier.toString, - sqlStr, - interruptOnCancel = true) + ss.sparkContext.setJobDescription(sqlStr) ss.sql(sqlStr).isEmpty } ss } - - sessionImpl.normalizedConf.foreach { - case ("use:database", database) => sparkSession.catalog.setCurrentDatabase(database) - case (key, value) => setModifiableConfig(sparkSession, key, value) + } catch { + case e: Exception => throw KyuubiSQLException(e) } - sessionImpl.open() - KDFRegistry.registerAll(sparkSession) - operationManager.setSparkSession(handle, sparkSession) - setSession(handle, sessionImpl) + + val session = new SparkSessionImpl( + protocol, + user, + password, + ipAddress, + conf, + this, + sparkSession) + try { + val handle = session.handle + session.open() + setSession(handle, session) info(s"$user's session with $handle is opened, current opening sessions" + s" $getOpenSessionCount") handle } catch { case e: Exception => - sessionImpl.close() + session.close() throw KyuubiSQLException(e) } } override def closeSession(sessionHandle: SessionHandle): Unit = { super.closeSession(sessionHandle) - operationManager.removeSparkSession(sessionHandle) if (conf.get(ENGINE_SHARE_LEVEL) == ShareLevel.CONNECTION.toString) { info("Session stopped due to shared level is Connection.") stopSession() } } - private def setModifiableConfig(spark: SparkSession, key: String, value: String): Unit = { - try { - spark.conf.set(key, value) - } catch { - case e: AnalysisException => - warn(e.getMessage()) - } - } - private def stopSession(): Unit = { SparkSQLEngine.currentEngine.foreach(_.stop()) } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala index 9f3086ac3..6cc7cd947 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala @@ -18,8 +18,11 @@ package org.apache.kyuubi.engine.spark.session import org.apache.hive.service.rpc.thrift.TProtocolVersion +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.kyuubi.engine.spark.events.{EventLoggingService, SessionEvent} +import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager +import org.apache.kyuubi.engine.spark.udf.KDFRegistry import org.apache.kyuubi.operation.{Operation, OperationHandle} import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager} @@ -29,13 +32,27 @@ class SparkSessionImpl( password: String, ipAddress: String, conf: Map[String, String], - sessionManager: SessionManager) + sessionManager: SessionManager, + val spark: SparkSession) extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) { override val handle: SessionHandle = SessionHandle(protocol) + private def setModifiableConfig(key: String, value: String): Unit = { + try { + spark.conf.set(key, value) + } catch { + case e: AnalysisException => warn(e.getMessage()) + } + } + private val sessionEvent = SessionEvent(this) override def open(): Unit = { + normalizedConf.foreach { + case ("use:database", database) => spark.catalog.setCurrentDatabase(database) + case (key, value) => setModifiableConfig(key, value) + } + KDFRegistry.registerAll(spark) EventLoggingService.onEvent(sessionEvent) super.open() } @@ -49,6 +66,7 @@ class SparkSessionImpl( sessionEvent.endTime = System.currentTimeMillis() EventLoggingService.onEvent(sessionEvent) super.close() + sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closeILoop(handle) } } diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala index 5e5185c13..bf42c1b41 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala @@ -512,15 +512,12 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with withThriftClient { client => val operationManager = engine.backendService.sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager] - assert(operationManager.getOpenSparkSessionCount === 0) val req = new TOpenSessionReq() req.setUsername("kentyao") req.setPassword("anonymous") val tOpenSessionResp = client.OpenSession(req) - assert(operationManager.getOpenSparkSessionCount === 1) - val tExecuteStatementReq = new TExecuteStatementReq() tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle) tExecuteStatementReq.setRunAsync(true) @@ -546,8 +543,6 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle) val tCloseSessionResp = client.CloseSession(tCloseSessionReq) assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) - - assert(operationManager.getOpenSparkSessionCount === 0) } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 70245d69f..ef9fbd63c 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -982,10 +982,10 @@ object KyuubiConf { .doc("A comma separated list of engine history loggers, where engine/session/operation etc" + " events go. We use spark logger by default.
    " + "
  • SPARK: the events will be written to the spark listener bus.
  • " + - s"
  • JSON: the events will be written to the location of" + + "
  • JSON: the events will be written to the location of" + s" ${ENGINE_EVENT_JSON_LOG_PATH.key}
  • " + - s"
  • JDBC: to be done
  • " + - s"
  • CUSTOM: to be done.
") + "
  • JDBC: to be done
  • " + + "
  • CUSTOM: to be done.
  • ") .version("1.3.0") .stringConf .transform(_.toUpperCase(Locale.ROOT)) @@ -1048,4 +1048,20 @@ object KyuubiConf { .transform(_.toUpperCase(Locale.ROOT)) .checkValues(OperationModes.values.map(_.toString)) .createWithDefault(OperationModes.NONE.toString) + + object OperationLanguages extends Enumeration { + type OperationLanguage = Value + val SQL, SCALA = Value + } + + val OPERATION_LANGUAGE: ConfigEntry[String] = + buildConf("operation.language") + .doc("Choose a programing language for the following inputs" + + "
    • SQL: (Default) Run all following statements as SQL queries.
    • " + + "
    • SCALA: Run all following input a scala codes
    ") + .version("1.5.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(OperationLanguages.values.map(_.toString)) + .createWithDefault(OperationLanguages.SQL.toString) } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala index 384d1af06..c68ed1a4b 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala @@ -412,4 +412,110 @@ trait SparkQueryTests extends HiveJDBCTestHelper { } } } + + test("execute simple scala code") { + withJdbcStatement() { statement => + statement.execute("SET kyuubi.operation.language=scala") + val rs = statement.executeQuery("spark.version") + rs.next() + // scala repl will return resX = YYYYY, and here we only check YYYYY + val sparkVer = rs.getString(1).split("=")(1).trim + assert("\\d\\.\\d\\.\\d".r.pattern.matcher(sparkVer).matches()) + assert(rs.getMetaData.getColumnName(1) === "output") + } + } + + test("execute simple scala code with result returned") { + withJdbcStatement() { statement => + statement.execute("SET kyuubi.operation.language=scala") + val code = + """ + |val df = spark + | .range(0, 10, 2, 1) + | .toDF + |""".stripMargin + val rs1 = statement.executeQuery(code) + rs1.next() + assert(rs1.getString(1) startsWith "df: org.apache.spark.sql.DataFrame") + + // continue + val rs2 = statement.executeQuery("df.count()") + rs2.next() + assert(rs2.getString(1).endsWith("5")) + + // continue + val rs3 = statement.executeQuery("results += df") + for (i <- Range(0, 10, 2)) { + assert(rs3.next) + assert(rs3.getInt(1) === i) + } + + // switch to sql + val set = + """ + |spark.conf.set("kyuubi.operation.language", "SQL") + |""".stripMargin + val t = statement.executeQuery(set) + t.next() + val rs4 = statement.executeQuery("select 12345") + assert(rs4.next()) + assert(rs4.getInt(1) === 12345) + + // switch to scala again + statement.execute("SET kyuubi.operation.language=scala") + val code2 = + """ + |/* this + | * is + | * a + | * multi-line comments + | */ + |val df = spark + | .range(0, 10, 2, 1) + | .map(x => (x, x + 1, x * 2)) // this is a single-line comment + | .toDF + |""".stripMargin + val rs5 = statement.executeQuery(code2) + rs5.next() + assert(rs5.getString(1) startsWith "df: org.apache.spark.sql.DataFrame") + + // re-assign + val rs6 = statement.executeQuery("results += df") + for (i <- Range(0, 10, 2)) { + assert(rs6.next) + assert(rs6.getInt(2) === i + 1) + } + } + } + + test("incomplete scala code block will fail") { + withJdbcStatement() { statement => + statement.execute("SET kyuubi.operation.language=scala") + // incomplete code block + val incompleteCode = + """ + |val df = spark + | .range(0, 10, 2, 1) + | .map { + | x => (x, x + 1, x * 2) + |""".stripMargin + val e = intercept[SQLException](statement.executeQuery(incompleteCode)) + assert(e.getMessage contains "Incomplete code:") + } + } + + test("scala code compile error will fail") { + withJdbcStatement() { statement => + statement.execute("SET kyuubi.operation.language=scala") + // incomplete code block + val incompleteCode = + """ + |val df = spark + | .range(0, 10, 2, 1) + | .map { x => (x, x + 1, y * 2) } // y is missing + |""".stripMargin + val e = intercept[SQLException](statement.executeQuery(incompleteCode)) + assert(e.getMessage contains "not found: value y") + } + } } diff --git a/pom.xml b/pom.xml index 0ced3d029..497e55f21 100644 --- a/pom.xml +++ b/pom.xml @@ -248,6 +248,12 @@ ${scala.version}
    + + org.scala-lang + scala-compiler + ${scala.version} + + org.scala-lang scala-reflect @@ -336,6 +342,18 @@ ${commons-lang3.version} + + org.apache.spark + spark-repl_${scala.binary.version} + ${spark.version} + + + * + * + + + + org.apache.spark spark-sql_${scala.binary.version}