From 864542118dc087d5b32c39eec63e3cab69748c84 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 18 Jan 2018 15:43:56 +0800 Subject: [PATCH] 1. set operation log to rootlogdir/operation_logs/username/sessionid 2. fix delete log dir bug: java.io.FileNotFoundException: File does not exist: /home/hadoop/data/apache-spark/spark-2.1.2-bin-2.1.2/operation_logs/hzyaoqin/7c8789b1-5dfc-4eb2-bc9e-243e1ad6446e/f8ef9615-5a13-471d-a091-8ea83f32cd30 at org.apache.commons.io.FileUtils.forceDelete(FileUtils.java:2275) at org.apache.hadoop.hive.ql.session.OperationLog$LogFile.remove(OperationLog.java:163) at org.apache.hadoop.hive.ql.session.OperationLog.close(OperationLog.java:121) at yaooqinn.kyuubi.operation.KyuubiOperation.cleanupOperationLog(KyuubiOperation.scala:190) at yaooqinn.kyuubi.operation.KyuubiOperation.close(KyuubiOperation.scala:199) at yaooqinn.kyuubi.operation.OperationManager.closeOperation(OperationManager.scala:131) at yaooqinn.kyuubi.session.KyuubiSession$$anonfun$close$1.apply(KyuubiSession.scala:319) at yaooqinn.kyuubi.session.KyuubiSession$$anonfun$close$1.apply(KyuubiSession.scala:318) at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at yaooqinn.kyuubi.session.KyuubiSession.close(KyuubiSession.scala:318) at yaooqinn.kyuubi.session.SessionManager.closeSession(SessionManager.scala:264) at yaooqinn.kyuubi.server.BackendService.closeSession(BackendService.scala:78) at yaooqinn.kyuubi.server.FrontendService$FeTServerEventHandler$$anonfun$deleteContext$1$$anonfun$1.apply$mcV$sp(FrontendService.scala:96) at yaooqinn.kyuubi.server.FrontendService$FeTServerEventHandler$$anonfun$deleteContext$1$$anonfun$1.apply(FrontendService.scala:96) at yaooqinn.kyuubi.server.FrontendService$FeTServerEventHandler$$anonfun$deleteContext$1$$anonfun$1.apply(FrontendService.scala:96) at scala.util.Try$.apply(Try.scala:192) at yaooqinn.kyuubi.server.FrontendService$FeTServerEventHandler$$anonfun$deleteContext$1.apply(FrontendService.scala:96) at yaooqinn.kyuubi.server.FrontendService$FeTServerEventHandler$$anonfun$deleteContext$1.apply(FrontendService.scala:93) at scala.Option.foreach(Option.scala:257) at yaooqinn.kyuubi.server.FrontendService$FeTServerEventHandler.deleteContext(FrontendService.scala:93) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:300) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) --- .../yaooqinn/kyuubi/operation/OperationManager.scala | 8 ++++++-- .../yaooqinn/kyuubi/server/BackendService.scala | 6 +++--- .../yaooqinn/kyuubi/session/KyuubiSession.scala | 12 ++++++------ .../yaooqinn/kyuubi/session/SessionManager.scala | 6 +++--- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala b/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala index 030082b53..b236dbe73 100644 --- a/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala +++ b/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala @@ -148,12 +148,12 @@ private[kyuubi] class OperationManager private(name: String) } try { // read logs - val logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows) + val logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows).asScala // convert logs to RowSet val tableSchema: TableSchema = new TableSchema(getLogSchema) val rowSet: RowSet = RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion) - for (log <- logs.asScala) { + for (log <- logs) { rowSet.addRow(Array[AnyRef](log)) } rowSet @@ -163,6 +163,10 @@ private[kyuubi] class OperationManager private(name: String) } } + def getResultSetSchema(opHandle: OperationHandle): TableSchema = { + getOperation(opHandle).getResultSetSchema + } + private[this] def isFetchFirst(fetchOrientation: FetchOrientation): Boolean = { fetchOrientation == FetchOrientation.FETCH_FIRST } diff --git a/src/main/scala/yaooqinn/kyuubi/server/BackendService.scala b/src/main/scala/yaooqinn/kyuubi/server/BackendService.scala index 6be6ed8e6..d47ef1dd0 100644 --- a/src/main/scala/yaooqinn/kyuubi/server/BackendService.scala +++ b/src/main/scala/yaooqinn/kyuubi/server/BackendService.scala @@ -141,15 +141,15 @@ private[server] class BackendService private(name: String) } override def cancelOperation(opHandle: OperationHandle): Unit = { - sessionManager.getOperationMgr.getOperation(opHandle).cancel() + sessionManager.getOperationMgr.getOperation(opHandle).getSession.cancelOperation(opHandle) } override def closeOperation(opHandle: OperationHandle): Unit = { - sessionManager.getOperationMgr.getOperation(opHandle).close() + sessionManager.getOperationMgr.getOperation(opHandle).getSession.closeOperation(opHandle) } override def getResultSetMetadata(opHandle: OperationHandle): TableSchema = { - sessionManager.getOperationMgr.getOperation(opHandle).getResultSetSchema + sessionManager.getOperationMgr.getOperation(opHandle).getSession.getResultSetMetadata(opHandle) } override def fetchResults(opHandle: OperationHandle): RowSet = { diff --git a/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala b/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala index 2e3cb7b84..3f5b331bf 100644 --- a/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala +++ b/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala @@ -116,7 +116,7 @@ private[kyuubi] class KyuubiSession( } private[this] def startSparkContextInNewThread(): Unit = { - new Thread("name") { + new Thread(s"Start-SparkContext-$getUserName") { override def run(): Unit = { promisedSparkContext.trySuccess(new SparkContext(conf)) } @@ -316,7 +316,7 @@ private[kyuubi] class KyuubiSession( try { // Iterate through the opHandles and close their operations for (opHandle <- opHandleSet) { - operationManager.closeOperation(opHandle) + closeOperation(opHandle) } opHandleSet.clear() // Cleanup session log directory. @@ -338,8 +338,7 @@ private[kyuubi] class KyuubiSession( acquire(true) try { operationManager.cancelOperation(opHandle) - } - finally { + } finally { release(true) } } @@ -357,7 +356,7 @@ private[kyuubi] class KyuubiSession( def getResultSetMetadata(opHandle: OperationHandle): TableSchema = { acquire(true) try { - operationManager.getOperation(opHandle).getResultSetSchema + operationManager.getResultSetSchema(opHandle) } finally { release(true) } @@ -408,7 +407,8 @@ private[kyuubi] class KyuubiSession( * @param operationLogRootDir the parent dir of the session dir */ def setOperationLogSessionDir(operationLogRootDir: File): Unit = { - sessionLogDir = new File(operationLogRootDir, sessionHandle.getHandleIdentifier.toString) + sessionLogDir = new File(operationLogRootDir, + username + File.separator + sessionHandle.getHandleIdentifier.toString) _isOperationLogEnabled = true if (!sessionLogDir.exists) { if (!sessionLogDir.mkdirs) { diff --git a/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala b/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala index 9046b217e..6e7be3c25 100644 --- a/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala +++ b/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala @@ -18,12 +18,12 @@ package yaooqinn.kyuubi.session import java.io.{File, IOException} -import java.util.{Date, Map => JMap} +import java.util.Date import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ -import scala.collection.mutable.HashSet +import scala.collection.mutable.{HashSet => MHSet} import org.apache.commons.io.FileUtils import org.apache.hive.service.cli.{HiveSQLException, SessionHandle} @@ -49,7 +49,7 @@ private[kyuubi] class SessionManager private( private[this] val handleToSessionUser = new ConcurrentHashMap[SessionHandle, String] private[this] val userToSparkSession = new ConcurrentHashMap[String, (SparkSession, AtomicInteger)] - private[this] val userSparkContextBeingConstruct = new HashSet[String]() + private[this] val userSparkContextBeingConstruct = new MHSet[String]() private[this] var execPool: ThreadPoolExecutor = _ private[this] var isOperationLogEnabled = false private[this] var operationLogRootDir: File = _