diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml index 5d76397f6..1bacda502 100644 --- a/kyuubi-server/pom.xml +++ b/kyuubi-server/pom.xml @@ -279,7 +279,7 @@ - src/gen/ + src/gen/java diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala index aa7ca4516..51ac2943c 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala @@ -165,7 +165,6 @@ object KyuubiConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.HOURS.toMillis(6L)) - val FRONTEND_IDLE_SESSION_TIMEOUT: ConfigEntry[Long] = KyuubiConfigBuilder("spark.kyuubi.frontend.session.timeout") .doc("The check interval for session/operation timeout, which can be disabled by setting" + @@ -373,6 +372,15 @@ object KyuubiConf { .intConf .createWithDefault(-1) + val OPERATION_DOWNLOADED_RESOURCES_DIR: ConfigEntry[String] = { + KyuubiConfigBuilder("spark.kyuubi.operation.downloaded.resources.dir") + .doc("Temporary local directory for added resources in the remote file system.") + .stringConf + .createWithDefault( + s"${sys.env.getOrElse("KYUUBI_HOME", System.getProperty("java.io.tmpdir"))}" + + File.separator + "resources") + } + ///////////////////////////////////////////////////////////////////////////////////////////////// // Containerization // ///////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 483b5dcfc..4e11a2f24 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -25,6 +25,8 @@ import java.util.concurrent.{Future, RejectedExecutionException} import scala.collection.JavaConverters._ import scala.util.control.NonFatal +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException import org.apache.hadoop.hive.ql.session.OperationLog @@ -32,7 +34,9 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.KyuubiConf._ import org.apache.spark.KyuubiSparkUtil import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils} +import org.apache.spark.sql.catalyst.catalog.FunctionResource import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.execution.command.AddJarCommand import org.apache.spark.sql.types._ import yaooqinn.kyuubi.{KyuubiSQLException, Logging} @@ -40,9 +44,12 @@ import yaooqinn.kyuubi.cli.FetchOrientation import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder} import yaooqinn.kyuubi.session.KyuubiSession import yaooqinn.kyuubi.ui.KyuubiServerMonitor +import yaooqinn.kyuubi.utils.ReflectUtils class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging { + import KyuubiOperation._ + private var state: OperationState = INITIALIZED private val opHandle: OperationHandle = new OperationHandle(EXECUTE_STATEMENT, session.getProtocolVersion) @@ -50,7 +57,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging private val conf = session.sparkSession.conf private val operationTimeout = - KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT.key)) + KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT)) private var lastAccessTime = System.currentTimeMillis() private var hasResultSet: Boolean = false @@ -299,6 +306,21 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } } + private def localizeAndAndResource(path: String): Unit = try { + if (isResourceDownloadable(path)) { + val src = new Path(path) + val destFileName = src.getName + val destFile = + new File(session.getResourcesSessionDir + destFileName).getCanonicalPath + val fs = src.getFileSystem(session.sparkSession.sparkContext.hadoopConfiguration) + fs.copyToLocalFile(src, new Path(destFile)) + FileUtil.chmod(destFile, "ugo+rx", true) + AddJarCommand(destFile).run(session.sparkSession) + } + } catch { + case e: Exception => throw new KyuubiSQLException(s"Failed to read external resource: $path", e) + } + private def execute(): Unit = { try { statementId = UUID.randomUUID().toString @@ -322,6 +344,18 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging KyuubiServerMonitor.getListener(session.getUserName).foreach { _.onStatementParsed(statementId, result.queryExecution.toString()) } + result.queryExecution.logical match { + case c if c.nodeName == "CreateFunctionCommand" => + val resources = + ReflectUtils.getFieldValue(c, "resources").asInstanceOf[Seq[FunctionResource]] + resources.foreach { case FunctionResource(_, uri) => + localizeAndAndResource(uri) + } + case a if a.nodeName == "AddJarCommand" => + val path = ReflectUtils.getFieldValue(a, "path").asInstanceOf[String] + localizeAndAndResource(path) + case _ => + } debug(result.queryExecution.toString()) iter = if (incrementalCollect) { info("Executing query in incremental collection mode") @@ -410,4 +444,9 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging object KyuubiOperation { val DEFAULT_FETCH_ORIENTATION: FetchOrientation = FetchOrientation.FETCH_NEXT val DEFAULT_FETCH_MAX_ROWS = 100 + + def isResourceDownloadable(resource: String): Boolean = { + val scheme = new Path(resource).toUri.getScheme + StringUtils.equalsIgnoreCase(scheme, "file") + } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala index ff1e4c0fd..29d32a593 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala @@ -63,6 +63,7 @@ private[kyuubi] class KyuubiSession( private val opHandleSet = new MHSet[OperationHandle] private var _isOperationLogEnabled = false private var sessionLogDir: File = _ + private var sessionResourcesDir: File = _ @volatile private var lastAccessTime: Long = System.currentTimeMillis() private var lastIdleTime = 0L @@ -136,6 +137,15 @@ private[kyuubi] class KyuubiSession( } } + private def cleanupSessionResourcesDir(): Unit = { + try { + FileUtils.forceDelete(sessionResourcesDir) + } catch { + case e: Exception => + error("Failed to cleanup session log dir: " + sessionResourcesDir, e) + } + } + def sparkSession: SparkSession = this.sparkSessionWithUGI.sparkSession def ugi: UserGroupInformation = this.sessionUGI @@ -195,6 +205,8 @@ private[kyuubi] class KyuubiSession( // Iterate through the opHandles and close their operations opHandleSet.foreach(closeOperation) opHandleSet.clear() + // Cleanup session resources directory + cleanupSessionResourcesDir() // Cleanup session log directory. cleanupSessionLogDir() } finally { @@ -313,14 +325,14 @@ private[kyuubi] class KyuubiSession( def isOperationLogEnabled: Boolean = _isOperationLogEnabled /** - * Get the session dir, which is the parent dir of operation logs + * Get the session log dir, which is the parent dir of operation logs * * @return a file representing the parent directory of operation logs */ def getSessionLogDir: File = sessionLogDir /** - * Set the session dir, which is the parent dir of operation logs + * Set the session log dir, which is the parent dir of operation logs * * @param operationLogRootDir the parent dir of the session dir */ @@ -340,6 +352,32 @@ private[kyuubi] class KyuubiSession( } } + /** + * Get the session resource dir, which is the parent dir of operation logs + * + * @return a file representing the parent directory of operation logs + */ + def getResourcesSessionDir: File = sessionResourcesDir + + /** + * Set the session log dir, which is the parent dir of operation logs + * + * @param resourcesRootDir the parent dir of the session dir + */ + def setResourcesSessionDir(resourcesRootDir: File): Unit = { + sessionResourcesDir = new File(resourcesRootDir, + username + File.separator + sessionHandle.getHandleIdentifier.toString + "_resources") + if (sessionResourcesDir.exists() && !sessionResourcesDir.isDirectory) { + throw new RuntimeException("The resources directory exists but is not a directory: " + + sessionResourcesDir) + } + + if (!sessionResourcesDir.exists() && !sessionResourcesDir.mkdirs()) { + throw new RuntimeException("Couldn't create session resources directory " + + sessionResourcesDir) + } + } + def getSessionHandle: SessionHandle = sessionHandle def getPassword: String = password diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala index e37f32987..b012d1299 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala @@ -31,7 +31,7 @@ import org.apache.spark.SparkConf import yaooqinn.kyuubi.{KyuubiSQLException, Logging} import yaooqinn.kyuubi.operation.OperationManager import yaooqinn.kyuubi.server.KyuubiServer -import yaooqinn.kyuubi.service.CompositeService +import yaooqinn.kyuubi.service.{CompositeService, ServiceException} import yaooqinn.kyuubi.spark.SparkSessionCacheManager import yaooqinn.kyuubi.ui.KyuubiServerMonitor import yaooqinn.kyuubi.utils.NamedThreadFactory @@ -41,25 +41,26 @@ import yaooqinn.kyuubi.utils.NamedThreadFactory */ private[kyuubi] class SessionManager private( name: String) extends CompositeService(name) with Logging { - private[this] val operationManager = new OperationManager() - private[this] val cacheManager = new SparkSessionCacheManager() - private[this] val handleToSession = new ConcurrentHashMap[SessionHandle, KyuubiSession] - private[this] var execPool: ThreadPoolExecutor = _ - private[this] var isOperationLogEnabled = false - private[this] var operationLogRootDir: File = _ - private[this] var checkInterval: Long = _ - private[this] var sessionTimeout: Long = _ - private[this] var checkOperation: Boolean = false - private[this] var shutdown: Boolean = false + private val operationManager = new OperationManager() + private val cacheManager = new SparkSessionCacheManager() + private val handleToSession = new ConcurrentHashMap[SessionHandle, KyuubiSession] + private var execPool: ThreadPoolExecutor = _ + private var isOperationLogEnabled = false + private var operationLogRootDir: File = _ + private var resourcesRootDir: File = _ + private var checkInterval: Long = _ + private var sessionTimeout: Long = _ + private var checkOperation: Boolean = false + private var shutdown: Boolean = false def this() = this(classOf[SessionManager].getSimpleName) - private[this] def createExecPool(): Unit = { - val poolSize = conf.get(ASYNC_EXEC_THREADS.key).toInt + private def createExecPool(): Unit = { + val poolSize = conf.get(ASYNC_EXEC_THREADS).toInt info("Background operation thread pool size: " + poolSize) - val poolQueueSize = conf.get(ASYNC_EXEC_WAIT_QUEUE_SIZE.key).toInt + val poolQueueSize = conf.get(ASYNC_EXEC_WAIT_QUEUE_SIZE).toInt info("Background operation thread wait queue size: " + poolQueueSize) - val keepAliveTime = conf.getTimeAsSeconds(EXEC_KEEPALIVE_TIME.key) + val keepAliveTime = conf.getTimeAsSeconds(EXEC_KEEPALIVE_TIME) info("Background operation thread keepalive time: " + keepAliveTime + " seconds") val threadPoolName = classOf[KyuubiServer].getSimpleName + "-Background-Pool" execPool = @@ -71,13 +72,13 @@ private[kyuubi] class SessionManager private( new LinkedBlockingQueue[Runnable](poolQueueSize), new NamedThreadFactory(threadPoolName)) execPool.allowCoreThreadTimeOut(true) - checkInterval = conf.getTimeAsMs(FRONTEND_SESSION_CHECK_INTERVAL.key) - sessionTimeout = conf.getTimeAsMs(FRONTEND_IDLE_SESSION_TIMEOUT.key) - checkOperation = conf.get(FRONTEND_IDLE_SESSION_CHECK_OPERATION.key).toBoolean + checkInterval = conf.getTimeAsMs(FRONTEND_SESSION_CHECK_INTERVAL) + sessionTimeout = conf.getTimeAsMs(FRONTEND_IDLE_SESSION_TIMEOUT) + checkOperation = conf.get(FRONTEND_IDLE_SESSION_CHECK_OPERATION).toBoolean } - private[this] def initOperationLogRootDir(): Unit = { - operationLogRootDir = new File(conf.get(LOGGING_OPERATION_LOG_DIR.key)) + private def initOperationLogRootDir(): Unit = { + operationLogRootDir = new File(conf.get(LOGGING_OPERATION_LOG_DIR)) isOperationLogEnabled = true if (operationLogRootDir.exists && !operationLogRootDir.isDirectory) { info("The operation log root directory exists, but it is not a directory: " @@ -109,10 +110,30 @@ private[kyuubi] class SessionManager private( } } + private def initResourcesRootDir(): Unit = { + resourcesRootDir = new File(conf.get(OPERATION_DOWNLOADED_RESOURCES_DIR)) + if (resourcesRootDir.exists() && !resourcesRootDir.isDirectory) { + throw new ServiceException("The operation downloaded resources directory exists but is not" + + s" a directory + ${resourcesRootDir.getAbsolutePath}") + } + if (!resourcesRootDir.exists() && !resourcesRootDir.mkdirs()) { + throw new ServiceException("Unable to create the operation downloaded resources directory" + + s" ${resourcesRootDir.getAbsolutePath}") + } + + try { + FileUtils.forceDeleteOnExit(resourcesRootDir) + } catch { + case e: IOException => + warn("Failed to schedule clean up Kyuubi Server's root directory for downloaded" + + " resources", e) + } + } + /** * Periodically close idle sessions in 'spark.kyuubi.frontend.session.check.interval(default 6h)' */ - private[this] def startTimeoutChecker(): Unit = { + private def startTimeoutChecker(): Unit = { val interval: Long = math.max(checkInterval, 3000L) // minimum 3 seconds val timeoutChecker = new Runnable() { @@ -143,7 +164,7 @@ private[kyuubi] class SessionManager private( execPool.execute(timeoutChecker) } - private[this] def sleepInterval(interval: Long): Unit = { + private def sleepInterval(interval: Long): Unit = { try { Thread.sleep(interval) } catch { @@ -152,7 +173,7 @@ private[kyuubi] class SessionManager private( } } - private[this] def cleanupLoggingRootDir(): Unit = { + private def cleanupLoggingRootDir(): Unit = { if (isOperationLogEnabled) { try { FileUtils.forceDelete(operationLogRootDir) @@ -164,10 +185,19 @@ private[kyuubi] class SessionManager private( } } + private def cleanupResourcesRootDir(): Unit = try { + FileUtils.forceDelete(resourcesRootDir) + } catch { + case e: Exception => + warn("Failed to cleanup root dir of KyuubiServer logging: " + + operationLogRootDir.getAbsolutePath, e) + } + override def init(conf: SparkConf): Unit = synchronized { this.conf = conf + initResourcesRootDir() // Create operation log root directory, if operation logging is enabled - if (conf.get(LOGGING_OPERATION_ENABLED.key).toBoolean) { + if (conf.get(LOGGING_OPERATION_ENABLED).toBoolean) { initOperationLogRootDir() } createExecPool() @@ -188,7 +218,7 @@ private[kyuubi] class SessionManager private( shutdown = true if (execPool != null) { execPool.shutdown() - val timeout = conf.getTimeAsSeconds(ASYNC_EXEC_SHUTDOWN_TIMEOUT.key) + val timeout = conf.getTimeAsSeconds(ASYNC_EXEC_SHUTDOWN_TIMEOUT) try { execPool.awaitTermination(timeout, TimeUnit.SECONDS) } catch { @@ -198,6 +228,7 @@ private[kyuubi] class SessionManager private( } execPool = null } + cleanupResourcesRootDir() cleanupLoggingRootDir() } @@ -227,6 +258,7 @@ private[kyuubi] class SessionManager private( info(s"Opening session for $username") kyuubiSession.open(sessionConf) + kyuubiSession.setResourcesSessionDir(resourcesRootDir) if (isOperationLogEnabled) { kyuubiSession.setOperationLogSessionDir(operationLogRootDir) } diff --git a/kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala b/kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala index bbd65b04c..6109eff26 100644 --- a/kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite class SparkSQLUtilsSuite extends SparkFunSuite { - test("testGetUserJarClassLoader") { + test("get user jar class loader") { val sparkSession = SparkSession .builder() .appName(classOf[SparkSQLUtilsSuite].getSimpleName)