[KYUUBI #1034] Engine may deadlock when close operationLog
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1062 from qiuliang988/1034. Closes #1034 41d510f5 [qiuliang] add comments dbd7c770 [qiuliang] [KYUUBI #1034]Engine may deadlock when close operationLog Authored-by: qiuliang <qlcumt@163.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
ba2880cd66
commit
d83674248e
@ -17,6 +17,7 @@
|
|||||||
|
|
||||||
package org.apache.kyuubi.engine.spark.operation
|
package org.apache.kyuubi.engine.spark.operation
|
||||||
|
|
||||||
|
import java.io.IOException
|
||||||
import java.time.ZoneId
|
import java.time.ZoneId
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils
|
import org.apache.commons.lang3.StringUtils
|
||||||
@ -122,7 +123,12 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
|
|||||||
|
|
||||||
override def close(): Unit = {
|
override def close(): Unit = {
|
||||||
cleanup(OperationState.CLOSED)
|
cleanup(OperationState.CLOSED)
|
||||||
getOperationLog.foreach(_.close())
|
try {
|
||||||
|
getOperationLog.foreach(_.close())
|
||||||
|
} catch {
|
||||||
|
case e: IOException =>
|
||||||
|
error(e.getMessage, e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def getResultSetSchema: TTableSchema = SchemaHelper.toTTableSchema(resultSchema)
|
override def getResultSetSchema: TTableSchema = SchemaHelper.toTTableSchema(resultSchema)
|
||||||
|
|||||||
@ -83,7 +83,7 @@ object OperationLog extends Logging {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class OperationLog(path: Path) extends Logging {
|
class OperationLog(path: Path) {
|
||||||
|
|
||||||
private val writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)
|
private val writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)
|
||||||
private val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
|
private val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
|
||||||
@ -134,7 +134,12 @@ class OperationLog(path: Path) extends Logging {
|
|||||||
Files.delete(path)
|
Files.delete(path)
|
||||||
} catch {
|
} catch {
|
||||||
case e: IOException =>
|
case e: IOException =>
|
||||||
error(s"Failed to remove corresponding log file of operation: ${path.toAbsolutePath}", e)
|
// Printing log here may cause a deadlock. The lock order of OperationLog.write
|
||||||
|
// is RootLogger -> LogDivertAppender -> OperationLog. If printing log here, the
|
||||||
|
// lock order is OperationLog -> RootLogger. So the exception is thrown and
|
||||||
|
// processing at the invocations
|
||||||
|
throw new IOException(
|
||||||
|
s"Failed to remove corresponding log file of operation: ${path.toAbsolutePath}", e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,6 +17,8 @@
|
|||||||
|
|
||||||
package org.apache.kyuubi.operation
|
package org.apache.kyuubi.operation
|
||||||
|
|
||||||
|
import java.io.IOException
|
||||||
|
|
||||||
import com.codahale.metrics.MetricRegistry
|
import com.codahale.metrics.MetricRegistry
|
||||||
import org.apache.commons.lang3.StringUtils
|
import org.apache.commons.lang3.StringUtils
|
||||||
import org.apache.hive.service.rpc.thrift._
|
import org.apache.hive.service.rpc.thrift._
|
||||||
@ -97,6 +99,11 @@ abstract class KyuubiOperation(
|
|||||||
if (_remoteOpHandle != null && !isClosedOrCanceled) {
|
if (_remoteOpHandle != null && !isClosedOrCanceled) {
|
||||||
try {
|
try {
|
||||||
getOperationLog.foreach(_.close())
|
getOperationLog.foreach(_.close())
|
||||||
|
} catch {
|
||||||
|
case e: IOException =>
|
||||||
|
error(e.getMessage, e)
|
||||||
|
}
|
||||||
|
try {
|
||||||
client.closeOperation(_remoteOpHandle)
|
client.closeOperation(_remoteOpHandle)
|
||||||
setState(OperationState.CLOSED)
|
setState(OperationState.CLOSED)
|
||||||
} catch onError("closing")
|
} catch onError("closing")
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user