diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index a57fc5639..7946f0e74 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -137,6 +137,7 @@ kyuubi\.delegation
\.token\.renew\.interval|
|
A comma separated list of exception classes. If there is any exception thrown, whose class matches the specified classes, the engine would deregister itself.
|
seq
|
1.2.0
kyuubi\.engine
\.initialize\.sql|
SHOW DATABASES
|
SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries.
|
string
|
1.2.0
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index cf313ef2f..672b45a50 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -21,6 +21,7 @@ import java.time.Instant import java.util.concurrent.CountDownLatch import org.apache.spark.SparkConf +import org.apache.spark.kyuubi.SparkSQLEngineListener import org.apache.spark.sql.SparkSession import org.apache.kyuubi.{Logging, Utils} @@ -41,7 +42,7 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession) ServiceDiscovery.supportServiceDiscovery(conf) } - override protected val discoveryService: Service = new EngineServiceDiscovery(this) + override val discoveryService: Service = new EngineServiceDiscovery(this) override def initialize(conf: KyuubiConf): Unit = { val listener = new SparkSQLEngineListener(this) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala similarity index 58% rename from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListener.scala rename to externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala index b7970c71a..050b2f27d 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala @@ -15,15 +15,20 @@ * limitations under the License. */ -package org.apache.kyuubi.engine.spark +// Some object likes `JobFailed` is only accessible in org.apache.spark package +package org.apache.spark.kyuubi -import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} +import org.apache.spark.scheduler.{JobFailed, SparkListener, SparkListenerApplicationEnd, SparkListenerJobEnd} import org.apache.kyuubi.Logging +import org.apache.kyuubi.config.KyuubiConf.ENGINE_DEREGISTER_EXCEPTION_CLASSES +import org.apache.kyuubi.ha.client.EngineServiceDiscovery import org.apache.kyuubi.service.{Serverable, ServiceState} class SparkSQLEngineListener(server: Serverable) extends SparkListener with Logging { + lazy val deregisterExceptions = server.getConf.get(ENGINE_DEREGISTER_EXCEPTION_CLASSES) + override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { server.getServiceState match { case ServiceState.STOPPED => debug("Received ApplicationEnd Message form Spark after the" + @@ -34,4 +39,18 @@ class SparkSQLEngineListener(server: Serverable) extends SparkListener with Logg } } + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + jobEnd.jobResult match { + case JobFailed(e) => + if (e != null && deregisterExceptions.exists(_.equals(e.getClass.getCanonicalName))) { + error( + s""" + | Job failed exception class is in the set of + | ${ENGINE_DEREGISTER_EXCEPTION_CLASSES.key}, stopping the engine. + """.stripMargin, e) + server.discoveryService.asInstanceOf[EngineServiceDiscovery].stop() + } + case _ => + } + } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 0ae3b27dc..b425d3c89 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -560,6 +560,15 @@ object KyuubiConf { .stringConf .createWithDefault("SHOW DATABASES") + val ENGINE_DEREGISTER_EXCEPTION_CLASSES: ConfigEntry[Seq[String]] = + buildConf("engine.deregister.exception.classes") + .doc("A comma separated list of exception classes. If there is any exception thrown," + + " whose class matches the specified classes, the engine would deregister itself.") + .version("1.2.0") + .stringConf + .toSequence + .createWithDefault(Nil) + val OPERATION_SCHEDULER_POOL: OptionalConfigEntry[String] = buildConf("operation.scheduler.pool") .doc("The scheduler pool of job. Note that, this config should be used after change Spark " + "config spark.scheduler.mode=FAIR.") diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala index 8e4b85006..c0077db6f 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/Serverable.scala @@ -29,7 +29,7 @@ abstract class Serverable(name: String) extends CompositeService(name) { private[kyuubi] val backendService: AbstractBackendService private lazy val frontendService = new FrontendService(backendService, OOMHook) protected def supportsServiceDiscovery: Boolean - protected val discoveryService: Service + val discoveryService: Service def connectionUrl: String = frontendService.connectionUrl diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ServiceState.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ServiceState.scala index 50235d8ad..e60ee92b4 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ServiceState.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ServiceState.scala @@ -20,7 +20,7 @@ package org.apache.kyuubi.service /** * Service states */ -private[kyuubi] object ServiceState extends Enumeration { +object ServiceState extends Enumeration { type ServiceState = Value val diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala index a05834836..c0851772a 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/NoopServer.scala @@ -34,6 +34,6 @@ class NoopServer extends Serverable("noop") { } - override protected val discoveryService: Service = backendService + override val discoveryService: Service = backendService override protected val supportsServiceDiscovery: Boolean = false } diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index b47985343..eb9bc0503 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -83,7 +83,7 @@ class KyuubiServer(name: String) extends Serverable(name) { override protected def supportsServiceDiscovery: Boolean = { ServiceDiscovery.supportServiceDiscovery(conf) } - override protected val discoveryService = new KyuubiServiceDiscovery(this) + override val discoveryService = new KyuubiServiceDiscovery(this) override def initialize(conf: KyuubiConf): Unit = synchronized { val kinit = new KinitAuxiliaryService()