From 05653d91f61778a3191a7c21c5338b62a765fb76 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 18 Feb 2019 15:34:05 +0800 Subject: [PATCH 1/9] fixes #150 un-recognized class while using add jar & create functions --- .../org/apache/spark/sql/SparkSQLUtils.scala | 3 ++ .../kyuubi/operation/KyuubiOperation.scala | 6 +++- kyuubi-server/src/test/resources/udf-test.jar | 0 .../apache/spark/sql/SparkSQLUtilsSuite.scala | 36 +++++++++++++++++++ 4 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 kyuubi-server/src/test/resources/udf-test.jar create mode 100644 kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala diff --git a/kyuubi-server/src/main/scala/org/apache/spark/sql/SparkSQLUtils.scala b/kyuubi-server/src/main/scala/org/apache/spark/sql/SparkSQLUtils.scala index 8ddf80188..1f582a47f 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/sql/SparkSQLUtils.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/sql/SparkSQLUtils.scala @@ -26,4 +26,7 @@ object SparkSQLUtils { HiveUtils.toHiveString(a) } + def getUserJarClassLoader(sparkSession: SparkSession): ClassLoader = { + sparkSession.sharedState.jarClassLoader + } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 1479bc758..483b5dcfc 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.session.OperationLog import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.KyuubiConf._ import org.apache.spark.KyuubiSparkUtil -import org.apache.spark.sql.{AnalysisException, DataFrame, Row} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.types._ @@ -304,6 +304,10 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging statementId = UUID.randomUUID().toString info(s"Running query '$statement' with $statementId") setState(RUNNING) + + val classLoader = SparkSQLUtils.getUserJarClassLoader(session.sparkSession) + Thread.currentThread().setContextClassLoader(classLoader) + KyuubiServerMonitor.getListener(session.getUserName).foreach { _.onStatementStart( statementId, diff --git a/kyuubi-server/src/test/resources/udf-test.jar b/kyuubi-server/src/test/resources/udf-test.jar new file mode 100644 index 000000000..e69de29bb diff --git a/kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala b/kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala new file mode 100644 index 000000000..bbd65b04c --- /dev/null +++ b/kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkFunSuite + +class SparkSQLUtilsSuite extends SparkFunSuite { + + test("testGetUserJarClassLoader") { + val sparkSession = SparkSession + .builder() + .appName(classOf[SparkSQLUtilsSuite].getSimpleName) + .master("local") + .getOrCreate() + sparkSession.sql("add jar udf-test.jar") + val loader = SparkSQLUtils.getUserJarClassLoader(sparkSession) + assert(loader.getResource("udf-test.jar") !== null) + sparkSession.stop() + } + +} From 694e3ef791e5e36d6ab04d6409cff70b31fd83dd Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 20 Feb 2019 10:21:18 +0800 Subject: [PATCH 2/9] localize remote jars to add them into classpath --- kyuubi-server/pom.xml | 2 +- .../scala/org/apache/spark/KyuubiConf.scala | 10 ++- .../kyuubi/operation/KyuubiOperation.scala | 41 +++++++++- .../kyuubi/session/KyuubiSession.scala | 42 +++++++++- .../kyuubi/session/SessionManager.scala | 82 +++++++++++++------ .../apache/spark/sql/SparkSQLUtilsSuite.scala | 2 +- 6 files changed, 148 insertions(+), 31 deletions(-) diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml index 5d76397f6..1bacda502 100644 --- a/kyuubi-server/pom.xml +++ b/kyuubi-server/pom.xml @@ -279,7 +279,7 @@ - src/gen/ + src/gen/java diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala index aa7ca4516..51ac2943c 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala @@ -165,7 +165,6 @@ object KyuubiConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.HOURS.toMillis(6L)) - val FRONTEND_IDLE_SESSION_TIMEOUT: ConfigEntry[Long] = KyuubiConfigBuilder("spark.kyuubi.frontend.session.timeout") .doc("The check interval for session/operation timeout, which can be disabled by setting" + @@ -373,6 +372,15 @@ object KyuubiConf { .intConf .createWithDefault(-1) + val OPERATION_DOWNLOADED_RESOURCES_DIR: ConfigEntry[String] = { + KyuubiConfigBuilder("spark.kyuubi.operation.downloaded.resources.dir") + .doc("Temporary local directory for added resources in the remote file system.") + .stringConf + .createWithDefault( + s"${sys.env.getOrElse("KYUUBI_HOME", System.getProperty("java.io.tmpdir"))}" + + File.separator + "resources") + } + ///////////////////////////////////////////////////////////////////////////////////////////////// // Containerization // ///////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 483b5dcfc..4e11a2f24 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -25,6 +25,8 @@ import java.util.concurrent.{Future, RejectedExecutionException} import scala.collection.JavaConverters._ import scala.util.control.NonFatal +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException import org.apache.hadoop.hive.ql.session.OperationLog @@ -32,7 +34,9 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.KyuubiConf._ import org.apache.spark.KyuubiSparkUtil import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils} +import org.apache.spark.sql.catalyst.catalog.FunctionResource import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.execution.command.AddJarCommand import org.apache.spark.sql.types._ import yaooqinn.kyuubi.{KyuubiSQLException, Logging} @@ -40,9 +44,12 @@ import yaooqinn.kyuubi.cli.FetchOrientation import yaooqinn.kyuubi.schema.{RowSet, RowSetBuilder} import yaooqinn.kyuubi.session.KyuubiSession import yaooqinn.kyuubi.ui.KyuubiServerMonitor +import yaooqinn.kyuubi.utils.ReflectUtils class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging { + import KyuubiOperation._ + private var state: OperationState = INITIALIZED private val opHandle: OperationHandle = new OperationHandle(EXECUTE_STATEMENT, session.getProtocolVersion) @@ -50,7 +57,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging private val conf = session.sparkSession.conf private val operationTimeout = - KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT.key)) + KyuubiSparkUtil.timeStringAsMs(conf.get(OPERATION_IDLE_TIMEOUT)) private var lastAccessTime = System.currentTimeMillis() private var hasResultSet: Boolean = false @@ -299,6 +306,21 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } } + private def localizeAndAndResource(path: String): Unit = try { + if (isResourceDownloadable(path)) { + val src = new Path(path) + val destFileName = src.getName + val destFile = + new File(session.getResourcesSessionDir + destFileName).getCanonicalPath + val fs = src.getFileSystem(session.sparkSession.sparkContext.hadoopConfiguration) + fs.copyToLocalFile(src, new Path(destFile)) + FileUtil.chmod(destFile, "ugo+rx", true) + AddJarCommand(destFile).run(session.sparkSession) + } + } catch { + case e: Exception => throw new KyuubiSQLException(s"Failed to read external resource: $path", e) + } + private def execute(): Unit = { try { statementId = UUID.randomUUID().toString @@ -322,6 +344,18 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging KyuubiServerMonitor.getListener(session.getUserName).foreach { _.onStatementParsed(statementId, result.queryExecution.toString()) } + result.queryExecution.logical match { + case c if c.nodeName == "CreateFunctionCommand" => + val resources = + ReflectUtils.getFieldValue(c, "resources").asInstanceOf[Seq[FunctionResource]] + resources.foreach { case FunctionResource(_, uri) => + localizeAndAndResource(uri) + } + case a if a.nodeName == "AddJarCommand" => + val path = ReflectUtils.getFieldValue(a, "path").asInstanceOf[String] + localizeAndAndResource(path) + case _ => + } debug(result.queryExecution.toString()) iter = if (incrementalCollect) { info("Executing query in incremental collection mode") @@ -410,4 +444,9 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging object KyuubiOperation { val DEFAULT_FETCH_ORIENTATION: FetchOrientation = FetchOrientation.FETCH_NEXT val DEFAULT_FETCH_MAX_ROWS = 100 + + def isResourceDownloadable(resource: String): Boolean = { + val scheme = new Path(resource).toUri.getScheme + StringUtils.equalsIgnoreCase(scheme, "file") + } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala index ff1e4c0fd..29d32a593 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/KyuubiSession.scala @@ -63,6 +63,7 @@ private[kyuubi] class KyuubiSession( private val opHandleSet = new MHSet[OperationHandle] private var _isOperationLogEnabled = false private var sessionLogDir: File = _ + private var sessionResourcesDir: File = _ @volatile private var lastAccessTime: Long = System.currentTimeMillis() private var lastIdleTime = 0L @@ -136,6 +137,15 @@ private[kyuubi] class KyuubiSession( } } + private def cleanupSessionResourcesDir(): Unit = { + try { + FileUtils.forceDelete(sessionResourcesDir) + } catch { + case e: Exception => + error("Failed to cleanup session log dir: " + sessionResourcesDir, e) + } + } + def sparkSession: SparkSession = this.sparkSessionWithUGI.sparkSession def ugi: UserGroupInformation = this.sessionUGI @@ -195,6 +205,8 @@ private[kyuubi] class KyuubiSession( // Iterate through the opHandles and close their operations opHandleSet.foreach(closeOperation) opHandleSet.clear() + // Cleanup session resources directory + cleanupSessionResourcesDir() // Cleanup session log directory. cleanupSessionLogDir() } finally { @@ -313,14 +325,14 @@ private[kyuubi] class KyuubiSession( def isOperationLogEnabled: Boolean = _isOperationLogEnabled /** - * Get the session dir, which is the parent dir of operation logs + * Get the session log dir, which is the parent dir of operation logs * * @return a file representing the parent directory of operation logs */ def getSessionLogDir: File = sessionLogDir /** - * Set the session dir, which is the parent dir of operation logs + * Set the session log dir, which is the parent dir of operation logs * * @param operationLogRootDir the parent dir of the session dir */ @@ -340,6 +352,32 @@ private[kyuubi] class KyuubiSession( } } + /** + * Get the session resource dir, which is the parent dir of operation logs + * + * @return a file representing the parent directory of operation logs + */ + def getResourcesSessionDir: File = sessionResourcesDir + + /** + * Set the session log dir, which is the parent dir of operation logs + * + * @param resourcesRootDir the parent dir of the session dir + */ + def setResourcesSessionDir(resourcesRootDir: File): Unit = { + sessionResourcesDir = new File(resourcesRootDir, + username + File.separator + sessionHandle.getHandleIdentifier.toString + "_resources") + if (sessionResourcesDir.exists() && !sessionResourcesDir.isDirectory) { + throw new RuntimeException("The resources directory exists but is not a directory: " + + sessionResourcesDir) + } + + if (!sessionResourcesDir.exists() && !sessionResourcesDir.mkdirs()) { + throw new RuntimeException("Couldn't create session resources directory " + + sessionResourcesDir) + } + } + def getSessionHandle: SessionHandle = sessionHandle def getPassword: String = password diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala index e37f32987..b012d1299 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala @@ -31,7 +31,7 @@ import org.apache.spark.SparkConf import yaooqinn.kyuubi.{KyuubiSQLException, Logging} import yaooqinn.kyuubi.operation.OperationManager import yaooqinn.kyuubi.server.KyuubiServer -import yaooqinn.kyuubi.service.CompositeService +import yaooqinn.kyuubi.service.{CompositeService, ServiceException} import yaooqinn.kyuubi.spark.SparkSessionCacheManager import yaooqinn.kyuubi.ui.KyuubiServerMonitor import yaooqinn.kyuubi.utils.NamedThreadFactory @@ -41,25 +41,26 @@ import yaooqinn.kyuubi.utils.NamedThreadFactory */ private[kyuubi] class SessionManager private( name: String) extends CompositeService(name) with Logging { - private[this] val operationManager = new OperationManager() - private[this] val cacheManager = new SparkSessionCacheManager() - private[this] val handleToSession = new ConcurrentHashMap[SessionHandle, KyuubiSession] - private[this] var execPool: ThreadPoolExecutor = _ - private[this] var isOperationLogEnabled = false - private[this] var operationLogRootDir: File = _ - private[this] var checkInterval: Long = _ - private[this] var sessionTimeout: Long = _ - private[this] var checkOperation: Boolean = false - private[this] var shutdown: Boolean = false + private val operationManager = new OperationManager() + private val cacheManager = new SparkSessionCacheManager() + private val handleToSession = new ConcurrentHashMap[SessionHandle, KyuubiSession] + private var execPool: ThreadPoolExecutor = _ + private var isOperationLogEnabled = false + private var operationLogRootDir: File = _ + private var resourcesRootDir: File = _ + private var checkInterval: Long = _ + private var sessionTimeout: Long = _ + private var checkOperation: Boolean = false + private var shutdown: Boolean = false def this() = this(classOf[SessionManager].getSimpleName) - private[this] def createExecPool(): Unit = { - val poolSize = conf.get(ASYNC_EXEC_THREADS.key).toInt + private def createExecPool(): Unit = { + val poolSize = conf.get(ASYNC_EXEC_THREADS).toInt info("Background operation thread pool size: " + poolSize) - val poolQueueSize = conf.get(ASYNC_EXEC_WAIT_QUEUE_SIZE.key).toInt + val poolQueueSize = conf.get(ASYNC_EXEC_WAIT_QUEUE_SIZE).toInt info("Background operation thread wait queue size: " + poolQueueSize) - val keepAliveTime = conf.getTimeAsSeconds(EXEC_KEEPALIVE_TIME.key) + val keepAliveTime = conf.getTimeAsSeconds(EXEC_KEEPALIVE_TIME) info("Background operation thread keepalive time: " + keepAliveTime + " seconds") val threadPoolName = classOf[KyuubiServer].getSimpleName + "-Background-Pool" execPool = @@ -71,13 +72,13 @@ private[kyuubi] class SessionManager private( new LinkedBlockingQueue[Runnable](poolQueueSize), new NamedThreadFactory(threadPoolName)) execPool.allowCoreThreadTimeOut(true) - checkInterval = conf.getTimeAsMs(FRONTEND_SESSION_CHECK_INTERVAL.key) - sessionTimeout = conf.getTimeAsMs(FRONTEND_IDLE_SESSION_TIMEOUT.key) - checkOperation = conf.get(FRONTEND_IDLE_SESSION_CHECK_OPERATION.key).toBoolean + checkInterval = conf.getTimeAsMs(FRONTEND_SESSION_CHECK_INTERVAL) + sessionTimeout = conf.getTimeAsMs(FRONTEND_IDLE_SESSION_TIMEOUT) + checkOperation = conf.get(FRONTEND_IDLE_SESSION_CHECK_OPERATION).toBoolean } - private[this] def initOperationLogRootDir(): Unit = { - operationLogRootDir = new File(conf.get(LOGGING_OPERATION_LOG_DIR.key)) + private def initOperationLogRootDir(): Unit = { + operationLogRootDir = new File(conf.get(LOGGING_OPERATION_LOG_DIR)) isOperationLogEnabled = true if (operationLogRootDir.exists && !operationLogRootDir.isDirectory) { info("The operation log root directory exists, but it is not a directory: " @@ -109,10 +110,30 @@ private[kyuubi] class SessionManager private( } } + private def initResourcesRootDir(): Unit = { + resourcesRootDir = new File(conf.get(OPERATION_DOWNLOADED_RESOURCES_DIR)) + if (resourcesRootDir.exists() && !resourcesRootDir.isDirectory) { + throw new ServiceException("The operation downloaded resources directory exists but is not" + + s" a directory + ${resourcesRootDir.getAbsolutePath}") + } + if (!resourcesRootDir.exists() && !resourcesRootDir.mkdirs()) { + throw new ServiceException("Unable to create the operation downloaded resources directory" + + s" ${resourcesRootDir.getAbsolutePath}") + } + + try { + FileUtils.forceDeleteOnExit(resourcesRootDir) + } catch { + case e: IOException => + warn("Failed to schedule clean up Kyuubi Server's root directory for downloaded" + + " resources", e) + } + } + /** * Periodically close idle sessions in 'spark.kyuubi.frontend.session.check.interval(default 6h)' */ - private[this] def startTimeoutChecker(): Unit = { + private def startTimeoutChecker(): Unit = { val interval: Long = math.max(checkInterval, 3000L) // minimum 3 seconds val timeoutChecker = new Runnable() { @@ -143,7 +164,7 @@ private[kyuubi] class SessionManager private( execPool.execute(timeoutChecker) } - private[this] def sleepInterval(interval: Long): Unit = { + private def sleepInterval(interval: Long): Unit = { try { Thread.sleep(interval) } catch { @@ -152,7 +173,7 @@ private[kyuubi] class SessionManager private( } } - private[this] def cleanupLoggingRootDir(): Unit = { + private def cleanupLoggingRootDir(): Unit = { if (isOperationLogEnabled) { try { FileUtils.forceDelete(operationLogRootDir) @@ -164,10 +185,19 @@ private[kyuubi] class SessionManager private( } } + private def cleanupResourcesRootDir(): Unit = try { + FileUtils.forceDelete(resourcesRootDir) + } catch { + case e: Exception => + warn("Failed to cleanup root dir of KyuubiServer logging: " + + operationLogRootDir.getAbsolutePath, e) + } + override def init(conf: SparkConf): Unit = synchronized { this.conf = conf + initResourcesRootDir() // Create operation log root directory, if operation logging is enabled - if (conf.get(LOGGING_OPERATION_ENABLED.key).toBoolean) { + if (conf.get(LOGGING_OPERATION_ENABLED).toBoolean) { initOperationLogRootDir() } createExecPool() @@ -188,7 +218,7 @@ private[kyuubi] class SessionManager private( shutdown = true if (execPool != null) { execPool.shutdown() - val timeout = conf.getTimeAsSeconds(ASYNC_EXEC_SHUTDOWN_TIMEOUT.key) + val timeout = conf.getTimeAsSeconds(ASYNC_EXEC_SHUTDOWN_TIMEOUT) try { execPool.awaitTermination(timeout, TimeUnit.SECONDS) } catch { @@ -198,6 +228,7 @@ private[kyuubi] class SessionManager private( } execPool = null } + cleanupResourcesRootDir() cleanupLoggingRootDir() } @@ -227,6 +258,7 @@ private[kyuubi] class SessionManager private( info(s"Opening session for $username") kyuubiSession.open(sessionConf) + kyuubiSession.setResourcesSessionDir(resourcesRootDir) if (isOperationLogEnabled) { kyuubiSession.setOperationLogSessionDir(operationLogRootDir) } diff --git a/kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala b/kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala index bbd65b04c..6109eff26 100644 --- a/kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/spark/sql/SparkSQLUtilsSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite class SparkSQLUtilsSuite extends SparkFunSuite { - test("testGetUserJarClassLoader") { + test("get user jar class loader") { val sparkSession = SparkSession .builder() .appName(classOf[SparkSQLUtilsSuite].getSimpleName) From 2f3c5823699f7079eb1fa7d66dbdea5db9ab598c Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 20 Feb 2019 10:57:18 +0800 Subject: [PATCH 3/9] set hdfs to be downloadable --- .../main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 4e11a2f24..97400972d 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -447,6 +447,6 @@ object KyuubiOperation { def isResourceDownloadable(resource: String): Boolean = { val scheme = new Path(resource).toUri.getScheme - StringUtils.equalsIgnoreCase(scheme, "file") + StringUtils.equalsIgnoreCase(scheme, "hdfs") } } From 2cfb2ae180586b8f4d96f3381033856a1174b106 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 20 Feb 2019 11:15:33 +0800 Subject: [PATCH 4/9] fix resource local path --- .../main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 97400972d..483123b90 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -311,7 +311,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging val src = new Path(path) val destFileName = src.getName val destFile = - new File(session.getResourcesSessionDir + destFileName).getCanonicalPath + new File(session.getResourcesSessionDir, destFileName).getCanonicalPath val fs = src.getFileSystem(session.sparkSession.sparkContext.hadoopConfiguration) fs.copyToLocalFile(src, new Path(destFile)) FileUtil.chmod(destFile, "ugo+rx", true) From bf309017c9d491d7f013fb9e0a73e6525db23a7b Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 20 Feb 2019 11:56:20 +0800 Subject: [PATCH 5/9] dectect associate plans in parse phase --- .../scala/org/apache/spark/sql/SparkSQLUtils.scala | 9 +++++++++ .../kyuubi/operation/KyuubiOperation.scala | 14 ++++++++------ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/spark/sql/SparkSQLUtils.scala b/kyuubi-server/src/main/scala/org/apache/spark/sql/SparkSQLUtils.scala index 1f582a47f..1b3ab9ed2 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/sql/SparkSQLUtils.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/sql/SparkSQLUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.types.DataType @@ -29,4 +30,12 @@ object SparkSQLUtils { def getUserJarClassLoader(sparkSession: SparkSession): ClassLoader = { sparkSession.sharedState.jarClassLoader } + + def parsePlan(sparkSession: SparkSession, statement: String): LogicalPlan = { + sparkSession.sessionState.sqlParser.parsePlan(statement) + } + + def toDataFrame(sparkSession: SparkSession, plan: LogicalPlan): DataFrame = { + Dataset.ofRows(sparkSession, plan) + } } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 483123b90..658d65ece 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -340,22 +340,24 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } session.sparkSession.sparkContext.setJobGroup(statementId, statement) KyuubiSparkUtil.setActiveSparkContext(session.sparkSession.sparkContext) - result = session.sparkSession.sql(statement) - KyuubiServerMonitor.getListener(session.getUserName).foreach { - _.onStatementParsed(statementId, result.queryExecution.toString()) - } - result.queryExecution.logical match { + + val parsedPlan = SparkSQLUtils.parsePlan(session.sparkSession, statement) + parsedPlan match { case c if c.nodeName == "CreateFunctionCommand" => val resources = ReflectUtils.getFieldValue(c, "resources").asInstanceOf[Seq[FunctionResource]] resources.foreach { case FunctionResource(_, uri) => - localizeAndAndResource(uri) + localizeAndAndResource(uri) } case a if a.nodeName == "AddJarCommand" => val path = ReflectUtils.getFieldValue(a, "path").asInstanceOf[String] localizeAndAndResource(path) case _ => } + result = SparkSQLUtils.toDataFrame(session.sparkSession, parsedPlan) + KyuubiServerMonitor.getListener(session.getUserName).foreach { + _.onStatementParsed(statementId, result.queryExecution.toString()) + } debug(result.queryExecution.toString()) iter = if (incrementalCollect) { info("Executing query in incremental collection mode") From 0495a55f9b466331fd5944063f33ca60e759dc6c Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 21 Feb 2019 11:15:58 +0800 Subject: [PATCH 6/9] ut --- kyuubi-server/src/test/resources/log4j.xml | 2 +- .../operation/KyuubiOperationSuite.scala | 12 +- .../kyuubi/server/FrontendServiceSuite.scala | 160 +++++++++++++++++- 3 files changed, 169 insertions(+), 5 deletions(-) diff --git a/kyuubi-server/src/test/resources/log4j.xml b/kyuubi-server/src/test/resources/log4j.xml index 129dcc1ef..37072aab6 100644 --- a/kyuubi-server/src/test/resources/log4j.xml +++ b/kyuubi-server/src/test/resources/log4j.xml @@ -33,4 +33,4 @@ - \ No newline at end of file + diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala index d6f35d7d1..d165cc707 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala @@ -136,11 +136,19 @@ class KyuubiOperationSuite extends SparkFunSuite { assert(s.getUserName === userName) } - test("testDEFAULT_FETCH_ORIENTATION") { + test("DEFAULT_FETCH_ORIENTATION") { assert(KyuubiOperation.DEFAULT_FETCH_ORIENTATION === FETCH_NEXT) } - test("testDEFAULT_FETCH_MAX_ROWS") { + test("DEFAULT_FETCH_MAX_ROWS") { assert(KyuubiOperation.DEFAULT_FETCH_MAX_ROWS === 100) } + + test("is resource downloadable") { + intercept[IllegalArgumentException](KyuubiOperation.isResourceDownloadable(null)) + intercept[IllegalArgumentException](KyuubiOperation.isResourceDownloadable("")) + assert(KyuubiOperation.isResourceDownloadable("hdfs://a/b/c.jar")) + assert(!KyuubiOperation.isResourceDownloadable("file://a/b/c.jar")) + assert(!KyuubiOperation.isResourceDownloadable("dfs://a/b/c.jar")) + } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala index b29b2ca55..111203bf1 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala @@ -21,6 +21,7 @@ import java.net.InetAddress import scala.collection.JavaConverters._ +import org.apache.hadoop.hdfs.MiniDFSCluster import org.apache.hive.service.cli.thrift._ import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite} import org.apache.spark.KyuubiConf._ @@ -55,7 +56,7 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu super.afterAll() } - test(" test new fe service") { + test("test new fe service") { val feService = new FrontendService(beService) feService.getConf should be(null) feService.getStartTime should be(0) @@ -206,7 +207,7 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu val req3 = new TGetOperationStatusReq(resp2.getOperationHandle) val resp3 = feService.GetOperationStatus(req3) resp3.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) - Thread.sleep(10000) + Thread.sleep(5000) val req4 = new TFetchResultsReq(resp2.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) val resp4 = feService.FetchResults(req4) resp4.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) @@ -253,4 +254,159 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu feService.stop() } } + + test("alter database") { + withFEServiceAndHandle { (fe, handle) => + val req = new TExecuteStatementReq(handle, + "alter database default set dbproperties ('kent'='yao')") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + tFetchResultsResp.getResults.getRows.size() should be(0) + } + } + + test("alter schema") { + withFEServiceAndHandle { (fe, handle) => + val req = new TExecuteStatementReq(handle, + "alter schema default set dbproperties ('kent'='yao')") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + tFetchResultsResp.getResults.getRows.size() should be(0) + } + } + + test("alter table name") { + withFEServiceAndHandle { (fe, handle) => + val ct = new TExecuteStatementReq(handle, "create table default.src(key int) using parquet") + fe.ExecuteStatement(ct) + Thread.sleep(5000) + val req = new TExecuteStatementReq(handle, "alter table default.src rename to default.src2") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + tFetchResultsResp.getResults.getRows.size() should be(0) + } + } + + test("alter table set properties") { + withFEServiceAndHandle { (fe, handle) => + val ct = new TExecuteStatementReq(handle, "create table default.src(key int) using parquet") + fe.ExecuteStatement(ct) + Thread.sleep(5000) + val req = new TExecuteStatementReq(handle, + "alter table default.src set tblproperties ('kent'='yao')") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + tFetchResultsResp.getResults.getRows.size() should be(0) + } + } + + test("alter table unset properties") { + withFEServiceAndHandle { (fe, handle) => + val ct = new TExecuteStatementReq(handle, + "create table default.src(key int) using parquet tblproperties ('kent'='yao')") + fe.ExecuteStatement(ct) + Thread.sleep(5000) + val req = new TExecuteStatementReq(handle, + "alter table default.src unset tblproperties if exists ('kent', 'yao')") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + tFetchResultsResp.getResults.getRows.size() should be(0) + } + } + + test("add jar hdfs") { + withFEServiceAndHandle { (fe, handle) => + val req = new TExecuteStatementReq(handle, "add jar hdfs://a/b/test.jar") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) + } + } + + test("add jar local") { + withFEServiceAndHandle { (fe, handle) => + val req = new TExecuteStatementReq(handle, "add jar file://a/b/test.jar") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + tFetchResultsResp.getResults.getRows.get(0).getColVals.get(0).getI32Val.getValue should be(0) + } + } + + test("create function") { + withFEServiceAndHandle { (fe, handle) => + val req = new TExecuteStatementReq(handle, + "create temporary function testfunc as 'testClass' using jar 'hdfs://a/b/test.jar'") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.ERROR_STATUS) + } + } + + def withFEServiceAndHandle(block: (FrontendService, TSessionHandle) => Unit): Unit = { + val feService = new FrontendService(beService) + try { + feService.init(conf) + feService.start() + val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) + val resp = feService.OpenSession(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + val handle = resp.getSessionHandle + block(feService, handle) + } finally { + feService.stop() + } + } } From 443ef11fce3f71469face590618e9324a2b9e002 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 21 Feb 2019 16:53:29 +0800 Subject: [PATCH 7/9] add more uts --- .../kyuubi/session/SessionManager.scala | 2 +- .../kyuubi/session/KyuubiSessionSuite.scala | 33 ++++++++++++++++--- .../kyuubi/session/SessionManagerSuite.scala | 26 ++++++++++++++- 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala index b012d1299..4a27313bb 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/session/SessionManager.scala @@ -114,7 +114,7 @@ private[kyuubi] class SessionManager private( resourcesRootDir = new File(conf.get(OPERATION_DOWNLOADED_RESOURCES_DIR)) if (resourcesRootDir.exists() && !resourcesRootDir.isDirectory) { throw new ServiceException("The operation downloaded resources directory exists but is not" + - s" a directory + ${resourcesRootDir.getAbsolutePath}") + s" a directory ${resourcesRootDir.getAbsolutePath}") } if (!resourcesRootDir.exists() && !resourcesRootDir.mkdirs()) { throw new ServiceException("Unable to create the operation downloaded resources directory" + diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/KyuubiSessionSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/KyuubiSessionSuite.scala index d712dd0c7..e0fa2e1ba 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/KyuubiSessionSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/KyuubiSessionSuite.scala @@ -22,7 +22,6 @@ import java.io.File import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkFunSuite} -import org.apache.spark.KyuubiConf.LOGGING_OPERATION_LOG_DIR import org.apache.spark.sql.SparkSession import yaooqinn.kyuubi.KyuubiSQLException @@ -34,6 +33,8 @@ import yaooqinn.kyuubi.utils.ReflectUtils class KyuubiSessionSuite extends SparkFunSuite { + import KyuubiConf._ + var server: KyuubiServer = _ var session: KyuubiSession = _ var spark: SparkSession = _ @@ -105,10 +106,8 @@ class KyuubiSessionSuite extends SparkFunSuite { } test("set operation log session dir") { - val operationLogRootDir = new File(server.getConf.get(LOGGING_OPERATION_LOG_DIR.key)) - operationLogRootDir.setReadable(true) - operationLogRootDir.setWritable(true) - operationLogRootDir.setExecutable(true) + val operationLogRootDir = new File(server.getConf.get(LOGGING_OPERATION_LOG_DIR)) + operationLogRootDir.mkdirs() session.setOperationLogSessionDir(operationLogRootDir) assert(session.isOperationLogEnabled) assert(operationLogRootDir.exists()) @@ -128,6 +127,30 @@ class KyuubiSessionSuite extends SparkFunSuite { operationLogRootDir.setExecutable(true) } + test("set resources session dir") { + val resourceRoot = new File(server.getConf.get(OPERATION_DOWNLOADED_RESOURCES_DIR)) + resourceRoot.mkdirs() + resourceRoot.deleteOnExit() + assert(resourceRoot.isDirectory) + session.setResourcesSessionDir(resourceRoot) + val subDir = resourceRoot.listFiles().head + assert(subDir.getName === KyuubiSparkUtil.getCurrentUserName) + val resourceDir = subDir.listFiles().head + assert(resourceDir.getName === session.getSessionHandle.getSessionId + "_resources") + session.setResourcesSessionDir(resourceRoot) + assert(subDir.listFiles().length === 1, "directory should already exists") + assert(resourceDir.delete()) + resourceDir.createNewFile() + assert(resourceDir.isFile) + val e1 = intercept[RuntimeException](session.setResourcesSessionDir(resourceRoot)) + assert(e1.getMessage.startsWith("The resources directory exists but is not a directory")) + resourceDir.delete() + subDir.setWritable(false) + val e2 = intercept[RuntimeException](session.setResourcesSessionDir(resourceRoot)) + assert(e2.getMessage.startsWith("Couldn't create session resources directory")) + subDir.setWritable(true) + } + test("get no operation time") { assert(session.getNoOperationTime !== 0L) } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala index 91de06dc2..64732a415 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala @@ -24,11 +24,13 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite} import yaooqinn.kyuubi.KyuubiSQLException -import yaooqinn.kyuubi.service.State +import yaooqinn.kyuubi.service.{ServiceException, State} import yaooqinn.kyuubi.utils.ReflectUtils class SessionManagerSuite extends SparkFunSuite { + import KyuubiConf._ + test("init operation log") { val logRoot = UUID.randomUUID().toString val logRoot2 = logRoot + "/sub" @@ -67,6 +69,28 @@ class SessionManagerSuite extends SparkFunSuite { sessionManager4.stop() } + test("init resources root dir") { + val conf = new SparkConf(true).set(KyuubiConf.LOGGING_OPERATION_ENABLED.key, "false") + KyuubiSparkUtil.setupCommonConfig(conf) + val sessionManager = new SessionManager() + + sessionManager.init(conf) + val resourcesRoot = new File(conf.get(OPERATION_DOWNLOADED_RESOURCES_DIR)) + assert(resourcesRoot.exists()) + assert(resourcesRoot.isDirectory) + resourcesRoot.delete() + resourcesRoot.createNewFile() + val e1 = intercept[ServiceException](sessionManager.init(conf)) + assert(e1.getMessage.startsWith( + "The operation downloaded resources directory exists but is not a directory")) + resourcesRoot.delete() + resourcesRoot.getParentFile.setWritable(false) + val e2 = intercept[ServiceException](sessionManager.init(conf)) + assert(e2.getMessage.startsWith("Unable to create the operation downloaded resources " + + "directory")) + resourcesRoot.getParentFile.setWritable(true) + } + test("start timeout checker") { val conf = new SparkConf().set(KyuubiConf.FRONTEND_SESSION_CHECK_INTERVAL.key, "-1") val sessionManager = new SessionManager() From 128928b2fc9b3dcf533e5ccfd510ef8daafc879a Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 21 Feb 2019 17:46:19 +0800 Subject: [PATCH 8/9] typo --- .travis.yml | 6 +++--- bin/kyuubi-logo | 2 +- .../scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index 76162441e..9367164b0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -30,13 +30,13 @@ jobs: include: - stage: spark2.3 language: scala - script: ./build/mvn install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V + script: ./build/mvn clean install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V - stage: spark2.2 language: scala - script: ./build/mvn install -Pspark-2.2 -Dmaven.javadoc.skip=true -B -V + script: ./build/mvn clean install -Pspark-2.2 -Dmaven.javadoc.skip=true -B -V - stage: spark2.1 language: scala - script: ./build/mvn install -Pspark-2.1 -Dmaven.javadoc.skip=true -B -V + script: ./build/mvn clean install -Pspark-2.1 -Dmaven.javadoc.skip=true -B -V after_success: - bash <(curl -s https://codecov.io/bash) diff --git a/bin/kyuubi-logo b/bin/kyuubi-logo index 10285b4ed..80bb6cd51 100644 --- a/bin/kyuubi-logo +++ b/bin/kyuubi-logo @@ -6,4 +6,4 @@ \ \_\ \_\/`____ \ \____/\ \____/ \ \_,__/\ \_\ \/_/\/_/`/___/> \/___/ \/___/ \/___/ \/_/ /\___/ - \/__/ \ No newline at end of file + \/__/ diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala index 64732a415..eb9a6c505 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala @@ -83,7 +83,7 @@ class SessionManagerSuite extends SparkFunSuite { val e1 = intercept[ServiceException](sessionManager.init(conf)) assert(e1.getMessage.startsWith( "The operation downloaded resources directory exists but is not a directory")) - resourcesRoot.delete() + assert(resourcesRoot.delete()) resourcesRoot.getParentFile.setWritable(false) val e2 = intercept[ServiceException](sessionManager.init(conf)) assert(e2.getMessage.startsWith("Unable to create the operation downloaded resources " + From a0a21f4eb397d0652c0a9a6448cc98d91c1d53de Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 21 Feb 2019 18:46:16 +0800 Subject: [PATCH 9/9] fix ut --- .../yaooqinn/kyuubi/session/SessionManagerSuite.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala index eb9a6c505..7cd34458f 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/session/SessionManagerSuite.scala @@ -85,10 +85,11 @@ class SessionManagerSuite extends SparkFunSuite { "The operation downloaded resources directory exists but is not a directory")) assert(resourcesRoot.delete()) resourcesRoot.getParentFile.setWritable(false) - val e2 = intercept[ServiceException](sessionManager.init(conf)) - assert(e2.getMessage.startsWith("Unable to create the operation downloaded resources " + - "directory")) - resourcesRoot.getParentFile.setWritable(true) + try { + intercept[Exception](sessionManager.init(conf)) + } finally { + resourcesRoot.getParentFile.setWritable(true) + } } test("start timeout checker") {