localize remote jars to add them into classpath

This commit is contained in:
Kent Yao 2019-02-20 10:21:18 +08:00
parent 05653d91f6
commit 694e3ef791
6 changed files with 148 additions and 31 deletions

View File

@ -279,7 +279,7 @@
</goals>
<configuration>
<sources>
<source>src/gen/</source>
<source>src/gen/java</source>
</sources>
</configuration>
</execution>

View File

@ -165,7 +165,6 @@ object KyuubiConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.HOURS.toMillis(6L))
val FRONTEND_IDLE_SESSION_TIMEOUT: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.frontend.session.timeout")
.doc("The check interval for session/operation timeout, which can be disabled by setting" +
@ -373,6 +372,15 @@ object KyuubiConf {
.intConf
.createWithDefault(-1)
val OPERATION_DOWNLOADED_RESOURCES_DIR: ConfigEntry[String] = {
KyuubiConfigBuilder("spark.kyuubi.operation.downloaded.resources.dir")
.doc("Temporary local directory for added resources in the remote file system.")
.stringConf
.createWithDefault(
s"${sys.env.getOrElse("KYUUBI_HOME", System.getProperty("java.io.tmpdir"))}"
+ File.separator + "resources")
}
/////////////////////////////////////////////////////////////////////////////////////////////////
// Containerization //
/////////////////////////////////////////////////////////////////////////////////////////////////

View File

@ -25,6 +25,8 @@ import java.util.concurrent.{Future, RejectedExecutionException}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException
import org.apache.hadoop.hive.ql.session.OperationLog
@ -32,7 +34,9 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils}
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.AddJarCommand
import org.apache.spark.sql.types._
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
@ -40,9 +44,12 @@ import yaooqinn.kyuubi.cli.FetchOrientation
import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder}
import yaooqinn.kyuubi.session.KyuubiSession
import yaooqinn.kyuubi.ui.KyuubiServerMonitor
import yaooqinn.kyuubi.utils.ReflectUtils
class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging {
import KyuubiOperation._
private var state: OperationState = INITIALIZED
private val opHandle: OperationHandle =
new OperationHandle(EXECUTE_STATEMENT, session.getProtocolVersion)
@ -50,7 +57,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
private val conf = session.sparkSession.conf
private val operationTimeout =
KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT.key))
KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT))
private var lastAccessTime = System.currentTimeMillis()
private var hasResultSet: Boolean = false
@ -299,6 +306,21 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
}
}
private def localizeAndAndResource(path: String): Unit = try {
if (isResourceDownloadable(path)) {
val src = new Path(path)
val destFileName = src.getName
val destFile =
new File(session.getResourcesSessionDir + destFileName).getCanonicalPath
val fs = src.getFileSystem(session.sparkSession.sparkContext.hadoopConfiguration)
fs.copyToLocalFile(src, new Path(destFile))
FileUtil.chmod(destFile, "ugo+rx", true)
AddJarCommand(destFile).run(session.sparkSession)
}
} catch {
case e: Exception => throw new KyuubiSQLException(s"Failed to read external resource: $path", e)
}
private def execute(): Unit = {
try {
statementId = UUID.randomUUID().toString
@ -322,6 +344,18 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
KyuubiServerMonitor.getListener(session.getUserName).foreach {
_.onStatementParsed(statementId, result.queryExecution.toString())
}
result.queryExecution.logical match {
case c if c.nodeName == "CreateFunctionCommand" =>
val resources =
ReflectUtils.getFieldValue(c, "resources").asInstanceOf[Seq[FunctionResource]]
resources.foreach { case FunctionResource(_, uri) =>
localizeAndAndResource(uri)
}
case a if a.nodeName == "AddJarCommand" =>
val path = ReflectUtils.getFieldValue(a, "path").asInstanceOf[String]
localizeAndAndResource(path)
case _ =>
}
debug(result.queryExecution.toString())
iter = if (incrementalCollect) {
info("Executing query in incremental collection mode")
@ -410,4 +444,9 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
object KyuubiOperation {
val DEFAULT_FETCH_ORIENTATION: FetchOrientation = FetchOrientation.FETCH_NEXT
val DEFAULT_FETCH_MAX_ROWS = 100
def isResourceDownloadable(resource: String): Boolean = {
val scheme = new Path(resource).toUri.getScheme
StringUtils.equalsIgnoreCase(scheme, "file")
}
}

View File

@ -63,6 +63,7 @@ private[kyuubi] class KyuubiSession(
private val opHandleSet = new MHSet[OperationHandle]
private var _isOperationLogEnabled = false
private var sessionLogDir: File = _
private var sessionResourcesDir: File = _
@volatile private var lastAccessTime: Long = System.currentTimeMillis()
private var lastIdleTime = 0L
@ -136,6 +137,15 @@ private[kyuubi] class KyuubiSession(
}
}
private def cleanupSessionResourcesDir(): Unit = {
try {
FileUtils.forceDelete(sessionResourcesDir)
} catch {
case e: Exception =>
error("Failed to cleanup session log dir: " + sessionResourcesDir, e)
}
}
def sparkSession: SparkSession = this.sparkSessionWithUGI.sparkSession
def ugi: UserGroupInformation = this.sessionUGI
@ -195,6 +205,8 @@ private[kyuubi] class KyuubiSession(
// Iterate through the opHandles and close their operations
opHandleSet.foreach(closeOperation)
opHandleSet.clear()
// Cleanup session resources directory
cleanupSessionResourcesDir()
// Cleanup session log directory.
cleanupSessionLogDir()
} finally {
@ -313,14 +325,14 @@ private[kyuubi] class KyuubiSession(
def isOperationLogEnabled: Boolean = _isOperationLogEnabled
/**
* Get the session dir, which is the parent dir of operation logs
* Get the session log dir, which is the parent dir of operation logs
*
* @return a file representing the parent directory of operation logs
*/
def getSessionLogDir: File = sessionLogDir
/**
* Set the session dir, which is the parent dir of operation logs
* Set the session log dir, which is the parent dir of operation logs
*
* @param operationLogRootDir the parent dir of the session dir
*/
@ -340,6 +352,32 @@ private[kyuubi] class KyuubiSession(
}
}
/**
* Get the session resource dir, which is the parent dir of operation logs
*
* @return a file representing the parent directory of operation logs
*/
def getResourcesSessionDir: File = sessionResourcesDir
/**
* Set the session log dir, which is the parent dir of operation logs
*
* @param resourcesRootDir the parent dir of the session dir
*/
def setResourcesSessionDir(resourcesRootDir: File): Unit = {
sessionResourcesDir = new File(resourcesRootDir,
username + File.separator + sessionHandle.getHandleIdentifier.toString + "_resources")
if (sessionResourcesDir.exists() && !sessionResourcesDir.isDirectory) {
throw new RuntimeException("The resources directory exists but is not a directory: " +
sessionResourcesDir)
}
if (!sessionResourcesDir.exists() && !sessionResourcesDir.mkdirs()) {
throw new RuntimeException("Couldn't create session resources directory " +
sessionResourcesDir)
}
}
def getSessionHandle: SessionHandle = sessionHandle
def getPassword: String = password

View File

@ -31,7 +31,7 @@ import org.apache.spark.SparkConf
import yaooqinn.kyuubi.{KyuubiSQLException, Logging}
import yaooqinn.kyuubi.operation.OperationManager
import yaooqinn.kyuubi.server.KyuubiServer
import yaooqinn.kyuubi.service.CompositeService
import yaooqinn.kyuubi.service.{CompositeService, ServiceException}
import yaooqinn.kyuubi.spark.SparkSessionCacheManager
import yaooqinn.kyuubi.ui.KyuubiServerMonitor
import yaooqinn.kyuubi.utils.NamedThreadFactory
@ -41,25 +41,26 @@ import yaooqinn.kyuubi.utils.NamedThreadFactory
*/
private[kyuubi] class SessionManager private(
name: String) extends CompositeService(name) with Logging {
private[this] val operationManager = new OperationManager()
private[this] val cacheManager = new SparkSessionCacheManager()
private[this] val handleToSession = new ConcurrentHashMap[SessionHandle, KyuubiSession]
private[this] var execPool: ThreadPoolExecutor = _
private[this] var isOperationLogEnabled = false
private[this] var operationLogRootDir: File = _
private[this] var checkInterval: Long = _
private[this] var sessionTimeout: Long = _
private[this] var checkOperation: Boolean = false
private[this] var shutdown: Boolean = false
private val operationManager = new OperationManager()
private val cacheManager = new SparkSessionCacheManager()
private val handleToSession = new ConcurrentHashMap[SessionHandle, KyuubiSession]
private var execPool: ThreadPoolExecutor = _
private var isOperationLogEnabled = false
private var operationLogRootDir: File = _
private var resourcesRootDir: File = _
private var checkInterval: Long = _
private var sessionTimeout: Long = _
private var checkOperation: Boolean = false
private var shutdown: Boolean = false
def this() = this(classOf[SessionManager].getSimpleName)
private[this] def createExecPool(): Unit = {
val poolSize = conf.get(ASYNC_EXEC_THREADS.key).toInt
private def createExecPool(): Unit = {
val poolSize = conf.get(ASYNC_EXEC_THREADS).toInt
info("Background operation thread pool size: " + poolSize)
val poolQueueSize = conf.get(ASYNC_EXEC_WAIT_QUEUE_SIZE.key).toInt
val poolQueueSize = conf.get(ASYNC_EXEC_WAIT_QUEUE_SIZE).toInt
info("Background operation thread wait queue size: " + poolQueueSize)
val keepAliveTime = conf.getTimeAsSeconds(EXEC_KEEPALIVE_TIME.key)
val keepAliveTime = conf.getTimeAsSeconds(EXEC_KEEPALIVE_TIME)
info("Background operation thread keepalive time: " + keepAliveTime + " seconds")
val threadPoolName = classOf[KyuubiServer].getSimpleName + "-Background-Pool"
execPool =
@ -71,13 +72,13 @@ private[kyuubi] class SessionManager private(
new LinkedBlockingQueue[Runnable](poolQueueSize),
new NamedThreadFactory(threadPoolName))
execPool.allowCoreThreadTimeOut(true)
checkInterval = conf.getTimeAsMs(FRONTEND_SESSION_CHECK_INTERVAL.key)
sessionTimeout = conf.getTimeAsMs(FRONTEND_IDLE_SESSION_TIMEOUT.key)
checkOperation = conf.get(FRONTEND_IDLE_SESSION_CHECK_OPERATION.key).toBoolean
checkInterval = conf.getTimeAsMs(FRONTEND_SESSION_CHECK_INTERVAL)
sessionTimeout = conf.getTimeAsMs(FRONTEND_IDLE_SESSION_TIMEOUT)
checkOperation = conf.get(FRONTEND_IDLE_SESSION_CHECK_OPERATION).toBoolean
}
private[this] def initOperationLogRootDir(): Unit = {
operationLogRootDir = new File(conf.get(LOGGING_OPERATION_LOG_DIR.key))
private def initOperationLogRootDir(): Unit = {
operationLogRootDir = new File(conf.get(LOGGING_OPERATION_LOG_DIR))
isOperationLogEnabled = true
if (operationLogRootDir.exists && !operationLogRootDir.isDirectory) {
info("The operation log root directory exists, but it is not a directory: "
@ -109,10 +110,30 @@ private[kyuubi] class SessionManager private(
}
}
private def initResourcesRootDir(): Unit = {
resourcesRootDir = new File(conf.get(OPERATION_DOWNLOADED_RESOURCES_DIR))
if (resourcesRootDir.exists() && !resourcesRootDir.isDirectory) {
throw new ServiceException("The operation downloaded resources directory exists but is not" +
s" a directory + ${resourcesRootDir.getAbsolutePath}")
}
if (!resourcesRootDir.exists() && !resourcesRootDir.mkdirs()) {
throw new ServiceException("Unable to create the operation downloaded resources directory" +
s" ${resourcesRootDir.getAbsolutePath}")
}
try {
FileUtils.forceDeleteOnExit(resourcesRootDir)
} catch {
case e: IOException =>
warn("Failed to schedule clean up Kyuubi Server's root directory for downloaded" +
" resources", e)
}
}
/**
* Periodically close idle sessions in 'spark.kyuubi.frontend.session.check.interval(default 6h)'
*/
private[this] def startTimeoutChecker(): Unit = {
private def startTimeoutChecker(): Unit = {
val interval: Long = math.max(checkInterval, 3000L)
// minimum 3 seconds
val timeoutChecker = new Runnable() {
@ -143,7 +164,7 @@ private[kyuubi] class SessionManager private(
execPool.execute(timeoutChecker)
}
private[this] def sleepInterval(interval: Long): Unit = {
private def sleepInterval(interval: Long): Unit = {
try {
Thread.sleep(interval)
} catch {
@ -152,7 +173,7 @@ private[kyuubi] class SessionManager private(
}
}
private[this] def cleanupLoggingRootDir(): Unit = {
private def cleanupLoggingRootDir(): Unit = {
if (isOperationLogEnabled) {
try {
FileUtils.forceDelete(operationLogRootDir)
@ -164,10 +185,19 @@ private[kyuubi] class SessionManager private(
}
}
private def cleanupResourcesRootDir(): Unit = try {
FileUtils.forceDelete(resourcesRootDir)
} catch {
case e: Exception =>
warn("Failed to cleanup root dir of KyuubiServer logging: "
+ operationLogRootDir.getAbsolutePath, e)
}
override def init(conf: SparkConf): Unit = synchronized {
this.conf = conf
initResourcesRootDir()
// Create operation log root directory, if operation logging is enabled
if (conf.get(LOGGING_OPERATION_ENABLED.key).toBoolean) {
if (conf.get(LOGGING_OPERATION_ENABLED).toBoolean) {
initOperationLogRootDir()
}
createExecPool()
@ -188,7 +218,7 @@ private[kyuubi] class SessionManager private(
shutdown = true
if (execPool != null) {
execPool.shutdown()
val timeout = conf.getTimeAsSeconds(ASYNC_EXEC_SHUTDOWN_TIMEOUT.key)
val timeout = conf.getTimeAsSeconds(ASYNC_EXEC_SHUTDOWN_TIMEOUT)
try {
execPool.awaitTermination(timeout, TimeUnit.SECONDS)
} catch {
@ -198,6 +228,7 @@ private[kyuubi] class SessionManager private(
}
execPool = null
}
cleanupResourcesRootDir()
cleanupLoggingRootDir()
}
@ -227,6 +258,7 @@ private[kyuubi] class SessionManager private(
info(s"Opening session for $username")
kyuubiSession.open(sessionConf)
kyuubiSession.setResourcesSessionDir(resourcesRootDir)
if (isOperationLogEnabled) {
kyuubiSession.setOperationLogSessionDir(operationLogRootDir)
}

View File

@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite
class SparkSQLUtilsSuite extends SparkFunSuite {
test("testGetUserJarClassLoader") {
test("get user jar class loader") {
val sparkSession = SparkSession
.builder()
.appName(classOf[SparkSQLUtilsSuite].getSimpleName)