1. set operation log to rootlogdir/operation_logs/username/sessionid

2. fix delete log dir bug:

java.io.FileNotFoundException: File does not exist: /home/hadoop/data/apache-spark/spark-2.1.2-bin-2.1.2/operation_logs/hzyaoqin/7c8789b1-5dfc-4eb2-bc9e-243e1ad6446e/f8ef9615-5a13-471d-a091-8ea83f32cd30
	at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2275)
	at org.apache.hadoop.hive.ql.session.OperationLog$LogFile.remove(OperationLog.java:163)
	at org.apache.hadoop.hive.ql.session.OperationLog.close(OperationLog.java:121)
	at yaooqinn.kyuubi.operation.KyuubiOperation.cleanupOperationLog(KyuubiOperation.scala:190)
	at yaooqinn.kyuubi.operation.KyuubiOperation.close(KyuubiOperation.scala:199)
	at yaooqinn.kyuubi.operation.OperationManager.closeOperation(OperationManager.scala:131)
	at yaooqinn.kyuubi.session.KyuubiSession$$anonfun$close$1.apply(KyuubiSession.scala:319)
	at yaooqinn.kyuubi.session.KyuubiSession$$anonfun$close$1.apply(KyuubiSession.scala:318)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
	at yaooqinn.kyuubi.session.KyuubiSession.close(KyuubiSession.scala:318)
	at yaooqinn.kyuubi.session.SessionManager.closeSession(SessionManager.scala:264)
	at yaooqinn.kyuubi.server.BackendService.closeSession(BackendService.scala:78)
	at yaooqinn.kyuubi.server.FrontendService$FeTServerEventHandler$$anonfun$deleteContext$1$$anonfun$1.apply$mcV$sp(FrontendService.scala:96)
	at yaooqinn.kyuubi.server.FrontendService$FeTServerEventHandler$$anonfun$deleteContext$1$$anonfun$1.apply(FrontendService.scala:96)
	at yaooqinn.kyuubi.server.FrontendService$FeTServerEventHandler$$anonfun$deleteContext$1$$anonfun$1.apply(FrontendService.scala:96)
	at scala.util.Try$.apply(Try.scala:192)
	at yaooqinn.kyuubi.server.FrontendService$FeTServerEventHandler$$anonfun$deleteContext$1.apply(FrontendService.scala:96)
	at yaooqinn.kyuubi.server.FrontendService$FeTServerEventHandler$$anonfun$deleteContext$1.apply(FrontendService.scala:93)
	at scala.Option.foreach(Option.scala:257)
	at yaooqinn.kyuubi.server.FrontendService$FeTServerEventHandler.deleteContext(FrontendService.scala:93)
	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:300)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
This commit is contained in:
Kent Yao 2018-01-18 15:43:56 +08:00
parent 1e40804c9a
commit 864542118d
4 changed files with 18 additions and 14 deletions

View File

@ -148,12 +148,12 @@ private[kyuubi] class OperationManager private(name: String)
}
try {
// read logs
val logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows)
val logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows).asScala
// convert logs to RowSet
val tableSchema: TableSchema = new TableSchema(getLogSchema)
val rowSet: RowSet =
RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion)
for (log <- logs.asScala) {
for (log <- logs) {
rowSet.addRow(Array[AnyRef](log))
}
rowSet
@ -163,6 +163,10 @@ private[kyuubi] class OperationManager private(name: String)
}
}
def getResultSetSchema(opHandle: OperationHandle): TableSchema = {
getOperation(opHandle).getResultSetSchema
}
private[this] def isFetchFirst(fetchOrientation: FetchOrientation): Boolean = {
fetchOrientation == FetchOrientation.FETCH_FIRST
}

View File

@ -141,15 +141,15 @@ private[server] class BackendService private(name: String)
}
override def cancelOperation(opHandle: OperationHandle): Unit = {
sessionManager.getOperationMgr.getOperation(opHandle).cancel()
sessionManager.getOperationMgr.getOperation(opHandle).getSession.cancelOperation(opHandle)
}
override def closeOperation(opHandle: OperationHandle): Unit = {
sessionManager.getOperationMgr.getOperation(opHandle).close()
sessionManager.getOperationMgr.getOperation(opHandle).getSession.closeOperation(opHandle)
}
override def getResultSetMetadata(opHandle: OperationHandle): TableSchema = {
sessionManager.getOperationMgr.getOperation(opHandle).getResultSetSchema
sessionManager.getOperationMgr.getOperation(opHandle).getSession.getResultSetMetadata(opHandle)
}
override def fetchResults(opHandle: OperationHandle): RowSet = {

View File

@ -116,7 +116,7 @@ private[kyuubi] class KyuubiSession(
}
private[this] def startSparkContextInNewThread(): Unit = {
new Thread("name") {
new Thread(s"Start-SparkContext-$getUserName") {
override def run(): Unit = {
promisedSparkContext.trySuccess(new SparkContext(conf))
}
@ -316,7 +316,7 @@ private[kyuubi] class KyuubiSession(
try {
// Iterate through the opHandles and close their operations
for (opHandle <- opHandleSet) {
operationManager.closeOperation(opHandle)
closeOperation(opHandle)
}
opHandleSet.clear()
// Cleanup session log directory.
@ -338,8 +338,7 @@ private[kyuubi] class KyuubiSession(
acquire(true)
try {
operationManager.cancelOperation(opHandle)
}
finally {
} finally {
release(true)
}
}
@ -357,7 +356,7 @@ private[kyuubi] class KyuubiSession(
def getResultSetMetadata(opHandle: OperationHandle): TableSchema = {
acquire(true)
try {
operationManager.getOperation(opHandle).getResultSetSchema
operationManager.getResultSetSchema(opHandle)
} finally {
release(true)
}
@ -408,7 +407,8 @@ private[kyuubi] class KyuubiSession(
* @param operationLogRootDir the parent dir of the session dir
*/
def setOperationLogSessionDir(operationLogRootDir: File): Unit = {
sessionLogDir = new File(operationLogRootDir, sessionHandle.getHandleIdentifier.toString)
sessionLogDir = new File(operationLogRootDir,
username + File.separator + sessionHandle.getHandleIdentifier.toString)
_isOperationLogEnabled = true
if (!sessionLogDir.exists) {
if (!sessionLogDir.mkdirs) {

View File

@ -18,12 +18,12 @@
package yaooqinn.kyuubi.session
import java.io.{File, IOException}
import java.util.{Date, Map => JMap}
import java.util.Date
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
import scala.collection.mutable.HashSet
import scala.collection.mutable.{HashSet => MHSet}
import org.apache.commons.io.FileUtils
import org.apache.hive.service.cli.{HiveSQLException, SessionHandle}
@ -49,7 +49,7 @@ private[kyuubi] class SessionManager private(
private[this] val handleToSessionUser = new ConcurrentHashMap[SessionHandle, String]
private[this] val userToSparkSession =
new ConcurrentHashMap[String, (SparkSession, AtomicInteger)]
private[this] val userSparkContextBeingConstruct = new HashSet[String]()
private[this] val userSparkContextBeingConstruct = new MHSet[String]()
private[this] var execPool: ThreadPoolExecutor = _
private[this] var isOperationLogEnabled = false
private[this] var operationLogRootDir: File = _