[KYUUBI #5799] [FLINK] Fix fetch timeout in session conf doesn't support ISO-8601

# 🔍 Description
## Issue References 🔗
Currently, Flink engine supports overwriting result fetch timeout in session conf, but in that way IOS-8601 time format is not supported. This PR fixes the problem.

This pull request fixes #

## Describe Your Solution 🔧

Apply ConfigOption's time value conf parsing in session conf parsing.

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklists
## 📝 Author Self Checklist

- [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [x] I have performed a self-review
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my feature works
- [x] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [x] Test coverage is ok
- [x] Assignees are selected.
- [x] Minimum number of approvals
- [x] No changes are requested

**Be nice. Be informative.**

Closes #5799 from link3280/timeconf_parsing.

Closes #5799

417898a63 [Paul Lin] [FLINK] Use ISO-8601 time conf in unit test
99a496419 [Paul Lin] [FLINK] Fix fetch timeout in session conf doesn't support ISO-8601

Authored-by: Paul Lin <paullin3280@gmail.com>
Signed-off-by: Paul Lin <paullin3280@gmail.com>
This commit is contained in:
Paul Lin 2023-12-01 18:45:17 +08:00
parent 3677f3a26e
commit 8f529aacf1
2 changed files with 5 additions and 2 deletions

View File

@ -73,7 +73,10 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
resultMaxRowsDefault.toString).toInt
val resultFetchTimeout =
flinkSession.normalizedConf.get(ENGINE_FLINK_FETCH_TIMEOUT.key).map(_.toLong milliseconds)
flinkSession.normalizedConf
.get(ENGINE_FLINK_FETCH_TIMEOUT.key)
.map(ENGINE_FLINK_FETCH_TIMEOUT.valueConverter)
.map(_.get milliseconds)
.getOrElse(resultFetchTimeoutDefault)
val op = mode match {

View File

@ -1255,7 +1255,7 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
test("test result fetch timeout") {
val exception = intercept[KyuubiSQLException](
withSessionConf()(Map(ENGINE_FLINK_FETCH_TIMEOUT.key -> "60000"))() {
withSessionConf()(Map(ENGINE_FLINK_FETCH_TIMEOUT.key -> "PT60S"))() {
withJdbcStatement("tbl_a") { stmt =>
stmt.executeQuery("create table tbl_a (a int) " +
"with ('connector' = 'datagen', 'rows-per-second'='0')")