[KYUUBI #1160] Support to config operation log root dir both for kyuubi server and engine sides
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html 2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'. 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'. --> ### _Why are the changes needed?_ <!-- Please clarify why the changes are needed. For instance, 1. If you add a feature, you can talk about the use case of it. 2. If you fix a bug, you can clarify why it is a bug. --> Support to config operation log root dir with kyuubi configuration both for server and engine sides. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #1160 from turboFei/support_config_operationlog_dir. Closes #1160 ed424f46 [fwang12] fix ut 5edfd4ae [fwang12] refactor ut b7026b2f [fwang12] add ut 6ff4d891 [fwang12] save 30cd5952 [fwang12] fix ut f058619a [fwang12] fix ut 96eb020d [fwang12] refactor ae0ded96 [fwang12] set config value for testing 83d932f4 [fwang12] exclude 33e3e214 [fwang12] update 6f030cba [fwang12] fix 77ec9ca2 [fwang12] save 711ececf [fwang12] Fix code style cfa3bfac [fwang12] refactor b299f8cf [fwang12] complete ut 26de3712 [fwang12] Support to config operation log root dir for kyuubi server and engine side Authored-by: fwang12 <fwang12@ebay.com> Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
parent
be0b9c1e69
commit
4f74aba50e
3
.gitignore
vendored
3
.gitignore
vendored
@ -64,8 +64,11 @@ metrics/.report.json.crc
|
||||
/kyuubi-ha/embedded_zookeeper/
|
||||
embedded_zookeeper/
|
||||
/externals/kyuubi-spark-sql-engine/operation_logs/
|
||||
/externals/kyuubi-spark-sql-engine/engine_operation_logs/
|
||||
/externals/kyuubi-spark-sql-engine/spark-warehouse/
|
||||
/work/
|
||||
/docs/_build/
|
||||
/kyuubi-common/metrics/
|
||||
**/operation_logs/
|
||||
**/server_operation_logs/
|
||||
**/engine_operation_logs/
|
||||
|
||||
@ -180,6 +180,7 @@ kyuubi\.engine<br>\.deregister\.job\.max<br>\.failures|<div style='width: 65pt;w
|
||||
kyuubi\.engine\.event<br>\.json\.log\.path|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>file:/tmp/kyuubi/events</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The location of all the engine events go for the builtin JSON logger.<ul><li>Local Path: start with 'file:'</li><li>HDFS Path: start with 'hdfs:'</li></ul></div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
|
||||
kyuubi\.engine\.event<br>\.loggers|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of engine history loggers, where engine/session/operation etc events go.<ul> <li>SPARK: the events will be written to the spark history events</li> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done.</li></ul></div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.3.0</div>
|
||||
kyuubi\.engine<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
|
||||
kyuubi\.engine<br>\.operation\.log\.dir<br>\.root|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>engine_operation_logs</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Root directory for query operation log at engine-side.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.engine\.pool<br>\.size|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>-1</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The size of engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold).</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.engine\.pool<br>\.size\.threshold|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>9</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>This parameter is introduced as a server-side parameter, and controls the upper limit of the engine pool.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.engine\.session<br>\.initialize\.sql|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SHOW DATABASES</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.3.0</div>
|
||||
@ -261,6 +262,7 @@ Key | Default | Meaning | Type | Since
|
||||
--- | --- | --- | --- | ---
|
||||
kyuubi\.operation\.idle<br>\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT3H</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Operation will be closed when it's not accessed for this duration of time</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
|
||||
kyuubi\.operation<br>\.interrupt\.on\.cancel|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.2.0</div>
|
||||
kyuubi\.operation\.log<br>\.dir\.root|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>server_operation_logs</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Root directory for query operation log at server-side.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.operation\.plan<br>\.only\.mode|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>NONE</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Whether to perform the statement in a PARSE, ANALYZE, OPTIMIZE only way without executing the query. When it is NONE, the statement will be fully executed</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
|
||||
kyuubi\.operation<br>\.query\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Timeout for query executions at server-side, take affect with client-side timeout(`java.sql.Statement.setQueryTimeout`) together, a running query will be cancelled automatically if timeout. It's off by default, which means only client-side take fully control whether the query should timeout or not. If set, client-side timeout capped at this point. To cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.2.0</div>
|
||||
kyuubi\.operation<br>\.scheduler\.pool|<div style='width: 65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The scheduler pool of job. Note that, this config should be used after change Spark config spark.scheduler.mode=FAIR.</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.1.1</div>
|
||||
|
||||
@ -52,8 +52,7 @@ class ExecuteStatement(
|
||||
|
||||
private var statementTimeoutCleaner: Option[ScheduledExecutorService] = None
|
||||
|
||||
private val operationLog: OperationLog =
|
||||
OperationLog.createOperationLog(session.handle, getHandle)
|
||||
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
|
||||
override def getOperationLog: Option[OperationLog] = Option(operationLog)
|
||||
|
||||
private val operationListener: SQLOperationListener = new SQLOperationListener(this, spark)
|
||||
|
||||
@ -37,8 +37,7 @@ class PlanOnlyStatement(
|
||||
mode: OperationMode)
|
||||
extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) {
|
||||
|
||||
private val operationLog: OperationLog =
|
||||
OperationLog.createOperationLog(session.handle, getHandle)
|
||||
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
|
||||
override def getOperationLog: Option[OperationLog] = Option(operationLog)
|
||||
|
||||
override protected def resultSchema: StructType = if (result == null) {
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
|
||||
import org.apache.spark.sql.{AnalysisException, SparkSession}
|
||||
|
||||
import org.apache.kyuubi.KyuubiSQLException
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf._
|
||||
import org.apache.kyuubi.engine.ShareLevel
|
||||
import org.apache.kyuubi.engine.spark.SparkSQLEngine
|
||||
@ -41,6 +42,11 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
|
||||
|
||||
def this(spark: SparkSession) = this(classOf[SparkSQLSessionManager].getSimpleName, spark)
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = {
|
||||
_operationLogRoot = Some(conf.get(ENGINE_OPERATION_LOG_DIR_ROOT))
|
||||
super.initialize(conf)
|
||||
}
|
||||
|
||||
val operationManager = new SparkSQLOperationManager()
|
||||
|
||||
private lazy val singleSparkSession = conf.get(ENGINE_SINGLE_SPARK_SESSION)
|
||||
|
||||
@ -678,6 +678,13 @@ object KyuubiConf {
|
||||
.checkValue(_ >= 1000, "must >= 1s if set")
|
||||
.createOptional
|
||||
|
||||
val SERVER_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] =
|
||||
buildConf("operation.log.dir.root")
|
||||
.doc("Root directory for query operation log at server-side.")
|
||||
.version("1.4.0")
|
||||
.stringConf
|
||||
.createWithDefault("server_operation_logs")
|
||||
|
||||
@deprecated(s"using kyuubi.engine.share.level instead", "1.2.0")
|
||||
val LEGACY_ENGINE_SHARE_LEVEL: ConfigEntry[String] = buildConf("session.engine.share.level")
|
||||
.doc(s"(deprecated) - Using kyuubi.engine.share.level instead")
|
||||
@ -885,6 +892,13 @@ object KyuubiConf {
|
||||
.checkValue(_ > 0, "retained sessions must be positive.")
|
||||
.createWithDefault(200)
|
||||
|
||||
val ENGINE_OPERATION_LOG_DIR_ROOT: ConfigEntry[String] =
|
||||
buildConf("engine.operation.log.dir.root")
|
||||
.doc("Root directory for query operation log at engine-side.")
|
||||
.version("1.4.0")
|
||||
.stringConf
|
||||
.createWithDefault("engine_operation_logs")
|
||||
|
||||
val SESSION_NAME: OptionalConfigEntry[String] =
|
||||
buildConf("session.name")
|
||||
.doc("A human readable name of session and we use empty string by default. " +
|
||||
|
||||
@ -24,17 +24,11 @@ import java.nio.file.{Files, Path, Paths}
|
||||
|
||||
import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn}
|
||||
|
||||
import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
|
||||
import org.apache.kyuubi.{KyuubiSQLException, Logging}
|
||||
import org.apache.kyuubi.operation.OperationHandle
|
||||
import org.apache.kyuubi.session.SessionHandle
|
||||
import org.apache.kyuubi.session.Session
|
||||
|
||||
object OperationLog extends Logging {
|
||||
|
||||
def LOG_ROOT: String = if (Utils.isTesting) {
|
||||
"target/operation_logs"
|
||||
} else {
|
||||
"operation_logs"
|
||||
}
|
||||
private final val OPERATION_LOG: InheritableThreadLocal[OperationLog] = {
|
||||
new InheritableThreadLocal[OperationLog] {
|
||||
override def initialValue(): OperationLog = null
|
||||
@ -50,36 +44,37 @@ object OperationLog extends Logging {
|
||||
def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove()
|
||||
|
||||
/**
|
||||
* The operation log root directory, here we choose $PWD/[[LOG_ROOT]]/$sessionId/ as the root per
|
||||
* session, this directory will delete when JVM exit.
|
||||
* The operation log root directory, this directory will delete when JVM exit.
|
||||
*/
|
||||
def createOperationLogRootDirectory(sessionHandle: SessionHandle): Unit = {
|
||||
val path = Paths.get(LOG_ROOT, sessionHandle.identifier.toString)
|
||||
try {
|
||||
Files.createDirectories(path)
|
||||
path.toFile.deleteOnExit()
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
error(s"Failed to create operation log root directory: $path", e)
|
||||
def createOperationLogRootDirectory(session: Session): Unit = {
|
||||
session.sessionManager.operationLogRoot.foreach { operationLogRoot =>
|
||||
val path = Paths.get(operationLogRoot, session.handle.identifier.toString)
|
||||
try {
|
||||
Files.createDirectories(path)
|
||||
path.toFile.deleteOnExit()
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
error(s"Failed to create operation log root directory: $path", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the OperationLog for each operation, the log file will be located at
|
||||
* $PWD/[[LOG_ROOT]]/$sessionId/$operationId
|
||||
* @return
|
||||
* Create the OperationLog for each operation.
|
||||
*/
|
||||
def createOperationLog(sessionHandle: SessionHandle, opHandle: OperationHandle): OperationLog = {
|
||||
try {
|
||||
val logPath = Paths.get(LOG_ROOT, sessionHandle.identifier.toString)
|
||||
val logFile = Paths.get(logPath.toAbsolutePath.toString, opHandle.identifier.toString)
|
||||
info(s"Creating operation log file $logFile")
|
||||
new OperationLog(logFile)
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
error(s"Failed to create operation log for $opHandle in $sessionHandle", e)
|
||||
null
|
||||
}
|
||||
def createOperationLog(session: Session, opHandle: OperationHandle): OperationLog = {
|
||||
session.sessionManager.operationLogRoot.map { operationLogRoot =>
|
||||
try {
|
||||
val logPath = Paths.get(operationLogRoot, session.handle.identifier.toString)
|
||||
val logFile = Paths.get(logPath.toAbsolutePath.toString, opHandle.identifier.toString)
|
||||
info(s"Creating operation log file $logFile")
|
||||
new OperationLog(logFile)
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
error(s"Failed to create operation log for $opHandle in ${session.handle}", e)
|
||||
null
|
||||
}
|
||||
}.getOrElse(null)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -213,7 +213,6 @@ abstract class AbstractSession(
|
||||
}
|
||||
|
||||
override def open(): Unit = {
|
||||
OperationLog.createOperationLogRootDirectory(handle)
|
||||
OperationLog.createOperationLogRootDirectory(this)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
|
||||
package org.apache.kyuubi.session
|
||||
|
||||
import java.io.IOException
|
||||
import java.nio.file.{Files, Paths}
|
||||
import java.util.concurrent.{ConcurrentHashMap, Future, ThreadPoolExecutor, TimeUnit}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
@ -41,6 +43,23 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
|
||||
|
||||
@volatile private var shutdown = false
|
||||
|
||||
protected var _operationLogRoot: Option[String] = None
|
||||
|
||||
def operationLogRoot: Option[String] = _operationLogRoot
|
||||
|
||||
private def initOperationLogRootDir(): Unit = {
|
||||
try {
|
||||
_operationLogRoot.foreach { logRoot =>
|
||||
val rootPath = Paths.get(logRoot)
|
||||
Files.createDirectories(rootPath)
|
||||
}
|
||||
} catch {
|
||||
case e: IOException =>
|
||||
error(s"Failed to initialize operation log root directory: ${_operationLogRoot}", e)
|
||||
_operationLogRoot = None
|
||||
}
|
||||
}
|
||||
|
||||
@volatile private var _latestLogoutTime: Long = System.currentTimeMillis()
|
||||
def latestLogoutTime: Long = _latestLogoutTime
|
||||
|
||||
@ -139,6 +158,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = synchronized {
|
||||
addService(operationManager)
|
||||
initOperationLogRootDir()
|
||||
|
||||
val poolSize: Int = if (isServer) {
|
||||
conf.get(SERVER_EXEC_POOL_SIZE)
|
||||
|
||||
@ -46,7 +46,7 @@ class KyuubiConfSuite extends KyuubiFunSuite {
|
||||
}
|
||||
|
||||
test("set and unset conf") {
|
||||
val conf = new KyuubiConf()
|
||||
val conf = new KyuubiConf(false)
|
||||
|
||||
val key = "kyuubi.conf.abc"
|
||||
conf.set(key, "opq")
|
||||
|
||||
@ -23,27 +23,38 @@ import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.hive.service.rpc.thrift.TProtocolVersion
|
||||
|
||||
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
|
||||
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException, Utils}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.operation.{OperationHandle, OperationType}
|
||||
import org.apache.kyuubi.session.SessionHandle
|
||||
import org.apache.kyuubi.session.NoopSessionManager
|
||||
|
||||
class OperationLogSuite extends KyuubiFunSuite {
|
||||
|
||||
val msg1 = "This is just a dummy log message 1"
|
||||
val msg2 = "This is just a dummy log message 2"
|
||||
|
||||
test("create, delete, read and write to operation log") {
|
||||
val sHandle = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
|
||||
test("create, delete, read and write to server operation log") {
|
||||
val sessionManager = new NoopSessionManager
|
||||
sessionManager.initialize(KyuubiConf())
|
||||
val sHandle = sessionManager.openSession(
|
||||
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
|
||||
"kyuubi",
|
||||
"passwd",
|
||||
"localhost",
|
||||
Map.empty)
|
||||
val session = sessionManager.getSession(sHandle)
|
||||
val oHandle = OperationHandle(
|
||||
OperationType.EXECUTE_STATEMENT, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
|
||||
assert(sessionManager.operationLogRoot.isDefined)
|
||||
val operationLogRoot = sessionManager.operationLogRoot.get
|
||||
|
||||
OperationLog.createOperationLogRootDirectory(sHandle)
|
||||
assert(Files.exists(Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString)))
|
||||
assert(Files.isDirectory(Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString)))
|
||||
OperationLog.createOperationLogRootDirectory(session)
|
||||
assert(Files.exists(Paths.get(operationLogRoot, sHandle.identifier.toString)))
|
||||
assert(Files.isDirectory(Paths.get(operationLogRoot, sHandle.identifier.toString)))
|
||||
|
||||
val operationLog = OperationLog.createOperationLog(sHandle, oHandle)
|
||||
val logFile =
|
||||
Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString, oHandle.identifier.toString)
|
||||
val operationLog = OperationLog.createOperationLog(session, oHandle)
|
||||
val logFile = Paths.get(operationLogRoot, sHandle.identifier.toString,
|
||||
oHandle.identifier.toString)
|
||||
assert(Files.exists(logFile))
|
||||
|
||||
OperationLog.setCurrentOperationLog(operationLog)
|
||||
@ -69,12 +80,20 @@ class OperationLogSuite extends KyuubiFunSuite {
|
||||
}
|
||||
|
||||
test("log divert appender") {
|
||||
val sHandle = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
|
||||
val sessionManager = new NoopSessionManager
|
||||
sessionManager.initialize(KyuubiConf())
|
||||
val sHandle = sessionManager.openSession(
|
||||
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
|
||||
"kyuubi",
|
||||
"passwd",
|
||||
"localhost",
|
||||
Map.empty)
|
||||
val session = sessionManager.getSession(sHandle)
|
||||
val oHandle = OperationHandle(
|
||||
OperationType.EXECUTE_STATEMENT, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
|
||||
|
||||
OperationLog.createOperationLogRootDirectory(sHandle)
|
||||
val operationLog = OperationLog.createOperationLog(sHandle, oHandle)
|
||||
OperationLog.createOperationLogRootDirectory(session)
|
||||
val operationLog = OperationLog.createOperationLog(session, oHandle)
|
||||
OperationLog.setCurrentOperationLog(operationLog)
|
||||
|
||||
LogDivertAppender.initialize()
|
||||
@ -102,25 +121,50 @@ class OperationLogSuite extends KyuubiFunSuite {
|
||||
}
|
||||
|
||||
test("exception when creating log files") {
|
||||
val sHandle = SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
|
||||
val logRoot = Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString).toFile
|
||||
val sessionManager = new NoopSessionManager
|
||||
sessionManager.initialize(KyuubiConf())
|
||||
val sHandle = sessionManager.openSession(
|
||||
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
|
||||
"kyuubi",
|
||||
"passwd",
|
||||
"localhost",
|
||||
Map.empty)
|
||||
val session = sessionManager.getSession(sHandle)
|
||||
assert(sessionManager.operationLogRoot.isDefined)
|
||||
val operationLogRoot = sessionManager.operationLogRoot.get
|
||||
|
||||
val logRoot = Paths.get(operationLogRoot, sHandle.identifier.toString).toFile
|
||||
logRoot.deleteOnExit()
|
||||
Files.createFile(Paths.get(OperationLog.LOG_ROOT, sHandle.identifier.toString))
|
||||
Files.createFile(Paths.get(operationLogRoot, sHandle.identifier.toString))
|
||||
assert(logRoot.exists())
|
||||
OperationLog.createOperationLogRootDirectory(sHandle)
|
||||
OperationLog.createOperationLogRootDirectory(session)
|
||||
assert(logRoot.isFile)
|
||||
val oHandle = OperationHandle(
|
||||
OperationType.EXECUTE_STATEMENT, TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
|
||||
val log = OperationLog.createOperationLog(sHandle, oHandle)
|
||||
val log = OperationLog.createOperationLog(session, oHandle)
|
||||
assert(log === null)
|
||||
logRoot.delete()
|
||||
|
||||
OperationLog.createOperationLogRootDirectory(sHandle)
|
||||
val log1 = OperationLog.createOperationLog(sHandle, oHandle)
|
||||
OperationLog.createOperationLogRootDirectory(session)
|
||||
val log1 = OperationLog.createOperationLog(session, oHandle)
|
||||
log1.write("some msg here \n")
|
||||
log1.close()
|
||||
log1.write("some msg here again")
|
||||
val e = intercept[KyuubiSQLException](log1.read(-1))
|
||||
assert(e.getMessage.contains(s"${sHandle.identifier}/${oHandle.identifier}"))
|
||||
}
|
||||
|
||||
test("test fail to init operation log root dir") {
|
||||
val sessionManager = new NoopSessionManager
|
||||
val tempDir = Utils.createTempDir().toFile
|
||||
tempDir.setExecutable(false)
|
||||
|
||||
sessionManager.setOperationLogRootDir(tempDir.getAbsolutePath + "/operation_logs")
|
||||
assert(sessionManager.operationLogRoot.isDefined)
|
||||
sessionManager.initialize(KyuubiConf())
|
||||
assert(sessionManager.operationLogRoot.isEmpty)
|
||||
|
||||
tempDir.setExecutable(true)
|
||||
tempDir.delete()
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,11 +20,21 @@ package org.apache.kyuubi.session
|
||||
import org.apache.hive.service.rpc.thrift.TProtocolVersion
|
||||
|
||||
import org.apache.kyuubi.KyuubiSQLException
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.operation.{NoopOperationManager, OperationManager}
|
||||
|
||||
class NoopSessionManager extends SessionManager("noop") {
|
||||
override val operationManager: OperationManager = new NoopOperationManager()
|
||||
|
||||
def setOperationLogRootDir(logRoot: String): Unit = {
|
||||
_operationLogRoot = Some(logRoot)
|
||||
}
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = {
|
||||
_operationLogRoot = _operationLogRoot.orElse(Some("target/operation_logs"))
|
||||
super.initialize(conf)
|
||||
}
|
||||
|
||||
override def openSession(
|
||||
protocol: TProtocolVersion,
|
||||
user: String,
|
||||
|
||||
@ -45,7 +45,7 @@ class ExecuteStatement(
|
||||
KyuubiStatementEvent(this, statementId, state, lastAccessTime)
|
||||
|
||||
private final val _operationLog: OperationLog = if (shouldRunAsync) {
|
||||
OperationLog.createOperationLog(session.handle, getHandle)
|
||||
OperationLog.createOperationLog(session, getHandle)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
|
||||
|
||||
import org.apache.kyuubi.KyuubiSQLException
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.config.KyuubiConf._
|
||||
import org.apache.kyuubi.credentials.HadoopCredentialsManager
|
||||
import org.apache.kyuubi.metrics.MetricsConstants._
|
||||
import org.apache.kyuubi.metrics.MetricsSystem
|
||||
@ -36,6 +37,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
|
||||
|
||||
override def initialize(conf: KyuubiConf): Unit = {
|
||||
addService(credentialsManager)
|
||||
_operationLogRoot = Some(conf.get(SERVER_OPERATION_LOG_DIR_ROOT))
|
||||
super.initialize(conf)
|
||||
}
|
||||
|
||||
|
||||
4
pom.xml
4
pom.xml
@ -1523,6 +1523,8 @@
|
||||
<kyuubi.metrics.json.location>${project.build.directory}/metrics</kyuubi.metrics.json.location>
|
||||
<kyuubi.frontend.bind.host>localhost</kyuubi.frontend.bind.host>
|
||||
<sun.security.krb5.debug>false</sun.security.krb5.debug>
|
||||
<kyuubi.operation.log.dir.root>target/server_operation_logs</kyuubi.operation.log.dir.root>
|
||||
<kyuubi.engine.operation.log.dir.root>target/engine_operation_logs</kyuubi.engine.operation.log.dir.root>
|
||||
</systemProperties>
|
||||
<tagsToExclude>${maven.plugin.scalatest.exclude.tags}</tagsToExclude>
|
||||
<tagsToInclude>${maven.plugin.scalatest.include.tags}</tagsToInclude>
|
||||
@ -1742,6 +1744,8 @@
|
||||
<exclude>build/apache-maven-*/**</exclude>
|
||||
<exclude>build/scala-*/**</exclude>
|
||||
<exclude>**/**/operation_logs/**/**</exclude>
|
||||
<exclude>**/**/server_operation_logs/**/**</exclude>
|
||||
<exclude>**/**/engine_operation_logs/**/**</exclude>
|
||||
<exclude>**/*.output.schema</exclude>
|
||||
<exclude>**/apache-kyuubi-*-bin*/**</exclude>
|
||||
<exclude>**/benchmarks/**</exclude>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user