From efc0b1c066d2e12c25ddf5cf53441c1ee8ae1f94 Mon Sep 17 00:00:00 2001 From: Paul Lin Date: Thu, 9 Mar 2023 13:41:31 +0800 Subject: [PATCH] [KYUUBI #4446] Fix connections blocked by Flink insert statements ### _Why are the changes needed?_ Flink 1.15 refactors the result fetching of insert statements and now `TableResult.await()` would block till the insert finishes. We could remove this line because the insert results are immediately available as other non-job statements. Flink JIRA: https://issues.apache.org/jira/browse/FLINK-24461 Critical changes: https://github.com/apache/flink/pull/17441/files#diff-ec88f0e06d880b53e2f152113ab1a4240a820cbb7248815c5f9ecf9ab4fce4caR108 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4485 from link3280/KYUUBI-4446. Closes #4446 256176c3b [Paul Lin] [KYUUBI #4446] Update comments 3cb982ca4 [Paul Lin] [KYUUBI #4446] Add comments d4c194ee5 [Paul Lin] [KYUUBI #4446] Fix connections blocked by Flink insert statements Authored-by: Paul Lin Signed-off-by: Cheng Pan (cherry picked from commit 85b2736cc38b2ad1d31900970b2531e03d1a6440) Signed-off-by: Cheng Pan --- .../kyuubi/engine/flink/operation/ExecuteStatement.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala index 182e3e3ef..a84a17796 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala @@ -161,7 +161,8 @@ class ExecuteStatement( .build(executor) val result = executeOperation.invoke[TableResult](sessionId, operation) jobId = result.getJobClient.asScala.map(_.getJobID) - result.await() + // after FLINK-24461, TableResult#await() would block insert statements + // until the job finishes, instead of returning row affected immediately resultSet = ResultSet.fromTableResult(result) }