diff --git a/externals/kyuubi-flink-sql-engine/pom.xml b/externals/kyuubi-flink-sql-engine/pom.xml
index d406027d7..39e38acad 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -61,19 +61,19 @@
org.apache.flink
- flink-streaming-java_${scala.binary.version}
+ flink-streaming-java${flink.module.scala.suffix}
provided
org.apache.flink
- flink-clients_${scala.binary.version}
+ flink-clients${flink.module.scala.suffix}
provided
org.apache.flink
- flink-sql-client_${scala.binary.version}
+ flink-sql-client${flink.module.scala.suffix}
provided
@@ -91,7 +91,7 @@
org.apache.flink
- flink-table-api-java-bridge_${scala.binary.version}
+ flink-table-api-java-bridge${flink.module.scala.suffix}
provided
@@ -103,7 +103,7 @@
org.apache.flink
- flink-table-runtime_${scala.binary.version}
+ flink-table-runtime${flink.module.scala.suffix}
provided
@@ -137,7 +137,7 @@
org.apache.flink
- flink-test-utils_${scala.binary.version}
+ flink-test-utils${flink.module.scala.suffix}
test
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
index 561a4c245..c12b221f0 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
@@ -31,7 +31,7 @@ import org.apache.flink.table.client.cli.CliOptionsParser._
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.flink.table.client.gateway.local.LocalExecutor
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{Logging, Utils}
object FlinkEngineUtils extends Logging {
@@ -40,8 +40,13 @@ object FlinkEngineUtils extends Logging {
def checkFlinkVersion(): Unit = {
val flinkVersion = EnvironmentInformation.getVersion
- if (!flinkVersion.startsWith("1.14")) {
- throw new RuntimeException("Only Flink-1.14.x is supported now!")
+ Utils.majorMinorVersion(flinkVersion) match {
+ case (1, 14 | 15) =>
+ logger.info(s"The current Flink version is $flinkVersion")
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"The current Flink version is $flinkVersion, " +
+ s"Only Flink 1.14.x and 1.15 are supported, not supported in other versions")
}
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 98e4a4a32..681400e4b 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.engine.flink.operation
+import java.time.LocalDate
+import java.util
import java.util.concurrent.RejectedExecutionException
import scala.collection.JavaConverters._
@@ -26,15 +28,21 @@ import com.google.common.annotations.VisibleForTesting
import org.apache.calcite.rel.metadata.{DefaultRelMetadataProvider, JaninoRelMetadataProvider, RelMetadataQueryBase}
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.client.gateway.{Executor, TypedResult}
+import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
+import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData}
import org.apache.flink.table.operations.{Operation, QueryOperation}
import org.apache.flink.table.operations.command._
+import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.logical._
import org.apache.flink.types.Row
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.engine.flink.schema.RowSet.toHiveString
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.RowSetUtils
class ExecuteStatement(
session: Session,
@@ -127,6 +135,7 @@ class ExecuteStatement(
var resultId: String = null
try {
val resultDescriptor = executor.executeQuery(sessionId, operation)
+ val dataTypes = resultDescriptor.getResultSchema.getColumnDataTypes.asScala.toList
resultId = resultDescriptor.getResultId
@@ -142,7 +151,19 @@ class ExecuteStatement(
case TypedResult.ResultType.PAYLOAD =>
(1 to result.getPayload).foreach { page =>
if (rows.size < resultMaxRows) {
- rows ++= executor.retrieveResultPage(resultId, page).asScala
+ // FLINK-24461 retrieveResultPage method changes the return type from Row to RowData
+ val result = executor.retrieveResultPage(resultId, page).asScala.toList
+ result.headOption match {
+ case None =>
+ case Some(r) =>
+ // for flink 1.14
+ if (r.getClass == classOf[Row]) {
+ rows ++= result.asInstanceOf[List[Row]]
+ } else {
+ // for flink 1.15+
+ rows ++= result.map(r => convertToRow(r.asInstanceOf[RowData], dataTypes))
+ }
+ }
} else {
loop = false
}
@@ -178,4 +199,88 @@ class ExecuteStatement(
warn(s"Failed to clean result set $resultId in session $sessionId", t)
}
}
+
+ private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row = {
+ val row = Row.withPositions(r.getRowKind, r.getArity)
+ for (i <- 0 until r.getArity) {
+ val dataType = dataTypes(i)
+ dataType.getLogicalType match {
+ case arrayType: ArrayType =>
+ val arrayData = r.getArray(i)
+ if (arrayData == null) {
+ row.setField(i, null)
+ }
+ arrayData match {
+ case d: GenericArrayData =>
+ row.setField(i, d.toObjectArray)
+ case d: BinaryArrayData =>
+ row.setField(i, d.toObjectArray(arrayType.getElementType))
+ case _ =>
+ }
+ case _: BinaryType =>
+ row.setField(i, r.getBinary(i))
+ case _: BigIntType =>
+ row.setField(i, r.getLong(i))
+ case _: BooleanType =>
+ row.setField(i, r.getBoolean(i))
+ case _: VarCharType | _: CharType =>
+ row.setField(i, r.getString(i))
+ case t: DecimalType =>
+ row.setField(i, r.getDecimal(i, t.getPrecision, t.getScale).toBigDecimal)
+ case _: DateType =>
+ val date = RowSetUtils.formatLocalDate(LocalDate.ofEpochDay(r.getInt(i)))
+ row.setField(i, date)
+ case t: TimestampType =>
+ val ts = RowSetUtils
+ .formatLocalDateTime(r.getTimestamp(i, t.getPrecision)
+ .toLocalDateTime)
+ row.setField(i, ts)
+ case _: TinyIntType =>
+ row.setField(i, r.getByte(i))
+ case _: SmallIntType =>
+ row.setField(i, r.getShort(i))
+ case _: IntType =>
+ row.setField(i, r.getInt(i))
+ case _: FloatType =>
+ row.setField(i, r.getFloat(i))
+ case mapType: MapType =>
+ val mapData = r.getMap(i)
+ if (mapData != null && mapData.size > 0) {
+ val keyType = mapType.getKeyType
+ val valueType = mapType.getValueType
+ mapData match {
+ case d: BinaryMapData =>
+ val kvArray = toArray(keyType, valueType, d)
+ val map: util.Map[Any, Any] = new util.HashMap[Any, Any]
+ for (i <- 0 until kvArray._1.length) {
+ val value: Any = kvArray._2(i)
+ map.put(kvArray._1(i), value)
+ }
+ row.setField(i, map)
+ case d: GenericMapData => // TODO
+ }
+ } else {
+ row.setField(i, null)
+ }
+ case _: DoubleType =>
+ row.setField(i, r.getDouble(i))
+ case t: RowType =>
+ val v = r.getRow(i, t.getFieldCount)
+ row.setField(i, v)
+ case t =>
+ val hiveString = toHiveString((row.getField(i), t))
+ row.setField(i, hiveString)
+ }
+ }
+ row
+ }
+
+ private[this] def toArray(
+ keyType: LogicalType,
+ valueType: LogicalType,
+ arrayData: BinaryMapData): (Array[_], Array[_]) = {
+
+ arrayData.keyArray().toObjectArray(keyType) -> arrayData.valueArray().toObjectArray(valueType)
+ }
+
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 419982387..f96cc61bc 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -21,8 +21,6 @@ import java.nio.file.Files
import java.sql.DatabaseMetaData
import java.util.UUID
-import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
-import org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE
import org.apache.flink.table.types.logical.LogicalTypeRoot
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchResultsReq, TOpenSessionReq}
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -349,16 +347,18 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
var resultSet = metaData.getSchemas(null, null)
+ val defaultCatalog = "default_catalog"
+ val defaultDatabase = "default_database"
while (resultSet.next()) {
- assert(resultSet.getString(TABLE_SCHEM) === DEFAULT_BUILTIN_DATABASE)
- assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG)
+ assert(resultSet.getString(TABLE_SCHEM) === defaultDatabase)
+ assert(resultSet.getString(TABLE_CATALOG) === defaultCatalog)
}
resultSet = metaData.getSchemas(
- DEFAULT_BUILTIN_CATALOG.split("_").apply(0),
- DEFAULT_BUILTIN_DATABASE.split("_").apply(0))
+ defaultCatalog.split("_").apply(0),
+ defaultDatabase.split("_").apply(0))
while (resultSet.next()) {
- assert(resultSet.getString(TABLE_SCHEM) === DEFAULT_BUILTIN_DATABASE)
- assert(resultSet.getString(TABLE_CATALOG) === DEFAULT_BUILTIN_CATALOG)
+ assert(resultSet.getString(TABLE_SCHEM) === defaultDatabase)
+ assert(resultSet.getString(TABLE_CATALOG) === defaultCatalog)
}
}
}
diff --git a/integration-tests/kyuubi-flink-it/pom.xml b/integration-tests/kyuubi-flink-it/pom.xml
index 9530cb052..88c868b47 100644
--- a/integration-tests/kyuubi-flink-it/pom.xml
+++ b/integration-tests/kyuubi-flink-it/pom.xml
@@ -77,7 +77,7 @@
org.apache.flink
- flink-table-runtime_${scala.binary.version}
+ flink-table-runtime${flink.module.scala.suffix}
test
diff --git a/pom.xml b/pom.xml
index d79a80112..654f7edeb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,6 +111,7 @@
1.2.1
0.9.3
1.14.4
+ _${scala.binary.version}
flink-${flink.version}-bin-scala_${scala.binary.version}.tgz
${apache.archive.dist}/flink/flink-${flink.version}
false
@@ -1295,13 +1296,13 @@
org.apache.flink
- flink-streaming-java_${scala.binary.version}
+ flink-streaming-java${flink.module.scala.suffix}
${flink.version}
org.apache.flink
- flink-clients_${scala.binary.version}
+ flink-clients${flink.module.scala.suffix}
${flink.version}
@@ -1319,7 +1320,7 @@
org.apache.flink
- flink-table-api-java-bridge_${scala.binary.version}
+ flink-table-api-java-bridge${flink.module.scala.suffix}
${flink.version}
@@ -1331,7 +1332,7 @@
org.apache.flink
- flink-table-runtime_${scala.binary.version}
+ flink-table-runtime${flink.module.scala.suffix}
${flink.version}
provided
@@ -1344,19 +1345,13 @@
org.apache.flink
- flink-yarn_${scala.binary.version}
+ flink-sql-client${flink.module.scala.suffix}
${flink.version}
org.apache.flink
- flink-sql-client_${scala.binary.version}
- ${flink.version}
-
-
-
- org.apache.flink
- flink-test-utils_${scala.binary.version}
+ flink-test-utils${flink.module.scala.suffix}
${flink.version}
@@ -2027,6 +2022,21 @@
+
+ flink-1.14
+
+ 1.14.4
+ _${scala.binary.version}
+
+
+
+ flink-1.15
+
+ 1.15.0
+
+
+
+
spark-provided