[KYUUBI #3790] Avoid using SchemaResolver directly in GetColumns operation

### _Why are the changes needed?_

close #3790

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3793 from df-Liu/flink_table.

Closes #3790

317434d2 [df_liu] flink table from

Authored-by: df_liu <df_liu@trip.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
df_liu 2022-11-11 10:08:05 +08:00 committed by Cheng Pan
parent 72c1f53dd0
commit a58816609b
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D

View File

@ -21,7 +21,6 @@ import scala.collection.JavaConverters._
import org.apache.commons.lang3.StringUtils
import org.apache.flink.table.api.{DataTypes, ResultKind}
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
import org.apache.flink.table.catalog.Column
import org.apache.flink.table.types.logical._
import org.apache.flink.types.Row
@ -42,14 +41,6 @@ class GetColumns(
override protected def runInternal(): Unit = {
try {
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
val resolver = tableEnv match {
case impl: StreamTableEnvironmentImpl =>
impl.getCatalogManager.getSchemaResolver
case _ =>
throw new UnsupportedOperationException(
"Unsupported Operation type GetColumns. You can execute " +
"DESCRIBE statement instead to get column infos.")
}
val catalogName =
if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
@ -68,8 +59,9 @@ class GetColumns(
schemaName,
tableNameRegex)
.filter { _._2.isDefined }
.flatMap { case (tableName, flinkTable) =>
val resolvedSchema = flinkTable.get.getUnresolvedSchema.resolve(resolver)
.flatMap { case (tableName, _) =>
val flinkTable = tableEnv.from(s"`$catalogName`.`$schemaName`.`$tableName`")
val resolvedSchema = flinkTable.getResolvedSchema
resolvedSchema.getColumns.asScala.toArray.zipWithIndex
.filter { case (column, _) =>
columnNameRegex.pattern.matcher(column.getName).matches()