From 32c95eacf1897e78dda5e7607de8d1c170e61342 Mon Sep 17 00:00:00 2001 From: Kent Yao <11215016@zju.edu.cn> Date: Tue, 26 Mar 2019 17:54:40 +0800 Subject: [PATCH] [KYUUBI-170]persist and coalesce before to local iterator while using incremental result collect (#171) * fix #170 persist and coalesce before to local iterator while using incremental result collect * add log * use df.rdd.getPartitionNum as upper limit * log & ut * bypass coalesce if df.rdd.partition.size is small * add ut * code cov * code cov --- .../scala/org/apache/spark/KyuubiConf.scala | 21 +++++ .../org/apache/spark/KyuubiSparkUtil.scala | 1 + .../kyuubi/operation/KyuubiOperation.scala | 29 ++++++- .../kyuubi/spark/SparkSessionWithUGI.scala | 10 +-- .../kyuubi/utils/KyuubiHadoopUtil.scala | 12 ++- .../operation/KyuubiOperationSuite.scala | 17 ++++ .../kyuubi/server/FrontendServiceSuite.scala | 86 +++++++++++++++++-- .../kyuubi/utils/KyuubiHadoopUtilSuite.scala | 32 +++++-- 8 files changed, 183 insertions(+), 25 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala index 6d5805ca5..ba733e9a3 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala @@ -385,6 +385,27 @@ object KyuubiConf { .booleanConf .createWithDefault(false) + val OPERATION_INCREMENTAL_RDD_PARTITIONS_LIMIT: ConfigEntry[Int] = + KyuubiConfigBuilder("spark.kyuubi.operation.incremental.rdd.partitions.limit") + .doc("In incremental result collection, when the partition number of the rdd underlying the" + + " query is great than this setting, Kyuubi will try to coalesce first before calling" + + " toLocalIterator") + .intConf + .createWithDefault(50) + + val OPERATION_INCREMENTAL_PARTITION_ROWS: ConfigEntry[Int] = + KyuubiConfigBuilder("spark.kyuubi.operation.incremental.partition.rows") + .doc("In incremental result collection, Spark will run job not task on a single partition," + + " which sequentially get results one partition by one to the driver. we use this" + + " configuration and the total size of the query output to calculate the partition number" + + " to coalesce to. Use Math.min(`total size of the query output` / `spark.kyuubi.operation" + + ".incremental.collect.partition.rows`, `df.rdd.partition.size`) to limit the total number" + + " of jobs. In case of OutOfMemoryError happens frequently in executor side which lead to" + + " job failures, we suggest to decrease this number. Otherwise for performance reasons," + + " increase this setting to reduce the size of sequential Spark jobs") + .intConf + .createWithDefault(20000) + val OPERATION_RESULT_LIMIT: ConfigEntry[Int] = KyuubiConfigBuilder("spark.kyuubi.operation.result.limit") .doc("In non-incremental result collection mode, set this to a positive value to limit the" + diff --git a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala index 82e17e58f..a6ff30082 100644 --- a/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala +++ b/kyuubi-server/src/main/scala/org/apache/spark/KyuubiSparkUtil.scala @@ -252,6 +252,7 @@ object KyuubiSparkUtil extends Logging { } def setActiveSparkContext(sc: SparkContext): Unit = { + info(s"Application ${sc.applicationId} has been activated") SparkContext.setActiveContext(sc, allowMultipleContexts = true) } diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala index 98ad355c5..dc5d7c0e0 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/operation/KyuubiOperation.scala @@ -23,6 +23,7 @@ import java.util.UUID import java.util.concurrent.{Future, RejectedExecutionException} import scala.collection.JavaConverters._ +import scala.util.{Success, Try} import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils @@ -200,7 +201,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } } - private[this] def cleanupOperationLog(): Unit = { + private def cleanupOperationLog(): Unit = { if (isOperationLogEnabled) { if (operationLog == null) { error("Operation [ " + opHandle.getHandleIdentifier + " ] " + @@ -381,8 +382,27 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } debug(result.queryExecution.toString()) iter = if (incrementalCollect) { - info("Executing query in incremental collection mode") - result.toLocalIterator().asScala + val numParts = result.rdd.getNumPartitions + info(s"Executing query in incremental mode, running $numParts jobs before optimization") + val limit = conf.get(OPERATION_INCREMENTAL_RDD_PARTITIONS_LIMIT).toInt + if (numParts > limit) { + val partRows = conf.get(OPERATION_INCREMENTAL_PARTITION_ROWS).toInt + val count = Try { result.persist.count() } match { + case Success(outputSize) => + val num = math.min(math.max(outputSize / partRows, 1), numParts) + info(s"The total query output is $outputSize and will be coalesced to $num of" + + s" partitions with $partRows rows on average") + num + case _ => + warn("Failed to calculate the query output size, do not coalesce") + numParts + } + info(s"Executing query in incremental mode, running $count jobs after optimization") + result.coalesce(count.toInt).toLocalIterator().asScala + } else { + info(s"Executing query in incremental mode, running $numParts jobs without optimization") + result.toLocalIterator().asScala + } } else { val resultLimit = conf.get(OPERATION_RESULT_LIMIT).toInt if (resultLimit >= 0) { @@ -396,7 +416,8 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging } catch { case e: KyuubiSQLException => if (!isClosedOrCanceled) { - onStatementError(statementId, e.getMessage, KyuubiSparkUtil.exceptionString(e)) + val err = KyuubiSparkUtil.exceptionString(e) + onStatementError(statementId, e.getMessage, err) throw e } case e: ParseException => diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala index 625f707b8..1a92c1a41 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/spark/SparkSessionWithUGI.scala @@ -172,7 +172,7 @@ class SparkSessionWithUGI( } catch { case e: Exception => if (conf.getOption("spark.master").contains("yarn")) { - KyuubiHadoopUtil.doAs(user) { + KyuubiHadoopUtil.doAsAndLogNonFatal(user) { KyuubiHadoopUtil.killYarnAppByName(appName) } } @@ -181,11 +181,9 @@ class SparkSessionWithUGI( val msg = s""" |Get SparkSession for [$userName] failed - |Diagnosis: - |${sparkException.map(_.getMessage).getOrElse(cause.getMessage)} - | - |Please check if the specified yarn queue [${conf.getOption(QUEUE).getOrElse("")}] - |is available or has sufficient resources left + |Diagnosis: ${sparkException.map(_.getMessage).getOrElse(cause.getMessage)} + |Please check if the specified yarn queue [${conf.getOption(QUEUE) + .getOrElse("")}] is available or has sufficient resources left """.stripMargin val ke = new KyuubiSQLException(msg, "08S01", 1001, cause) sparkException.foreach(ke.addSuppressed) diff --git a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala index a0cf8e697..cf057a6c8 100644 --- a/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala +++ b/kyuubi-server/src/main/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtil.scala @@ -21,12 +21,14 @@ import java.lang.reflect.UndeclaredThrowableException import java.security.PrivilegedExceptionAction import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.records.ApplicationReport import org.apache.hadoop.yarn.api.records.YarnApplicationState._ import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.spark.KyuubiSparkUtil import yaooqinn.kyuubi.Logging @@ -67,7 +69,15 @@ private[kyuubi] object KyuubiHadoopUtil extends Logging { override def run(): T = f }) } catch { - case e: UndeclaredThrowableException => throw Option(e.getCause).getOrElse(e) + case NonFatal(e) => throw KyuubiSparkUtil.findCause(e) + } + } + + def doAsAndLogNonFatal(user: UserGroupInformation)(f: => Unit): Unit = { + try { + doAs(user)(f) + } catch { + case NonFatal(e) => error(s"Failed to operate as ${user.getShortUserName}", e) } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala index 0fbb2e373..069b5190c 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/operation/KyuubiOperationSuite.scala @@ -190,4 +190,21 @@ class KyuubiOperationSuite extends SparkFunSuite with MockitoSugar { val e3 = intercept[KyuubiSQLException](op.transform(plan5)) assert(e3.getMessage.startsWith("Resource Type")) } + + test("is closed or canceled") { + val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement) + assert(!op.isClosedOrCanceled) + op.cancel() + assert(op.isClosedOrCanceled) + op.close() + assert(op.isClosedOrCanceled) + val op2 = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement) + op2.close() + assert(op2.isClosedOrCanceled) + val op3 = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, null) + op3.cancel() + op3.close() + assert(op3.isClosedOrCanceled) + + } } diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala index abc8bdc68..b86f4b84f 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/server/FrontendServiceSuite.scala @@ -18,12 +18,11 @@ package yaooqinn.kyuubi.server import java.net.InetAddress -import java.util.UUID import scala.collection.JavaConverters._ import org.apache.hive.service.cli.thrift._ -import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite} +import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite} import org.apache.spark.KyuubiConf._ import org.scalatest.Matchers @@ -256,7 +255,7 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu } test("alter database") { - withFEServiceAndHandle { (fe, handle) => + val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { val req = new TExecuteStatementReq(handle, "alter database default set dbproperties ('kent'='yao')") val resp = fe.ExecuteStatement(req) @@ -269,10 +268,13 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) tFetchResultsResp.getResults.getRows.size() should be(0) } + withFEServiceAndHandle(block) + withFEServiceAndHandleInc(block) + withFEServiceAndHandleIncAndCal(block) } test("alter schema") { - withFEServiceAndHandle { (fe, handle) => + val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { val req = new TExecuteStatementReq(handle, "alter schema default set dbproperties ('kent'='yao')") val resp = fe.ExecuteStatement(req) @@ -286,11 +288,15 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) tFetchResultsResp.getResults.getRows.size() should be(0) } + withFEServiceAndHandle(block) + withFEServiceAndHandleInc(block) + withFEServiceAndHandleIncAndCal(block) } test("alter table name") { - withFEServiceAndHandle { (fe, handle) => - val ct = new TExecuteStatementReq(handle, "create table default.src(key int) using parquet") + val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { + val ct = new TExecuteStatementReq(handle, + "create table if not exists default.src(key int) using parquet") fe.ExecuteStatement(ct) Thread.sleep(5000) val req = new TExecuteStatementReq(handle, "alter table default.src rename to default.src2") @@ -306,10 +312,33 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) tFetchResultsResp.getResults.getRows.size() should be(0) } + withFEServiceAndHandle(block) + } + + test("alter table name inc") { + val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { + val ct = new TExecuteStatementReq(handle, + "create table if not exists default.src3(key int) using parquet") + fe.ExecuteStatement(ct) + Thread.sleep(5000) + val req = new TExecuteStatementReq(handle, "alter table default.src3 rename to default.src4") + val resp = fe.ExecuteStatement(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + + val tFetchResultsReq = + new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50) + val dt = new TExecuteStatementReq(handle, "drop table src4") + fe.ExecuteStatement(dt) + Thread.sleep(5000) + val tFetchResultsResp = fe.FetchResults(tFetchResultsReq) + tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + tFetchResultsResp.getResults.getRows.size() should be(0) + } + withFEServiceAndHandleInc(block) } test("alter table set properties") { - withFEServiceAndHandle { (fe, handle) => + val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => { val ct = new TExecuteStatementReq(handle, "create table default.src(key int) using parquet") fe.ExecuteStatement(ct) Thread.sleep(5000) @@ -326,6 +355,8 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) tFetchResultsResp.getResults.getRows.size() should be(0) } + withFEServiceAndHandle(block) + withFEServiceAndHandleInc(block) } test("alter table unset properties") { @@ -380,7 +411,7 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu } } - test("create function") { + test("create temporary function") { withFEServiceAndHandle { (fe, handle) => val req = new TExecuteStatementReq(handle, "create temporary function testfunc as 'testClass' using jar 'hdfs://a/b/test.jar'") @@ -402,6 +433,45 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu feService.init(conf) feService.start() val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) + req.setUsername(user) + val resp = feService.OpenSession(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + val handle = resp.getSessionHandle + block(feService, handle) + } finally { + feService.stop() + } + } + + def withFEServiceAndHandleInc(block: (FrontendService, TSessionHandle) => Unit): Unit = { + val feService = new FrontendService(beService) + try { + feService.init(conf) + feService.start() + val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) + req.setUsername(user) + req.setConfiguration( + Map("set:hivevar:" + KyuubiConf.OPERATION_INCREMENTAL_COLLECT.key -> "true").asJava) + val resp = feService.OpenSession(req) + resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) + val handle = resp.getSessionHandle + block(feService, handle) + } finally { + feService.stop() + } + } + + def withFEServiceAndHandleIncAndCal(block: (FrontendService, TSessionHandle) => Unit): Unit = { + val feService = new FrontendService(beService) + try { + feService.init(conf) + feService.start() + val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1) + req.setUsername(user) + req.setConfiguration( + Map("set:hivevar:" + KyuubiConf.OPERATION_INCREMENTAL_COLLECT.key -> "true", + "set:hivevar:" + KyuubiConf.OPERATION_INCREMENTAL_RDD_PARTITIONS_LIMIT.key -> "-1") + .asJava) val resp = feService.OpenSession(req) resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS) val handle = resp.getSessionHandle diff --git a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala index ada11ec03..e2d640cd2 100644 --- a/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala +++ b/kyuubi-server/src/test/scala/yaooqinn/kyuubi/utils/KyuubiHadoopUtilSuite.scala @@ -18,7 +18,8 @@ package yaooqinn.kyuubi.utils import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.yarn.api.records.{ApplicationId, ContainerLaunchContext, Resource, YarnApplicationState} +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster @@ -52,6 +53,12 @@ class KyuubiHadoopUtilSuite extends SparkFunSuite with BeforeAndAfterEach { super.beforeEach() } + test("kill yarn app") { + val report = new ApplicationReportPBImpl() + KyuubiHadoopUtil.killYarnApp(report) + intercept[NullPointerException](KyuubiHadoopUtil.killYarnApp(null)) + } + test("kill yarn application by name") { withYarnApplication { id => KyuubiHadoopUtil.killYarnAppByName(id.toString) @@ -67,7 +74,6 @@ class KyuubiHadoopUtilSuite extends SparkFunSuite with BeforeAndAfterEach { } test("do as") { - val user1 = UserGroupInformation.getCurrentUser val userName1 = user1.getShortUserName val userName2 = "test" @@ -77,14 +83,28 @@ class KyuubiHadoopUtilSuite extends SparkFunSuite with BeforeAndAfterEach { UserGroupInformation.getCurrentUser.getShortUserName == expectedUser } - KyuubiHadoopUtil.doAs(user1) { - assert(testf(userName1)) + def testf2(expectedUser: String): Boolean = { + throw new RuntimeException("testf2") } - KyuubiHadoopUtil.doAs(user2) { - assert(testf(userName2)) + def testf3(expectedUser: String): Boolean = { + throw new OutOfMemoryError("testf3") } + KyuubiHadoopUtil.doAs(user1)(assert(testf(userName1))) + val e1 = intercept[RuntimeException](KyuubiHadoopUtil.doAs(user1)(assert(testf2(userName1)))) + assert(e1.getMessage === "testf2") + val e2 = intercept[OutOfMemoryError](KyuubiHadoopUtil.doAs(user1)(assert(testf3(userName1)))) + assert(e2.getMessage === "testf3") + + KyuubiHadoopUtil.doAsAndLogNonFatal(user1)(assert(testf(userName1))) + KyuubiHadoopUtil.doAsAndLogNonFatal(user1)(assert(testf2(userName1))) + val e3 = intercept[OutOfMemoryError]( + KyuubiHadoopUtil.doAsAndLogNonFatal(user1)(assert(testf3(userName1)))) + assert(e3.getMessage === "testf3") + + KyuubiHadoopUtil.doAs(user2)(assert(testf(userName2))) + KyuubiHadoopUtil.doAs(user1) { KyuubiHadoopUtil.doAsRealUser { assert(testf(userName1))