diff --git a/.travis.yml b/.travis.yml index 76162441e..9367164b0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -30,13 +30,13 @@ jobs: include: - stage: spark2.3 language: scala - script: ./build/mvn install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V + script: ./build/mvn clean install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V - stage: spark2.2 language: scala - script: ./build/mvn install -Pspark-2.2 -Dmaven.javadoc.skip=true -B -V + script: ./build/mvn clean install -Pspark-2.2 -Dmaven.javadoc.skip=true -B -V - stage: spark2.1 language: scala - script: ./build/mvn install -Pspark-2.1 -Dmaven.javadoc.skip=true -B -V + script: ./build/mvn clean install -Pspark-2.1 -Dmaven.javadoc.skip=true -B -V after_success: - bash <(curl -s https://codecov.io/bash) diff --git a/bin/kyuubi-logo b/bin/kyuubi-logo index 10285b4ed..80bb6cd51 100644 --- a/bin/kyuubi-logo +++ b/bin/kyuubi-logo @@ -6,4 +6,4 @@ \ \_\ \_\/`____ \ \____/\ \____/ \ \_,__/\ \_\ \/_/\/_/`/___/> \/___/ \/___/ \/___/ \/_/ /\___/ - \/__/ \ No newline at end of file + \/__/ diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml index 5d76397f6..1bacda502 100644 --- a/kyuubi-server/pom.xml +++ b/kyuubi-server/pom.xml @@ -279,7 +279,7 @@ - src/gen/ + src/gen/java diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala index aa7ca4516..51ac2943c 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala @@ -165,7 +165,6 @@ object KyuubiConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.HOURS.toMillis(6L)) - val FRONTEND_IDLE_SESSION_TIMEOUT: ConfigEntry[Long] = KyuubiConfigBuilder("spark.kyuubi.frontend.session.timeout") .doc("The check interval for session/operation timeout, which can be disabled by setting" + @@ -373,6 +372,15 @@ object KyuubiConf { .intConf .createWithDefault(-1) + val OPERATION_DOWNLOADED_RESOURCES_DIR: ConfigEntry[String] = { + KyuubiConfigBuilder("spark.kyuubi.operation.downloaded.resources.dir") + .doc("Temporary local directory for added resources in the remote file system.") + .stringConf + .createWithDefault( + s"${sys.env.getOrElse("KYUUBI_HOME", System.getProperty("java.io.tmpdir"))}" + + File.separator + "resources") + } + ///////////////////////////////////////////////////////////////////////////////////////////////// // Containerization // ///////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/kyuubi-server/src/main/scala/org/apache/spark/sql/SparkSQLUtils.scala b/kyuubi-server/src/main/scala/org/apache/spark/sql/SparkSQLUtils.scala index 8ddf80188..1b3ab9ed2 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/sql/SparkSQLUtils.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/sql/SparkSQLUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.types.DataType @@ -26,4 +27,15 @@ object SparkSQLUtils { HiveUtils.toHiveString(a) } + def getUserJarClassLoader(sparkSession: SparkSession): ClassLoader = { + sparkSession.sharedState.jarClassLoader + } + + def parsePlan(sparkSession: SparkSession, statement: String): LogicalPlan = { + sparkSession.sessionState.sqlParser.parsePlan(statement) + } + + def toDataFrame(sparkSession: SparkSession, plan: LogicalPlan): DataFrame = { + Dataset.ofRows(sparkSession, plan) + } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 1479bc758..658d65ece 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -25,14 +25,18 @@ import java.util.concurrent.{Future, RejectedExecutionException} import scala.collection.JavaConverters._ import scala.util.control.NonFatal +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException import org.apache.hadoop.hive.ql.session.OperationLog import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.KyuubiConf._ import org.apache.spark.KyuubiSparkUtil -import org.apache.spark.sql.{AnalysisException, DataFrame, Row} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils} +import org.apache.spark.sql.catalyst.catalog.FunctionResource import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.execution.command.AddJarCommand import org.apache.spark.sql.types._ import yaooqinn.kyuubi.{KyuubiSQLException, Logging} @@ -40,9 +44,12 @@ import yaooqinn.kyuubi.cli.FetchOrientation import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder} import yaooqinn.kyuubi.session.KyuubiSession import yaooqinn.kyuubi.ui.KyuubiServerMonitor +import yaooqinn.kyuubi.utils.ReflectUtils class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging { + import KyuubiOperation._ + private var state: OperationState = INITIALIZED private val opHandle: OperationHandle = new OperationHandle(EXECUTE_STATEMENT, session.getProtocolVersion) @@ -50,7 +57,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging private val conf = session.sparkSession.conf private val operationTimeout = - KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT.key)) + KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT)) private var lastAccessTime = System.currentTimeMillis() private var hasResultSet: Boolean = false @@ -299,11 +306,30 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } } + private def localizeAndAndResource(path: String): Unit = try { + if (isResourceDownloadable(path)) { + val src = new Path(path) + val destFileName = src.getName + val destFile = + new File(session.getResourcesSessionDir, destFileName).getCanonicalPath + val fs = src.getFileSystem(session.sparkSession.sparkContext.hadoopConfiguration) + fs.copyToLocalFile(src, new Path(destFile)) + FileUtil.chmod(destFile, "ugo+rx", true) + AddJarCommand(destFile).run(session.sparkSession) + } + } catch { + case e: Exception => throw new KyuubiSQLException(s"Failed to read external resource: $path", e) + } + private def execute(): Unit = { try { statementId = UUID.randomUUID().toString info(s"Running query '$statement' with $statementId") setState(RUNNING) + + val classLoader = SparkSQLUtils.getUserJarClassLoader(session.sparkSession) + Thread.currentThread().setContextClassLoader(classLoader) + KyuubiServerMonitor.getListener(session.getUserName).foreach { _.onStatementStart( statementId, @@ -314,7 +340,21 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } session.sparkSession.sparkContext.setJobGroup(statementId, statement) KyuubiSparkUtil.setActiveSparkContext(session.sparkSession.sparkContext) - result = session.sparkSession.sql(statement) + + val parsedPlan = SparkSQLUtils.parsePlan(session.sparkSession, statement) + parsedPlan match { + case c if c.nodeName == "CreateFunctionCommand" => + val resources = + ReflectUtils.getFieldValue(c, "resources").asInstanceOf[Seq[FunctionResource]] + resources.foreach { case FunctionResource(_, uri) => + localizeAndAndResource(uri) + } + case a if a.nodeName == "AddJarCommand" => + val path = ReflectUtils.getFieldValue(a, "path").asInstanceOf[String] + localizeAndAndResource(path) + case _ => + } + result = SparkSQLUtils.toDataFrame(session.sparkSession, parsedPlan) KyuubiServerMonitor.getListener(session.getUserName).foreach { _.onStatementParsed(statementId, result.queryExecution.toString()) } @@ -406,4 +446,9 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging object KyuubiOperation { val DEFAULT_FETCH_ORIENTATION: FetchOrientation = FetchOrientation.FETCH_NEXT val DEFAULT_FETCH_MAX_ROWS = 100 + + def isResourceDownloadable(resource: String): Boolean = { + val scheme = new Path(resource).toUri.getScheme + StringUtils.equalsIgnoreCase(scheme, "hdfs") + } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala index ff1e4c0fd..29d32a593 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala @@ -63,6 +63,7 @@ private[kyuubi] class KyuubiSession( private val opHandleSet = new MHSet[OperationHandle] private var _isOperationLogEnabled = false private var sessionLogDir: File = _ + private var sessionResourcesDir: File = _ @volatile private var lastAccessTime: Long = System.currentTimeMillis() private var lastIdleTime = 0L @@ -136,6 +137,15 @@ private[kyuubi] class KyuubiSession( } } + private def cleanupSessionResourcesDir(): Unit = { + try { + FileUtils.forceDelete(sessionResourcesDir) + } catch { + case e: Exception => + error("Failed to cleanup session log dir: " + sessionResourcesDir, e) + } + } + def sparkSession: SparkSession = this.sparkSessionWithUGI.sparkSession def ugi: UserGroupInformation = this.sessionUGI @@ -195,6 +205,8 @@ private[kyuubi] class KyuubiSession( // Iterate through the opHandles and close their operations opHandleSet.foreach(closeOperation) opHandleSet.clear() + // Cleanup session resources directory + cleanupSessionResourcesDir() // Cleanup session log directory. cleanupSessionLogDir() } finally { @@ -313,14 +325,14 @@ private[kyuubi] class KyuubiSession( def isOperationLogEnabled: Boolean = _isOperationLogEnabled /** - * Get the session dir, which is the parent dir of operation logs + * Get the session log dir, which is the parent dir of operation logs * * @return a file representing the parent directory of operation logs */ def getSessionLogDir: File = sessionLogDir /** - * Set the session dir, which is the parent dir of operation logs + * Set the session log dir, which is the parent dir of operation logs * * @param operationLogRootDir the parent dir of the session dir */ @@ -340,6 +352,32 @@ private[kyuubi] class KyuubiSession( } } + /** + * Get the session resource dir, which is the parent dir of operation logs + * + * @return a file representing the parent directory of operation logs + */ + def getResourcesSessionDir: File = sessionResourcesDir + + /** + * Set the session log dir, which is the parent dir of operation logs + * + * @param resourcesRootDir the parent dir of the session dir + */ + def setResourcesSessionDir(resourcesRootDir: File): Unit = { + sessionResourcesDir = new File(resourcesRootDir, + username + File.separator + sessionHandle.getHandleIdentifier.toString + "_resources") + if (sessionResourcesDir.exists() && !sessionResourcesDir.isDirectory) { + throw new RuntimeException("The resources directory exists but is not a directory: " + + sessionResourcesDir) + } + + if (!sessionResourcesDir.exists() && !sessionResourcesDir.mkdirs()) { + throw new RuntimeException("Couldn't create session resources directory " + + sessionResourcesDir) + } + } + def getSessionHandle: SessionHandle = sessionHandle def getPassword: String = password diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala index e37f32987..4a27313bb 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala @@ -31,7 +31,7 @@ import org.apache.spark.SparkConf import yaooqinn.kyuubi.{KyuubiSQLException, Logging} import yaooqinn.kyuubi.operation.OperationManager import yaooqinn.kyuubi.server.KyuubiServer -import yaooqinn.kyuubi.service.CompositeService +import yaooqinn.kyuubi.service.{CompositeService, ServiceException} import yaooqinn.kyuubi.spark.SparkSessionCacheManager import yaooqinn.kyuubi.ui.KyuubiServerMonitor import yaooqinn.kyuubi.utils.NamedThreadFactory @@ -41,25 +41,26 @@ import yaooqinn.kyuubi.utils.NamedThreadFactory */ private[kyuubi] class SessionManager private( name: String) extends CompositeService(name) with Logging { - private[this] val operationManager = new OperationManager() - private[this] val cacheManager = new SparkSessionCacheManager() - private[this] val handleToSession = new ConcurrentHashMap[SessionHandle, KyuubiSession] - private[this] var execPool: ThreadPoolExecutor = _ - private[this] var isOperationLogEnabled = false - private[this] var operationLogRootDir: File = _ - private[this] var checkInterval: Long = _ - private[this] var sessionTimeout: Long = _ - private[this] var checkOperation: Boolean = false - private[this] var shutdown: Boolean = false + private val operationManager = new OperationManager() + private val cacheManager = new SparkSessionCacheManager() + private val handleToSession = new ConcurrentHashMap[SessionHandle, KyuubiSession] + private var execPool: ThreadPoolExecutor = _ + private var isOperationLogEnabled = false + private var operationLogRootDir: File = _ + private var resourcesRootDir: File = _ + private var checkInterval: Long = _ + private var sessionTimeout: Long = _ + private var checkOperation: Boolean = false + private var shutdown: Boolean = false def this() = this(classOf[SessionManager].getSimpleName) - private[this] def createExecPool(): Unit = { - val poolSize = conf.get(ASYNC_EXEC_THREADS.key).toInt + private def createExecPool(): Unit = { + val poolSize = conf.get(ASYNC_EXEC_THREADS).toInt info("Background operation thread pool size: " + poolSize) - val poolQueueSize = conf.get(ASYNC_EXEC_WAIT_QUEUE_SIZE.key).toInt + val poolQueueSize = conf.get(ASYNC_EXEC_WAIT_QUEUE_SIZE).toInt info("Background operation thread wait queue size: " + poolQueueSize) - val keepAliveTime = conf.getTimeAsSeconds(EXEC_KEEPALIVE_TIME.key) + val keepAliveTime = conf.getTimeAsSeconds(EXEC_KEEPALIVE_TIME) info("Background operation thread keepalive time: " + keepAliveTime + " seconds") val threadPoolName = classOf[KyuubiServer].getSimpleName + "-Background-Pool" execPool = @@ -71,13 +72,13 @@ private[kyuubi] class SessionManager private( new LinkedBlockingQueue[Runnable](poolQueueSize), new NamedThreadFactory(threadPoolName)) execPool.allowCoreThreadTimeOut(true) - checkInterval = conf.getTimeAsMs(FRONTEND_SESSION_CHECK_INTERVAL.key) - sessionTimeout = conf.getTimeAsMs(FRONTEND_IDLE_SESSION_TIMEOUT.key) - checkOperation = conf.get(FRONTEND_IDLE_SESSION_CHECK_OPERATION.key).toBoolean + checkInterval = conf.getTimeAsMs(FRONTEND_SESSION_CHECK_INTERVAL) + sessionTimeout = conf.getTimeAsMs(FRONTEND_IDLE_SESSION_TIMEOUT) + checkOperation = conf.get(FRONTEND_IDLE_SESSION_CHECK_OPERATION).toBoolean } - private[this] def initOperationLogRootDir(): Unit = { - operationLogRootDir = new File(conf.get(LOGGING_OPERATION_LOG_DIR.key)) + private def initOperationLogRootDir(): Unit = { + operationLogRootDir = new File(conf.get(LOGGING_OPERATION_LOG_DIR)) isOperationLogEnabled = true if (operationLogRootDir.exists && !operationLogRootDir.isDirectory) { info("The operation log root directory exists, but it is not a directory: " @@ -109,10 +110,30 @@ private[kyuubi] class SessionManager private( } } + private def initResourcesRootDir(): Unit = { + resourcesRootDir = new File(conf.get(OPERATION_DOWNLOADED_RESOURCES_DIR)) + if (resourcesRootDir.exists() && !resourcesRootDir.isDirectory) { + throw new ServiceException("The operation downloaded resources directory exists but is not" + + s" a directory ${resourcesRootDir.getAbsolutePath}") + } + if (!resourcesRootDir.exists() && !resourcesRootDir.mkdirs()) { + throw new ServiceException("Unable to create the operation downloaded resources directory" + + s" ${resourcesRootDir.getAbsolutePath}") + } + + try { + FileUtils.forceDeleteOnExit(resourcesRootDir) + } catch { + case e: IOException => + warn("Failed to schedule clean up Kyuubi Server's root directory for downloaded" + + " resources", e) + } + } + /** * Periodically close idle sessions in 'spark.kyuubi.frontend.session.check.interval(default 6h)' */ - private[this] def startTimeoutChecker(): Unit = { + private def startTimeoutChecker(): Unit = { val interval: Long = math.max(checkInterval, 3000L) // minimum 3 seconds val timeoutChecker = new Runnable() { @@ -143,7 +164,7 @@ private[kyuubi] class SessionManager private( execPool.execute(timeoutChecker) } - private[this] def sleepInterval(interval: Long): Unit = { + private def sleepInterval(interval: Long): Unit = { try { Thread.sleep(interval) } catch { @@ -152,7 +173,7 @@ private[kyuubi] class SessionManager private( } } - private[this] def cleanupLoggingRootDir(): Unit = { + private def cleanupLoggingRootDir(): Unit = { if (isOperationLogEnabled) { try { FileUtils.forceDelete(operationLogRootDir) @@ -164,10 +185,19 @@ private[kyuubi] class SessionManager private( } } + private def cleanupResourcesRootDir(): Unit = try { + FileUtils.forceDelete(resourcesRootDir) + } catch { + case e: Exception => + warn("Failed to cleanup root dir of KyuubiServer logging: " + + operationLogRootDir.getAbsolutePath, e) + } + override def init(conf: SparkConf): Unit = synchronized { this.conf = conf + initResourcesRootDir() // Create operation log root directory, if operation logging is enabled - if (conf.get(LOGGING_OPERATION_ENABLED.key).toBoolean) { + if (conf.get(LOGGING_OPERATION_ENABLED).toBoolean) { initOperationLogRootDir() } createExecPool() @@ -188,7 +218,7 @@ private[kyuubi] class SessionManager private( shutdown = true if (execPool != null) { execPool.shutdown() - val timeout = conf.getTimeAsSeconds(ASYNC_EXEC_SHUTDOWN_TIMEOUT.key) + val timeout = conf.getTimeAsSeconds(ASYNC_EXEC_SHUTDOWN_TIMEOUT) try { execPool.awaitTermination(timeout, TimeUnit.SECONDS) } catch { @@ -198,6 +228,7 @@ private[kyuubi] class SessionManager private( } execPool = null } + cleanupResourcesRootDir() cleanupLoggingRootDir() } @@ -227,6 +258,7 @@ private[kyuubi] class SessionManager private( info(s"Opening session for $username") kyuubiSession.open(sessionConf) + kyuubiSession.setResourcesSessionDir(resourcesRootDir) if (isOperationLogEnabled) { kyuubiSession.setOperationLogSessionDir(operationLogRootDir) } diff --git a/kyuubi-server/src/test/resources/log4j.xml b/kyuubi-server/src/test/resources/log4j.xml index 129dcc1ef..37072aab6 100644 --- a/kyuubi-server/src/test/resources/log4j.xml +++ b/kyuubi-server/src/test/resources/log4j.xml @@ -33,4 +33,4 @@ - \ No newline at end of file + diff --git a/kyuubi-server/src/test/resources/udf-test.jar b/kyuubi-server/src/test/resources/udf-test.jar new file mode 100644 index 000000000..e69de29bb diff --git a/kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala b/kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala new file mode 100644 index 000000000..6109eff26 --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkFunSuite + +class SparkSQLUtilsSuite extends SparkFunSuite { + + test("get user jar class loader") { + val sparkSession = SparkSession + .builder() + .appName(classOf[SparkSQLUtilsSuite].getSimpleName) + .master("local") + .getOrCreate() + sparkSession.sql("add jar udf-test.jar") + val loader = SparkSQLUtils.getUserJarClassLoader(sparkSession) + assert(loader.getResource("udf-test.jar") !== null) + sparkSession.stop() + } + +} diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala index d6f35d7d1..d165cc707 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala @@ -136,11 +136,19 @@ class KyuubiOperationSuite extends SparkFunSuite { assert(s.getUserName === userName) } - test("testDEFAULT_FETCH_ORIENTATION") { + test("DEFAULT_FETCH_ORIENTATION") { assert(KyuubiOperation.DEFAULT_FETCH_ORIENTATION === FETCH_NEXT) } - test("testDEFAULT_FETCH_MAX_ROWS") { + test("DEFAULT_FETCH_MAX_ROWS") { assert(KyuubiOperation.DEFAULT_FETCH_MAX_ROWS === 100) } + + test("is resource downloadable") { + intercept[IllegalArgumentException](KyuubiOperation.isResourceDownloadable(null)) + intercept[IllegalArgumentException](KyuubiOperation.isResourceDownloadable("")) + assert(KyuubiOperation.isResourceDownloadable("hdfs://a/b/c.jar")) + assert(!KyuubiOperation.isResourceDownloadable("file://a/b/c.jar")) + assert(!KyuubiOperation.isResourceDownloadable("dfs://a/b/c.jar")) + } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala index b29b2ca55..111203bf1 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala @@ -21,6 +21,7 @@ import java.net.InetAddress import scala.collection.JavaConverters._ +import org.apache.hadoop.hdfs.MiniDFSCluster import org.apache.hive.service.cli.thrift._ import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite} import org.apache.spark.KyuubiConf._ @@ -55,7 +56,7 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu super.afterAll() } - test(" test new fe service") { + test("test new fe service") { val feService = new FrontendService(beService) feService.getConf should be(null) feService.getStartTime should be(0) @@ -206,7 +207,7 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu val req3 = new TGetOperationStatusReq(resp2.getOperationHandle) val resp3 = feService.GetOperationStatus(req3) resp3.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - Thread.sleep(10000) + Thread.sleep(5000) val req4 = new TFetchResultsReq(resp2.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) val resp4 = feService.FetchResults(req4) resp4.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) @@ -253,4 +254,159 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu feService.stop() } } + + test("alter database") { + withFEServiceAndHandle { (fe, handle) => + val req = new TExecuteStatementReq(handle, + "alter database default set dbproperties ('kent'='yao')") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + tFetchResultsResp.getResults.getRows.size() should be(0) + } + } + + test("alter schema") { + withFEServiceAndHandle { (fe, handle) => + val req = new TExecuteStatementReq(handle, + "alter schema default set dbproperties ('kent'='yao')") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + tFetchResultsResp.getResults.getRows.size() should be(0) + } + } + + test("alter table name") { + withFEServiceAndHandle { (fe, handle) => + val ct = new TExecuteStatementReq(handle, "create table default.src(key int) using parquet") + fe.ExecuteStatement(ct) + Thread.sleep(5000) + val req = new TExecuteStatementReq(handle, "alter table default.src rename to default.src2") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + tFetchResultsResp.getResults.getRows.size() should be(0) + } + } + + test("alter table set properties") { + withFEServiceAndHandle { (fe, handle) => + val ct = new TExecuteStatementReq(handle, "create table default.src(key int) using parquet") + fe.ExecuteStatement(ct) + Thread.sleep(5000) + val req = new TExecuteStatementReq(handle, + "alter table default.src set tblproperties ('kent'='yao')") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + tFetchResultsResp.getResults.getRows.size() should be(0) + } + } + + test("alter table unset properties") { + withFEServiceAndHandle { (fe, handle) => + val ct = new TExecuteStatementReq(handle, + "create table default.src(key int) using parquet tblproperties ('kent'='yao')") + fe.ExecuteStatement(ct) + Thread.sleep(5000) + val req = new TExecuteStatementReq(handle, + "alter table default.src unset tblproperties if exists ('kent', 'yao')") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + tFetchResultsResp.getResults.getRows.size() should be(0) + } + } + + test("add jar hdfs") { + withFEServiceAndHandle { (fe, handle) => + val req = new TExecuteStatementReq(handle, "add jar hdfs://a/b/test.jar") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) + } + } + + test("add jar local") { + withFEServiceAndHandle { (fe, handle) => + val req = new TExecuteStatementReq(handle, "add jar file://a/b/test.jar") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + tFetchResultsResp.getResults.getRows.get(0).getColVals.get(0).getI32Val.getValue should be(0) + } + } + + test("create function") { + withFEServiceAndHandle { (fe, handle) => + val req = new TExecuteStatementReq(handle, + "create temporary function testfunc as 'testClass' using jar 'hdfs://a/b/test.jar'") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) + } + } + + def withFEServiceAndHandle(block: (FrontendService, TSessionHandle) => Unit): Unit = { + val feService = new FrontendService(beService) + try { + feService.init(conf) + feService.start() + val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) + val resp = feService.OpenSession(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + val handle = resp.getSessionHandle + block(feService, handle) + } finally { + feService.stop() + } + } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/KyuubiSessionSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/KyuubiSessionSuite.scala index d712dd0c7..e0fa2e1ba 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/KyuubiSessionSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/KyuubiSessionSuite.scala @@ -22,7 +22,6 @@ import java.io.File import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkFunSuite} -import org.apache.spark.KyuubiConf.LOGGING_OPERATION_LOG_DIR import org.apache.spark.sql.SparkSession import yaooqinn.kyuubi.KyuubiSQLException @@ -34,6 +33,8 @@ import yaooqinn.kyuubi.utils.ReflectUtils class KyuubiSessionSuite extends SparkFunSuite { + import KyuubiConf._ + var server: KyuubiServer = _ var session: KyuubiSession = _ var spark: SparkSession = _ @@ -105,10 +106,8 @@ class KyuubiSessionSuite extends SparkFunSuite { } test("set operation log session dir") { - val operationLogRootDir = new File(server.getConf.get(LOGGING_OPERATION_LOG_DIR.key)) - operationLogRootDir.setReadable(true) - operationLogRootDir.setWritable(true) - operationLogRootDir.setExecutable(true) + val operationLogRootDir = new File(server.getConf.get(LOGGING_OPERATION_LOG_DIR)) + operationLogRootDir.mkdirs() session.setOperationLogSessionDir(operationLogRootDir) assert(session.isOperationLogEnabled) assert(operationLogRootDir.exists()) @@ -128,6 +127,30 @@ class KyuubiSessionSuite extends SparkFunSuite { operationLogRootDir.setExecutable(true) } + test("set resources session dir") { + val resourceRoot = new File(server.getConf.get(OPERATION_DOWNLOADED_RESOURCES_DIR)) + resourceRoot.mkdirs() + resourceRoot.deleteOnExit() + assert(resourceRoot.isDirectory) + session.setResourcesSessionDir(resourceRoot) + val subDir = resourceRoot.listFiles().head + assert(subDir.getName === KyuubiSparkUtil.getCurrentUserName) + val resourceDir = subDir.listFiles().head + assert(resourceDir.getName === session.getSessionHandle.getSessionId + "_resources") + session.setResourcesSessionDir(resourceRoot) + assert(subDir.listFiles().length === 1, "directory should already exists") + assert(resourceDir.delete()) + resourceDir.createNewFile() + assert(resourceDir.isFile) + val e1 = intercept[RuntimeException](session.setResourcesSessionDir(resourceRoot)) + assert(e1.getMessage.startsWith("The resources directory exists but is not a directory")) + resourceDir.delete() + subDir.setWritable(false) + val e2 = intercept[RuntimeException](session.setResourcesSessionDir(resourceRoot)) + assert(e2.getMessage.startsWith("Couldn't create session resources directory")) + subDir.setWritable(true) + } + test("get no operation time") { assert(session.getNoOperationTime !== 0L) } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala index 91de06dc2..7cd34458f 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala @@ -24,11 +24,13 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite} import yaooqinn.kyuubi.KyuubiSQLException -import yaooqinn.kyuubi.service.State +import yaooqinn.kyuubi.service.{ServiceException, State} import yaooqinn.kyuubi.utils.ReflectUtils class SessionManagerSuite extends SparkFunSuite { + import KyuubiConf._ + test("init operation log") { val logRoot = UUID.randomUUID().toString val logRoot2 = logRoot + "/sub" @@ -67,6 +69,29 @@ class SessionManagerSuite extends SparkFunSuite { sessionManager4.stop() } + test("init resources root dir") { + val conf = new SparkConf(true).set(KyuubiConf.LOGGING_OPERATION_ENABLED.key, "false") + KyuubiSparkUtil.setupCommonConfig(conf) + val sessionManager = new SessionManager() + + sessionManager.init(conf) + val resourcesRoot = new File(conf.get(OPERATION_DOWNLOADED_RESOURCES_DIR)) + assert(resourcesRoot.exists()) + assert(resourcesRoot.isDirectory) + resourcesRoot.delete() + resourcesRoot.createNewFile() + val e1 = intercept[ServiceException](sessionManager.init(conf)) + assert(e1.getMessage.startsWith( + "The operation downloaded resources directory exists but is not a directory")) + assert(resourcesRoot.delete()) + resourcesRoot.getParentFile.setWritable(false) + try { + intercept[Exception](sessionManager.init(conf)) + } finally { + resourcesRoot.getParentFile.setWritable(true) + } + } + test("start timeout checker") { val conf = new SparkConf().set(KyuubiConf.FRONTEND_SESSION_CHECK_INTERVAL.key, "-1") val sessionManager = new SessionManager()