diff --git a/.gitignore b/.gitignore
index 9bef5972c..7bc6a7f2b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -64,8 +64,11 @@ metrics/.report.json.crc
/kyuubi-ha/embedded_zookeeper/
embedded_zookeeper/
/externals/kyuubi-spark-sql-engine/operation_logs/
+/externals/kyuubi-spark-sql-engine/engine_operation_logs/
/externals/kyuubi-spark-sql-engine/spark-warehouse/
/work/
/docs/_build/
/kyuubi-common/metrics/
**/operation_logs/
+**/server_operation_logs/
+**/engine_operation_logs/
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 43050b589..f0f08f179 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -180,6 +180,7 @@ kyuubi\.engine
\.deregister\.job\.max
\.failures|
file:/tmp/kyuubi/events
|The location of all the engine events go for the builtin JSON logger.
- Local Path: start with 'file:'
- HDFS Path: start with 'hdfs:'
|string
|1.3.0
kyuubi\.engine\.event
\.loggers||A comma separated list of engine history loggers, where engine/session/operation etc events go.
- SPARK: the events will be written to the spark history events
- JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
- JDBC: to be done
- CUSTOM: to be done.
|seq
|1.3.0
kyuubi\.engine
\.initialize\.sql|SHOW DATABASES
|SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.
|seq
|1.2.0
+kyuubi\.engine
\.operation\.log\.dir
\.root|engine_operation_logs
|Root directory for query operation log at engine-side.
|string
|1.4.0
kyuubi\.engine\.pool
\.size|-1
|The size of engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold).
|int
|1.4.0
kyuubi\.engine\.pool
\.size\.threshold|9
|This parameter is introduced as a server-side parameter, and controls the upper limit of the engine pool.
|int
|1.4.0
kyuubi\.engine\.session
\.initialize\.sql|SHOW DATABASES
|SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.
|seq
|1.3.0
@@ -261,6 +262,7 @@ Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
kyuubi\.operation\.idle
\.timeout|PT3H
|Operation will be closed when it's not accessed for this duration of time
|duration
|1.0.0
kyuubi\.operation
\.interrupt\.on\.cancel|true
|When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.
|boolean
|1.2.0
+kyuubi\.operation\.log
\.dir\.root|server_operation_logs
|Root directory for query operation log at server-side.
|string
|1.4.0
kyuubi\.operation\.plan
\.only\.mode|NONE
|Whether to perform the statement in a PARSE, ANALYZE, OPTIMIZE only way without executing the query. When it is NONE, the statement will be fully executed
|string
|1.4.0
kyuubi\.operation
\.query\.timeout|<undefined>
|Timeout for query executions at server-side, take affect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take fully control whether the query should timeout or not. If set, client-side timeout capped at this point. To cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.
|duration
|1.2.0
kyuubi\.operation
\.scheduler\.pool|<undefined>
|The scheduler pool of job. Note that, this config should be used after change Spark config spark.scheduler.mode=FAIR.
|string
|1.1.1
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 6e2471f9e..796d87b65 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -52,8 +52,7 @@ class ExecuteStatement(
private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
- private val operationLog: OperationLog =
- OperationLog.createOperationLog(session.handle, getHandle)
+ private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index 07b8cf938..529f620e0 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -37,8 +37,7 @@ class PlanOnlyStatement(
mode: OperationMode)
extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) {
- private val operationLog: OperationLog =
- OperationLog.createOperationLog(session.handle, getHandle)
+ private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
override protected def resultSchema: StructType = if (result == null) {
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
index d91947991..96cdeca4b 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSQLSessionManager.scala
@@ -21,6 +21,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.spark.SparkSQLEngine
@@ -41,6 +42,11 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
def this(spark: SparkSession) = this(classOf[SparkSQLSessionManager].getSimpleName, spark)
+ override def initialize(conf: KyuubiConf): Unit = {
+ _operationLogRoot = Some(conf.get(ENGINE_OPERATION_LOG_DIR_ROOT))
+ super.initialize(conf)
+ }
+
val operationManager = new SparkSQLOperationManager()
private lazy val singleSparkSession = conf.get(ENGINE_SINGLE_SPARK_SESSION)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 1d583b05f..dd734662f 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -678,6 +678,13 @@ object KyuubiConf {
.checkValue(_ >= 1000, "must >= 1s if set")
.createOptional
+ val SERVER_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] =
+ buildConf("operation.log.dir.root")
+ .doc("Root directory for query operation log at server-side.")
+ .version("1.4.0")
+ .stringConf
+ .createWithDefault("server_operation_logs")
+
@deprecated(s"using kyuubi.engine.share.level instead", "1.2.0")
val LEGACY_ENGINE_SHARE_LEVEL: ConfigEntry[String] = buildConf("session.engine.share.level")
.doc(s"(deprecated) - Using kyuubi.engine.share.level instead")
@@ -885,6 +892,13 @@ object KyuubiConf {
.checkValue(_ > 0, "retained sessions must be positive.")
.createWithDefault(200)
+ val ENGINE_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] =
+ buildConf("engine.operation.log.dir.root")
+ .doc("Root directory for query operation log at engine-side.")
+ .version("1.4.0")
+ .stringConf
+ .createWithDefault("engine_operation_logs")
+
val SESSION_NAME: OptionalConfigEntry[String] =
buildConf("session.name")
.doc("A human readable name of session and we use empty string by default. " +
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index 1e485532a..45a08784d 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -24,17 +24,11 @@ import java.nio.file.{Files, Path, Paths}
import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn}
-import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
+import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.operation.OperationHandle
-import org.apache.kyuubi.session.SessionHandle
+import org.apache.kyuubi.session.Session
object OperationLog extends Logging {
-
- def LOG_ROOT: String = if (Utils.isTesting) {
- "target/operation_logs"
- } else {
- "operation_logs"
- }
private final val OPERATION_LOG: InheritableThreadLocal[OperationLog] = {
new InheritableThreadLocal[OperationLog] {
override def initialValue(): OperationLog = null
@@ -50,36 +44,37 @@ object OperationLog extends Logging {
def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove()
/**
- * The operation log root directory, here we choose $PWD/[[LOG_ROOT]]/$sessionId/ as the root per
- * session, this directory will delete when JVM exit.
+ * The operation log root directory, this directory will delete when JVM exit.
*/
- def createOperationLogRootDirectory(sessionHandle: SessionHandle): Unit = {
- val path = Paths.get(LOG_ROOT, sessionHandle.identifier.toString)
- try {
- Files.createDirectories(path)
- path.toFile.deleteOnExit()
- } catch {
- case e: IOException =>
- error(s"Failed to create operation log root directory: $path", e)
+ def createOperationLogRootDirectory(session: Session): Unit = {
+ session.sessionManager.operationLogRoot.foreach { operationLogRoot =>
+ val path = Paths.get(operationLogRoot, session.handle.identifier.toString)
+ try {
+ Files.createDirectories(path)
+ path.toFile.deleteOnExit()
+ } catch {
+ case e: IOException =>
+ error(s"Failed to create operation log root directory: $path", e)
+ }
}
}
/**
- * Create the OperationLog for each operation, the log file will be located at
- * $PWD/[[LOG_ROOT]]/$sessionId/$operationId
- * @return
+ * Create the OperationLog for each operation.
*/
- def createOperationLog(sessionHandle: SessionHandle, opHandle: OperationHandle): OperationLog = {
- try {
- val logPath = Paths.get(LOG_ROOT, sessionHandle.identifier.toString)
- val logFile = Paths.get(logPath.toAbsolutePath.toString, opHandle.identifier.toString)
- info(s"Creating operation log file $logFile")
- new OperationLog(logFile)
- } catch {
- case e: IOException =>
- error(s"Failed to create operation log for $opHandle in $sessionHandle", e)
- null
- }
+ def createOperationLog(session: Session, opHandle: OperationHandle): OperationLog = {
+ session.sessionManager.operationLogRoot.map { operationLogRoot =>
+ try {
+ val logPath = Paths.get(operationLogRoot, session.handle.identifier.toString)
+ val logFile = Paths.get(logPath.toAbsolutePath.toString, opHandle.identifier.toString)
+ info(s"Creating operation log file $logFile")
+ new OperationLog(logFile)
+ } catch {
+ case e: IOException =>
+ error(s"Failed to create operation log for $opHandle in ${session.handle}", e)
+ null
+ }
+ }.getOrElse(null)
}
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index f558195f0..687c79fbe 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -213,7 +213,6 @@ abstract class AbstractSession(
}
override def open(): Unit = {
- OperationLog.createOperationLogRootDirectory(handle)
+ OperationLog.createOperationLogRootDirectory(this)
}
-
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
index d6a79ef9c..81eee2239 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.session
+import java.io.IOException
+import java.nio.file.{Files, Paths}
import java.util.concurrent.{ConcurrentHashMap, Future, ThreadPoolExecutor, TimeUnit}
import scala.collection.JavaConverters._
@@ -41,6 +43,23 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
@volatile private var shutdown = false
+ protected var _operationLogRoot: Option[String] = None
+
+ def operationLogRoot: Option[String] = _operationLogRoot
+
+ private def initOperationLogRootDir(): Unit = {
+ try {
+ _operationLogRoot.foreach { logRoot =>
+ val rootPath = Paths.get(logRoot)
+ Files.createDirectories(rootPath)
+ }
+ } catch {
+ case e: IOException =>
+ error(s"Failed to initialize operation log root directory: ${_operationLogRoot}", e)
+ _operationLogRoot = None
+ }
+ }
+
@volatile private var _latestLogoutTime: Long = System.currentTimeMillis()
def latestLogoutTime: Long = _latestLogoutTime
@@ -139,6 +158,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
override def initialize(conf: KyuubiConf): Unit = synchronized {
addService(operationManager)
+ initOperationLogRootDir()
val poolSize: Int = if (isServer) {
conf.get(SERVER_EXEC_POOL_SIZE)
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
index 9e859c827..944316db2 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala
@@ -46,7 +46,7 @@ class KyuubiConfSuite extends KyuubiFunSuite {
}
test("set and unset conf") {
- val conf = new KyuubiConf()
+ val conf = new KyuubiConf(false)
val key = "kyuubi.conf.abc"
conf.set(key, "opq")
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index 2251a2a31..5870153f4 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -23,27 +23,38 @@ import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.TProtocolVersion
-import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
+import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils}
+import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.{OperationHandle, OperationType}
-import org.apache.kyuubi.session.SessionHandle
+import org.apache.kyuubi.session.NoopSessionManager
class OperationLogSuite extends KyuubiFunSuite {
val msg1 = "This is just a dummy log message 1"
val msg2 = "This is just a dummy log message 2"
- test("create, delete, read and write to operation log") {
- val sHandle = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+ test("create, delete, read and write to server operation log") {
+ val sessionManager = new NoopSessionManager
+ sessionManager.initialize(KyuubiConf())
+ val sHandle = sessionManager.openSession(
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
+ "kyuubi",
+ "passwd",
+ "localhost",
+ Map.empty)
+ val session = sessionManager.getSession(sHandle)
val oHandle = OperationHandle(
OperationType.EXECUTE_STATEMENT, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+ assert(sessionManager.operationLogRoot.isDefined)
+ val operationLogRoot = sessionManager.operationLogRoot.get
- OperationLog.createOperationLogRootDirectory(sHandle)
- assert(Files.exists(Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString)))
- assert(Files.isDirectory(Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString)))
+ OperationLog.createOperationLogRootDirectory(session)
+ assert(Files.exists(Paths.get(operationLogRoot, sHandle.identifier.toString)))
+ assert(Files.isDirectory(Paths.get(operationLogRoot, sHandle.identifier.toString)))
- val operationLog = OperationLog.createOperationLog(sHandle, oHandle)
- val logFile =
- Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString, oHandle.identifier.toString)
+ val operationLog = OperationLog.createOperationLog(session, oHandle)
+ val logFile = Paths.get(operationLogRoot, sHandle.identifier.toString,
+ oHandle.identifier.toString)
assert(Files.exists(logFile))
OperationLog.setCurrentOperationLog(operationLog)
@@ -69,12 +80,20 @@ class OperationLogSuite extends KyuubiFunSuite {
}
test("log divert appender") {
- val sHandle = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+ val sessionManager = new NoopSessionManager
+ sessionManager.initialize(KyuubiConf())
+ val sHandle = sessionManager.openSession(
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
+ "kyuubi",
+ "passwd",
+ "localhost",
+ Map.empty)
+ val session = sessionManager.getSession(sHandle)
val oHandle = OperationHandle(
OperationType.EXECUTE_STATEMENT, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
- OperationLog.createOperationLogRootDirectory(sHandle)
- val operationLog = OperationLog.createOperationLog(sHandle, oHandle)
+ OperationLog.createOperationLogRootDirectory(session)
+ val operationLog = OperationLog.createOperationLog(session, oHandle)
OperationLog.setCurrentOperationLog(operationLog)
LogDivertAppender.initialize()
@@ -102,25 +121,50 @@ class OperationLogSuite extends KyuubiFunSuite {
}
test("exception when creating log files") {
- val sHandle = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
- val logRoot = Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString).toFile
+ val sessionManager = new NoopSessionManager
+ sessionManager.initialize(KyuubiConf())
+ val sHandle = sessionManager.openSession(
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
+ "kyuubi",
+ "passwd",
+ "localhost",
+ Map.empty)
+ val session = sessionManager.getSession(sHandle)
+ assert(sessionManager.operationLogRoot.isDefined)
+ val operationLogRoot = sessionManager.operationLogRoot.get
+
+ val logRoot = Paths.get(operationLogRoot, sHandle.identifier.toString).toFile
logRoot.deleteOnExit()
- Files.createFile(Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString))
+ Files.createFile(Paths.get(operationLogRoot, sHandle.identifier.toString))
assert(logRoot.exists())
- OperationLog.createOperationLogRootDirectory(sHandle)
+ OperationLog.createOperationLogRootDirectory(session)
assert(logRoot.isFile)
val oHandle = OperationHandle(
OperationType.EXECUTE_STATEMENT, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
- val log = OperationLog.createOperationLog(sHandle, oHandle)
+ val log = OperationLog.createOperationLog(session, oHandle)
assert(log === null)
logRoot.delete()
- OperationLog.createOperationLogRootDirectory(sHandle)
- val log1 = OperationLog.createOperationLog(sHandle, oHandle)
+ OperationLog.createOperationLogRootDirectory(session)
+ val log1 = OperationLog.createOperationLog(session, oHandle)
log1.write("some msg here \n")
log1.close()
log1.write("some msg here again")
val e = intercept[KyuubiSQLException](log1.read(-1))
assert(e.getMessage.contains(s"${sHandle.identifier}/${oHandle.identifier}"))
}
+
+ test("test fail to init operation log root dir") {
+ val sessionManager = new NoopSessionManager
+ val tempDir = Utils.createTempDir().toFile
+ tempDir.setExecutable(false)
+
+ sessionManager.setOperationLogRootDir(tempDir.getAbsolutePath + "/operation_logs")
+ assert(sessionManager.operationLogRoot.isDefined)
+ sessionManager.initialize(KyuubiConf())
+ assert(sessionManager.operationLogRoot.isEmpty)
+
+ tempDir.setExecutable(true)
+ tempDir.delete()
+ }
}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
index 11f10fc91..5d14c045d 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/session/NoopSessionManager.scala
@@ -20,11 +20,21 @@ package org.apache.kyuubi.session
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.{NoopOperationManager, OperationManager}
class NoopSessionManager extends SessionManager("noop") {
override val operationManager: OperationManager = new NoopOperationManager()
+ def setOperationLogRootDir(logRoot: String): Unit = {
+ _operationLogRoot = Some(logRoot)
+ }
+
+ override def initialize(conf: KyuubiConf): Unit = {
+ _operationLogRoot = _operationLogRoot.orElse(Some("target/operation_logs"))
+ super.initialize(conf)
+ }
+
override def openSession(
protocol: TProtocolVersion,
user: String,
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 1f731bc87..40a072fab 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -45,7 +45,7 @@ class ExecuteStatement(
KyuubiStatementEvent(this, statementId, state, lastAccessTime)
private final val _operationLog: OperationLog = if (shouldRunAsync) {
- OperationLog.createOperationLog(session.handle, getHandle)
+ OperationLog.createOperationLog(session, getHandle)
} else {
null
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 8bbb23b6b..1c31b6432 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -22,6 +22,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.credentials.HadoopCredentialsManager
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
@@ -36,6 +37,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
override def initialize(conf: KyuubiConf): Unit = {
addService(credentialsManager)
+ _operationLogRoot = Some(conf.get(SERVER_OPERATION_LOG_DIR_ROOT))
super.initialize(conf)
}
diff --git a/pom.xml b/pom.xml
index 64c0c59e4..b921c8fc2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1523,6 +1523,8 @@
${project.build.directory}/metrics
localhost
false
+ target/server_operation_logs
+ target/engine_operation_logs
${maven.plugin.scalatest.exclude.tags}
${maven.plugin.scalatest.include.tags}
@@ -1742,6 +1744,8 @@
build/apache-maven-*/**
build/scala-*/**
**/**/operation_logs/**/**
+ **/**/server_operation_logs/**/**
+ **/**/engine_operation_logs/**/**
**/*.output.schema
**/apache-kyuubi-*-bin*/**
**/benchmarks/**