diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index 2bd855403..e15645133 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -315,8 +315,16 @@ private[kyuubi] class EngineRef( */ def deregister(discoveryClient: DiscoveryClient, hostPort: (String, Int)): Unit = tryWithLock(discoveryClient) { - if (discoveryClient.getServerHost(engineSpace) == Option(hostPort)) { - discoveryClient.delete(engineSpace) + // refer the DiscoveryClient::getServerHost implementation + discoveryClient.getServiceNodesInfo(engineSpace, Some(1), silent = true) match { + case Seq(sn) => + if ((sn.host, sn.port) == hostPort) { + info(s"Deleting engine node:$sn") + discoveryClient.delete(s"$engineSpace/${sn.nodeName}") + } else { + warn(s"Engine node:$sn is not matched with host&port[$hostPort]") + } + case _ => warn(s"No engine node found in $engineSpace") } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala index 08b36b84a..af927960d 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala @@ -341,4 +341,25 @@ trait EngineRefTests extends KyuubiFunSuite { val engine4 = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) assert(engine4.subdomain.startsWith("engine-pool-")) } + + test("deregister engine with existing host port") { + val id = UUID.randomUUID().toString + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString) + conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString) + conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) + conf.set(HighAvailabilityConf.HA_NAMESPACE, "engine_test") + conf.set(HighAvailabilityConf.HA_ADDRESSES, getConnectString()) + conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") + + val engine = new EngineRef(conf, user, PluginLoader.loadGroupProvider(conf), id, null) + + DiscoveryClientProvider.withDiscoveryClient(conf) { client => + val hp = engine.getOrCreate(client) + assert(client.getServerHost(engine.engineSpace) == Option(hp)) + engine.deregister(client, ("non_existing_host", 0)) + assert(client.getServerHost(engine.engineSpace) == Option(hp)) + engine.deregister(client, hp) + assert(client.getServerHost(engine.engineSpace).isEmpty) + } + } }