diff --git a/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala b/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala index 030082b53..b236dbe73 100644 --- a/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala +++ b/src/main/scala/yaooqinn/kyuubi/operation/OperationManager.scala @@ -148,12 +148,12 @@ private[kyuubi] class OperationManager private(name: String) } try { // read logs - val logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows) + val logs = operationLog.readOperationLog(isFetchFirst(orientation), maxRows).asScala // convert logs to RowSet val tableSchema: TableSchema = new TableSchema(getLogSchema) val rowSet: RowSet = RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion) - for (log <- logs.asScala) { + for (log <- logs) { rowSet.addRow(Array[AnyRef](log)) } rowSet @@ -163,6 +163,10 @@ private[kyuubi] class OperationManager private(name: String) } } + def getResultSetSchema(opHandle: OperationHandle): TableSchema = { + getOperation(opHandle).getResultSetSchema + } + private[this] def isFetchFirst(fetchOrientation: FetchOrientation): Boolean = { fetchOrientation == FetchOrientation.FETCH_FIRST } diff --git a/src/main/scala/yaooqinn/kyuubi/server/BackendService.scala b/src/main/scala/yaooqinn/kyuubi/server/BackendService.scala index 6be6ed8e6..d47ef1dd0 100644 --- a/src/main/scala/yaooqinn/kyuubi/server/BackendService.scala +++ b/src/main/scala/yaooqinn/kyuubi/server/BackendService.scala @@ -141,15 +141,15 @@ private[server] class BackendService private(name: String) } override def cancelOperation(opHandle: OperationHandle): Unit = { - sessionManager.getOperationMgr.getOperation(opHandle).cancel() + sessionManager.getOperationMgr.getOperation(opHandle).getSession.cancelOperation(opHandle) } override def closeOperation(opHandle: OperationHandle): Unit = { - sessionManager.getOperationMgr.getOperation(opHandle).close() + sessionManager.getOperationMgr.getOperation(opHandle).getSession.closeOperation(opHandle) } override def getResultSetMetadata(opHandle: OperationHandle): TableSchema = { - sessionManager.getOperationMgr.getOperation(opHandle).getResultSetSchema + sessionManager.getOperationMgr.getOperation(opHandle).getSession.getResultSetMetadata(opHandle) } override def fetchResults(opHandle: OperationHandle): RowSet = { diff --git a/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala b/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala index 2e3cb7b84..3f5b331bf 100644 --- a/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala +++ b/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala @@ -116,7 +116,7 @@ private[kyuubi] class KyuubiSession( } private[this] def startSparkContextInNewThread(): Unit = { - new Thread("name") { + new Thread(s"Start-SparkContext-$getUserName") { override def run(): Unit = { promisedSparkContext.trySuccess(new SparkContext(conf)) } @@ -316,7 +316,7 @@ private[kyuubi] class KyuubiSession( try { // Iterate through the opHandles and close their operations for (opHandle <- opHandleSet) { - operationManager.closeOperation(opHandle) + closeOperation(opHandle) } opHandleSet.clear() // Cleanup session log directory. @@ -338,8 +338,7 @@ private[kyuubi] class KyuubiSession( acquire(true) try { operationManager.cancelOperation(opHandle) - } - finally { + } finally { release(true) } } @@ -357,7 +356,7 @@ private[kyuubi] class KyuubiSession( def getResultSetMetadata(opHandle: OperationHandle): TableSchema = { acquire(true) try { - operationManager.getOperation(opHandle).getResultSetSchema + operationManager.getResultSetSchema(opHandle) } finally { release(true) } @@ -408,7 +407,8 @@ private[kyuubi] class KyuubiSession( * @param operationLogRootDir the parent dir of the session dir */ def setOperationLogSessionDir(operationLogRootDir: File): Unit = { - sessionLogDir = new File(operationLogRootDir, sessionHandle.getHandleIdentifier.toString) + sessionLogDir = new File(operationLogRootDir, + username + File.separator + sessionHandle.getHandleIdentifier.toString) _isOperationLogEnabled = true if (!sessionLogDir.exists) { if (!sessionLogDir.mkdirs) { diff --git a/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala b/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala index 9046b217e..6e7be3c25 100644 --- a/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala +++ b/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala @@ -18,12 +18,12 @@ package yaooqinn.kyuubi.session import java.io.{File, IOException} -import java.util.{Date, Map => JMap} +import java.util.Date import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ -import scala.collection.mutable.HashSet +import scala.collection.mutable.{HashSet => MHSet} import org.apache.commons.io.FileUtils import org.apache.hive.service.cli.{HiveSQLException, SessionHandle} @@ -49,7 +49,7 @@ private[kyuubi] class SessionManager private( private[this] val handleToSessionUser = new ConcurrentHashMap[SessionHandle, String] private[this] val userToSparkSession = new ConcurrentHashMap[String, (SparkSession, AtomicInteger)] - private[this] val userSparkContextBeingConstruct = new HashSet[String]() + private[this] val userSparkContextBeingConstruct = new MHSet[String]() private[this] var execPool: ThreadPoolExecutor = _ private[this] var isOperationLogEnabled = false private[this] var operationLogRootDir: File = _