[KYUUBI #2730] [WIP][KYUUBI #2238] Support Flink 1.15

### Why are the changes needed?

Flink 1.15.0 is out, Kyuubi needs to support it.

Please clarify why the changes are needed. For instance

1. new add flink 1.15 support
2. new add `flink-1.14` and `flink-1.15` maven profiles to support both Flink 1.14 & 1.15

flink 1.14  all functions and codes are kept unchanged without any impact

flink 1.15 support part of the work is still in progress, At present, 32 test cases have passed and 5 have failed, You can review, put forward suggestions for improvement, I will continue to improve.

### how to build with flink 1.14

```
mvn clean install -DskipTests -Pflink-1.14
```

### how to build with flink 1.15

```
mvn clean install -DskipTests -Pflink-1.15
```

### how to run test cases with flink 1.14

enable `flink-1.14` maven profile  and run the test cases

### how to run test cases with flink 1.15

enable `flink-1.15` maven profile  and run the test cases

### _How was this patch tested?_

- [x]  Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2730 from wolfboys/master.

Closes #2730

Closes #2238

ba9716f8 [benjobs] Improve check flink version
0616e74d [benjobs] [KYUUBI #2238] Support Flink 1.15

Authored-by: benjobs <benjobs@qq.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
benjobs 2022-05-25 11:31:19 +08:00 committed by Cheng Pan
parent 81c48b0c61
commit c84ea87c81
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
6 changed files with 151 additions and 31 deletions

View File

@ -61,19 +61,19 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java${flink.module.scala.suffix}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<artifactId>flink-clients${flink.module.scala.suffix}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client_${scala.binary.version}</artifactId>
<artifactId>flink-sql-client${flink.module.scala.suffix}</artifactId>
<scope>provided</scope>
</dependency>
@ -91,7 +91,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<artifactId>flink-table-api-java-bridge${flink.module.scala.suffix}</artifactId>
<scope>provided</scope>
</dependency>
@ -103,7 +103,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-table-runtime${flink.module.scala.suffix}</artifactId>
<scope>provided</scope>
</dependency>
@ -137,7 +137,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils${flink.module.scala.suffix}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

View File

@ -31,7 +31,7 @@ import org.apache.flink.table.client.cli.CliOptionsParser._
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.kyuubi.Logging
import org.apache.kyuubi.{Logging, Utils}
object FlinkEngineUtils extends Logging {
@ -40,8 +40,13 @@ object FlinkEngineUtils extends Logging {
def checkFlinkVersion(): Unit = {
val flinkVersion = EnvironmentInformation.getVersion
if (!flinkVersion.startsWith("1.14")) {
throw new RuntimeException("Only Flink-1.14.x is supported now!")
Utils.majorMinorVersion(flinkVersion) match {
case (1, 14 | 15) =>
logger.info(s"The current Flink version is $flinkVersion")
case _ =>
throw new UnsupportedOperationException(
s"The current Flink version is $flinkVersion, " +
s"Only Flink 1.14.x and 1.15 are supported, not supported in other versions")
}
}

View File

@ -17,6 +17,8 @@
package org.apache.kyuubi.engine.flink.operation
import java.time.LocalDate
import java.util
import java.util.concurrent.RejectedExecutionException
import scala.collection.JavaConverters._
@ -26,15 +28,21 @@ import com.google.common.annotations.VisibleForTesting
import org.apache.calcite.rel.metadata.{DefaultRelMetadataProvider, JaninoRelMetadataProvider, RelMetadataQueryBase}
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.client.gateway.{Executor, TypedResult}
import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData}
import org.apache.flink.table.operations.{Operation, QueryOperation}
import org.apache.flink.table.operations.command._
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.logical._
import org.apache.flink.types.Row
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.engine.flink.schema.RowSet.toHiveString
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.RowSetUtils
class ExecuteStatement(
session: Session,
@ -127,6 +135,7 @@ class ExecuteStatement(
var resultId: String = null
try {
val resultDescriptor = executor.executeQuery(sessionId, operation)
val dataTypes = resultDescriptor.getResultSchema.getColumnDataTypes.asScala.toList
resultId = resultDescriptor.getResultId
@ -142,7 +151,19 @@ class ExecuteStatement(
case TypedResult.ResultType.PAYLOAD =>
(1 to result.getPayload).foreach { page =>
if (rows.size < resultMaxRows) {
rows ++= executor.retrieveResultPage(resultId, page).asScala
// FLINK-24461 retrieveResultPage method changes the return type from Row to RowData
val result = executor.retrieveResultPage(resultId, page).asScala.toList
result.headOption match {
case None =>
case Some(r) =>
// for flink 1.14
if (r.getClass == classOf[Row]) {
rows ++= result.asInstanceOf[List[Row]]
} else {
// for flink 1.15+
rows ++= result.map(r => convertToRow(r.asInstanceOf[RowData], dataTypes))
}
}
} else {
loop = false
}
@ -178,4 +199,88 @@ class ExecuteStatement(
warn(s"Failed to clean result set $resultId in session $sessionId", t)
}
}
private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row = {
val row = Row.withPositions(r.getRowKind, r.getArity)
for (i <- 0 until r.getArity) {
val dataType = dataTypes(i)
dataType.getLogicalType match {
case arrayType: ArrayType =>
val arrayData = r.getArray(i)
if (arrayData == null) {
row.setField(i, null)
}
arrayData match {
case d: GenericArrayData =>
row.setField(i, d.toObjectArray)
case d: BinaryArrayData =>
row.setField(i, d.toObjectArray(arrayType.getElementType))
case _ =>
}
case _: BinaryType =>
row.setField(i, r.getBinary(i))
case _: BigIntType =>
row.setField(i, r.getLong(i))
case _: BooleanType =>
row.setField(i, r.getBoolean(i))
case _: VarCharType | _: CharType =>
row.setField(i, r.getString(i))
case t: DecimalType =>
row.setField(i, r.getDecimal(i, t.getPrecision, t.getScale).toBigDecimal)
case _: DateType =>
val date = RowSetUtils.formatLocalDate(LocalDate.ofEpochDay(r.getInt(i)))
row.setField(i, date)
case t: TimestampType =>
val ts = RowSetUtils
.formatLocalDateTime(r.getTimestamp(i, t.getPrecision)
.toLocalDateTime)
row.setField(i, ts)
case _: TinyIntType =>
row.setField(i, r.getByte(i))
case _: SmallIntType =>
row.setField(i, r.getShort(i))
case _: IntType =>
row.setField(i, r.getInt(i))
case _: FloatType =>
row.setField(i, r.getFloat(i))
case mapType: MapType =>
val mapData = r.getMap(i)
if (mapData != null && mapData.size > 0) {
val keyType = mapType.getKeyType
val valueType = mapType.getValueType
mapData match {
case d: BinaryMapData =>
val kvArray = toArray(keyType, valueType, d)
val map: util.Map[Any, Any] = new util.HashMap[Any, Any]
for (i <- 0 until kvArray._1.length) {
val value: Any = kvArray._2(i)
map.put(kvArray._1(i), value)
}
row.setField(i, map)
case d: GenericMapData => // TODO
}
} else {
row.setField(i, null)
}
case _: DoubleType =>
row.setField(i, r.getDouble(i))
case t: RowType =>
val v = r.getRow(i, t.getFieldCount)
row.setField(i, v)
case t =>
val hiveString = toHiveString((row.getField(i), t))
row.setField(i, hiveString)
}
}
row
}
private[this] def toArray(
keyType: LogicalType,
valueType: LogicalType,
arrayData: BinaryMapData): (Array[_], Array[_]) = {
arrayData.keyArray().toObjectArray(keyType) -> arrayData.valueArray().toObjectArray(valueType)
}
}

View File

@ -21,8 +21,6 @@ import java.nio.file.Files
import java.sql.DatabaseMetaData
import java.util.UUID
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE
import org.apache.flink.table.types.logical.LogicalTypeRoot
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TOpenSessionReq}
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@ -349,16 +347,18 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
var resultSet = metaData.getSchemas(null, null)
val defaultCatalog = "default_catalog"
val defaultDatabase = "default_database"
while (resultSet.next()) {
assert(resultSet.getString(TABLE_SCHEM) === DEFAULT_BUILTIN_DATABASE)
assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG)
assert(resultSet.getString(TABLE_SCHEM) === defaultDatabase)
assert(resultSet.getString(TABLE_CATALOG) === defaultCatalog)
}
resultSet = metaData.getSchemas(
DEFAULT_BUILTIN_CATALOG.split("_").apply(0),
DEFAULT_BUILTIN_DATABASE.split("_").apply(0))
defaultCatalog.split("_").apply(0),
defaultDatabase.split("_").apply(0))
while (resultSet.next()) {
assert(resultSet.getString(TABLE_SCHEM) === DEFAULT_BUILTIN_DATABASE)
assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG)
assert(resultSet.getString(TABLE_SCHEM) === defaultDatabase)
assert(resultSet.getString(TABLE_CATALOG) === defaultCatalog)
}
}
}

View File

@ -77,7 +77,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-table-runtime${flink.module.scala.suffix}</artifactId>
<scope>test</scope>
</dependency>

34
pom.xml
View File

@ -111,6 +111,7 @@
<delta.version>1.2.1</delta.version>
<fb303.version>0.9.3</fb303.version>
<flink.version>1.14.4</flink.version>
<flink.module.scala.suffix>_${scala.binary.version}</flink.module.scala.suffix>
<flink.archive.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.archive.name>
<flink.archive.mirror>${apache.archive.dist}/flink/flink-${flink.version}</flink.archive.mirror>
<flink.archive.download.skip>false</flink.archive.download.skip>
@ -1295,13 +1296,13 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java${flink.module.scala.suffix}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<artifactId>flink-clients${flink.module.scala.suffix}</artifactId>
<version>${flink.version}</version>
</dependency>
@ -1319,7 +1320,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<artifactId>flink-table-api-java-bridge${flink.module.scala.suffix}</artifactId>
<version>${flink.version}</version>
</dependency>
@ -1331,7 +1332,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-table-runtime${flink.module.scala.suffix}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
@ -1344,19 +1345,13 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_${scala.binary.version}</artifactId>
<artifactId>flink-sql-client${flink.module.scala.suffix}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils${flink.module.scala.suffix}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
@ -2027,6 +2022,21 @@
</properties>
</profile>
<profile>
<id>flink-1.14</id>
<properties>
<flink.version>1.14.4</flink.version>
<flink.module.scala.suffix>_${scala.binary.version}</flink.module.scala.suffix>
</properties>
</profile>
<profile>
<id>flink-1.15</id>
<properties>
<flink.version>1.15.0</flink.version>
<flink.module.scala.suffix></flink.module.scala.suffix>
</properties>
</profile>
<profile>
<id>spark-provided</id>
<properties>