[KYUUBI #544] Deregister engine when meeting specified exception class

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/NetEase/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Involve `kyuubi.engine.deregister.exception.classes` to deregister engine when job failed because of specified exception class

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #549 from turboFei/KYUUBI-544-exception-class.

Closes #544

9ff83b80 [fwang12] use lazy val to avoid npe
d80d4436 [fwang12] [KYUUBI #544] Add kyuubi.engine.deregister.exception.classes to deregister engine when job failed because of specified exception class

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: Cheng Pan <379377944@qq.com>
This commit is contained in:
fwang12 2021-04-21 09:36:58 +08:00 committed by Cheng Pan
parent 41c3e683f7
commit 3d3d004e2d
8 changed files with 37 additions and 7 deletions

View File

@ -137,6 +137,7 @@ kyuubi\.delegation<br>\.token\.renew\.interval|<div style='width: 65pt;word-wrap
Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
kyuubi\.engine<br>\.deregister\.exception<br>\.classes|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of exception classes. If there is any exception thrown, whose class matches the specified classes, the engine would deregister itself.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.2.0</div>

View File

@ -21,6 +21,7 @@ import java.time.Instant
import java.util.concurrent.CountDownLatch
import org.apache.spark.SparkConf
import org.apache.spark.kyuubi.SparkSQLEngineListener
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.{Logging, Utils}
@ -41,7 +42,7 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
ServiceDiscovery.supportServiceDiscovery(conf)
}
override protected val discoveryService: Service = new EngineServiceDiscovery(this)
override val discoveryService: Service = new EngineServiceDiscovery(this)
override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)

View File

@ -15,15 +15,20 @@
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark
// Some object likes `JobFailed` is only accessible in org.apache.spark package
package org.apache.spark.kyuubi
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.scheduler.{JobFailed, SparkListener, SparkListenerApplicationEnd, SparkListenerJobEnd}
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf.ENGINE_DEREGISTER_EXCEPTION_CLASSES
import org.apache.kyuubi.ha.client.EngineServiceDiscovery
import org.apache.kyuubi.service.{Serverable, ServiceState}
class SparkSQLEngineListener(server: Serverable) extends SparkListener with Logging {
lazy val deregisterExceptions = server.getConf.get(ENGINE_DEREGISTER_EXCEPTION_CLASSES)
override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
server.getServiceState match {
case ServiceState.STOPPED => debug("Received ApplicationEnd Message form Spark after the" +
@ -34,4 +39,18 @@ class SparkSQLEngineListener(server: Serverable) extends SparkListener with Logg
}
}
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
jobEnd.jobResult match {
case JobFailed(e) =>
if (e != null && deregisterExceptions.exists(_.equals(e.getClass.getCanonicalName))) {
error(
s"""
| Job failed exception class is in the set of
| ${ENGINE_DEREGISTER_EXCEPTION_CLASSES.key}, stopping the engine.
""".stripMargin, e)
server.discoveryService.asInstanceOf[EngineServiceDiscovery].stop()
}
case _ =>
}
}
}

View File

@ -560,6 +560,15 @@ object KyuubiConf {
.stringConf
.createWithDefault("SHOW DATABASES")
val ENGINE_DEREGISTER_EXCEPTION_CLASSES: ConfigEntry[Seq[String]] =
buildConf("engine.deregister.exception.classes")
.doc("A comma separated list of exception classes. If there is any exception thrown," +
" whose class matches the specified classes, the engine would deregister itself.")
.version("1.2.0")
.stringConf
.toSequence
.createWithDefault(Nil)
val OPERATION_SCHEDULER_POOL: OptionalConfigEntry[String] = buildConf("operation.scheduler.pool")
.doc("The scheduler pool of job. Note that, this config should be used after change Spark " +
"config spark.scheduler.mode=FAIR.")

View File

@ -29,7 +29,7 @@ abstract class Serverable(name: String) extends CompositeService(name) {
private[kyuubi] val backendService: AbstractBackendService
private lazy val frontendService = new FrontendService(backendService, OOMHook)
protected def supportsServiceDiscovery: Boolean
protected val discoveryService: Service
val discoveryService: Service
def connectionUrl: String = frontendService.connectionUrl

View File

@ -20,7 +20,7 @@ package org.apache.kyuubi.service
/**
* Service states
*/
private[kyuubi] object ServiceState extends Enumeration {
object ServiceState extends Enumeration {
type ServiceState = Value
val

View File

@ -34,6 +34,6 @@ class NoopServer extends Serverable("noop") {
}
override protected val discoveryService: Service = backendService
override val discoveryService: Service = backendService
override protected val supportsServiceDiscovery: Boolean = false
}

View File

@ -83,7 +83,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
override protected def supportsServiceDiscovery: Boolean = {
ServiceDiscovery.supportServiceDiscovery(conf)
}
override protected val discoveryService = new KyuubiServiceDiscovery(this)
override val discoveryService = new KyuubiServiceDiscovery(this)
override def initialize(conf: KyuubiConf): Unit = synchronized {
val kinit = new KinitAuxiliaryService()