[KYUUBI #1800] Remove oom hook

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
closes #1800

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1803 from ulysses-you/KYUUBI#1800.

Closes #1800

416bf9f3 [ulysses-you] comment
98437694 [ulysses-you] style git diff
ad9d21d3 [ulysses-you] remove oom hook

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
ulysses-you 2022-01-19 19:30:23 +08:00 committed by Kent Yao
parent dd4c2fa7fc
commit bb1cfde773
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
6 changed files with 13 additions and 200 deletions

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.service
import java.util.concurrent.TimeUnit
import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit}
import org.apache.hive.service.rpc.thrift._
import org.apache.thrift.protocol.TBinaryProtocol
@ -26,7 +26,7 @@ import org.apache.thrift.transport.TServerSocket
import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.util.ExecutorPoolCaptureOom
import org.apache.kyuubi.util.NamedThreadFactory
/**
* Apache Thrift based hive service rpc
@ -46,14 +46,7 @@ abstract class TBinaryFrontendService(name: String)
private var server: Option[TServer] = None
// When a OOM occurs, here we de-register the engine by stop its discoveryService.
// Then the current engine will not be connected by new client anymore but keep the existing ones
// alive. In this case we can reduce the engine's overhead and make it possible recover from that.
// We shall not tear down the whole engine by serverable.stop to make the engine unreachable for
// the existing clients which are still getting statuses and reporting to the end-users.
protected def oomHook: Runnable = {
() => discoveryService.foreach(_.stop())
}
// Removed OOM hook since Kyuubi #1800 to respect the hive server2 #2383
override def initialize(conf: KyuubiConf): Unit = synchronized {
this.conf = conf
@ -61,12 +54,13 @@ abstract class TBinaryFrontendService(name: String)
val minThreads = conf.get(FRONTEND_THRIFT_MIN_WORKER_THREADS)
val maxThreads = conf.get(FRONTEND_THRIFT_MAX_WORKER_THREADS)
val keepAliveTime = conf.get(FRONTEND_THRIFT_WORKER_KEEPALIVE_TIME)
val executor = ExecutorPoolCaptureOom(
name + "Handler-Pool",
val executor = new ThreadPoolExecutor(
minThreads,
maxThreads,
keepAliveTime,
oomHook)
TimeUnit.MILLISECONDS,
new SynchronousQueue[Runnable](),
new NamedThreadFactory(name + "Handler-Pool", false))
val transFactory = authFactory.getTTransportFactory
val tProcFactory = authFactory.getTProcessorFactory(this)
val tServerSocket = new TServerSocket(serverSocket)

View File

@ -1,53 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.util
import java.util.concurrent.{Future, SynchronousQueue, ThreadPoolExecutor, TimeUnit}
case class ExecutorPoolCaptureOom(
poolName: String,
corePoolSize: Int,
maximumPoolSize: Int,
keepAliveSeconds: Long,
hook: Runnable)
extends ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveSeconds,
TimeUnit.MILLISECONDS,
new SynchronousQueue[Runnable](),
new NamedThreadFactory(poolName, false)) {
override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)
t match {
case _: OutOfMemoryError => hook.run()
case null => r match {
case f: Future[_] =>
try {
if (f.isDone) f.get()
} catch {
case _: InterruptedException => Thread.currentThread().interrupt()
case _: OutOfMemoryError => hook.run()
}
case _ =>
}
case _ =>
}
}
}

View File

@ -21,6 +21,4 @@ class NoopTBinaryFrontendService(override val serverable: Serverable)
extends TBinaryFrontendService("NoopThriftBinaryFrontend") {
override val discoveryService: Option[Service] = None
override protected def oomHook: Runnable = () => serverable.stop()
}

View File

@ -1,121 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.util
import java.util.concurrent.{RunnableFuture, TimeUnit}
import org.apache.kyuubi.KyuubiFunSuite
class ExecutorPoolCaptureOomSuite extends KyuubiFunSuite {
@volatile var flag = false
private val oomHook = new Runnable {
override def run(): Unit = {
flag = true
}
}
override def beforeEach(): Unit = {
flag = false
}
private val pool = ExecutorPoolCaptureOom(getClass.getName, 10, 10, 10, oomHook)
test("t is RuntimeException, r is not future") {
val exception = new RuntimeException()
pool.execute(() => {
throw exception
})
checkFalse()
pool.afterExecute(null, exception)
checkFalse()
}
test("t is OutOfMemoryError, r is not future") {
val error = new OutOfMemoryError()
pool.execute(() => {
throw error
})
checkTrue()
flag = false
pool.afterExecute(null, error)
checkTrue()
}
test("t is null, r is not future") {
pool.execute(() => ())
checkFalse()
pool.afterExecute(null, null)
checkFalse()
}
test("t is null, r is future with no exception") {
val future = new TestRunnableFuture(1)
pool.execute(future)
wait(future)
checkFalse()
}
test("t is null, r is future throw InterruptedException") {
val future = new TestRunnableFuture(throw new InterruptedException())
pool.execute(future)
checkFalse()
}
test("t is null, r is future throw OutOfMemoryError") {
val future = new TestRunnableFuture(throw new OutOfMemoryError())
pool.execute(future)
wait(future)
checkTrue()
}
test("t is null, r is future throw RuntimeException") {
val future = new TestRunnableFuture(throw new RuntimeException)
pool.execute(future)
wait(future)
checkFalse()
}
def wait(future: RunnableFuture[_]): Unit = {
while (!future.isDone) {
Thread.sleep(10)
}
}
def checkFalse(): Unit = {
Thread.sleep(50)
assert(!flag)
}
def checkTrue(): Unit = {
Thread.sleep(50)
assert(flag)
}
}
class TestRunnableFuture[T](f: => T, isDone: Boolean = true) extends RunnableFuture[T] {
override def run(): Unit = {}
override def cancel(mayInterruptIfRunning: Boolean): Boolean = !isDone
override def isCancelled: Boolean = isDone
override def isDone: Boolean = isDone
override def get(): T = f
override def get(timeout: Long, unit: TimeUnit): T = f
}

View File

@ -18,7 +18,7 @@
package org.apache.kyuubi.server
import java.net.{InetAddress, InetSocketAddress}
import java.util.concurrent.{ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit}
import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.PooledByteBufAllocator
@ -32,7 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.server.mysql._
import org.apache.kyuubi.server.mysql.authentication.MySQLAuthHandler
import org.apache.kyuubi.service.{AbstractFrontendService, Serverable, Service}
import org.apache.kyuubi.util.ExecutorPoolCaptureOom
import org.apache.kyuubi.util.NamedThreadFactory
import org.apache.kyuubi.util.NettyUtils._
/**
@ -50,18 +50,17 @@ class KyuubiMySQLFrontendService(override val serverable: Serverable)
@volatile protected var isStarted = false
protected def oomHook: Runnable = () => serverable.stop()
override def initialize(conf: KyuubiConf): Unit = synchronized {
val minThreads = conf.get(FRONTEND_MYSQL_MIN_WORKER_THREADS)
val maxThreads = conf.get(FRONTEND_MYSQL_MAX_WORKER_THREADS)
val keepAliveMs = conf.get(FRONTEND_MYSQL_WORKER_KEEPALIVE_TIME)
execPool = ExecutorPoolCaptureOom(
"mysql-exec-pool",
execPool = new ThreadPoolExecutor(
minThreads,
maxThreads,
keepAliveMs,
oomHook)
TimeUnit.MILLISECONDS,
new SynchronousQueue[Runnable](),
new NamedThreadFactory("mysql-exec-pool", false))
serverAddr = conf.get(FRONTEND_MYSQL_BIND_HOST)
.map(InetAddress.getByName)

View File

@ -69,9 +69,5 @@ final class KyuubiTBinaryFrontendService(
resp
}
override protected def oomHook: Runnable = {
() => serverable.stop()
}
override protected def isServer(): Boolean = true
}