[KYUUBI #5039] [Improvement] Use semantic versions and remove redundant version comparison methods

### _Why are the changes needed?_

- Support initializing or comparing version with major version only, e.g "3" equivalent to  "3.0"
- Remove redundant version comparison methods by using semantic versions of Spark, Flink and Kyuubi
- adding common `toDouble` method

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

Closes #5039 from bowenliang123/improve-semanticversion.

Closes #5039

b6868264f [liangbowen] nit
d39646b7d [liangbowen] SPARK_ENGINE_RUNTIME_VERSION
9148caad0 [liangbowen] use semantic versions
ecc3b4af6 [mans2singh] [KYUUBI #5086] [KYUUBI # 5085] Update config section of deploy on kubernetes

Lead-authored-by: liangbowen <liangbowen@gf.com.cn>
Co-authored-by: mans2singh <mans2singh@yahoo.com>
Signed-off-by: liangbowen <liangbowen@gf.com.cn>
This commit is contained in:
liangbowen 2023-07-25 18:04:45 +08:00
parent 026b88e87b
commit 6ec326adb4
25 changed files with 122 additions and 144 deletions

View File

@ -83,18 +83,10 @@ private[authz] object AuthZUtils {
}
}
private lazy val sparkSemanticVersion: SemanticVersion = SemanticVersion(SPARK_VERSION)
lazy val isSparkV31OrGreater: Boolean = isSparkVersionAtLeast("3.1")
lazy val isSparkV32OrGreater: Boolean = isSparkVersionAtLeast("3.2")
lazy val isSparkV33OrGreater: Boolean = isSparkVersionAtLeast("3.3")
def isSparkVersionAtMost(targetVersionString: String): Boolean = {
sparkSemanticVersion.isVersionAtMost(targetVersionString)
}
def isSparkVersionAtLeast(targetVersionString: String): Boolean = {
sparkSemanticVersion.isVersionAtLeast(targetVersionString)
}
lazy val SPARK_RUNTIME_VERSION: SemanticVersion = SemanticVersion(SPARK_VERSION)
lazy val isSparkV31OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.1"
lazy val isSparkV32OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.2"
lazy val isSparkV33OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.3"
def quoteIfNeeded(part: String): String = {
if (part.matches("[a-zA-Z0-9_]+") && !part.matches("\\d+")) {

View File

@ -24,7 +24,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.apache.kyuubi.plugin.spark.authz.OperationType.QUERY
import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.isSparkVersionAtMost
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.SPARK_RUNTIME_VERSION
abstract class FunctionPrivilegesBuilderSuite extends AnyFunSuite
with SparkSessionProvider with BeforeAndAfterAll with BeforeAndAfterEach {
@ -112,7 +112,7 @@ class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite
override protected val catalogImpl: String = "hive"
test("Function Call Query") {
assume(isSparkVersionAtMost("3.3"))
assume(SPARK_RUNTIME_VERSION <= "3.3")
val plan = sql(s"SELECT kyuubi_fun_1('data'), " +
s"kyuubi_fun_2(value), " +
s"${reusedDb}.kyuubi_fun_0(value), " +
@ -132,7 +132,7 @@ class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite
}
test("Function Call Query with Quoted Name") {
assume(isSparkVersionAtMost("3.3"))
assume(SPARK_RUNTIME_VERSION <= "3.3")
val plan = sql(s"SELECT `kyuubi_fun_1`('data'), " +
s"`kyuubi_fun_2`(value), " +
s"`${reusedDb}`.`kyuubi_fun_0`(value), " +
@ -152,7 +152,7 @@ class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite
}
test("Simple Function Call Query") {
assume(isSparkVersionAtMost("3.3"))
assume(SPARK_RUNTIME_VERSION <= "3.3")
val plan = sql(s"SELECT kyuubi_fun_1('data'), " +
s"kyuubi_fun_0('value'), " +
s"${reusedDb}.kyuubi_fun_0('value'), " +
@ -172,7 +172,7 @@ class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite
}
test("Function Call In CAST Command") {
assume(isSparkVersionAtMost("3.3"))
assume(SPARK_RUNTIME_VERSION <= "3.3")
val table = "castTable"
withTable(table) { table =>
val plan = sql(s"CREATE TABLE ${table} " +

View File

@ -112,7 +112,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
}
test("AlterDatabasePropertiesCommand") {
assume(isSparkVersionAtMost("3.2"))
assume(SPARK_RUNTIME_VERSION <= "3.2")
val plan = sql("ALTER DATABASE default SET DBPROPERTIES (abc = '123')").queryExecution.analyzed
val (in, out, operationType) = PrivilegesBuilder.build(plan, spark)
assertResult(plan.getClass.getName)(
@ -160,7 +160,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
}
test("CreateDatabaseCommand") {
assume(isSparkVersionAtMost("3.2"))
assume(SPARK_RUNTIME_VERSION <= "3.2")
withDatabase("CreateDatabaseCommand") { db =>
val plan = sql(s"CREATE DATABASE $db").queryExecution.analyzed
val (in, out, operationType) = PrivilegesBuilder.build(plan, spark)
@ -182,7 +182,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
}
test("DropDatabaseCommand") {
assume(isSparkVersionAtMost("3.2"))
assume(SPARK_RUNTIME_VERSION <= "3.2")
withDatabase("DropDatabaseCommand") { db =>
sql(s"CREATE DATABASE $db")
val plan = sql(s"DROP DATABASE DropDatabaseCommand").queryExecution.analyzed
@ -759,7 +759,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
}
test("DescribeDatabaseCommand") {
assume(isSparkVersionAtMost("3.2"))
assume(SPARK_RUNTIME_VERSION <= "3.2")
val plan = sql(s"DESC DATABASE $reusedDb").queryExecution.analyzed
val (in, out, operationType) = PrivilegesBuilder.build(plan, spark)
assert(operationType === DESCDATABASE)
@ -1253,7 +1253,7 @@ class InMemoryPrivilegeBuilderSuite extends PrivilegesBuilderSuite {
// some hive version does not support set database location
test("AlterDatabaseSetLocationCommand") {
assume(isSparkVersionAtMost("3.2"))
assume(SPARK_RUNTIME_VERSION <= "3.2")
val newLoc = spark.conf.get("spark.sql.warehouse.dir") + "/new_db_location"
val plan = sql(s"ALTER DATABASE default SET LOCATION '$newLoc'")
.queryExecution.analyzed

View File

@ -22,16 +22,5 @@ import org.apache.spark.SPARK_VERSION
import org.apache.kyuubi.util.SemanticVersion
object SparkUtils {
def isSparkVersionAtMost(targetVersionString: String): Boolean = {
SemanticVersion(SPARK_VERSION).isVersionAtMost(targetVersionString)
}
def isSparkVersionAtLeast(targetVersionString: String): Boolean = {
SemanticVersion(SPARK_VERSION).isVersionAtLeast(targetVersionString)
}
def isSparkVersionEqualTo(targetVersionString: String): Boolean = {
SemanticVersion(SPARK_VERSION).isVersionEqualTo(targetVersionString)
}
lazy val SPARK_RUNTIME_VERSION: SemanticVersion = SemanticVersion(SPARK_VERSION)
}

View File

@ -29,15 +29,15 @@ import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.kyuubi.spark.connector.common.SparkUtils
import org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
object HiveConnectorUtils extends Logging {
def partitionedFilePath(file: PartitionedFile): String = {
if (SparkUtils.isSparkVersionAtLeast("3.4")) {
if (SPARK_RUNTIME_VERSION >= "3.4") {
invokeAs[String](file, "urlEncodedPath")
} else if (SparkUtils.isSparkVersionAtLeast("3.3")) {
} else if (SPARK_RUNTIME_VERSION >= "3.3") {
invokeAs[String](file, "filePath")
} else {
throw KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +

View File

@ -23,7 +23,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession
import org.apache.kyuubi.spark.connector.common.SparkUtils
import org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION
class TPCDSCatalogSuite extends KyuubiFunSuite {
@ -126,7 +126,7 @@ class TPCDSCatalogSuite extends KyuubiFunSuite {
val stats = spark.table(tableName).queryExecution.analyzed.stats
assert(stats.sizeInBytes == sizeInBytes)
// stats.rowCount only has value after SPARK-33954
if (SparkUtils.isSparkVersionAtLeast("3.2")) {
if (SPARK_RUNTIME_VERSION >= "3.2") {
assert(stats.rowCount.contains(rowCount), tableName)
}
}

View File

@ -23,7 +23,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession
import org.apache.kyuubi.spark.connector.common.SparkUtils
import org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION
class TPCHCatalogSuite extends KyuubiFunSuite {
@ -130,7 +130,7 @@ class TPCHCatalogSuite extends KyuubiFunSuite {
val stats = spark.table(tableName).queryExecution.analyzed.stats
assert(stats.sizeInBytes == sizeInBytes)
// stats.rowCount only has value after SPARK-33954
if (SparkUtils.isSparkVersionAtLeast("3.2")) {
if (SPARK_RUNTIME_VERSION >= "3.2") {
assert(stats.rowCount.contains(rowCount), tableName)
}
}

View File

@ -25,23 +25,7 @@ import org.apache.kyuubi.util.SemanticVersion
object SparkListenerHelper {
lazy val sparkMajorMinorVersion: (Int, Int) = {
val runtimeSparkVer = org.apache.spark.SPARK_VERSION
val runtimeVersion = SemanticVersion(runtimeSparkVer)
(runtimeVersion.majorVersion, runtimeVersion.minorVersion)
}
def isSparkVersionAtMost(targetVersionString: String): Boolean = {
SemanticVersion(SPARK_VERSION).isVersionAtMost(targetVersionString)
}
def isSparkVersionAtLeast(targetVersionString: String): Boolean = {
SemanticVersion(SPARK_VERSION).isVersionAtLeast(targetVersionString)
}
def isSparkVersionEqualTo(targetVersionString: String): Boolean = {
SemanticVersion(SPARK_VERSION).isVersionEqualTo(targetVersionString)
}
lazy val SPARK_RUNTIME_VERSION: SemanticVersion = SemanticVersion(SPARK_VERSION)
def currentUser: String = UserGroupInformation.getCurrentUser.getShortUserName

View File

@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.kyuubi.plugin.lineage.Lineage
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION
import org.apache.kyuubi.util.reflect.ReflectUtils._
trait LineageParser {
@ -194,7 +194,7 @@ trait LineageParser {
extractColumnsLineage(commandPlan, parentColumnsLineage)
case p if p.nodeName == "AlterViewAsCommand" =>
val query =
if (isSparkVersionAtMost("3.1")) {
if (SPARK_RUNTIME_VERSION <= "3.1") {
sparkSession.sessionState.analyzer.execute(getQuery(plan))
} else {
getQuery(plan)
@ -211,7 +211,7 @@ trait LineageParser {
val outputCols =
getField[Seq[(String, Option[String])]](plan, "userSpecifiedColumns").map(_._1)
val query =
if (isSparkVersionAtMost("3.1")) {
if (SPARK_RUNTIME_VERSION <= "3.1") {
sparkSession.sessionState.analyzer.execute(getField[LogicalPlan](plan, "child"))
} else {
getField[LogicalPlan](plan, "plan")
@ -240,7 +240,7 @@ trait LineageParser {
if p.nodeName == "CreateTableAsSelect" ||
p.nodeName == "ReplaceTableAsSelect" =>
val (table, namespace, catalog) =
if (isSparkVersionAtMost("3.2")) {
if (SPARK_RUNTIME_VERSION <= "3.2") {
(
getField[Identifier](plan, "tableName").name,
getField[Identifier](plan, "tableName").namespace.mkString("."),

View File

@ -34,11 +34,11 @@ import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.plugin.lineage.Lineage
import org.apache.kyuubi.plugin.lineage.dispatcher.atlas.AtlasEntityHelper.{COLUMN_LINEAGE_TYPE, PROCESS_TYPE}
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION
class AtlasLineageDispatcherSuite extends KyuubiFunSuite with SparkListenerExtensionTest {
val catalogName =
if (isSparkVersionAtMost("3.1")) "org.apache.spark.sql.connector.InMemoryTableCatalog"
if (SPARK_RUNTIME_VERSION <= "3.1") "org.apache.spark.sql.connector.InMemoryTableCatalog"
else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
override protected val catalogImpl: String = "hive"

View File

@ -30,12 +30,12 @@ import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.plugin.lineage.Lineage
import org.apache.kyuubi.plugin.lineage.dispatcher.{OperationLineageKyuubiEvent, OperationLineageSparkEvent}
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION
class OperationLineageEventSuite extends KyuubiFunSuite with SparkListenerExtensionTest {
val catalogName =
if (isSparkVersionAtMost("3.1")) "org.apache.spark.sql.connector.InMemoryTableCatalog"
if (SPARK_RUNTIME_VERSION <= "3.1") "org.apache.spark.sql.connector.InMemoryTableCatalog"
else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
override protected val catalogImpl: String = "hive"

View File

@ -29,13 +29,13 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.plugin.lineage.Lineage
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.isSparkVersionAtMost
import org.apache.kyuubi.plugin.lineage.helper.SparkListenerHelper.SPARK_RUNTIME_VERSION
class SparkSQLLineageParserHelperSuite extends KyuubiFunSuite
with SparkListenerExtensionTest {
val catalogName =
if (isSparkVersionAtMost("3.1")) "org.apache.spark.sql.connector.InMemoryTableCatalog"
if (SPARK_RUNTIME_VERSION <= "3.1") "org.apache.spark.sql.connector.InMemoryTableCatalog"
else "org.apache.spark.sql.connector.catalog.InMemoryTableCatalog"
override protected val catalogImpl: String = "hive"

View File

@ -48,12 +48,13 @@ object FlinkEngineUtils extends Logging {
val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new Options)
val SUPPORTED_FLINK_VERSIONS: Array[SemanticVersion] =
Array("1.16", "1.17").map(SemanticVersion.apply)
private def SUPPORTED_FLINK_VERSIONS = Set("1.16", "1.17").map(SemanticVersion.apply)
val FLINK_RUNTIME_VERSION: SemanticVersion = SemanticVersion(EnvironmentInformation.getVersion)
def checkFlinkVersion(): Unit = {
val flinkVersion = EnvironmentInformation.getVersion
if (SUPPORTED_FLINK_VERSIONS.contains(SemanticVersion(flinkVersion))) {
if (SUPPORTED_FLINK_VERSIONS.contains(FLINK_RUNTIME_VERSION)) {
info(s"The current Flink version is $flinkVersion")
} else {
throw new UnsupportedOperationException(
@ -62,15 +63,6 @@ object FlinkEngineUtils extends Logging {
}
}
def isFlinkVersionAtMost(targetVersionString: String): Boolean =
SemanticVersion(EnvironmentInformation.getVersion).isVersionAtMost(targetVersionString)
def isFlinkVersionAtLeast(targetVersionString: String): Boolean =
SemanticVersion(EnvironmentInformation.getVersion).isVersionAtLeast(targetVersionString)
def isFlinkVersionEqualTo(targetVersionString: String): Boolean =
SemanticVersion(EnvironmentInformation.getVersion).isVersionEqualTo(targetVersionString)
/**
* Copied and modified from [[org.apache.flink.table.client.cli.CliOptionsParser]]
* to avoid loading flink-python classes which we doesn't support yet.
@ -116,7 +108,7 @@ object FlinkEngineUtils extends Logging {
val libDirs: JList[URL] = Option(checkUrls(line, CliOptionsParser.OPTION_LIBRARY))
.getOrElse(JCollections.emptyList())
val dependencies: JList[URL] = discoverDependencies(jars, libDirs)
if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
if (FLINK_RUNTIME_VERSION === "1.16") {
val commandLines: JList[CustomCommandLine] =
Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava
DynConstructors.builder()
@ -127,7 +119,7 @@ object FlinkEngineUtils extends Logging {
.build()
.newInstance(flinkConf, commandLines)
.asInstanceOf[DefaultContext]
} else if (FlinkEngineUtils.isFlinkVersionEqualTo("1.17")) {
} else if (FLINK_RUNTIME_VERSION === "1.17") {
invokeAs[DefaultContext](
classOf[DefaultContext],
"load",
@ -144,7 +136,7 @@ object FlinkEngineUtils extends Logging {
def getSessionContext(session: Session): SessionContext = getField(session, "sessionContext")
def getResultJobId(resultFetch: ResultFetcher): Option[JobID] = {
if (FlinkEngineUtils.isFlinkVersionAtMost("1.16")) {
if (FLINK_RUNTIME_VERSION <= "1.16") {
return None
}
try {

View File

@ -21,14 +21,14 @@ import org.apache.flink.table.gateway.api.session.{SessionEnvironment, SessionHa
import org.apache.flink.table.gateway.service.context.DefaultContext
import org.apache.flink.table.gateway.service.session.Session
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
import org.apache.kyuubi.util.reflect._
import org.apache.kyuubi.util.reflect.ReflectUtils._
class FlinkSessionManager(engineContext: DefaultContext) {
val sessionManager: AnyRef = {
if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
if (FLINK_RUNTIME_VERSION === "1.16") {
DynConstructors.builder().impl(
"org.apache.flink.table.gateway.service.session.SessionManager",
classOf[DefaultContext])

View File

@ -27,7 +27,7 @@ import org.apache.flink.table.gateway.service.context.SessionContext
import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_NAME, KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
import org.apache.kyuubi.util.reflect.DynMethods
object KDFRegistry {
@ -37,7 +37,7 @@ object KDFRegistry {
val kyuubiDefinedFunctions = new ArrayBuffer[KyuubiDefinedFunction]
val flinkConfigMap: util.Map[String, String] = {
if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
if (FLINK_RUNTIME_VERSION === "1.16") {
DynMethods
.builder("getConfigMap")
.impl(classOf[SessionContext])

View File

@ -30,7 +30,8 @@ import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.flink.{FlinkEngineUtils, WithFlinkTestResources}
import org.apache.kyuubi.engine.flink.FlinkEngineUtils.FLINK_RUNTIME_VERSION
import org.apache.kyuubi.engine.flink.WithFlinkTestResources
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
@ -635,7 +636,7 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
}
test("execute statement - show/stop jobs") {
if (FlinkEngineUtils.isFlinkVersionAtLeast("1.17")) {
if (FLINK_RUNTIME_VERSION >= "1.17") {
withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "10"))(Map.empty) {
withMultipleConnectionJdbcStatement()({ statement =>
statement.executeQuery(
@ -1055,7 +1056,7 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
val jobId = resultSet.getString(1)
assert(jobId.length == 32)
if (FlinkEngineUtils.isFlinkVersionAtLeast("1.17")) {
if (FLINK_RUNTIME_VERSION >= "1.17") {
val stopResult = statement.executeQuery(s"stop job '$jobId'")
assert(stopResult.next())
assert(stopResult.getString(1) === "OK")
@ -1244,7 +1245,7 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
stmt.executeQuery("insert into tbl_a values (1)")
val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
// Flink 1.16 doesn't support query id via ResultFetcher
if (FlinkEngineUtils.isFlinkVersionAtLeast("1.17")) {
if (FLINK_RUNTIME_VERSION >= "1.17") {
assert(queryId !== null)
// parse the string to check if it's valid Flink job id
assert(JobID.fromHexString(queryId) !== null)

View File

@ -21,7 +21,7 @@ import java.time.{Instant, LocalDateTime, ZoneId}
import scala.annotation.meta.getter
import org.apache.spark.SparkContext
import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.kvstore.KVIndex
@ -97,5 +97,5 @@ object KyuubiSparkUtil extends Logging {
// Given that we are on the Spark SQL engine side, the [[org.apache.spark.SPARK_VERSION]] can be
// represented as the runtime version of the Spark SQL engine.
lazy val SPARK_ENGINE_RUNTIME_VERSION = SemanticVersion(org.apache.spark.SPARK_VERSION)
lazy val SPARK_ENGINE_RUNTIME_VERSION: SemanticVersion = SemanticVersion(SPARK_VERSION)
}

View File

@ -258,7 +258,7 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
}
test("result offset support") {
assume(SPARK_ENGINE_RUNTIME_VERSION > "3.3")
assume(SPARK_ENGINE_RUNTIME_VERSION >= "3.4")
var numStages = 0
val listener = new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {

View File

@ -24,9 +24,8 @@ import org.apache.spark.sql.internal.SQLConf.ANSI_ENABLED
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.{WithDiscoverySparkSQLEngine, WithEmbeddedZookeeper}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
import org.apache.kyuubi.engine.spark.WithDiscoverySparkSQLEngine
import org.apache.kyuubi.engine.spark.WithEmbeddedZookeeper
import org.apache.kyuubi.service.ServiceState
abstract class SparkSQLEngineDeregisterSuite
@ -61,13 +60,14 @@ abstract class SparkSQLEngineDeregisterSuite
class SparkSQLEngineDeregisterExceptionSuite extends SparkSQLEngineDeregisterSuite {
override def withKyuubiConf: Map[String, String] = {
super.withKyuubiConf ++ Map(ENGINE_DEREGISTER_EXCEPTION_CLASSES.key -> {
if (SPARK_ENGINE_RUNTIME_VERSION > "3.2") {
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") {
// see https://issues.apache.org/jira/browse/SPARK-35958
"org.apache.spark.SparkArithmeticException"
} else {
classOf[ArithmeticException].getCanonicalName
}
})
}
}
@ -94,7 +94,7 @@ class SparkSQLEngineDeregisterExceptionTTLSuite
zookeeperConf ++ Map(
ANSI_ENABLED.key -> "true",
ENGINE_DEREGISTER_EXCEPTION_CLASSES.key -> {
if (SPARK_ENGINE_RUNTIME_VERSION > "3.2") {
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") {
// see https://issues.apache.org/jira/browse/SPARK-35958
"org.apache.spark.SparkArithmeticException"
} else {

View File

@ -126,10 +126,7 @@ object PlainSASLServer {
}
}
final private val version: Double = {
val runtimeVersion = SemanticVersion(KYUUBI_VERSION)
runtimeVersion.majorVersion + runtimeVersion.minorVersion.toDouble / 10
}
final private val version = SemanticVersion(KYUUBI_VERSION).toDouble
class SaslPlainProvider
extends Provider("KyuubiSaslPlain", version, "Kyuubi Plain SASL provider") {

View File

@ -245,7 +245,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper with SparkVersionUtil {
assert(resultSet.next())
val result = resultSet.getString("col")
val metaData = resultSet.getMetaData
if (SPARK_ENGINE_RUNTIME_VERSION < "3.2") {
if (SPARK_ENGINE_RUNTIME_VERSION <= "3.1") {
// for spark 3.1 and backwards
assert(result === kv._2._2)
assert(metaData.getPrecision(1) === Int.MaxValue)
@ -276,7 +276,7 @@ trait SparkDataTypeTests extends HiveJDBCTestHelper with SparkVersionUtil {
assert(resultSet.next())
val result = resultSet.getString("col")
val metaData = resultSet.getMetaData
if (SPARK_ENGINE_RUNTIME_VERSION < "3.2") {
if (SPARK_ENGINE_RUNTIME_VERSION <= "3.1") {
// for spark 3.1 and backwards
assert(result === kv._2._2)
assert(metaData.getPrecision(1) === Int.MaxValue)

View File

@ -62,10 +62,6 @@ class PlainSASLHelperSuite extends KyuubiFunSuite {
val saslPlainProvider = new SaslPlainProvider()
assert(saslPlainProvider.containsKey("SaslServerFactory.PLAIN"))
assert(saslPlainProvider.getName === "KyuubiSaslPlain")
val version: Double = {
val ver = SemanticVersion(KYUUBI_VERSION)
ver.majorVersion + ver.minorVersion.toDouble / 10
}
assert(saslPlainProvider.getVersion === version)
assertResult(saslPlainProvider.getVersion)(SemanticVersion(KYUUBI_VERSION).toDouble)
}
}

View File

@ -22,9 +22,7 @@ import org.apache.kyuubi.operation.HiveJDBCTestHelper
trait SparkVersionUtil {
this: HiveJDBCTestHelper =>
protected lazy val SPARK_ENGINE_RUNTIME_VERSION = sparkEngineMajorMinorVersion
def sparkEngineMajorMinorVersion: SemanticVersion = {
protected lazy val SPARK_ENGINE_RUNTIME_VERSION: SemanticVersion = {
var sparkRuntimeVer = ""
withJdbcStatement() { stmt =>
val result = stmt.executeQuery("SELECT version()")

View File

@ -20,7 +20,10 @@ package org.apache.kyuubi.util
/**
* Encapsulate a component version for the convenience of version checks.
*/
case class SemanticVersion(majorVersion: Int, minorVersion: Int) {
case class SemanticVersion(majorVersion: Int, minorVersion: Int)
extends Comparable[SemanticVersion] {
def ===(targetVersionString: String): Boolean = isVersionEqualTo(targetVersionString)
def <=(targetVersionString: String): Boolean = isVersionAtMost(targetVersionString)
@ -30,49 +33,46 @@ case class SemanticVersion(majorVersion: Int, minorVersion: Int) {
def <(targetVersionString: String): Boolean = !isVersionAtLeast(targetVersionString)
def isVersionAtMost(targetVersionString: String): Boolean = {
this.compareVersion(
targetVersionString,
(targetMajor: Int, targetMinor: Int, runtimeMajor: Int, runtimeMinor: Int) =>
(runtimeMajor < targetMajor) || {
runtimeMajor == targetMajor && runtimeMinor <= targetMinor
})
}
def isVersionAtMost(targetVersionString: String): Boolean =
compareTo(SemanticVersion(targetVersionString)) <= 0
def isVersionAtLeast(targetVersionString: String): Boolean = {
this.compareVersion(
targetVersionString,
(targetMajor: Int, targetMinor: Int, runtimeMajor: Int, runtimeMinor: Int) =>
(runtimeMajor > targetMajor) || {
runtimeMajor == targetMajor && runtimeMinor >= targetMinor
})
}
def isVersionAtLeast(targetVersionString: String): Boolean =
compareTo(SemanticVersion(targetVersionString)) >= 0
def isVersionEqualTo(targetVersionString: String): Boolean = {
this.compareVersion(
targetVersionString,
(targetMajor: Int, targetMinor: Int, runtimeMajor: Int, runtimeMinor: Int) =>
runtimeMajor == targetMajor && runtimeMinor == targetMinor)
}
def isVersionEqualTo(targetVersionString: String): Boolean =
compareTo(SemanticVersion(targetVersionString)) == 0
def compareVersion(
targetVersionString: String,
callback: (Int, Int, Int, Int) => Boolean): Boolean = {
val targetVersion = SemanticVersion(targetVersionString)
val targetMajor = targetVersion.majorVersion
val targetMinor = targetVersion.minorVersion
callback(targetMajor, targetMinor, this.majorVersion, this.minorVersion)
override def compareTo(v: SemanticVersion): Int = {
if (majorVersion > v.majorVersion) {
1
} else if (majorVersion < v.majorVersion) {
-1
} else {
minorVersion - v.minorVersion
}
}
override def toString: String = s"$majorVersion.$minorVersion"
/**
* Returning a double in format of "majorVersion.minorVersion".
* Note: Not suitable for version comparison, only for logging.
* @return
*/
def toDouble: Double = toString.toDouble
}
object SemanticVersion {
private val semanticVersionRegex = """^(\d+)(?:\.(\d+))?(\..*)?$""".r
def apply(versionString: String): SemanticVersion = {
"""^(\d+)\.(\d+)(\..*)?$""".r.findFirstMatchIn(versionString) match {
semanticVersionRegex.findFirstMatchIn(versionString) match {
case Some(m) =>
SemanticVersion(m.group(1).toInt, m.group(2).toInt)
val major = m.group(1).toInt
val minor = Option(m.group(2)).getOrElse("0").toInt
SemanticVersion(major, minor)
case None =>
throw new IllegalArgumentException(s"Tried to parse '$versionString' as a project" +
s" version string, but it could not find the major and minor version numbers.")

View File

@ -41,6 +41,11 @@ class SemanticVersionSuite extends AnyFunSuite {
assert(version.minorVersion === 9)
}
test("reject parsing illegal formatted version") {
assertThrows[IllegalArgumentException](SemanticVersion("v1.0"))
assertThrows[IllegalArgumentException](SemanticVersion(".1.0"))
}
test("companion class compare version at most") {
assert(SemanticVersion("1.12").isVersionAtMost("2.8.8-SNAPSHOT"))
val runtimeVersion = SemanticVersion("1.12.4")
@ -73,4 +78,28 @@ class SemanticVersionSuite extends AnyFunSuite {
assert(!runtimeVersion.isVersionEqualTo("1.10.4"))
assert(!runtimeVersion.isVersionEqualTo("2.12.8"))
}
test("compare version to major version only") {
val versionFromMajorOnly = SemanticVersion("3")
assert(versionFromMajorOnly === "3.0")
assert(versionFromMajorOnly < "3.1")
assert(!(versionFromMajorOnly > "3.0"))
val runtimeVersion = SemanticVersion("2.3.4")
assert(runtimeVersion > "1")
assert(runtimeVersion > "2")
assert(runtimeVersion >= "2")
assert(!(runtimeVersion === "2"))
assert(runtimeVersion < "3")
assert(runtimeVersion <= "4")
}
test("semantic version to double") {
assertResult(1.0d)(SemanticVersion("1").toDouble)
assertResult(1.2d)(SemanticVersion("1.2").toDouble)
assertResult(1.2d)(SemanticVersion("1.2.3").toDouble)
assertResult(1.2d)(SemanticVersion("1.2.3-SNAPSHOT").toDouble)
assertResult(1.234d)(SemanticVersion("1.234").toDouble)
assertResult(1.234d)(SemanticVersion("1.234.567").toDouble)
}
}