From c84ea87c8163173edefc3b086d553a2ffb4ed6ab Mon Sep 17 00:00:00 2001 From: benjobs Date: Wed, 25 May 2022 11:31:19 +0800 Subject: [PATCH] [KYUUBI #2730] [WIP][KYUUBI #2238] Support Flink 1.15 ### Why are the changes needed? Flink 1.15.0 is out, Kyuubi needs to support it. Please clarify why the changes are needed. For instance 1. new add flink 1.15 support 2. new add `flink-1.14` and `flink-1.15` maven profiles to support both Flink 1.14 & 1.15 flink 1.14 all functions and codes are kept unchanged without any impact flink 1.15 support part of the work is still in progress, At present, 32 test cases have passed and 5 have failed, You can review, put forward suggestions for improvement, I will continue to improve. ### how to build with flink 1.14 ``` mvn clean install -DskipTests -Pflink-1.14 ``` ### how to build with flink 1.15 ``` mvn clean install -DskipTests -Pflink-1.15 ``` ### how to run test cases with flink 1.14 enable `flink-1.14` maven profile and run the test cases ### how to run test cases with flink 1.15 enable `flink-1.15` maven profile and run the test cases ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2730 from wolfboys/master. Closes #2730 Closes #2238 ba9716f8 [benjobs] Improve check flink version 0616e74d [benjobs] [KYUUBI #2238] Support Flink 1.15 Authored-by: benjobs Signed-off-by: Cheng Pan --- externals/kyuubi-flink-sql-engine/pom.xml | 12 +- .../engine/flink/FlinkEngineUtils.scala | 11 +- .../flink/operation/ExecuteStatement.scala | 107 +++++++++++++++++- .../flink/operation/FlinkOperationSuite.scala | 16 +-- integration-tests/kyuubi-flink-it/pom.xml | 2 +- pom.xml | 34 ++++-- 6 files changed, 151 insertions(+), 31 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/pom.xml b/externals/kyuubi-flink-sql-engine/pom.xml index d406027d7..39e38acad 100644 --- a/externals/kyuubi-flink-sql-engine/pom.xml +++ b/externals/kyuubi-flink-sql-engine/pom.xml @@ -61,19 +61,19 @@ org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java${flink.module.scala.suffix} provided org.apache.flink - flink-clients_${scala.binary.version} + flink-clients${flink.module.scala.suffix} provided org.apache.flink - flink-sql-client_${scala.binary.version} + flink-sql-client${flink.module.scala.suffix} provided @@ -91,7 +91,7 @@ org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} + flink-table-api-java-bridge${flink.module.scala.suffix} provided @@ -103,7 +103,7 @@ org.apache.flink - flink-table-runtime_${scala.binary.version} + flink-table-runtime${flink.module.scala.suffix} provided @@ -137,7 +137,7 @@ org.apache.flink - flink-test-utils_${scala.binary.version} + flink-test-utils${flink.module.scala.suffix} test diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala index 561a4c245..c12b221f0 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala @@ -31,7 +31,7 @@ import org.apache.flink.table.client.cli.CliOptionsParser._ import org.apache.flink.table.client.gateway.context.SessionContext import org.apache.flink.table.client.gateway.local.LocalExecutor -import org.apache.kyuubi.Logging +import org.apache.kyuubi.{Logging, Utils} object FlinkEngineUtils extends Logging { @@ -40,8 +40,13 @@ object FlinkEngineUtils extends Logging { def checkFlinkVersion(): Unit = { val flinkVersion = EnvironmentInformation.getVersion - if (!flinkVersion.startsWith("1.14")) { - throw new RuntimeException("Only Flink-1.14.x is supported now!") + Utils.majorMinorVersion(flinkVersion) match { + case (1, 14 | 15) => + logger.info(s"The current Flink version is $flinkVersion") + case _ => + throw new UnsupportedOperationException( + s"The current Flink version is $flinkVersion, " + + s"Only Flink 1.14.x and 1.15 are supported, not supported in other versions") } } diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala index 98e4a4a32..681400e4b 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala @@ -17,6 +17,8 @@ package org.apache.kyuubi.engine.flink.operation +import java.time.LocalDate +import java.util import java.util.concurrent.RejectedExecutionException import scala.collection.JavaConverters._ @@ -26,15 +28,21 @@ import com.google.common.annotations.VisibleForTesting import org.apache.calcite.rel.metadata.{DefaultRelMetadataProvider, JaninoRelMetadataProvider, RelMetadataQueryBase} import org.apache.flink.table.api.ResultKind import org.apache.flink.table.client.gateway.{Executor, TypedResult} +import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData} +import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData} import org.apache.flink.table.operations.{Operation, QueryOperation} import org.apache.flink.table.operations.command._ +import org.apache.flink.table.types.DataType +import org.apache.flink.table.types.logical._ import org.apache.flink.types.Row import org.apache.kyuubi.{KyuubiSQLException, Logging} import org.apache.kyuubi.engine.flink.result.ResultSet +import org.apache.kyuubi.engine.flink.schema.RowSet.toHiveString import org.apache.kyuubi.operation.{OperationState, OperationType} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session +import org.apache.kyuubi.util.RowSetUtils class ExecuteStatement( session: Session, @@ -127,6 +135,7 @@ class ExecuteStatement( var resultId: String = null try { val resultDescriptor = executor.executeQuery(sessionId, operation) + val dataTypes = resultDescriptor.getResultSchema.getColumnDataTypes.asScala.toList resultId = resultDescriptor.getResultId @@ -142,7 +151,19 @@ class ExecuteStatement( case TypedResult.ResultType.PAYLOAD => (1 to result.getPayload).foreach { page => if (rows.size < resultMaxRows) { - rows ++= executor.retrieveResultPage(resultId, page).asScala + // FLINK-24461 retrieveResultPage method changes the return type from Row to RowData + val result = executor.retrieveResultPage(resultId, page).asScala.toList + result.headOption match { + case None => + case Some(r) => + // for flink 1.14 + if (r.getClass == classOf[Row]) { + rows ++= result.asInstanceOf[List[Row]] + } else { + // for flink 1.15+ + rows ++= result.map(r => convertToRow(r.asInstanceOf[RowData], dataTypes)) + } + } } else { loop = false } @@ -178,4 +199,88 @@ class ExecuteStatement( warn(s"Failed to clean result set $resultId in session $sessionId", t) } } + + private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row = { + val row = Row.withPositions(r.getRowKind, r.getArity) + for (i <- 0 until r.getArity) { + val dataType = dataTypes(i) + dataType.getLogicalType match { + case arrayType: ArrayType => + val arrayData = r.getArray(i) + if (arrayData == null) { + row.setField(i, null) + } + arrayData match { + case d: GenericArrayData => + row.setField(i, d.toObjectArray) + case d: BinaryArrayData => + row.setField(i, d.toObjectArray(arrayType.getElementType)) + case _ => + } + case _: BinaryType => + row.setField(i, r.getBinary(i)) + case _: BigIntType => + row.setField(i, r.getLong(i)) + case _: BooleanType => + row.setField(i, r.getBoolean(i)) + case _: VarCharType | _: CharType => + row.setField(i, r.getString(i)) + case t: DecimalType => + row.setField(i, r.getDecimal(i, t.getPrecision, t.getScale).toBigDecimal) + case _: DateType => + val date = RowSetUtils.formatLocalDate(LocalDate.ofEpochDay(r.getInt(i))) + row.setField(i, date) + case t: TimestampType => + val ts = RowSetUtils + .formatLocalDateTime(r.getTimestamp(i, t.getPrecision) + .toLocalDateTime) + row.setField(i, ts) + case _: TinyIntType => + row.setField(i, r.getByte(i)) + case _: SmallIntType => + row.setField(i, r.getShort(i)) + case _: IntType => + row.setField(i, r.getInt(i)) + case _: FloatType => + row.setField(i, r.getFloat(i)) + case mapType: MapType => + val mapData = r.getMap(i) + if (mapData != null && mapData.size > 0) { + val keyType = mapType.getKeyType + val valueType = mapType.getValueType + mapData match { + case d: BinaryMapData => + val kvArray = toArray(keyType, valueType, d) + val map: util.Map[Any, Any] = new util.HashMap[Any, Any] + for (i <- 0 until kvArray._1.length) { + val value: Any = kvArray._2(i) + map.put(kvArray._1(i), value) + } + row.setField(i, map) + case d: GenericMapData => // TODO + } + } else { + row.setField(i, null) + } + case _: DoubleType => + row.setField(i, r.getDouble(i)) + case t: RowType => + val v = r.getRow(i, t.getFieldCount) + row.setField(i, v) + case t => + val hiveString = toHiveString((row.getField(i), t)) + row.setField(i, hiveString) + } + } + row + } + + private[this] def toArray( + keyType: LogicalType, + valueType: LogicalType, + arrayData: BinaryMapData): (Array[_], Array[_]) = { + + arrayData.keyArray().toObjectArray(keyType) -> arrayData.valueArray().toObjectArray(valueType) + } + } diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala index 419982387..f96cc61bc 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala @@ -21,8 +21,6 @@ import java.nio.file.Files import java.sql.DatabaseMetaData import java.util.UUID -import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG -import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE import org.apache.flink.table.types.logical.LogicalTypeRoot import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TOpenSessionReq} import org.scalatest.concurrent.PatienceConfiguration.Timeout @@ -349,16 +347,18 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper { withJdbcStatement() { statement => val metaData = statement.getConnection.getMetaData var resultSet = metaData.getSchemas(null, null) + val defaultCatalog = "default_catalog" + val defaultDatabase = "default_database" while (resultSet.next()) { - assert(resultSet.getString(TABLE_SCHEM) === DEFAULT_BUILTIN_DATABASE) - assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG) + assert(resultSet.getString(TABLE_SCHEM) === defaultDatabase) + assert(resultSet.getString(TABLE_CATALOG) === defaultCatalog) } resultSet = metaData.getSchemas( - DEFAULT_BUILTIN_CATALOG.split("_").apply(0), - DEFAULT_BUILTIN_DATABASE.split("_").apply(0)) + defaultCatalog.split("_").apply(0), + defaultDatabase.split("_").apply(0)) while (resultSet.next()) { - assert(resultSet.getString(TABLE_SCHEM) === DEFAULT_BUILTIN_DATABASE) - assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG) + assert(resultSet.getString(TABLE_SCHEM) === defaultDatabase) + assert(resultSet.getString(TABLE_CATALOG) === defaultCatalog) } } } diff --git a/integration-tests/kyuubi-flink-it/pom.xml b/integration-tests/kyuubi-flink-it/pom.xml index 9530cb052..88c868b47 100644 --- a/integration-tests/kyuubi-flink-it/pom.xml +++ b/integration-tests/kyuubi-flink-it/pom.xml @@ -77,7 +77,7 @@ org.apache.flink - flink-table-runtime_${scala.binary.version} + flink-table-runtime${flink.module.scala.suffix} test diff --git a/pom.xml b/pom.xml index d79a80112..654f7edeb 100644 --- a/pom.xml +++ b/pom.xml @@ -111,6 +111,7 @@ 1.2.1 0.9.3 1.14.4 + _${scala.binary.version} flink-${flink.version}-bin-scala_${scala.binary.version}.tgz ${apache.archive.dist}/flink/flink-${flink.version} false @@ -1295,13 +1296,13 @@ org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java${flink.module.scala.suffix} ${flink.version} org.apache.flink - flink-clients_${scala.binary.version} + flink-clients${flink.module.scala.suffix} ${flink.version} @@ -1319,7 +1320,7 @@ org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} + flink-table-api-java-bridge${flink.module.scala.suffix} ${flink.version} @@ -1331,7 +1332,7 @@ org.apache.flink - flink-table-runtime_${scala.binary.version} + flink-table-runtime${flink.module.scala.suffix} ${flink.version} provided @@ -1344,19 +1345,13 @@ org.apache.flink - flink-yarn_${scala.binary.version} + flink-sql-client${flink.module.scala.suffix} ${flink.version} org.apache.flink - flink-sql-client_${scala.binary.version} - ${flink.version} - - - - org.apache.flink - flink-test-utils_${scala.binary.version} + flink-test-utils${flink.module.scala.suffix} ${flink.version} @@ -2027,6 +2022,21 @@ + + flink-1.14 + + 1.14.4 + _${scala.binary.version} + + + + flink-1.15 + + 1.15.0 + + + + spark-provided