[KYUUBI #6554] Delete redundant code related to zorder

# 🔍 Description
## Issue References 🔗

This pull request fixes #6554

## Describe Your Solution 🔧

- Delete `/kyuubi/extensions/spark/kyuubi-extension-spark-3-x/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala` file
- Rename `InsertZorderBeforeWriting33.scala` to `InsertZorderBeforeWriting.scala`
- Rename `InsertZorderHelper33,  InsertZorderBeforeWritingDatasource33,  InsertZorderBeforeWritingHive33, ZorderSuiteSpark33` to `InsertZorderHelper,  InsertZorderBeforeWritingDatasource,  InsertZorderBeforeWritingHive, ZorderSuiteSpark`

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6555 from huangxiaopingRD/6554.

Closes #6554

26de4fa09 [huangxiaoping] [KYUUBI #6554] Delete redundant code related to zorder

Authored-by: huangxiaoping <1754789345@qq.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
huangxiaoping 2024-07-23 12:14:55 +08:00 committed by Cheng Pan
parent 3aaa1d6b1e
commit 0f6d7643ae
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
10 changed files with 39 additions and 525 deletions

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33, InsertZorderBeforeWritingHive33, ResolveZorder}
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder}
class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension {
// should be applied before
// RepartitionBeforeWriting and RebalanceBeforeWriting
// because we can only apply one of them (i.e. Global Sort or Repartition/Rebalance)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)

View File

@ -28,7 +28,11 @@ import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, Inse
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder {
trait ZorderBuilder {
def buildZorder(children: Seq[Expression]): ZorderBase
}
trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
@ -140,8 +144,8 @@ trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder {
}
}
case class InsertZorderBeforeWritingDatasource33(session: SparkSession)
extends InsertZorderHelper33 {
case class InsertZorderBeforeWritingDatasource(session: SparkSession)
extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHadoopFsRelationCommand
if insert.query.resolved &&
@ -172,8 +176,8 @@ case class InsertZorderBeforeWritingDatasource33(session: SparkSession)
}
}
case class InsertZorderBeforeWritingHive33(session: SparkSession)
extends InsertZorderHelper33 {
case class InsertZorderBeforeWritingHive(session: SparkSession)
extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHiveTable
if insert.query.resolved &&

View File

@ -1,188 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.sql.zorder
import java.util.Locale
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, NullsLast, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Repartition, RepartitionByExpression, Sort}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand}
import org.apache.kyuubi.sql.KyuubiSQLConf
/**
* TODO: shall we forbid zorder if it's dynamic partition inserts ?
* Insert zorder before writing datasource if the target table properties has zorder properties
*/
abstract class InsertZorderBeforeWritingDatasourceBase
extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHadoopFsRelationCommand
if insert.query.resolved && insert.bucketSpec.isEmpty && insert.catalogTable.isDefined &&
isZorderEnabled(insert.catalogTable.get.properties) =>
val newQuery = insertZorder(insert.catalogTable.get, insert.query)
if (newQuery.eq(insert.query)) {
insert
} else {
insert.copy(query = newQuery)
}
case ctas: CreateDataSourceTableAsSelectCommand
if ctas.query.resolved && ctas.table.bucketSpec.isEmpty &&
isZorderEnabled(ctas.table.properties) =>
val newQuery = insertZorder(ctas.table, ctas.query)
if (newQuery.eq(ctas.query)) {
ctas
} else {
ctas.copy(query = newQuery)
}
case _ => plan
}
}
/**
* TODO: shall we forbid zorder if it's dynamic partition inserts ?
* Insert zorder before writing hive if the target table properties has zorder properties
*/
abstract class InsertZorderBeforeWritingHiveBase
extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHiveTable
if insert.query.resolved && insert.table.bucketSpec.isEmpty &&
isZorderEnabled(insert.table.properties) =>
val newQuery = insertZorder(insert.table, insert.query)
if (newQuery.eq(insert.query)) {
insert
} else {
insert.copy(query = newQuery)
}
case ctas: CreateHiveTableAsSelectCommand
if ctas.query.resolved && ctas.tableDesc.bucketSpec.isEmpty &&
isZorderEnabled(ctas.tableDesc.properties) =>
val newQuery = insertZorder(ctas.tableDesc, ctas.query)
if (newQuery.eq(ctas.query)) {
ctas
} else {
ctas.copy(query = newQuery)
}
case octas: OptimizedCreateHiveTableAsSelectCommand
if octas.query.resolved && octas.tableDesc.bucketSpec.isEmpty &&
isZorderEnabled(octas.tableDesc.properties) =>
val newQuery = insertZorder(octas.tableDesc, octas.query)
if (newQuery.eq(octas.query)) {
octas
} else {
octas.copy(query = newQuery)
}
case _ => plan
}
}
trait ZorderBuilder {
def buildZorder(children: Seq[Expression]): ZorderBase
}
trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
def isZorderEnabled(props: Map[String, String]): Boolean = {
props.contains(KYUUBI_ZORDER_ENABLED) &&
"true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) &&
props.contains(KYUUBI_ZORDER_COLS)
}
def getZorderColumns(props: Map[String, String]): Seq[String] = {
val cols = props.get(KYUUBI_ZORDER_COLS)
assert(cols.isDefined)
cols.get.split(",").map(_.trim.toLowerCase(Locale.ROOT))
}
def canInsertZorder(query: LogicalPlan): Boolean = query match {
case Project(_, child) => canInsertZorder(child)
// TODO: actually, we can force zorder even if existed some shuffle
case _: Sort => false
case _: RepartitionByExpression => false
case _: Repartition => false
case _ => true
}
def insertZorder(catalogTable: CatalogTable, plan: LogicalPlan): LogicalPlan = {
if (!canInsertZorder(plan)) {
return plan
}
val cols = getZorderColumns(catalogTable.properties)
val attrs = plan.output.map(attr => (attr.name, attr)).toMap
if (cols.exists(!attrs.contains(_))) {
logWarning(s"target table does not contain all zorder cols: ${cols.mkString(",")}, " +
s"please check your table properties ${KYUUBI_ZORDER_COLS}.")
plan
} else {
val bound = cols.map(attrs(_))
val orderExpr =
if (bound.length == 1) {
bound.head
} else {
buildZorder(bound)
}
// TODO: We can do rebalance partitions before local sort of zorder after SPARK 3.3
// see https://github.com/apache/spark/pull/34542
Sort(
SortOrder(orderExpr, Ascending, NullsLast, Seq.empty) :: Nil,
conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED),
plan)
}
}
def applyInternal(plan: LogicalPlan): LogicalPlan
final override def apply(plan: LogicalPlan): LogicalPlan = {
if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) {
applyInternal(plan)
} else {
plan
}
}
}
/**
* TODO: shall we forbid zorder if it's dynamic partition inserts ?
* Insert zorder before writing datasource if the target table properties has zorder properties
*/
case class InsertZorderBeforeWritingDatasource(session: SparkSession)
extends InsertZorderBeforeWritingDatasourceBase {
override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
}
/**
* TODO: shall we forbid zorder if it's dynamic partition inserts ?
* Insert zorder before writing hive if the target table properties has zorder properties
*/
case class InsertZorderBeforeWritingHive(session: SparkSession)
extends InsertZorderBeforeWritingHiveBase {
override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
}

View File

@ -24,7 +24,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.kyuubi.sql.{KyuubiSQLConf, SparkKyuubiSparkSQLParser}
import org.apache.kyuubi.sql.zorder.Zorder
trait ZorderSuiteSpark33 extends ZorderSuiteBase {
trait ZorderSuiteSpark extends ZorderSuiteBase {
test("Add rebalance before zorder") {
Seq("true" -> false, "false" -> true).foreach { case (useOriginalOrdering, zorder) =>
@ -115,10 +115,10 @@ trait ParserSuite { self: ZorderSuiteBase =>
class ZorderWithCodegenEnabledSuite
extends ZorderWithCodegenEnabledSuiteBase
with ZorderSuiteSpark33
with ZorderSuiteSpark
with ParserSuite {}
class ZorderWithCodegenDisabledSuite
extends ZorderWithCodegenDisabledSuiteBase
with ZorderSuiteSpark33
with ZorderSuiteSpark
with ParserSuite {}

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33, InsertZorderBeforeWritingHive33, ResolveZorder}
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder}
class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension {
// should be applied before
// RepartitionBeforeWriting and RebalanceBeforeWriting
// because we can only apply one of them (i.e. Global Sort or Repartition/Rebalance)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)

View File

@ -27,7 +27,11 @@ import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder {
trait ZorderBuilder {
def buildZorder(children: Seq[Expression]): ZorderBase
}
trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
@ -139,8 +143,8 @@ trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder {
}
}
case class InsertZorderBeforeWritingDatasource33(session: SparkSession)
extends InsertZorderHelper33 {
case class InsertZorderBeforeWritingDatasource(session: SparkSession)
extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHadoopFsRelationCommand
if insert.query.resolved &&
@ -159,8 +163,8 @@ case class InsertZorderBeforeWritingDatasource33(session: SparkSession)
}
}
case class InsertZorderBeforeWritingHive33(session: SparkSession)
extends InsertZorderHelper33 {
case class InsertZorderBeforeWritingHive(session: SparkSession)
extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHiveTable
if insert.query.resolved &&

View File

@ -1,155 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.sql.zorder
import java.util.Locale
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, NullsLast, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.kyuubi.sql.KyuubiSQLConf
/**
* TODO: shall we forbid zorder if it's dynamic partition inserts ?
* Insert zorder before writing datasource if the target table properties has zorder properties
*/
abstract class InsertZorderBeforeWritingDatasourceBase
extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHadoopFsRelationCommand
if insert.query.resolved && insert.bucketSpec.isEmpty && insert.catalogTable.isDefined &&
isZorderEnabled(insert.catalogTable.get.properties) =>
val newQuery = insertZorder(insert.catalogTable.get, insert.query)
if (newQuery.eq(insert.query)) {
insert
} else {
insert.copy(query = newQuery)
}
case _ => plan
}
}
/**
* TODO: shall we forbid zorder if it's dynamic partition inserts ?
* Insert zorder before writing hive if the target table properties has zorder properties
*/
abstract class InsertZorderBeforeWritingHiveBase
extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHiveTable
if insert.query.resolved && insert.table.bucketSpec.isEmpty &&
isZorderEnabled(insert.table.properties) =>
val newQuery = insertZorder(insert.table, insert.query)
if (newQuery.eq(insert.query)) {
insert
} else {
insert.copy(query = newQuery)
}
case _ => plan
}
}
trait ZorderBuilder {
def buildZorder(children: Seq[Expression]): ZorderBase
}
trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
def isZorderEnabled(props: Map[String, String]): Boolean = {
props.contains(KYUUBI_ZORDER_ENABLED) &&
"true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) &&
props.contains(KYUUBI_ZORDER_COLS)
}
def getZorderColumns(props: Map[String, String]): Seq[String] = {
val cols = props.get(KYUUBI_ZORDER_COLS)
assert(cols.isDefined)
cols.get.split(",").map(_.trim.toLowerCase(Locale.ROOT))
}
def canInsertZorder(query: LogicalPlan): Boolean = query match {
case Project(_, child) => canInsertZorder(child)
// TODO: actually, we can force zorder even if existed some shuffle
case _: Sort => false
case _: RepartitionByExpression => false
case _: Repartition => false
case _ => true
}
def insertZorder(catalogTable: CatalogTable, plan: LogicalPlan): LogicalPlan = {
if (!canInsertZorder(plan)) {
return plan
}
val cols = getZorderColumns(catalogTable.properties)
val attrs = plan.output.map(attr => (attr.name, attr)).toMap
if (cols.exists(!attrs.contains(_))) {
logWarning(s"target table does not contain all zorder cols: ${cols.mkString(",")}, " +
s"please check your table properties ${KYUUBI_ZORDER_COLS}.")
plan
} else {
val bound = cols.map(attrs(_))
val orderExpr =
if (bound.length == 1) {
bound.head
} else {
buildZorder(bound)
}
// TODO: We can do rebalance partitions before local sort of zorder after SPARK 3.3
// see https://github.com/apache/spark/pull/34542
Sort(
SortOrder(orderExpr, Ascending, NullsLast, Seq.empty) :: Nil,
conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED),
plan)
}
}
def applyInternal(plan: LogicalPlan): LogicalPlan
final override def apply(plan: LogicalPlan): LogicalPlan = {
if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) {
applyInternal(plan)
} else {
plan
}
}
}
/**
* TODO: shall we forbid zorder if it's dynamic partition inserts ?
* Insert zorder before writing datasource if the target table properties has zorder properties
*/
case class InsertZorderBeforeWritingDatasource(session: SparkSession)
extends InsertZorderBeforeWritingDatasourceBase {
override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
}
/**
* TODO: shall we forbid zorder if it's dynamic partition inserts ?
* Insert zorder before writing hive if the target table properties has zorder properties
*/
case class InsertZorderBeforeWritingHive(session: SparkSession)
extends InsertZorderBeforeWritingHiveBase {
override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
}

View File

@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33, InsertZorderBeforeWritingHive33, ResolveZorder}
import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder}
class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension {
// should be applied before
// RepartitionBeforeWriting and RebalanceBeforeWriting
// because we can only apply one of them (i.e. Global Sort or Repartition/Rebalance)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource)
extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive)
extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule)
extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin)

View File

@ -27,7 +27,11 @@ import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder {
trait ZorderBuilder {
def buildZorder(children: Seq[Expression]): ZorderBase
}
trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
@ -139,8 +143,8 @@ trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder {
}
}
case class InsertZorderBeforeWritingDatasource33(session: SparkSession)
extends InsertZorderHelper33 {
case class InsertZorderBeforeWritingDatasource(session: SparkSession)
extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHadoopFsRelationCommand
if insert.query.resolved &&
@ -159,8 +163,8 @@ case class InsertZorderBeforeWritingDatasource33(session: SparkSession)
}
}
case class InsertZorderBeforeWritingHive33(session: SparkSession)
extends InsertZorderHelper33 {
case class InsertZorderBeforeWritingHive(session: SparkSession)
extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHiveTable
if insert.query.resolved &&

View File

@ -1,155 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.sql.zorder
import java.util.Locale
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, NullsLast, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.kyuubi.sql.KyuubiSQLConf
/**
* TODO: shall we forbid zorder if it's dynamic partition inserts ?
* Insert zorder before writing datasource if the target table properties has zorder properties
*/
abstract class InsertZorderBeforeWritingDatasourceBase
extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHadoopFsRelationCommand
if insert.query.resolved && insert.bucketSpec.isEmpty && insert.catalogTable.isDefined &&
isZorderEnabled(insert.catalogTable.get.properties) =>
val newQuery = insertZorder(insert.catalogTable.get, insert.query)
if (newQuery.eq(insert.query)) {
insert
} else {
insert.copy(query = newQuery)
}
case _ => plan
}
}
/**
* TODO: shall we forbid zorder if it's dynamic partition inserts ?
* Insert zorder before writing hive if the target table properties has zorder properties
*/
abstract class InsertZorderBeforeWritingHiveBase
extends InsertZorderHelper {
override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match {
case insert: InsertIntoHiveTable
if insert.query.resolved && insert.table.bucketSpec.isEmpty &&
isZorderEnabled(insert.table.properties) =>
val newQuery = insertZorder(insert.table, insert.query)
if (newQuery.eq(insert.query)) {
insert
} else {
insert.copy(query = newQuery)
}
case _ => plan
}
}
trait ZorderBuilder {
def buildZorder(children: Seq[Expression]): ZorderBase
}
trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder {
private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled"
private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols"
def isZorderEnabled(props: Map[String, String]): Boolean = {
props.contains(KYUUBI_ZORDER_ENABLED) &&
"true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) &&
props.contains(KYUUBI_ZORDER_COLS)
}
def getZorderColumns(props: Map[String, String]): Seq[String] = {
val cols = props.get(KYUUBI_ZORDER_COLS)
assert(cols.isDefined)
cols.get.split(",").map(_.trim.toLowerCase(Locale.ROOT))
}
def canInsertZorder(query: LogicalPlan): Boolean = query match {
case Project(_, child) => canInsertZorder(child)
// TODO: actually, we can force zorder even if existed some shuffle
case _: Sort => false
case _: RepartitionByExpression => false
case _: Repartition => false
case _ => true
}
def insertZorder(catalogTable: CatalogTable, plan: LogicalPlan): LogicalPlan = {
if (!canInsertZorder(plan)) {
return plan
}
val cols = getZorderColumns(catalogTable.properties)
val attrs = plan.output.map(attr => (attr.name, attr)).toMap
if (cols.exists(!attrs.contains(_))) {
logWarning(s"target table does not contain all zorder cols: ${cols.mkString(",")}, " +
s"please check your table properties ${KYUUBI_ZORDER_COLS}.")
plan
} else {
val bound = cols.map(attrs(_))
val orderExpr =
if (bound.length == 1) {
bound.head
} else {
buildZorder(bound)
}
// TODO: We can do rebalance partitions before local sort of zorder after SPARK 3.3
// see https://github.com/apache/spark/pull/34542
Sort(
SortOrder(orderExpr, Ascending, NullsLast, Seq.empty) :: Nil,
conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED),
plan)
}
}
def applyInternal(plan: LogicalPlan): LogicalPlan
final override def apply(plan: LogicalPlan): LogicalPlan = {
if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) {
applyInternal(plan)
} else {
plan
}
}
}
/**
* TODO: shall we forbid zorder if it's dynamic partition inserts ?
* Insert zorder before writing datasource if the target table properties has zorder properties
*/
case class InsertZorderBeforeWritingDatasource(session: SparkSession)
extends InsertZorderBeforeWritingDatasourceBase {
override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
}
/**
* TODO: shall we forbid zorder if it's dynamic partition inserts ?
* Insert zorder before writing hive if the target table properties has zorder properties
*/
case class InsertZorderBeforeWritingHive(session: SparkSession)
extends InsertZorderBeforeWritingHiveBase {
override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children)
}