From 4f74aba50e683fc7d58e793f92ff6db75e09f7b8 Mon Sep 17 00:00:00 2001 From: fwang12 Date: Wed, 29 Sep 2021 00:43:23 +0800 Subject: [PATCH] [KYUUBI #1160] Support to config operation log root dir both for kyuubi server and engine sides ### _Why are the changes needed?_ Support to config operation log root dir with kyuubi configuration both for server and engine sides. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1160 from turboFei/support_config_operationlog_dir. Closes #1160 ed424f46 [fwang12] fix ut 5edfd4ae [fwang12] refactor ut b7026b2f [fwang12] add ut 6ff4d891 [fwang12] save 30cd5952 [fwang12] fix ut f058619a [fwang12] fix ut 96eb020d [fwang12] refactor ae0ded96 [fwang12] set config value for testing 83d932f4 [fwang12] exclude 33e3e214 [fwang12] update 6f030cba [fwang12] fix 77ec9ca2 [fwang12] save 711ececf [fwang12] Fix code style cfa3bfac [fwang12] refactor b299f8cf [fwang12] complete ut 26de3712 [fwang12] Support to config operation log root dir for kyuubi server and engine side Authored-by: fwang12 Signed-off-by: fwang12 --- .gitignore | 3 + docs/deployment/settings.md | 2 + .../spark/operation/ExecuteStatement.scala | 3 +- .../spark/operation/PlanOnlyStatement.scala | 3 +- .../session/SparkSQLSessionManager.scala | 6 ++ .../org/apache/kyuubi/config/KyuubiConf.scala | 14 ++++ .../kyuubi/operation/log/OperationLog.scala | 59 ++++++------- .../kyuubi/session/AbstractSession.scala | 3 +- .../kyuubi/session/SessionManager.scala | 20 +++++ .../kyuubi/config/KyuubiConfSuite.scala | 2 +- .../operation/log/OperationLogSuite.scala | 84 ++++++++++++++----- .../kyuubi/session/NoopSessionManager.scala | 10 +++ .../kyuubi/operation/ExecuteStatement.scala | 2 +- .../kyuubi/session/KyuubiSessionManager.scala | 2 + pom.xml | 4 + 15 files changed, 157 insertions(+), 60 deletions(-) diff --git a/.gitignore b/.gitignore index 9bef5972c..7bc6a7f2b 100644 --- a/.gitignore +++ b/.gitignore @@ -64,8 +64,11 @@ metrics/.report.json.crc /kyuubi-ha/embedded_zookeeper/ embedded_zookeeper/ /externals/kyuubi-spark-sql-engine/operation_logs/ +/externals/kyuubi-spark-sql-engine/engine_operation_logs/ /externals/kyuubi-spark-sql-engine/spark-warehouse/ /work/ /docs/_build/ /kyuubi-common/metrics/ **/operation_logs/ +**/server_operation_logs/ +**/engine_operation_logs/ diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index 43050b589..f0f08f179 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -180,6 +180,7 @@ kyuubi\.engine
\.deregister\.job\.max
\.failures|
file:/tmp/kyuubi/events
|
The location of all the engine events go for the builtin JSON logger.
  • Local Path: start with 'file:'
  • HDFS Path: start with 'hdfs:'
|
string
|
1.3.0
kyuubi\.engine\.event
\.loggers|
|
A comma separated list of engine history loggers, where engine/session/operation etc events go.
  • SPARK: the events will be written to the spark history events
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
|
seq
|
1.3.0
kyuubi\.engine
\.initialize\.sql|
SHOW DATABASES
|
SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.
|
seq
|
1.2.0
+kyuubi\.engine
\.operation\.log\.dir
\.root|
engine_operation_logs
|
Root directory for query operation log at engine-side.
|
string
|
1.4.0
kyuubi\.engine\.pool
\.size|
-1
|
The size of engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold).
|
int
|
1.4.0
kyuubi\.engine\.pool
\.size\.threshold|
9
|
This parameter is introduced as a server-side parameter, and controls the upper limit of the engine pool.
|
int
|
1.4.0
kyuubi\.engine\.session
\.initialize\.sql|
SHOW DATABASES
|
SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.
|
seq
|
1.3.0
@@ -261,6 +262,7 @@ Key | Default | Meaning | Type | Since --- | --- | --- | --- | --- kyuubi\.operation\.idle
\.timeout|
PT3H
|
Operation will be closed when it's not accessed for this duration of time
|
duration
|
1.0.0
kyuubi\.operation
\.interrupt\.on\.cancel|
true
|
When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.
|
boolean
|
1.2.0
+kyuubi\.operation\.log
\.dir\.root|
server_operation_logs
|
Root directory for query operation log at server-side.
|
string
|
1.4.0
kyuubi\.operation\.plan
\.only\.mode|
NONE
|
Whether to perform the statement in a PARSE, ANALYZE, OPTIMIZE only way without executing the query. When it is NONE, the statement will be fully executed
|
string
|
1.4.0
kyuubi\.operation
\.query\.timeout|
<undefined>
|
Timeout for query executions at server-side, take affect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take fully control whether the query should timeout or not. If set, client-side timeout capped at this point. To cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.
|
duration
|
1.2.0
kyuubi\.operation
\.scheduler\.pool|
<undefined>
|
The scheduler pool of job. Note that, this config should be used after change Spark config spark.scheduler.mode=FAIR.
|
string
|
1.1.1
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index 6e2471f9e..796d87b65 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -52,8 +52,7 @@ class ExecuteStatement( private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None - private val operationLog: OperationLog = - OperationLog.createOperationLog(session.handle, getHandle) + private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle) override def getOperationLog: Option[OperationLog] = Option(operationLog) private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala index 07b8cf938..529f620e0 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala @@ -37,8 +37,7 @@ class PlanOnlyStatement( mode: OperationMode) extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) { - private val operationLog: OperationLog = - OperationLog.createOperationLog(session.handle, getHandle) + private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle) override def getOperationLog: Option[OperationLog] = Option(operationLog) override protected def resultSchema: StructType = if (result == null) { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala index d91947991..96cdeca4b 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala @@ -21,6 +21,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.engine.ShareLevel import org.apache.kyuubi.engine.spark.SparkSQLEngine @@ -41,6 +42,11 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) def this(spark: SparkSession) = this(classOf[SparkSQLSessionManager].getSimpleName, spark) + override def initialize(conf: KyuubiConf): Unit = { + _operationLogRoot = Some(conf.get(ENGINE_OPERATION_LOG_DIR_ROOT)) + super.initialize(conf) + } + val operationManager = new SparkSQLOperationManager() private lazy val singleSparkSession = conf.get(ENGINE_SINGLE_SPARK_SESSION) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 1d583b05f..dd734662f 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -678,6 +678,13 @@ object KyuubiConf { .checkValue(_ >= 1000, "must >= 1s if set") .createOptional + val SERVER_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] = + buildConf("operation.log.dir.root") + .doc("Root directory for query operation log at server-side.") + .version("1.4.0") + .stringConf + .createWithDefault("server_operation_logs") + @deprecated(s"using kyuubi.engine.share.level instead", "1.2.0") val LEGACY_ENGINE_SHARE_LEVEL: ConfigEntry[String] = buildConf("session.engine.share.level") .doc(s"(deprecated) - Using kyuubi.engine.share.level instead") @@ -885,6 +892,13 @@ object KyuubiConf { .checkValue(_ > 0, "retained sessions must be positive.") .createWithDefault(200) + val ENGINE_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] = + buildConf("engine.operation.log.dir.root") + .doc("Root directory for query operation log at engine-side.") + .version("1.4.0") + .stringConf + .createWithDefault("engine_operation_logs") + val SESSION_NAME: OptionalConfigEntry[String] = buildConf("session.name") .doc("A human readable name of session and we use empty string by default. " + diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala index 1e485532a..45a08784d 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala @@ -24,17 +24,11 @@ import java.nio.file.{Files, Path, Paths} import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn} -import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils} +import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.operation.OperationHandle -import org.apache.kyuubi.session.SessionHandle +import org.apache.kyuubi.session.Session object OperationLog extends Logging { - - def LOG_ROOT: String = if (Utils.isTesting) { - "target/operation_logs" - } else { - "operation_logs" - } private final val OPERATION_LOG: InheritableThreadLocal[OperationLog] = { new InheritableThreadLocal[OperationLog] { override def initialValue(): OperationLog = null @@ -50,36 +44,37 @@ object OperationLog extends Logging { def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove() /** - * The operation log root directory, here we choose $PWD/[[LOG_ROOT]]/$sessionId/ as the root per - * session, this directory will delete when JVM exit. + * The operation log root directory, this directory will delete when JVM exit. */ - def createOperationLogRootDirectory(sessionHandle: SessionHandle): Unit = { - val path = Paths.get(LOG_ROOT, sessionHandle.identifier.toString) - try { - Files.createDirectories(path) - path.toFile.deleteOnExit() - } catch { - case e: IOException => - error(s"Failed to create operation log root directory: $path", e) + def createOperationLogRootDirectory(session: Session): Unit = { + session.sessionManager.operationLogRoot.foreach { operationLogRoot => + val path = Paths.get(operationLogRoot, session.handle.identifier.toString) + try { + Files.createDirectories(path) + path.toFile.deleteOnExit() + } catch { + case e: IOException => + error(s"Failed to create operation log root directory: $path", e) + } } } /** - * Create the OperationLog for each operation, the log file will be located at - * $PWD/[[LOG_ROOT]]/$sessionId/$operationId - * @return + * Create the OperationLog for each operation. */ - def createOperationLog(sessionHandle: SessionHandle, opHandle: OperationHandle): OperationLog = { - try { - val logPath = Paths.get(LOG_ROOT, sessionHandle.identifier.toString) - val logFile = Paths.get(logPath.toAbsolutePath.toString, opHandle.identifier.toString) - info(s"Creating operation log file $logFile") - new OperationLog(logFile) - } catch { - case e: IOException => - error(s"Failed to create operation log for $opHandle in $sessionHandle", e) - null - } + def createOperationLog(session: Session, opHandle: OperationHandle): OperationLog = { + session.sessionManager.operationLogRoot.map { operationLogRoot => + try { + val logPath = Paths.get(operationLogRoot, session.handle.identifier.toString) + val logFile = Paths.get(logPath.toAbsolutePath.toString, opHandle.identifier.toString) + info(s"Creating operation log file $logFile") + new OperationLog(logFile) + } catch { + case e: IOException => + error(s"Failed to create operation log for $opHandle in ${session.handle}", e) + null + } + }.getOrElse(null) } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala index f558195f0..687c79fbe 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala @@ -213,7 +213,6 @@ abstract class AbstractSession( } override def open(): Unit = { - OperationLog.createOperationLogRootDirectory(handle) + OperationLog.createOperationLogRootDirectory(this) } - } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala index d6a79ef9c..81eee2239 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala @@ -17,6 +17,8 @@ package org.apache.kyuubi.session +import java.io.IOException +import java.nio.file.{Files, Paths} import java.util.concurrent.{ConcurrentHashMap, Future, ThreadPoolExecutor, TimeUnit} import scala.collection.JavaConverters._ @@ -41,6 +43,23 @@ abstract class SessionManager(name: String) extends CompositeService(name) { @volatile private var shutdown = false + protected var _operationLogRoot: Option[String] = None + + def operationLogRoot: Option[String] = _operationLogRoot + + private def initOperationLogRootDir(): Unit = { + try { + _operationLogRoot.foreach { logRoot => + val rootPath = Paths.get(logRoot) + Files.createDirectories(rootPath) + } + } catch { + case e: IOException => + error(s"Failed to initialize operation log root directory: ${_operationLogRoot}", e) + _operationLogRoot = None + } + } + @volatile private var _latestLogoutTime: Long = System.currentTimeMillis() def latestLogoutTime: Long = _latestLogoutTime @@ -139,6 +158,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) { override def initialize(conf: KyuubiConf): Unit = synchronized { addService(operationManager) + initOperationLogRootDir() val poolSize: Int = if (isServer) { conf.get(SERVER_EXEC_POOL_SIZE) diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala index 9e859c827..944316db2 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala @@ -46,7 +46,7 @@ class KyuubiConfSuite extends KyuubiFunSuite { } test("set and unset conf") { - val conf = new KyuubiConf() + val conf = new KyuubiConf(false) val key = "kyuubi.conf.abc" conf.set(key, "opq") diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala index 2251a2a31..5870153f4 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala @@ -23,27 +23,38 @@ import scala.collection.JavaConverters._ import org.apache.hive.service.rpc.thrift.TProtocolVersion -import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException} +import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils} +import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.operation.{OperationHandle, OperationType} -import org.apache.kyuubi.session.SessionHandle +import org.apache.kyuubi.session.NoopSessionManager class OperationLogSuite extends KyuubiFunSuite { val msg1 = "This is just a dummy log message 1" val msg2 = "This is just a dummy log message 2" - test("create, delete, read and write to operation log") { - val sHandle = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + test("create, delete, read and write to server operation log") { + val sessionManager = new NoopSessionManager + sessionManager.initialize(KyuubiConf()) + val sHandle = sessionManager.openSession( + TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10, + "kyuubi", + "passwd", + "localhost", + Map.empty) + val session = sessionManager.getSession(sHandle) val oHandle = OperationHandle( OperationType.EXECUTE_STATEMENT, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + assert(sessionManager.operationLogRoot.isDefined) + val operationLogRoot = sessionManager.operationLogRoot.get - OperationLog.createOperationLogRootDirectory(sHandle) - assert(Files.exists(Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString))) - assert(Files.isDirectory(Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString))) + OperationLog.createOperationLogRootDirectory(session) + assert(Files.exists(Paths.get(operationLogRoot, sHandle.identifier.toString))) + assert(Files.isDirectory(Paths.get(operationLogRoot, sHandle.identifier.toString))) - val operationLog = OperationLog.createOperationLog(sHandle, oHandle) - val logFile = - Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString, oHandle.identifier.toString) + val operationLog = OperationLog.createOperationLog(session, oHandle) + val logFile = Paths.get(operationLogRoot, sHandle.identifier.toString, + oHandle.identifier.toString) assert(Files.exists(logFile)) OperationLog.setCurrentOperationLog(operationLog) @@ -69,12 +80,20 @@ class OperationLogSuite extends KyuubiFunSuite { } test("log divert appender") { - val sHandle = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) + val sessionManager = new NoopSessionManager + sessionManager.initialize(KyuubiConf()) + val sHandle = sessionManager.openSession( + TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10, + "kyuubi", + "passwd", + "localhost", + Map.empty) + val session = sessionManager.getSession(sHandle) val oHandle = OperationHandle( OperationType.EXECUTE_STATEMENT, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) - OperationLog.createOperationLogRootDirectory(sHandle) - val operationLog = OperationLog.createOperationLog(sHandle, oHandle) + OperationLog.createOperationLogRootDirectory(session) + val operationLog = OperationLog.createOperationLog(session, oHandle) OperationLog.setCurrentOperationLog(operationLog) LogDivertAppender.initialize() @@ -102,25 +121,50 @@ class OperationLogSuite extends KyuubiFunSuite { } test("exception when creating log files") { - val sHandle = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) - val logRoot = Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString).toFile + val sessionManager = new NoopSessionManager + sessionManager.initialize(KyuubiConf()) + val sHandle = sessionManager.openSession( + TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10, + "kyuubi", + "passwd", + "localhost", + Map.empty) + val session = sessionManager.getSession(sHandle) + assert(sessionManager.operationLogRoot.isDefined) + val operationLogRoot = sessionManager.operationLogRoot.get + + val logRoot = Paths.get(operationLogRoot, sHandle.identifier.toString).toFile logRoot.deleteOnExit() - Files.createFile(Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString)) + Files.createFile(Paths.get(operationLogRoot, sHandle.identifier.toString)) assert(logRoot.exists()) - OperationLog.createOperationLogRootDirectory(sHandle) + OperationLog.createOperationLogRootDirectory(session) assert(logRoot.isFile) val oHandle = OperationHandle( OperationType.EXECUTE_STATEMENT, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10) - val log = OperationLog.createOperationLog(sHandle, oHandle) + val log = OperationLog.createOperationLog(session, oHandle) assert(log === null) logRoot.delete() - OperationLog.createOperationLogRootDirectory(sHandle) - val log1 = OperationLog.createOperationLog(sHandle, oHandle) + OperationLog.createOperationLogRootDirectory(session) + val log1 = OperationLog.createOperationLog(session, oHandle) log1.write("some msg here \n") log1.close() log1.write("some msg here again") val e = intercept[KyuubiSQLException](log1.read(-1)) assert(e.getMessage.contains(s"${sHandle.identifier}/${oHandle.identifier}")) } + + test("test fail to init operation log root dir") { + val sessionManager = new NoopSessionManager + val tempDir = Utils.createTempDir().toFile + tempDir.setExecutable(false) + + sessionManager.setOperationLogRootDir(tempDir.getAbsolutePath + "/operation_logs") + assert(sessionManager.operationLogRoot.isDefined) + sessionManager.initialize(KyuubiConf()) + assert(sessionManager.operationLogRoot.isEmpty) + + tempDir.setExecutable(true) + tempDir.delete() + } } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala index 11f10fc91..5d14c045d 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala @@ -20,11 +20,21 @@ package org.apache.kyuubi.session import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.operation.{NoopOperationManager, OperationManager} class NoopSessionManager extends SessionManager("noop") { override val operationManager: OperationManager = new NoopOperationManager() + def setOperationLogRootDir(logRoot: String): Unit = { + _operationLogRoot = Some(logRoot) + } + + override def initialize(conf: KyuubiConf): Unit = { + _operationLogRoot = _operationLogRoot.orElse(Some("target/operation_logs")) + super.initialize(conf) + } + override def openSession( protocol: TProtocolVersion, user: String, diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala index 1f731bc87..40a072fab 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala @@ -45,7 +45,7 @@ class ExecuteStatement( KyuubiStatementEvent(this, statementId, state, lastAccessTime) private final val _operationLog: OperationLog = if (shouldRunAsync) { - OperationLog.createOperationLog(session.handle, getHandle) + OperationLog.createOperationLog(session, getHandle) } else { null } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index 8bbb23b6b..1c31b6432 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -22,6 +22,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.credentials.HadoopCredentialsManager import org.apache.kyuubi.metrics.MetricsConstants._ import org.apache.kyuubi.metrics.MetricsSystem @@ -36,6 +37,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { override def initialize(conf: KyuubiConf): Unit = { addService(credentialsManager) + _operationLogRoot = Some(conf.get(SERVER_OPERATION_LOG_DIR_ROOT)) super.initialize(conf) } diff --git a/pom.xml b/pom.xml index 64c0c59e4..b921c8fc2 100644 --- a/pom.xml +++ b/pom.xml @@ -1523,6 +1523,8 @@ ${project.build.directory}/metrics localhost false + target/server_operation_logs + target/engine_operation_logs ${maven.plugin.scalatest.exclude.tags} ${maven.plugin.scalatest.include.tags} @@ -1742,6 +1744,8 @@ build/apache-maven-*/** build/scala-*/** **/**/operation_logs/**/** + **/**/server_operation_logs/**/** + **/**/engine_operation_logs/**/** **/*.output.schema **/apache-kyuubi-*-bin*/** **/benchmarks/**