diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 86790b5ef..1a4b807b5 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -282,6 +282,7 @@ Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
kyuubi\.operation\.idle
\.timeout|
PT3H
|Operation will be closed when it's not accessed for this duration of time
|duration
|1.0.0
kyuubi\.operation
\.interrupt\.on\.cancel|true
|When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.
|boolean
|1.2.0
+kyuubi\.operation
\.language|SQL
|Choose a programing language for the following inputs
- SQL: (Default) Run all following statements as SQL queries.
- SCALA: Run all following input a scala codes
|string
|1.5.0
kyuubi\.operation\.log
\.dir\.root|server_operation_logs
|Root directory for query operation log at server-side.
|string
|1.4.0
kyuubi\.operation\.plan
\.only\.mode|NONE
|Whether to perform the statement in a PARSE, ANALYZE, OPTIMIZE only way without executing the query. When it is NONE, the statement will be fully executed
|string
|1.4.0
kyuubi\.operation
\.query\.timeout|<undefined>
|Timeout for query executions at server-side, take affect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take fully control whether the query should timeout or not. If set, client-side timeout capped at this point. To cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.
|duration
|1.2.0
diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml
index f05b5c6db..360b01fe6 100644
--- a/externals/kyuubi-spark-sql-engine/pom.xml
+++ b/externals/kyuubi-spark-sql-engine/pom.xml
@@ -50,6 +50,24 @@
provided
+
+ org.apache.spark
+ spark-repl_${scala.binary.version}
+ provided
+
+
+
+ org.scala-lang
+ scala-compiler
+ provided
+
+
+
+ org.scala-lang
+ scala-reflect
+ provided
+
+
org.apache.hadoop
hadoop-client-api
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 8a5b95177..57bc67ffd 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -24,6 +24,7 @@ import scala.util.control.NonFatal
import org.apache.spark.{ui, SparkConf}
import org.apache.spark.kyuubi.SparkSQLEngineListener
+import org.apache.spark.repl.Main
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.{KyuubiException, Logging}
@@ -81,6 +82,9 @@ object SparkSQLEngine extends Logging {
sparkConf.setIfMissing("spark.sql.legacy.castComplexTypesToString.enabled", "true")
sparkConf.setIfMissing("spark.master", "local")
sparkConf.setIfMissing("spark.ui.port", "0")
+ // register the repl's output dir with the file server.
+ // see also `spark.repl.classdir`
+ sparkConf.set("spark.repl.class.outputDir", Main.outputDir.getAbsolutePath)
sparkConf.setIfMissing(
"spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads",
"20")
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
new file mode 100644
index 000000000..9e8ca37fc
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.operation
+
+import scala.tools.nsc.interpreter.Results.{Error, Incomplete, Success}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.StructType
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.engine.spark.ArrayFetchIterator
+import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop
+import org.apache.kyuubi.operation.OperationType
+import org.apache.kyuubi.session.Session
+
+/**
+ * Support executing Scala Script with or without common Spark APIs, only support running in sync
+ * mode, as an operation may [[Incomplete]] and wait for others to make [[Success]].
+ *
+ * [[KyuubiSparkILoop.results]] is exposed as a [[org.apache.spark.sql.DataFrame]] to users in repl
+ * to transfer result they wanted to client side.
+ *
+ * @param session parent session
+ * @param repl Scala Interpreter
+ * @param statement a scala code snippet
+ */
+class ExecuteScala(
+ session: Session,
+ repl: KyuubiSparkILoop,
+ override val statement: String)
+ extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) {
+
+ override protected def resultSchema: StructType = {
+ if (result == null || result.schema.isEmpty) {
+ new StructType().add("output", "string")
+ } else {
+ result.schema
+ }
+ }
+
+ override protected def runInternal(): Unit = {
+ try {
+ spark.sparkContext.setJobGroup(statementId, statement)
+ Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
+ repl.interpret(statement) match {
+ case Success =>
+ iter =
+ if (repl.results.nonEmpty) {
+ result = repl.results.remove(0)
+ new ArrayFetchIterator[Row](result.collect())
+ } else {
+ new ArrayFetchIterator[Row](Array(Row(repl.getOutput)))
+ }
+ case Error =>
+ throw KyuubiSQLException(s"Interpret error:\n$statement\n ${repl.getOutput}")
+ case Incomplete =>
+ throw KyuubiSQLException(s"Incomplete code:\n$statement")
+ }
+ } catch {
+ onError(cancel = true)
+ } finally {
+ spark.sparkContext.clearJobGroup()
+ }
+ }
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 7022217f8..f43983382 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
import org.apache.spark.kyuubi.SQLOperationListener
import org.apache.spark.sql.Row
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
@@ -37,13 +36,12 @@ import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThreadUtils
class ExecuteStatement(
- spark: SparkSession,
session: Session,
override protected val statement: String,
override val shouldRunAsync: Boolean,
queryTimeout: Long,
incrementalCollect: Boolean)
- extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) with Logging {
+ extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging {
import org.apache.kyuubi.KyuubiSparkUtils._
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala
index 86c6ee522..cbd3fdcc9 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala
@@ -17,7 +17,6 @@
package org.apache.kyuubi.engine.spark.operation
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@@ -26,8 +25,8 @@ import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
import org.apache.kyuubi.session.Session
-class GetCatalogs(spark: SparkSession, session: Session)
- extends SparkOperation(spark, OperationType.GET_CATALOGS, session) {
+class GetCatalogs(session: Session)
+ extends SparkOperation(OperationType.GET_CATALOGS, session) {
override protected def resultSchema: StructType = {
new StructType()
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala
index d02c0d501..471304c0a 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetColumns.scala
@@ -17,7 +17,6 @@
package org.apache.kyuubi.engine.spark.operation
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@@ -27,13 +26,12 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetColumns(
- spark: SparkSession,
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String)
- extends SparkOperation(spark, OperationType.GET_COLUMNS, session) {
+ extends SparkOperation(OperationType.GET_COLUMNS, session) {
override def statement: String = {
super.statement +
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetFunctions.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetFunctions.scala
index 2c424d255..f7ff1fb14 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetFunctions.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
import java.sql.DatabaseMetaData
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@@ -28,12 +28,11 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetFunctions(
- spark: SparkSession,
session: Session,
catalogName: String,
schemaName: String,
functionName: String)
- extends SparkOperation(spark, OperationType.GET_FUNCTIONS, session) {
+ extends SparkOperation(OperationType.GET_FUNCTIONS, session) {
override def statement: String = {
super.statement +
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala
index ce312f949..0ac7abbb3 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetSchemas.scala
@@ -17,7 +17,6 @@
package org.apache.kyuubi.engine.spark.operation
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@@ -26,8 +25,8 @@ import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
-class GetSchemas(spark: SparkSession, session: Session, catalogName: String, schema: String)
- extends SparkOperation(spark, OperationType.GET_SCHEMAS, session) {
+class GetSchemas(session: Session, catalogName: String, schema: String)
+ extends SparkOperation(OperationType.GET_SCHEMAS, session) {
override def statement: String = {
super.statement + s" [catalog : $catalogName, schemaPattern : $schema]"
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala
index 50e8edf48..6ad3a6cf4 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.spark.operation
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@@ -26,8 +26,8 @@ import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
-class GetTableTypes(spark: SparkSession, session: Session)
- extends SparkOperation(spark, OperationType.GET_TABLE_TYPES, session) {
+class GetTableTypes(session: Session)
+ extends SparkOperation(OperationType.GET_TABLE_TYPES, session) {
override protected def resultSchema: StructType = {
new StructType()
.add(TABLE_TYPE, "string", nullable = true, "Table type name.")
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
index 733877192..fb57367a5 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTables.scala
@@ -17,7 +17,6 @@
package org.apache.kyuubi.engine.spark.operation
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@@ -27,13 +26,12 @@ import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetTables(
- spark: SparkSession,
session: Session,
catalog: String,
schema: String,
tableName: String,
tableTypes: Set[String])
- extends SparkOperation(spark, OperationType.GET_TABLES, session) {
+ extends SparkOperation(OperationType.GET_TABLES, session) {
override def statement: String = {
super.statement +
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala
index 210976a3b..5d0ad510e 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTypeInfo.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
import java.sql.Types._
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.engine.spark.IterableFetchIterator
@@ -27,8 +27,8 @@ import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
-class GetTypeInfo(spark: SparkSession, session: Session)
- extends SparkOperation(spark, OperationType.GET_TYPE_INFO, session) {
+class GetTypeInfo(session: Session)
+ extends SparkOperation(OperationType.GET_TYPE_INFO, session) {
override protected def resultSchema: StructType = {
new StructType()
.add(TYPE_NAME, "string", nullable = false, "Type name")
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index cf1fdce47..e98c8b28c 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.spark.operation
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.StructType
@@ -31,11 +31,10 @@ import org.apache.kyuubi.session.Session
* Perform the statement parsing, analyzing or optimizing only without executing it
*/
class PlanOnlyStatement(
- spark: SparkSession,
session: Session,
override protected val statement: String,
mode: OperationMode)
- extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) {
+ extends SparkOperation(OperationType.EXECUTE_STATEMENT, session) {
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 028329f48..a6238824f 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.engine.spark.FetchIterator
import org.apache.kyuubi.engine.spark.operation.SparkOperation.TIMEZONE_KEY
+import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.operation.{AbstractOperation, OperationState}
import org.apache.kyuubi.operation.FetchOrientation._
import org.apache.kyuubi.operation.OperationState.OperationState
@@ -36,9 +37,11 @@ import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.schema.{RowSet, SchemaHelper}
import org.apache.kyuubi.session.Session
-abstract class SparkOperation(spark: SparkSession, opType: OperationType, session: Session)
+abstract class SparkOperation(opType: OperationType, session: Session)
extends AbstractOperation(opType, session) {
+ protected val spark: SparkSession = session.asInstanceOf[SparkSessionImpl].spark
+
private val timeZone: ZoneId = {
spark.conf.getOption(TIMEZONE_KEY).map { timeZoneId =>
ZoneId.of(timeZoneId.replaceFirst("(\\+|\\-)(\\d):", "$10$2:"), ZoneId.SHORT_IDS)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index ac64d60db..58b539c7e 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -22,11 +22,10 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
-import org.apache.spark.sql.SparkSession
-
-import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf.{OPERATION_INCREMENTAL_COLLECT, OPERATION_PLAN_ONLY, OperationModes}
+import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiConf.OperationModes._
+import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop
+import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim
import org.apache.kyuubi.operation.{Operation, OperationManager}
import org.apache.kyuubi.session.{Session, SessionHandle}
@@ -35,58 +34,50 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
def this() = this(classOf[SparkSQLOperationManager].getSimpleName)
- private val sessionToSpark = new ConcurrentHashMap[SessionHandle, SparkSession]()
-
- private def getSparkSession(sessionHandle: SessionHandle): SparkSession = {
- val sparkSession = sessionToSpark.get(sessionHandle)
- if (sparkSession == null) {
- throw KyuubiSQLException(s"$sessionHandle has not been initialized or already been closed")
- }
- sparkSession
- }
-
- def setSparkSession(sessionHandle: SessionHandle, spark: SparkSession): Unit = {
- sessionToSpark.put(sessionHandle, spark)
- }
-
- def removeSparkSession(sessionHandle: SessionHandle): SparkSession = {
- sessionToSpark.remove(sessionHandle)
- }
-
- def getOpenSparkSessionCount: Int = sessionToSpark.size()
-
private lazy val operationModeDefault = getConf.get(OPERATION_PLAN_ONLY)
private lazy val operationIncrementalCollectDefault = getConf.get(OPERATION_INCREMENTAL_COLLECT)
+ private lazy val operationLanguageDefault = getConf.get(OPERATION_LANGUAGE)
+
+ private val sessionToRepl = new ConcurrentHashMap[SessionHandle, KyuubiSparkILoop]().asScala
+
+ def closeILoop(session: SessionHandle): Unit = {
+ val maybeRepl = sessionToRepl.remove(session)
+ maybeRepl.foreach(_.close())
+ }
override def newExecuteStatementOperation(
session: Session,
statement: String,
runAsync: Boolean,
queryTimeout: Long): Operation = {
- val spark = getSparkSession(session.handle)
-
- val operationModeStr =
- spark.conf.get(OPERATION_PLAN_ONLY.key, operationModeDefault).toUpperCase(Locale.ROOT)
- val incrementalCollect = spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key)
- .map(_.toBoolean).getOrElse(operationIncrementalCollectDefault)
- val operation = OperationModes.withName(operationModeStr) match {
- case NONE =>
- new ExecuteStatement(spark, session, statement, runAsync, queryTimeout, incrementalCollect)
- case mode =>
- new PlanOnlyStatement(spark, session, statement, mode)
- }
+ val spark = session.asInstanceOf[SparkSessionImpl].spark
+ val lang = spark.conf.get(OPERATION_LANGUAGE.key, operationLanguageDefault)
+ val operation =
+ OperationLanguages.withName(lang.toUpperCase(Locale.ROOT)) match {
+ case OperationLanguages.SQL =>
+ val mode = spark.conf.get(OPERATION_PLAN_ONLY.key, operationModeDefault)
+ OperationModes.withName(mode.toUpperCase(Locale.ROOT)) match {
+ case NONE =>
+ val incrementalCollect = spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key)
+ .map(_.toBoolean).getOrElse(operationIncrementalCollectDefault)
+ new ExecuteStatement(session, statement, runAsync, queryTimeout, incrementalCollect)
+ case mode =>
+ new PlanOnlyStatement(session, statement, mode)
+ }
+ case OperationLanguages.SCALA =>
+ val repl = sessionToRepl.getOrElseUpdate(session.handle, KyuubiSparkILoop(spark))
+ new ExecuteScala(session, repl, statement)
+ }
addOperation(operation)
}
override def newGetTypeInfoOperation(session: Session): Operation = {
- val spark = getSparkSession(session.handle)
- val op = new GetTypeInfo(spark, session)
+ val op = new GetTypeInfo(session)
addOperation(op)
}
override def newGetCatalogsOperation(session: Session): Operation = {
- val spark = getSparkSession(session.handle)
- val op = new GetCatalogs(spark, session)
+ val op = new GetCatalogs(session)
addOperation(op)
}
@@ -94,8 +85,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
session: Session,
catalog: String,
schema: String): Operation = {
- val spark = getSparkSession(session.handle)
- val op = new GetSchemas(spark, session, catalog, schema)
+ val op = new GetSchemas(session, catalog, schema)
addOperation(op)
}
@@ -105,20 +95,18 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
schemaName: String,
tableName: String,
tableTypes: java.util.List[String]): Operation = {
- val spark = getSparkSession(session.handle)
val tTypes =
if (tableTypes == null || tableTypes.isEmpty) {
SparkCatalogShim.sparkTableTypes
} else {
tableTypes.asScala.toSet
}
- val op = new GetTables(spark, session, catalogName, schemaName, tableName, tTypes)
+ val op = new GetTables(session, catalogName, schemaName, tableName, tTypes)
addOperation(op)
}
override def newGetTableTypesOperation(session: Session): Operation = {
- val spark = getSparkSession(session.handle)
- val op = new GetTableTypes(spark, session)
+ val op = new GetTableTypes(session)
addOperation(op)
}
@@ -128,8 +116,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
schemaName: String,
tableName: String,
columnName: String): Operation = {
- val spark = getSparkSession(session.handle)
- val op = new GetColumns(spark, session, catalogName, schemaName, tableName, columnName)
+ val op = new GetColumns(session, catalogName, schemaName, tableName, columnName)
addOperation(op)
}
@@ -138,8 +125,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
catalogName: String,
schemaName: String,
functionName: String): Operation = {
- val spark = getSparkSession(session.handle)
- val op = new GetFunctions(spark, session, catalogName, schemaName, functionName)
+ val op = new GetFunctions(session, catalogName, schemaName, functionName)
addOperation(op)
}
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala
new file mode 100644
index 000000000..e1da8f7ff
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/KyuubiSparkILoop.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.repl
+
+import java.io.{ByteArrayOutputStream, File}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.tools.nsc.Settings
+import scala.tools.nsc.interpreter.JPrintWriter
+
+import org.apache.spark.SparkContext
+import org.apache.spark.repl.{Main, SparkILoop}
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.util.MutableURLClassLoader
+
+private[spark] case class KyuubiSparkILoop private (
+ spark: SparkSession,
+ output: ByteArrayOutputStream)
+ extends SparkILoop(None, new JPrintWriter(output)) {
+
+ // TODO: this is a little hacky
+ val results = new ArrayBuffer[Dataset[Row]]()
+
+ private def initialize(): Unit = {
+ settings = new Settings
+ val interpArguments = List(
+ "-Yrepl-class-based",
+ "-Yrepl-outdir",
+ s"${Main.outputDir.getAbsolutePath}")
+ settings.processArguments(interpArguments, processAll = true)
+ settings.usejavacp.value = true
+ val currentClassLoader = Thread.currentThread().getContextClassLoader
+ settings.embeddedDefaults(currentClassLoader)
+ this.createInterpreter()
+ this.initializeSynchronous()
+ try {
+ this.compilerClasspath
+ this.ensureClassLoader()
+ var classLoader = Thread.currentThread().getContextClassLoader
+ while (classLoader != null) {
+ classLoader match {
+ case loader: MutableURLClassLoader =>
+ val allJars = loader.getURLs.filter { u =>
+ val file = new File(u.getPath)
+ u.getProtocol == "file" && file.isFile &&
+ file.getName.contains("scala-lang_scala-reflect")
+ }
+ this.addUrlsToClassPath(allJars: _*)
+ classLoader = null
+ case _ =>
+ classLoader = classLoader.getParent
+ }
+ }
+ } finally {
+ Thread.currentThread().setContextClassLoader(currentClassLoader)
+ }
+
+ this.beQuietDuring {
+ // SparkSession/SparkContext and their implicits
+ this.bind("spark", classOf[SparkSession].getCanonicalName, spark, List("""@transient"""))
+ this.bind(
+ "sc",
+ classOf[SparkContext].getCanonicalName,
+ spark.sparkContext,
+ List("""@transient"""))
+
+ this.interpret("import org.apache.spark.SparkContext._")
+ this.interpret("import spark.implicits._")
+ this.interpret("import spark.sql")
+ this.interpret("import org.apache.spark.sql.functions._")
+
+ // for feeding results to client, e.g. beeline
+ this.bind(
+ "results",
+ "scala.collection.mutable.ArrayBuffer[" +
+ "org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]]",
+ results)
+ }
+ }
+
+ def getOutput: String = {
+ val res = output.toString.trim
+ output.reset()
+ res
+ }
+}
+
+private[spark] object KyuubiSparkILoop {
+ def apply(spark: SparkSession): KyuubiSparkILoop = {
+ val os = new ByteArrayOutputStream()
+ val iLoop = new KyuubiSparkILoop(spark, os)
+ iLoop.initialize()
+ iLoop
+ }
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index 046948cc3..4ab7f6097 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -18,7 +18,7 @@
package org.apache.kyuubi.engine.spark.session
import org.apache.hive.service.rpc.thrift.TProtocolVersion
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
@@ -26,7 +26,6 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.spark.SparkSQLEngine
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
-import org.apache.kyuubi.engine.spark.udf.KDFRegistry
import org.apache.kyuubi.session._
/**
@@ -58,60 +57,52 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
ipAddress: String,
conf: Map[String, String]): SessionHandle = {
info(s"Opening session for $user@$ipAddress")
- val sessionImpl = new SparkSessionImpl(protocol, user, password, ipAddress, conf, this)
- val handle = sessionImpl.handle
- try {
- val sparkSession =
+ val sparkSession =
+ try {
if (singleSparkSession) {
spark
} else {
val ss = spark.newSession()
this.conf.get(ENGINE_SESSION_INITIALIZE_SQL).foreach { sqlStr =>
- ss.sparkContext.setJobGroup(
- handle.identifier.toString,
- sqlStr,
- interruptOnCancel = true)
+ ss.sparkContext.setJobDescription(sqlStr)
ss.sql(sqlStr).isEmpty
}
ss
}
-
- sessionImpl.normalizedConf.foreach {
- case ("use:database", database) => sparkSession.catalog.setCurrentDatabase(database)
- case (key, value) => setModifiableConfig(sparkSession, key, value)
+ } catch {
+ case e: Exception => throw KyuubiSQLException(e)
}
- sessionImpl.open()
- KDFRegistry.registerAll(sparkSession)
- operationManager.setSparkSession(handle, sparkSession)
- setSession(handle, sessionImpl)
+
+ val session = new SparkSessionImpl(
+ protocol,
+ user,
+ password,
+ ipAddress,
+ conf,
+ this,
+ sparkSession)
+ try {
+ val handle = session.handle
+ session.open()
+ setSession(handle, session)
info(s"$user's session with $handle is opened, current opening sessions" +
s" $getOpenSessionCount")
handle
} catch {
case e: Exception =>
- sessionImpl.close()
+ session.close()
throw KyuubiSQLException(e)
}
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
super.closeSession(sessionHandle)
- operationManager.removeSparkSession(sessionHandle)
if (conf.get(ENGINE_SHARE_LEVEL) == ShareLevel.CONNECTION.toString) {
info("Session stopped due to shared level is Connection.")
stopSession()
}
}
- private def setModifiableConfig(spark: SparkSession, key: String, value: String): Unit = {
- try {
- spark.conf.set(key, value)
- } catch {
- case e: AnalysisException =>
- warn(e.getMessage())
- }
- }
-
private def stopSession(): Unit = {
SparkSQLEngine.currentEngine.foreach(_.stop())
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index 9f3086ac3..6cc7cd947 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -18,8 +18,11 @@
package org.apache.kyuubi.engine.spark.session
import org.apache.hive.service.rpc.thrift.TProtocolVersion
+import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.kyuubi.engine.spark.events.{EventLoggingService, SessionEvent}
+import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
+import org.apache.kyuubi.engine.spark.udf.KDFRegistry
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
@@ -29,13 +32,27 @@ class SparkSessionImpl(
password: String,
ipAddress: String,
conf: Map[String, String],
- sessionManager: SessionManager)
+ sessionManager: SessionManager,
+ val spark: SparkSession)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
override val handle: SessionHandle = SessionHandle(protocol)
+ private def setModifiableConfig(key: String, value: String): Unit = {
+ try {
+ spark.conf.set(key, value)
+ } catch {
+ case e: AnalysisException => warn(e.getMessage())
+ }
+ }
+
private val sessionEvent = SessionEvent(this)
override def open(): Unit = {
+ normalizedConf.foreach {
+ case ("use:database", database) => spark.catalog.setCurrentDatabase(database)
+ case (key, value) => setModifiableConfig(key, value)
+ }
+ KDFRegistry.registerAll(spark)
EventLoggingService.onEvent(sessionEvent)
super.open()
}
@@ -49,6 +66,7 @@ class SparkSessionImpl(
sessionEvent.endTime = System.currentTimeMillis()
EventLoggingService.onEvent(sessionEvent)
super.close()
+ sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager].closeILoop(handle)
}
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
index 5e5185c13..bf42c1b41 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
@@ -512,15 +512,12 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
withThriftClient { client =>
val operationManager =
engine.backendService.sessionManager.operationManager.asInstanceOf[SparkSQLOperationManager]
- assert(operationManager.getOpenSparkSessionCount === 0)
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
val tOpenSessionResp = client.OpenSession(req)
- assert(operationManager.getOpenSparkSessionCount === 1)
-
val tExecuteStatementReq = new TExecuteStatementReq()
tExecuteStatementReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
tExecuteStatementReq.setRunAsync(true)
@@ -546,8 +543,6 @@ class SparkOperationSuite extends WithSparkSQLEngine with HiveMetadataTests with
tCloseSessionReq.setSessionHandle(tOpenSessionResp.getSessionHandle)
val tCloseSessionResp = client.CloseSession(tCloseSessionReq)
assert(tCloseSessionResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
-
- assert(operationManager.getOpenSparkSessionCount === 0)
}
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 70245d69f..ef9fbd63c 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -982,10 +982,10 @@ object KyuubiConf {
.doc("A comma separated list of engine history loggers, where engine/session/operation etc" +
" events go. We use spark logger by default." +
" - SPARK: the events will be written to the spark listener bus.
" +
- s" - JSON: the events will be written to the location of" +
+ "
- JSON: the events will be written to the location of" +
s" ${ENGINE_EVENT_JSON_LOG_PATH.key}
" +
- s" - JDBC: to be done
" +
- s" - CUSTOM: to be done.
")
+ " JDBC: to be done" +
+ " CUSTOM: to be done.")
.version("1.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
@@ -1048,4 +1048,20 @@ object KyuubiConf {
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(OperationModes.values.map(_.toString))
.createWithDefault(OperationModes.NONE.toString)
+
+ object OperationLanguages extends Enumeration {
+ type OperationLanguage = Value
+ val SQL, SCALA = Value
+ }
+
+ val OPERATION_LANGUAGE: ConfigEntry[String] =
+ buildConf("operation.language")
+ .doc("Choose a programing language for the following inputs" +
+ " - SQL: (Default) Run all following statements as SQL queries.
" +
+ " - SCALA: Run all following input a scala codes
")
+ .version("1.5.0")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(OperationLanguages.values.map(_.toString))
+ .createWithDefault(OperationLanguages.SQL.toString)
}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
index 384d1af06..c68ed1a4b 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
@@ -412,4 +412,110 @@ trait SparkQueryTests extends HiveJDBCTestHelper {
}
}
}
+
+ test("execute simple scala code") {
+ withJdbcStatement() { statement =>
+ statement.execute("SET kyuubi.operation.language=scala")
+ val rs = statement.executeQuery("spark.version")
+ rs.next()
+ // scala repl will return resX = YYYYY, and here we only check YYYYY
+ val sparkVer = rs.getString(1).split("=")(1).trim
+ assert("\\d\\.\\d\\.\\d".r.pattern.matcher(sparkVer).matches())
+ assert(rs.getMetaData.getColumnName(1) === "output")
+ }
+ }
+
+ test("execute simple scala code with result returned") {
+ withJdbcStatement() { statement =>
+ statement.execute("SET kyuubi.operation.language=scala")
+ val code =
+ """
+ |val df = spark
+ | .range(0, 10, 2, 1)
+ | .toDF
+ |""".stripMargin
+ val rs1 = statement.executeQuery(code)
+ rs1.next()
+ assert(rs1.getString(1) startsWith "df: org.apache.spark.sql.DataFrame")
+
+ // continue
+ val rs2 = statement.executeQuery("df.count()")
+ rs2.next()
+ assert(rs2.getString(1).endsWith("5"))
+
+ // continue
+ val rs3 = statement.executeQuery("results += df")
+ for (i <- Range(0, 10, 2)) {
+ assert(rs3.next)
+ assert(rs3.getInt(1) === i)
+ }
+
+ // switch to sql
+ val set =
+ """
+ |spark.conf.set("kyuubi.operation.language", "SQL")
+ |""".stripMargin
+ val t = statement.executeQuery(set)
+ t.next()
+ val rs4 = statement.executeQuery("select 12345")
+ assert(rs4.next())
+ assert(rs4.getInt(1) === 12345)
+
+ // switch to scala again
+ statement.execute("SET kyuubi.operation.language=scala")
+ val code2 =
+ """
+ |/* this
+ | * is
+ | * a
+ | * multi-line comments
+ | */
+ |val df = spark
+ | .range(0, 10, 2, 1)
+ | .map(x => (x, x + 1, x * 2)) // this is a single-line comment
+ | .toDF
+ |""".stripMargin
+ val rs5 = statement.executeQuery(code2)
+ rs5.next()
+ assert(rs5.getString(1) startsWith "df: org.apache.spark.sql.DataFrame")
+
+ // re-assign
+ val rs6 = statement.executeQuery("results += df")
+ for (i <- Range(0, 10, 2)) {
+ assert(rs6.next)
+ assert(rs6.getInt(2) === i + 1)
+ }
+ }
+ }
+
+ test("incomplete scala code block will fail") {
+ withJdbcStatement() { statement =>
+ statement.execute("SET kyuubi.operation.language=scala")
+ // incomplete code block
+ val incompleteCode =
+ """
+ |val df = spark
+ | .range(0, 10, 2, 1)
+ | .map {
+ | x => (x, x + 1, x * 2)
+ |""".stripMargin
+ val e = intercept[SQLException](statement.executeQuery(incompleteCode))
+ assert(e.getMessage contains "Incomplete code:")
+ }
+ }
+
+ test("scala code compile error will fail") {
+ withJdbcStatement() { statement =>
+ statement.execute("SET kyuubi.operation.language=scala")
+ // incomplete code block
+ val incompleteCode =
+ """
+ |val df = spark
+ | .range(0, 10, 2, 1)
+ | .map { x => (x, x + 1, y * 2) } // y is missing
+ |""".stripMargin
+ val e = intercept[SQLException](statement.executeQuery(incompleteCode))
+ assert(e.getMessage contains "not found: value y")
+ }
+ }
}
diff --git a/pom.xml b/pom.xml
index 0ced3d029..497e55f21 100644
--- a/pom.xml
+++ b/pom.xml
@@ -248,6 +248,12 @@
${scala.version}
+
+ org.scala-lang
+ scala-compiler
+ ${scala.version}
+
+
org.scala-lang
scala-reflect
@@ -336,6 +342,18 @@
${commons-lang3.version}
+
+ org.apache.spark
+ spark-repl_${scala.binary.version}
+ ${spark.version}
+
+
+ *
+ *
+
+
+
+
org.apache.spark
spark-sql_${scala.binary.version}