[KYUUBI #6250] Drop support for Spark 3.1
# 🔍 Description ## Issue References 🔗 This pull request fixes #6250 ## Describe Your Solution 🔧 Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [x] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 Pass CI --- # Checklist 📝 - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6273 from liuxiaocs7/issue-6250. Closes #6250 c6ba1e88a [liuxiao] remove unused import db887ef9b [liuxiao] inline method 769da013b [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala 21dbd37a7 [liuxiao] remove unused import e869d571e [liuxiao] update for miss 7d755a879 [liuxiao] Drop support for Spark 3.1 Lead-authored-by: liuxiao <liuxiao2103@qq.com> Co-authored-by: Cheng Pan <pan3793@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
35d4b5f0c7
commit
42c619a5ac
5
.github/workflows/master.yml
vendored
5
.github/workflows/master.yml
vendored
@ -56,11 +56,6 @@ jobs:
|
||||
exclude-tags: [""]
|
||||
comment: ["normal"]
|
||||
include:
|
||||
- java: 8
|
||||
spark: '3.5'
|
||||
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.3 -Dspark.archive.name=spark-3.1.3-bin-hadoop3.2.tgz -Pzookeeper-3.6'
|
||||
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
|
||||
comment: 'verify-on-spark-3.1-binary'
|
||||
- java: 8
|
||||
spark: '3.5'
|
||||
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 -Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
|
||||
|
||||
@ -20,6 +20,7 @@
|
||||
## Upgrading from Kyuubi 1.9 to 1.10
|
||||
|
||||
* Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the future, please use `kyuubi-beeline` instead.
|
||||
* Since Kyuubi 1.10, the support of Spark engine for Spark 3.1 is removed.
|
||||
* Since Kyuubi 1.10, the support of Flink engine for Flink 1.16 is removed.
|
||||
|
||||
## Upgrading from Kyuubi 1.8 to 1.9
|
||||
|
||||
@ -43,7 +43,7 @@ pre-installed and the ``JAVA_HOME`` is correctly set to each component.
|
||||
**Kyuubi** Gateway \ |release| \ - Kyuubi Server
|
||||
Engine lib - Kyuubi Engine
|
||||
Beeline - Kyuubi Beeline
|
||||
**Spark** Engine 3.1 to 3.5 A Spark distribution
|
||||
**Spark** Engine 3.2 to 3.5 A Spark distribution
|
||||
**Flink** Engine 1.17 to 1.19 A Flink distribution
|
||||
**Trino** Engine N/A A Trino cluster allows to access via trino-client v411
|
||||
**Doris** Engine N/A A Doris cluster
|
||||
|
||||
@ -381,9 +381,6 @@ object SparkSQLEngine extends Logging {
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
if (KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION === "3.1") {
|
||||
warn("The support for Spark 3.1 is deprecated, and will be removed in the next version.")
|
||||
}
|
||||
val startedTime = System.currentTimeMillis()
|
||||
val submitTime = kyuubiConf.getOption(KYUUBI_ENGINE_SUBMIT_TIME_KEY) match {
|
||||
case Some(t) => t.toLong
|
||||
|
||||
@ -185,14 +185,13 @@ class ExecuteStatement(
|
||||
// Rename all col name to avoid duplicate columns
|
||||
val colName = range(0, result.schema.size).map(x => "col" + x)
|
||||
|
||||
val codec = if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") "zstd" else "zlib"
|
||||
// df.write will introduce an extra shuffle for the outermost limit, and hurt performance
|
||||
if (resultMaxRows > 0) {
|
||||
result.toDF(colName: _*).limit(resultMaxRows).write
|
||||
.option("compression", codec).format("orc").save(saveFileName.get)
|
||||
.option("compression", "zstd").format("orc").save(saveFileName.get)
|
||||
} else {
|
||||
result.toDF(colName: _*).write
|
||||
.option("compression", codec).format("orc").save(saveFileName.get)
|
||||
.option("compression", "zstd").format("orc").save(saveFileName.get)
|
||||
}
|
||||
info(s"Save result to ${saveFileName.get}")
|
||||
fetchOrcStatement = Some(new FetchOrcStatement(spark))
|
||||
|
||||
@ -35,10 +35,7 @@ import org.apache.spark.sql.execution.datasources.RecordReaderIterator
|
||||
import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
|
||||
import org.apache.spark.sql.types.StructType
|
||||
|
||||
import org.apache.kyuubi.KyuubiException
|
||||
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
|
||||
import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
|
||||
import org.apache.kyuubi.util.reflect.DynConstructors
|
||||
|
||||
class FetchOrcStatement(spark: SparkSession) {
|
||||
|
||||
@ -62,7 +59,7 @@ class FetchOrcStatement(spark: SparkSession) {
|
||||
val fullSchema = orcSchema.map(f =>
|
||||
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
|
||||
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
|
||||
val deserializer = getOrcDeserializer(orcSchema, colId)
|
||||
val deserializer = new OrcDeserializer(orcSchema, colId)
|
||||
orcIter = new OrcFileIterator(list)
|
||||
val iterRow = orcIter.map(value =>
|
||||
unsafeProjection(deserializer.deserialize(value)))
|
||||
@ -73,35 +70,6 @@ class FetchOrcStatement(spark: SparkSession) {
|
||||
def close(): Unit = {
|
||||
orcIter.close()
|
||||
}
|
||||
|
||||
private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
|
||||
try {
|
||||
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
|
||||
// SPARK-34535 changed the constructor signature of OrcDeserializer
|
||||
DynConstructors.builder()
|
||||
.impl(classOf[OrcDeserializer], classOf[StructType], classOf[Array[Int]])
|
||||
.build[OrcDeserializer]()
|
||||
.newInstance(
|
||||
orcSchema,
|
||||
colId)
|
||||
} else {
|
||||
DynConstructors.builder()
|
||||
.impl(
|
||||
classOf[OrcDeserializer],
|
||||
classOf[StructType],
|
||||
classOf[StructType],
|
||||
classOf[Array[Int]])
|
||||
.build[OrcDeserializer]()
|
||||
.newInstance(
|
||||
new StructType,
|
||||
orcSchema,
|
||||
colId)
|
||||
}
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
throw new KyuubiException("Failed to create OrcDeserializer", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[OrcStruct] {
|
||||
|
||||
@ -30,7 +30,6 @@ import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer}
|
||||
import org.apache.arrow.vector.types.pojo.{Schema => ArrowSchema}
|
||||
import org.apache.spark.TaskContext
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
|
||||
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
|
||||
import org.apache.spark.sql.execution.CollectLimitExec
|
||||
@ -158,9 +157,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
|
||||
val partsToScan =
|
||||
partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts))
|
||||
|
||||
// TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we
|
||||
// drop Spark 3.1 support.
|
||||
val sc = SparkSession.active.sparkContext
|
||||
val sc = collectLimitExec.session.sparkContext
|
||||
val res = sc.runJob(
|
||||
childRDD,
|
||||
(it: Iterator[InternalRow]) => {
|
||||
@ -347,6 +344,6 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
|
||||
largeVarTypes)
|
||||
}
|
||||
|
||||
// IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this for adapt Spark 3.1/3.2
|
||||
// IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this for adapt Spark 3.2
|
||||
final private val ARROW_IPC_OPTION_DEFAULT = new IpcOption()
|
||||
}
|
||||
|
||||
@ -23,11 +23,10 @@ import org.apache.spark.SparkContext
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.{DataFrame, Dataset, Row}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
|
||||
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
|
||||
import org.apache.spark.sql.execution.{CollectLimitExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
|
||||
import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
|
||||
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
|
||||
import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
|
||||
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
|
||||
@ -38,7 +37,6 @@ import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
|
||||
import org.apache.kyuubi.engine.spark.schema.RowSet
|
||||
import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils.quoteIfNeeded
|
||||
import org.apache.kyuubi.util.reflect.DynMethods
|
||||
import org.apache.kyuubi.util.reflect.ReflectUtils._
|
||||
|
||||
object SparkDatasetHelper extends Logging {
|
||||
|
||||
@ -48,7 +46,7 @@ object SparkDatasetHelper extends Logging {
|
||||
|
||||
def executeArrowBatchCollect: SparkPlan => Array[Array[Byte]] = {
|
||||
case adaptiveSparkPlan: AdaptiveSparkPlanExec =>
|
||||
executeArrowBatchCollect(finalPhysicalPlan(adaptiveSparkPlan))
|
||||
executeArrowBatchCollect(adaptiveSparkPlan.finalPhysicalPlan)
|
||||
// TODO: avoid extra shuffle if `offset` > 0
|
||||
case collectLimit: CollectLimitExec if offset(collectLimit) > 0 =>
|
||||
logWarning("unsupported offset > 0, an extra shuffle will be introduced.")
|
||||
@ -57,9 +55,8 @@ object SparkDatasetHelper extends Logging {
|
||||
doCollectLimit(collectLimit)
|
||||
case collectLimit: CollectLimitExec if collectLimit.limit < 0 =>
|
||||
executeArrowBatchCollect(collectLimit.child)
|
||||
// TODO: replace with pattern match once we drop Spark 3.1 support.
|
||||
case command: SparkPlan if isCommandResultExec(command) =>
|
||||
doCommandResultExec(command)
|
||||
case commandResult: CommandResultExec =>
|
||||
doCommandResultExec(commandResult)
|
||||
case localTableScan: LocalTableScanExec =>
|
||||
doLocalTableScan(localTableScan)
|
||||
case plan: SparkPlan =>
|
||||
@ -76,10 +73,8 @@ object SparkDatasetHelper extends Logging {
|
||||
*/
|
||||
def toArrowBatchRdd(plan: SparkPlan): RDD[Array[Byte]] = {
|
||||
val schemaCaptured = plan.schema
|
||||
// TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we
|
||||
// drop Spark 3.1 support.
|
||||
val maxRecordsPerBatch = SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch
|
||||
val timeZoneId = SparkSession.active.sessionState.conf.sessionLocalTimeZone
|
||||
val maxRecordsPerBatch = plan.session.sessionState.conf.arrowMaxRecordsPerBatch
|
||||
val timeZoneId = plan.session.sessionState.conf.sessionLocalTimeZone
|
||||
// note that, we can't pass the lazy variable `maxBatchSize` directly, this is because input
|
||||
// arguments are serialized and sent to the executor side for execution.
|
||||
val maxBatchSizePerBatch = maxBatchSize
|
||||
@ -160,10 +155,8 @@ object SparkDatasetHelper extends Logging {
|
||||
}
|
||||
|
||||
private def doCollectLimit(collectLimit: CollectLimitExec): Array[Array[Byte]] = {
|
||||
// TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we
|
||||
// drop Spark 3.1 support.
|
||||
val timeZoneId = SparkSession.active.sessionState.conf.sessionLocalTimeZone
|
||||
val maxRecordsPerBatch = SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch
|
||||
val timeZoneId = collectLimit.session.sessionState.conf.sessionLocalTimeZone
|
||||
val maxRecordsPerBatch = collectLimit.session.sessionState.conf.arrowMaxRecordsPerBatch
|
||||
|
||||
val batches = KyuubiArrowConverters.takeAsArrowBatches(
|
||||
collectLimit,
|
||||
@ -191,19 +184,13 @@ object SparkDatasetHelper extends Logging {
|
||||
result.toArray
|
||||
}
|
||||
|
||||
private lazy val commandResultExecRowsMethod = DynMethods.builder("rows")
|
||||
.impl("org.apache.spark.sql.execution.CommandResultExec")
|
||||
.build()
|
||||
|
||||
private def doCommandResultExec(command: SparkPlan): Array[Array[Byte]] = {
|
||||
val spark = SparkSession.active
|
||||
// TODO: replace with `command.rows` once we drop Spark 3.1 support.
|
||||
val rows = commandResultExecRowsMethod.invoke[Seq[InternalRow]](command)
|
||||
command.longMetric("numOutputRows").add(rows.size)
|
||||
sendDriverMetrics(spark.sparkContext, command.metrics)
|
||||
private def doCommandResultExec(commandResult: CommandResultExec): Array[Array[Byte]] = {
|
||||
val spark = commandResult.session
|
||||
commandResult.longMetric("numOutputRows").add(commandResult.rows.size)
|
||||
sendDriverMetrics(spark.sparkContext, commandResult.metrics)
|
||||
KyuubiArrowConverters.toBatchIterator(
|
||||
rows.iterator,
|
||||
command.schema,
|
||||
commandResult.rows.iterator,
|
||||
commandResult.schema,
|
||||
spark.sessionState.conf.arrowMaxRecordsPerBatch,
|
||||
maxBatchSize,
|
||||
-1,
|
||||
@ -211,7 +198,7 @@ object SparkDatasetHelper extends Logging {
|
||||
}
|
||||
|
||||
private def doLocalTableScan(localTableScan: LocalTableScanExec): Array[Array[Byte]] = {
|
||||
val spark = SparkSession.active
|
||||
val spark = localTableScan.session
|
||||
localTableScan.longMetric("numOutputRows").add(localTableScan.rows.size)
|
||||
sendDriverMetrics(spark.sparkContext, localTableScan.metrics)
|
||||
KyuubiArrowConverters.toBatchIterator(
|
||||
@ -224,31 +211,7 @@ object SparkDatasetHelper extends Logging {
|
||||
}
|
||||
|
||||
/**
|
||||
* This method provides a reflection-based implementation of
|
||||
* [[AdaptiveSparkPlanExec.finalPhysicalPlan]] that enables us to adapt to the Spark runtime
|
||||
* without patching SPARK-41914.
|
||||
*
|
||||
* TODO: Once we drop support for Spark 3.1.x, we can directly call
|
||||
* [[AdaptiveSparkPlanExec.finalPhysicalPlan]].
|
||||
*/
|
||||
def finalPhysicalPlan(adaptiveSparkPlanExec: AdaptiveSparkPlanExec): SparkPlan = {
|
||||
withFinalPlanUpdate(adaptiveSparkPlanExec, identity)
|
||||
}
|
||||
|
||||
/**
|
||||
* A reflection-based implementation of [[AdaptiveSparkPlanExec.withFinalPlanUpdate]].
|
||||
*/
|
||||
private def withFinalPlanUpdate[T](
|
||||
adaptiveSparkPlanExec: AdaptiveSparkPlanExec,
|
||||
fun: SparkPlan => T): T = {
|
||||
val plan = invokeAs[SparkPlan](adaptiveSparkPlanExec, "getFinalPhysicalPlan")
|
||||
val result = fun(plan)
|
||||
invokeAs[Unit](adaptiveSparkPlanExec, "finalPlanUpdate")
|
||||
result
|
||||
}
|
||||
|
||||
/**
|
||||
* offset support was add since Spark-3.4(set SPARK-28330), to ensure backward compatibility with
|
||||
* offset support was add in SPARK-28330(3.4.0), to ensure backward compatibility with
|
||||
* earlier versions of Spark, this function uses reflective calls to the "offset".
|
||||
*/
|
||||
private def offset(collectLimitExec: CollectLimitExec): Int = {
|
||||
@ -261,24 +224,6 @@ object SparkDatasetHelper extends Logging {
|
||||
.getOrElse(0)
|
||||
}
|
||||
|
||||
private def isCommandResultExec(sparkPlan: SparkPlan): Boolean = {
|
||||
// scalastyle:off line.size.limit
|
||||
// the CommandResultExec was introduced in SPARK-35378 (Spark 3.2), after SPARK-35378 the
|
||||
// physical plan of runnable command is CommandResultExec.
|
||||
// for instance:
|
||||
// ```
|
||||
// scala> spark.sql("show tables").queryExecution.executedPlan
|
||||
// res0: org.apache.spark.sql.execution.SparkPlan =
|
||||
// CommandResult <empty>, [namespace#0, tableName#1, isTemporary#2]
|
||||
// +- ShowTables [namespace#0, tableName#1, isTemporary#2], V2SessionCatalog(spark_catalog), [default]
|
||||
//
|
||||
// scala > spark.sql("show tables").queryExecution.executedPlan.getClass
|
||||
// res1: Class[_ <: org.apache.spark.sql.execution.SparkPlan] = class org.apache.spark.sql.execution.CommandResultExec
|
||||
// ```
|
||||
// scalastyle:on line.size.limit
|
||||
sparkPlan.getClass.getName == "org.apache.spark.sql.execution.CommandResultExec"
|
||||
}
|
||||
|
||||
/**
|
||||
* refer to org.apache.spark.sql.Dataset#withAction(), assign a new execution id for arrow-based
|
||||
* operation, so that we can track the arrow-based queries on the UI tab.
|
||||
|
||||
@ -21,7 +21,6 @@ import org.apache.spark.sql.SparkSession
|
||||
|
||||
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
|
||||
import org.apache.kyuubi.config.KyuubiConf
|
||||
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
|
||||
|
||||
trait WithSparkSQLEngine extends KyuubiFunSuite {
|
||||
protected var spark: SparkSession = _
|
||||
@ -35,7 +34,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
|
||||
// Affected by such configuration' default value
|
||||
// engine.initialize.sql='SHOW DATABASES'
|
||||
// SPARK-35378
|
||||
protected lazy val initJobId: Int = if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") 1 else 0
|
||||
protected val initJobId: Int = 1
|
||||
|
||||
override def beforeAll(): Unit = {
|
||||
startSparkEngine()
|
||||
|
||||
@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
|
||||
|
||||
import java.lang.{Boolean => JBoolean}
|
||||
import java.sql.Statement
|
||||
import java.util.{Locale, Set => JSet}
|
||||
import java.util.Locale
|
||||
|
||||
import org.apache.spark.{KyuubiSparkContextHelper, TaskContext}
|
||||
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
|
||||
@ -43,7 +43,6 @@ import org.apache.kyuubi.engine.spark.{SparkSQLEngine, WithSparkSQLEngine}
|
||||
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
|
||||
import org.apache.kyuubi.operation.SparkDataTypeTests
|
||||
import org.apache.kyuubi.util.reflect.{DynFields, DynMethods}
|
||||
import org.apache.kyuubi.util.reflect.ReflectUtils._
|
||||
|
||||
class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTypeTests
|
||||
with SparkMetricsTestUtils {
|
||||
@ -188,12 +187,9 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
|
||||
returnSize.foreach { size =>
|
||||
val df = spark.sql(s"select * from t_1 limit $size")
|
||||
val headPlan = df.queryExecution.executedPlan.collectLeaves().head
|
||||
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
|
||||
assert(headPlan.isInstanceOf[AdaptiveSparkPlanExec])
|
||||
val finalPhysicalPlan =
|
||||
SparkDatasetHelper.finalPhysicalPlan(headPlan.asInstanceOf[AdaptiveSparkPlanExec])
|
||||
assert(finalPhysicalPlan.isInstanceOf[CollectLimitExec])
|
||||
}
|
||||
assert(headPlan.isInstanceOf[AdaptiveSparkPlanExec])
|
||||
val finalPhysicalPlan = headPlan.asInstanceOf[AdaptiveSparkPlanExec].finalPhysicalPlan
|
||||
assert(finalPhysicalPlan.isInstanceOf[CollectLimitExec])
|
||||
if (size > 1000) {
|
||||
runAndCheck(df.queryExecution.executedPlan, 1000)
|
||||
} else {
|
||||
@ -298,11 +294,7 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
|
||||
val listener = new JobCountListener
|
||||
val l2 = new SQLMetricsListener
|
||||
val nodeName = spark.sql("SHOW TABLES").queryExecution.executedPlan.getClass.getName
|
||||
if (SPARK_ENGINE_RUNTIME_VERSION < "3.2") {
|
||||
assert(nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec")
|
||||
} else {
|
||||
assert(nodeName == "org.apache.spark.sql.execution.CommandResultExec")
|
||||
}
|
||||
assert(nodeName == "org.apache.spark.sql.execution.CommandResultExec")
|
||||
withJdbcStatement("table_1") { statement =>
|
||||
statement.executeQuery("CREATE TABLE table_1 (id bigint) USING parquet")
|
||||
withSparkListener(listener) {
|
||||
@ -314,15 +306,8 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
|
||||
}
|
||||
}
|
||||
|
||||
if (SPARK_ENGINE_RUNTIME_VERSION < "3.2") {
|
||||
// Note that before Spark 3.2, a LocalTableScan SparkPlan will be submitted, and the issue of
|
||||
// preventing LocalTableScan from triggering a job submission was addressed in [KYUUBI #4710].
|
||||
assert(l2.queryExecution.executedPlan.getClass.getName ==
|
||||
"org.apache.spark.sql.execution.LocalTableScanExec")
|
||||
} else {
|
||||
assert(l2.queryExecution.executedPlan.getClass.getName ==
|
||||
"org.apache.spark.sql.execution.CommandResultExec")
|
||||
}
|
||||
assert(l2.queryExecution.executedPlan.getClass.getName ==
|
||||
"org.apache.spark.sql.execution.CommandResultExec")
|
||||
assert(listener.numJobs == 0)
|
||||
}
|
||||
|
||||
@ -378,7 +363,6 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
|
||||
|
||||
test("post CommandResultExec driver-side metrics") {
|
||||
spark.sql("show tables").show(truncate = false)
|
||||
assume(SPARK_ENGINE_RUNTIME_VERSION >= "3.2")
|
||||
val expectedMetrics = Map(
|
||||
0L -> (("CommandResult", Map("number of output rows" -> "2"))))
|
||||
withTables("table_1", "table_2") {
|
||||
@ -493,7 +477,7 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
|
||||
}
|
||||
}
|
||||
(keys, values).zipped.foreach { (k, v) =>
|
||||
if (isStaticConfigKey(k)) {
|
||||
if (SQLConf.isStaticConfigKey(k)) {
|
||||
throw new KyuubiException(s"Cannot modify the value of a static config: $k")
|
||||
}
|
||||
conf.setConfString(k, v)
|
||||
@ -521,16 +505,6 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method provides a reflection-based implementation of [[SQLConf.isStaticConfigKey]] to
|
||||
* adapt Spark 3.1
|
||||
*
|
||||
* TODO: Once we drop support for Spark 3.1, we can directly call
|
||||
* [[SQLConf.isStaticConfigKey()]].
|
||||
*/
|
||||
private def isStaticConfigKey(key: String): Boolean =
|
||||
getField[JSet[String]]((SQLConf.getClass, SQLConf), "staticConfKeys").contains(key)
|
||||
|
||||
// the signature of function [[ArrowConverters.fromBatchIterator]] is changed in SPARK-43528
|
||||
// (since Spark 3.5)
|
||||
private lazy val fromBatchIteratorMethod = DynMethods.builder("fromBatchIterator")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user