[KYUUBI #1055] Refine zorder plan

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
- Change origin `OptimizeZorderCommand` to `OptimizeZorderStatement` which holds the parsed plan
- Add `OptimizeZorderCommand` which delegate an another command(e.g. `InsertIntoHive`) to execute; the reason we should add a new command (not reuse `InserIntoStatement`) is we may need to implement read and overwrite same table in future. And also it's tricky to add `InserIntoStatement` during analyze.
- Change `ZorderBeforeWrite` to `ResolveZorder`, so we can resolve `OptimizeZorderStatement` to `OptimizeZorderCommand`

So the code should look cleaner

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1055 from ulysses-you/zorder-insert.

Closes #1055

d252d35e [ulysses-you] simplify
223f68ed [ulysses-you] refine zorder plan

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
This commit is contained in:
ulysses-you 2021-09-09 10:15:49 +08:00 committed by ulysses-you
parent 85057366f4
commit ba2880cd66
No known key found for this signature in database
GPG Key ID: 4C500BC62D576766
10 changed files with 200 additions and 108 deletions

View File

@ -15,6 +15,14 @@
* limitations under the License.
*/
package org.apache.kyuubi.sql.zorder
package org.apache.kyuubi.sql
class ZorderException(message: String, cause: Throwable = null) extends Exception(message, cause)
import java.sql.SQLException
class KyuubiSQLExtensionException(reason: String, cause: Throwable)
extends SQLException(reason, cause) {
def this(reason: String) = {
this(reason, null)
}
}

View File

@ -20,7 +20,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.kyuubi.sql.sqlclassification.KyuubiSqlClassification
import org.apache.kyuubi.sql.zorder.ZorderBeforeWrite
import org.apache.kyuubi.sql.zorder.ResolveZorder
import org.apache.kyuubi.sql.zorder.ZorderSparkSqlExtensionsParser
// scalastyle:off line.size.limit
@ -34,7 +34,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
// inject zorder parser and related rules
extensions.injectParser{ case (_, parser) => new ZorderSparkSqlExtensionsParser(parser) }
extensions.injectResolutionRule(ZorderBeforeWrite)
extensions.injectResolutionRule(ResolveZorder)
extensions.injectPostHocResolutionRule(KyuubiSqlClassification)
extensions.injectPostHocResolutionRule(RepartitionBeforeWrite)

View File

@ -17,27 +17,51 @@
package org.apache.kyuubi.sql.zorder
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, NullsLast, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Sort, UnaryNode}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
case class OptimizeZorderCommand(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}
import org.apache.kyuubi.sql.KyuubiSQLExtensionException
object OptimizeZorderCommand {
/**
* A runnable command for zorder, we delegate to real command to execute
*/
case class OptimizeZorderCommand(
catalogTable: CatalogTable,
query: LogicalPlan) extends DataWritingCommand {
override def outputColumnNames: Seq[String] = query.output.map(_.name)
def apply(tableIdent: Seq[String],
whereExp: Option[Expression],
sortArr: Seq[UnresolvedAttribute]): OptimizeZorderCommand = {
val table = UnresolvedRelation(tableIdent)
val child = whereExp match {
case Some(x) => Filter(x, table)
case None => table
private def isHiveTable: Boolean = {
catalogTable.provider.isEmpty ||
(catalogTable.provider.isDefined && "hive".equalsIgnoreCase(catalogTable.provider.get))
}
private def getWritingCommand(session: SparkSession): DataWritingCommand = {
// TODO: Support convert hive relation to datasource relation, can see
// [[org.apache.spark.sql.hive.RelationConversions]]
InsertIntoHiveTable(
catalogTable,
catalogTable.partitionColumnNames.map(p => (p, None)).toMap,
query,
overwrite = true,
ifPartitionNotExists = false,
outputColumnNames
)
}
override def run(session: SparkSession, child: SparkPlan): Seq[Row] = {
// TODO: Support datasource relation
// TODO: Support read and insert overwrite the same table for some table format
if (!isHiveTable) {
throw new KyuubiSQLExtensionException("only support hive table")
}
val sortOrder = SortOrder(Zorder(sortArr), Ascending, NullsLast, Seq.empty)
val zorderSort = Sort(Seq(sortOrder), true, child)
OptimizeZorderCommand(zorderSort)
val command = getWritingCommand(session)
command.run(session, child)
DataWritingCommand.propogateMetrics(session.sparkContext, command, metrics)
Seq.empty
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.sql.zorder
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
/**
* A zorder statement that contains we parsed from SQL.
* We should convert this plan to certain command at Analyzer.
*/
case class OptimizeZorderStatement(
tableIdentifier: Seq[String],
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.sql.zorder
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.AttributeSet
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.kyuubi.sql.KyuubiSQLExtensionException
/**
* Resolve `OptimizeZorderStatement` to `OptimizeZorderCommand`
*/
case class ResolveZorder(session: SparkSession) extends Rule[LogicalPlan] {
private def checkQueryAllowed(query: LogicalPlan): Unit = query foreach {
case Filter(condition, SubqueryAlias(_, tableRelation: HiveTableRelation)) =>
if (tableRelation.partitionCols.isEmpty) {
throw new KyuubiSQLExtensionException("Filters are only supported for partitioned table")
}
val partitionKeyIds = AttributeSet(tableRelation.partitionCols)
if (condition.references.isEmpty || !condition.references.subsetOf(partitionKeyIds)) {
throw new KyuubiSQLExtensionException("Only partition column filters are allowed")
}
case _ =>
}
private def getTableIdentifier(tableIdent: Seq[String]): TableIdentifier = tableIdent match {
case Seq(tbl) => TableIdentifier.apply(tbl)
case Seq(db, tbl) => TableIdentifier.apply(tbl, Some(db))
case _ => throw new KyuubiSQLExtensionException(
"only support session catalog table, please use db.table instead")
}
override def apply(plan: LogicalPlan): LogicalPlan = plan match {
case OptimizeZorderStatement(tableIdent, query) if query.resolved =>
checkQueryAllowed(query)
val tableIdentifier = getTableIdentifier(tableIdent)
val catalogTable = session.sessionState.catalog.getTableMetadata(tableIdentifier)
OptimizeZorderCommand(catalogTable, query)
case _ => plan
}
}

View File

@ -22,6 +22,8 @@ import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, TimestampType}
import org.apache.kyuubi.sql.KyuubiSQLExtensionException
case class Zorder(children: Seq[Expression]) extends Expression with CodegenFallback {
private lazy val defaultNullValues = {
children.map {
@ -50,10 +52,10 @@ case class Zorder(children: Seq[Expression]) extends Expression with CodegenFall
case d: DecimalType =>
Long.MaxValue
case other: Any =>
throw new ZorderException("Unsupported z-order type: " + other.getClass)
throw new KyuubiSQLExtensionException("Unsupported z-order type: " + other.getClass)
}
case other: Any =>
throw new ZorderException("Unknown z-order column: " + other)
throw new KyuubiSQLExtensionException("Unknown z-order column: " + other)
}
}

View File

@ -1,67 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.sql.zorder
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.AttributeSet
import org.apache.spark.sql.catalyst.plans.logical.{Filter, InsertIntoStatement, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
case class ZorderBeforeWrite(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan match {
case o @ OptimizeZorderCommand(child) =>
var relation: Option[HiveTableRelation] = None
var partitionSpec: Map[String, Option[String]] = Map.empty
val newChild = child.resolveOperatorsUp {
case f @ Filter(condition,
SubqueryAlias(_, tableRelation: HiveTableRelation)) if f.resolved =>
if (!tableRelation.isPartitioned) {
throw new ZorderException("Filters are only supported for partitioned table")
}
val partitionKeyIds = AttributeSet(tableRelation.partitionCols)
if (condition.references.isEmpty || !condition.references.subsetOf(partitionKeyIds)) {
throw new ZorderException("Only partition column filters are allowed")
}
val partitions = tableRelation.partitionCols.map { p =>
p.name -> None
}
partitionSpec = Map(partitions: _*)
f
case r: HiveTableRelation =>
relation = Option(r)
r
}
if (o.resolved) {
relation match {
case Some(table) =>
InsertIntoStatement(table, partitionSpec, Nil, newChild, true, false)
case None => plan
}
} else {
plan
}
case _ => plan
}
}
}

View File

@ -22,6 +22,8 @@ import java.nio.charset.Charset
import org.apache.spark.sql.types.Decimal
import org.apache.kyuubi.sql.KyuubiSQLExtensionException
object ZorderBytesUtils {
def interleaveMultiByteArray(arrays: Array[Array[Byte]]): Array[Byte] = {
var totalLength = 0
@ -87,7 +89,7 @@ object ZorderBytesUtils {
case dec: Decimal =>
ZorderBytesUtils.longToByte(dec.toLong)
case other: Any =>
throw new ZorderException("Unsupported z-order type: " + other.getClass)
throw new KyuubiSQLExtensionException("Unsupported z-order type: " + other.getClass)
}
}

View File

@ -26,11 +26,11 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import org.antlr.v4.runtime.ParserRuleContext
import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{And, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Not, Or}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{And, Ascending, EqualNullSafe, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Not, NullsLast, Or, SortOrder}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, stringWithoutUnescape, withOrigin}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, Sort}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.hive.HiveAnalysis.conf
@ -49,24 +49,40 @@ class ZorderSqlAstBuilder extends ZorderSqlExtensionsBaseVisitor[AnyRef] {
protected def multiPart(ctx: ParserRuleContext): Seq[String] = typedVisit(ctx)
protected def zorder(ctx: ParserRuleContext): Seq[UnresolvedAttribute] = typedVisit(ctx)
override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = {
visit(ctx.statement()).asInstanceOf[LogicalPlan]
}
override def visitPassThrough(ctx: PassThroughContext): LogicalPlan = null
override def visitOptimizeZorder(ctx: OptimizeZorderContext):
OptimizeZorderCommand = withOrigin(ctx) {
val tableIdent = multiPart(ctx.multipartIdentifier())
val whereItem = if (ctx.whereClause() == null) {
None
} else {
Option(expression(ctx.whereClause().booleanExpression()))
}
override def visitOptimizeZorder(
ctx: OptimizeZorderContext): OptimizeZorderStatement = withOrigin(ctx) {
val tableIdent = multiPart(ctx.multipartIdentifier())
val table = UnresolvedRelation(tableIdent)
OptimizeZorderCommand(tableIdent, whereItem, zorder(ctx.zorderClause()))
val whereClause = if (ctx.whereClause() == null) {
None
} else {
Option(expression(ctx.whereClause().booleanExpression()))
}
val tableWithFilter = whereClause match {
case Some(expr) => Filter(expr, table)
case None => table
}
val zorderCols = ctx.zorderClause().order.asScala
.map(visitMultipartIdentifier)
.map(UnresolvedAttribute(_))
.toSeq
val query =
Sort(
SortOrder(Zorder(zorderCols), Ascending, NullsLast, Seq.empty) :: Nil,
true,
Project(Seq(UnresolvedStar(None)), tableWithFilter))
OptimizeZorderStatement(tableIdent, query)
}
override def visitQuery(ctx: QueryContext): Expression = withOrigin(ctx) {

View File

@ -28,7 +28,7 @@ import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf}
import org.apache.kyuubi.sql.zorder.ZorderException
import org.apache.kyuubi.sql.KyuubiSQLExtensionException
class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSparkPlanHelper {
@ -908,7 +908,9 @@ class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSpar
"(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
"(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
val e = intercept[ZorderException](sql("OPTIMIZE up WHERE c1 > 1 ZORDER BY c1, c2"))
val e = intercept[KyuubiSQLExtensionException] {
sql("OPTIMIZE up WHERE c1 > 1 ZORDER BY c1, c2")
}
assert(e.getMessage == "Filters are only supported for partitioned table")
sql("OPTIMIZE up ZORDER BY c1, c2")
@ -999,7 +1001,7 @@ class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSpar
"(2,0,2),(2,1,1),(2,2,5),(2,3,5)," +
"(3,0,3),(3,1,4),(3,2,9),(3,3,0)")
val e = intercept[ZorderException](
val e = intercept[KyuubiSQLExtensionException](
sql(s"OPTIMIZE p WHERE id = 1 AND c1 > 1 ZORDER BY c1, c2")
)
assert(e.getMessage == "Only partition column filters are allowed")
@ -1026,4 +1028,15 @@ class KyuubiExtensionSuite extends QueryTest with SQLTestUtils with AdaptiveSpar
}
}
}
test("optimize zorder with datasource table") {
// TODO remove this if we support datasource table
withTable("t") {
sql("CREATE TABLE t (c1 int, c2 int) USING PARQUET")
val msg = intercept[KyuubiSQLExtensionException] {
sql("OPTIMIZE t ZORDER BY c1, c2")
}.getMessage
assert(msg.contains("only support hive table"))
}
}
}