[KYUUBI #1555][FLINK] Bump flink version to 1.14.0

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1573 from yanghua/KYUUBI-1555.

Closes #1555

4f5ce746 [yanghua] [KYUUBI #1555] [SUB-TASK][KPIP-2] Bump flink version to 1.14.1

Authored-by: yanghua <yanghua1127@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
yanghua 2021-12-16 16:59:38 +08:00 committed by Cheng Pan
parent be3a964f37
commit 4b58b0ab9b
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
3 changed files with 18 additions and 29 deletions

View File

@ -54,7 +54,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-runtime</artifactId>
<scope>provided</scope>
</dependency>
@ -90,13 +90,13 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>

View File

@ -22,7 +22,6 @@ import java.lang.reflect.Method;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.Nullable;
@ -43,8 +42,8 @@ import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TemporaryClassLoaderContext;
@ -146,10 +145,9 @@ public class ExecutionContext<ClusterID> {
CatalogManager catalogManager,
ModuleManager moduleManager,
FunctionCatalog functionCatalog) {
final Map<String, String> plannerProperties = settings.toPlannerProperties();
final Planner planner =
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(plannerProperties, executor, config, functionCatalog, catalogManager);
PlannerFactoryUtil.createPlanner(
settings.getPlanner(), executor, config, catalogManager, functionCatalog);
return new StreamTableEnvironmentImpl(
catalogManager,
@ -163,18 +161,16 @@ public class ExecutionContext<ClusterID> {
classLoader);
}
private static Executor lookupExecutor(
Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
private Executor lookupExecutor(
EnvironmentSettings settings, StreamExecutionEnvironment executionEnvironment) {
try {
ExecutorFactory executorFactory =
ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
Method createMethod =
executorFactory
.getClass()
.getMethod("create", Map.class, StreamExecutionEnvironment.class);
FactoryUtil.discoverFactory(classLoader, ExecutorFactory.class, settings.getExecutor());
return (Executor)
createMethod.invoke(executorFactory, executorProperties, executionEnvironment);
Method createMethod =
executorFactory.getClass().getMethod("create", StreamExecutionEnvironment.class);
return (Executor) createMethod.invoke(executorFactory, executionEnvironment);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath", e);
@ -213,8 +209,7 @@ public class ExecutionContext<ClusterID> {
if (engineEnvironment.getExecution().isStreamingPlanner()) {
streamExecEnv = createStreamExecutionEnvironment();
final Map<String, String> executorProperties = settings.toExecutorProperties();
executor = lookupExecutor(executorProperties, streamExecEnv);
executor = lookupExecutor(settings, streamExecEnv);
tableEnv =
createStreamTableEnvironment(
streamExecEnv,

12
pom.xml
View File

@ -100,7 +100,7 @@
<commons-lang3.version>3.10</commons-lang3.version>
<curator.version>2.12.0</curator.version>
<delta.version>1.0.0</delta.version>
<flink.version>1.12.5</flink.version>
<flink.version>1.14.0</flink.version>
<flink.archive.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.archive.name>
<flink.archive.mirror>https://archive.apache.org/dist/flink/flink-${flink.version}</flink.archive.mirror>
<flink.archive.download.skip>false</flink.archive.download.skip>
@ -1072,7 +1072,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
@ -1114,13 +1114,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>