[KYUUBI-170]persist and coalesce before to local iterator while using incremental result collect (#171)

* fix #170 persist and coalesce before to local iterator while using incremental result collect

*  add log

* use df.rdd.getPartitionNum as upper limit

* log & ut

* bypass coalesce if df.rdd.partition.size is small

* add ut

* code cov

* code cov
This commit is contained in:
Kent Yao 2019-03-26 17:54:40 +08:00 committed by GitHub
parent 7790bbd9df
commit 32c95eacf1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 183 additions and 25 deletions

View File

@ -385,6 +385,27 @@ object KyuubiConf {
.booleanConf
.createWithDefault(false)
val OPERATION_INCREMENTAL_RDD_PARTITIONS_LIMIT: ConfigEntry[Int] =
KyuubiConfigBuilder("spark.kyuubi.operation.incremental.rdd.partitions.limit")
.doc("In incremental result collection, when the partition number of the rdd underlying the" +
" query is great than this setting, Kyuubi will try to coalesce first before calling" +
" toLocalIterator")
.intConf
.createWithDefault(50)
val OPERATION_INCREMENTAL_PARTITION_ROWS: ConfigEntry[Int] =
KyuubiConfigBuilder("spark.kyuubi.operation.incremental.partition.rows")
.doc("In incremental result collection, Spark will run job not task on a single partition," +
" which sequentially get results one partition by one to the driver. we use this" +
" configuration and the total size of the query output to calculate the partition number" +
" to coalesce to. Use Math.min(`total size of the query output` / `spark.kyuubi.operation" +
".incremental.collect.partition.rows`, `df.rdd.partition.size`) to limit the total number" +
" of jobs. In case of OutOfMemoryError happens frequently in executor side which lead to" +
" job failures, we suggest to decrease this number. Otherwise for performance reasons," +
" increase this setting to reduce the size of sequential Spark jobs")
.intConf
.createWithDefault(20000)
val OPERATION_RESULT_LIMIT: ConfigEntry[Int] =
KyuubiConfigBuilder("spark.kyuubi.operation.result.limit")
.doc("In non-incremental result collection mode, set this to a positive value to limit the" +

View File

@ -252,6 +252,7 @@ object KyuubiSparkUtil extends Logging {
}
def setActiveSparkContext(sc: SparkContext): Unit = {
info(s"Application ${sc.applicationId} has been activated")
SparkContext.setActiveContext(sc, allowMultipleContexts = true)
}

View File

@ -23,6 +23,7 @@ import java.util.UUID
import java.util.concurrent.{Future, RejectedExecutionException}
import scala.collection.JavaConverters._
import scala.util.{Success, Try}
import scala.util.control.NonFatal
import org.apache.commons.lang3.StringUtils
@ -200,7 +201,7 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
}
}
private[this] def cleanupOperationLog(): Unit = {
private def cleanupOperationLog(): Unit = {
if (isOperationLogEnabled) {
if (operationLog == null) {
error("Operation [ " + opHandle.getHandleIdentifier + " ] " +
@ -381,8 +382,27 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
}
debug(result.queryExecution.toString())
iter = if (incrementalCollect) {
info("Executing query in incremental collection mode")
result.toLocalIterator().asScala
val numParts = result.rdd.getNumPartitions
info(s"Executing query in incremental mode, running $numParts jobs before optimization")
val limit = conf.get(OPERATION_INCREMENTAL_RDD_PARTITIONS_LIMIT).toInt
if (numParts > limit) {
val partRows = conf.get(OPERATION_INCREMENTAL_PARTITION_ROWS).toInt
val count = Try { result.persist.count() } match {
case Success(outputSize) =>
val num = math.min(math.max(outputSize / partRows, 1), numParts)
info(s"The total query output is $outputSize and will be coalesced to $num of" +
s" partitions with $partRows rows on average")
num
case _ =>
warn("Failed to calculate the query output size, do not coalesce")
numParts
}
info(s"Executing query in incremental mode, running $count jobs after optimization")
result.coalesce(count.toInt).toLocalIterator().asScala
} else {
info(s"Executing query in incremental mode, running $numParts jobs without optimization")
result.toLocalIterator().asScala
}
} else {
val resultLimit = conf.get(OPERATION_RESULT_LIMIT).toInt
if (resultLimit >= 0) {
@ -396,7 +416,8 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
} catch {
case e: KyuubiSQLException =>
if (!isClosedOrCanceled) {
onStatementError(statementId, e.getMessage, KyuubiSparkUtil.exceptionString(e))
val err = KyuubiSparkUtil.exceptionString(e)
onStatementError(statementId, e.getMessage, err)
throw e
}
case e: ParseException =>

View File

@ -172,7 +172,7 @@ class SparkSessionWithUGI(
} catch {
case e: Exception =>
if (conf.getOption("spark.master").contains("yarn")) {
KyuubiHadoopUtil.doAs(user) {
KyuubiHadoopUtil.doAsAndLogNonFatal(user) {
KyuubiHadoopUtil.killYarnAppByName(appName)
}
}
@ -181,11 +181,9 @@ class SparkSessionWithUGI(
val msg =
s"""
|Get SparkSession for [$userName] failed
|Diagnosis:
|${sparkException.map(_.getMessage).getOrElse(cause.getMessage)}
|
|Please check if the specified yarn queue [${conf.getOption(QUEUE).getOrElse("")}]
|is available or has sufficient resources left
|Diagnosis: ${sparkException.map(_.getMessage).getOrElse(cause.getMessage)}
|Please check if the specified yarn queue [${conf.getOption(QUEUE)
.getOrElse("")}] is available or has sufficient resources left
""".stripMargin
val ke = new KyuubiSQLException(msg, "08S01", 1001, cause)
sparkException.foreach(ke.addSuppressed)

View File

@ -21,12 +21,14 @@ import java.lang.reflect.UndeclaredThrowableException
import java.security.PrivilegedExceptionAction
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationReport
import org.apache.hadoop.yarn.api.records.YarnApplicationState._
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.KyuubiSparkUtil
import yaooqinn.kyuubi.Logging
@ -67,7 +69,15 @@ private[kyuubi] object KyuubiHadoopUtil extends Logging {
override def run(): T = f
})
} catch {
case e: UndeclaredThrowableException => throw Option(e.getCause).getOrElse(e)
case NonFatal(e) => throw KyuubiSparkUtil.findCause(e)
}
}
def doAsAndLogNonFatal(user: UserGroupInformation)(f: => Unit): Unit = {
try {
doAs(user)(f)
} catch {
case NonFatal(e) => error(s"Failed to operate as ${user.getShortUserName}", e)
}
}

View File

@ -190,4 +190,21 @@ class KyuubiOperationSuite extends SparkFunSuite with MockitoSugar {
val e3 = intercept[KyuubiSQLException](op.transform(plan5))
assert(e3.getMessage.startsWith("Resource Type"))
}
test("is closed or canceled") {
val op = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
assert(!op.isClosedOrCanceled)
op.cancel()
assert(op.isClosedOrCanceled)
op.close()
assert(op.isClosedOrCanceled)
val op2 = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, statement)
op2.close()
assert(op2.isClosedOrCanceled)
val op3 = sessionMgr.getOperationMgr.newExecuteStatementOperation(session, null)
op3.cancel()
op3.close()
assert(op3.isClosedOrCanceled)
}
}

View File

@ -18,12 +18,11 @@
package yaooqinn.kyuubi.server
import java.net.InetAddress
import java.util.UUID
import scala.collection.JavaConverters._
import org.apache.hive.service.cli.thrift._
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.apache.spark.KyuubiConf._
import org.scalatest.Matchers
@ -256,7 +255,7 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
}
test("alter database") {
withFEServiceAndHandle { (fe, handle) =>
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
val req = new TExecuteStatementReq(handle,
"alter database default set dbproperties ('kent'='yao')")
val resp = fe.ExecuteStatement(req)
@ -269,10 +268,13 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
tFetchResultsResp.getResults.getRows.size() should be(0)
}
withFEServiceAndHandle(block)
withFEServiceAndHandleInc(block)
withFEServiceAndHandleIncAndCal(block)
}
test("alter schema") {
withFEServiceAndHandle { (fe, handle) =>
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
val req = new TExecuteStatementReq(handle,
"alter schema default set dbproperties ('kent'='yao')")
val resp = fe.ExecuteStatement(req)
@ -286,11 +288,15 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
tFetchResultsResp.getResults.getRows.size() should be(0)
}
withFEServiceAndHandle(block)
withFEServiceAndHandleInc(block)
withFEServiceAndHandleIncAndCal(block)
}
test("alter table name") {
withFEServiceAndHandle { (fe, handle) =>
val ct = new TExecuteStatementReq(handle, "create table default.src(key int) using parquet")
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
val ct = new TExecuteStatementReq(handle,
"create table if not exists default.src(key int) using parquet")
fe.ExecuteStatement(ct)
Thread.sleep(5000)
val req = new TExecuteStatementReq(handle, "alter table default.src rename to default.src2")
@ -306,10 +312,33 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
tFetchResultsResp.getResults.getRows.size() should be(0)
}
withFEServiceAndHandle(block)
}
test("alter table name inc") {
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
val ct = new TExecuteStatementReq(handle,
"create table if not exists default.src3(key int) using parquet")
fe.ExecuteStatement(ct)
Thread.sleep(5000)
val req = new TExecuteStatementReq(handle, "alter table default.src3 rename to default.src4")
val resp = fe.ExecuteStatement(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val tFetchResultsReq =
new TFetchResultsReq(resp.getOperationHandle, TFetchOrientation.FETCH_NEXT, 50)
val dt = new TExecuteStatementReq(handle, "drop table src4")
fe.ExecuteStatement(dt)
Thread.sleep(5000)
val tFetchResultsResp = fe.FetchResults(tFetchResultsReq)
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
tFetchResultsResp.getResults.getRows.size() should be(0)
}
withFEServiceAndHandleInc(block)
}
test("alter table set properties") {
withFEServiceAndHandle { (fe, handle) =>
val block: (FrontendService, TSessionHandle) => Unit = (fe, handle) => {
val ct = new TExecuteStatementReq(handle, "create table default.src(key int) using parquet")
fe.ExecuteStatement(ct)
Thread.sleep(5000)
@ -326,6 +355,8 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
tFetchResultsResp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
tFetchResultsResp.getResults.getRows.size() should be(0)
}
withFEServiceAndHandle(block)
withFEServiceAndHandleInc(block)
}
test("alter table unset properties") {
@ -380,7 +411,7 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
}
}
test("create function") {
test("create temporary function") {
withFEServiceAndHandle { (fe, handle) =>
val req = new TExecuteStatementReq(handle,
"create temporary function testfunc as 'testClass' using jar 'hdfs://a/b/test.jar'")
@ -402,6 +433,45 @@ class FrontendServiceSuite extends SparkFunSuite with Matchers with SecuredFunSu
feService.init(conf)
feService.start()
val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
req.setUsername(user)
val resp = feService.OpenSession(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val handle = resp.getSessionHandle
block(feService, handle)
} finally {
feService.stop()
}
}
def withFEServiceAndHandleInc(block: (FrontendService, TSessionHandle) => Unit): Unit = {
val feService = new FrontendService(beService)
try {
feService.init(conf)
feService.start()
val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
req.setUsername(user)
req.setConfiguration(
Map("set:hivevar:" + KyuubiConf.OPERATION_INCREMENTAL_COLLECT.key -> "true").asJava)
val resp = feService.OpenSession(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val handle = resp.getSessionHandle
block(feService, handle)
} finally {
feService.stop()
}
}
def withFEServiceAndHandleIncAndCal(block: (FrontendService, TSessionHandle) => Unit): Unit = {
val feService = new FrontendService(beService)
try {
feService.init(conf)
feService.start()
val req = new TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
req.setUsername(user)
req.setConfiguration(
Map("set:hivevar:" + KyuubiConf.OPERATION_INCREMENTAL_COLLECT.key -> "true",
"set:hivevar:" + KyuubiConf.OPERATION_INCREMENTAL_RDD_PARTITIONS_LIMIT.key -> "-1")
.asJava)
val resp = feService.OpenSession(req)
resp.getStatus.getStatusCode should be(TStatusCode.SUCCESS_STATUS)
val handle = resp.getSessionHandle

View File

@ -18,7 +18,8 @@
package yaooqinn.kyuubi.utils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.{ApplicationId, ContainerLaunchContext, Resource, YarnApplicationState}
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
@ -52,6 +53,12 @@ class KyuubiHadoopUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
super.beforeEach()
}
test("kill yarn app") {
val report = new ApplicationReportPBImpl()
KyuubiHadoopUtil.killYarnApp(report)
intercept[NullPointerException](KyuubiHadoopUtil.killYarnApp(null))
}
test("kill yarn application by name") {
withYarnApplication { id =>
KyuubiHadoopUtil.killYarnAppByName(id.toString)
@ -67,7 +74,6 @@ class KyuubiHadoopUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
}
test("do as") {
val user1 = UserGroupInformation.getCurrentUser
val userName1 = user1.getShortUserName
val userName2 = "test"
@ -77,14 +83,28 @@ class KyuubiHadoopUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
UserGroupInformation.getCurrentUser.getShortUserName == expectedUser
}
KyuubiHadoopUtil.doAs(user1) {
assert(testf(userName1))
def testf2(expectedUser: String): Boolean = {
throw new RuntimeException("testf2")
}
KyuubiHadoopUtil.doAs(user2) {
assert(testf(userName2))
def testf3(expectedUser: String): Boolean = {
throw new OutOfMemoryError("testf3")
}
KyuubiHadoopUtil.doAs(user1)(assert(testf(userName1)))
val e1 = intercept[RuntimeException](KyuubiHadoopUtil.doAs(user1)(assert(testf2(userName1))))
assert(e1.getMessage === "testf2")
val e2 = intercept[OutOfMemoryError](KyuubiHadoopUtil.doAs(user1)(assert(testf3(userName1))))
assert(e2.getMessage === "testf3")
KyuubiHadoopUtil.doAsAndLogNonFatal(user1)(assert(testf(userName1)))
KyuubiHadoopUtil.doAsAndLogNonFatal(user1)(assert(testf2(userName1)))
val e3 = intercept[OutOfMemoryError](
KyuubiHadoopUtil.doAsAndLogNonFatal(user1)(assert(testf3(userName1))))
assert(e3.getMessage === "testf3")
KyuubiHadoopUtil.doAs(user2)(assert(testf(userName2)))
KyuubiHadoopUtil.doAs(user1) {
KyuubiHadoopUtil.doAsRealUser {
assert(testf(userName1))