Compare commits

...

14 Commits

Author SHA1 Message Date
ulysses-you
9355c999d3 [KYUUBI #543] [TESTS] Add test for scheduler pool
Some checks failed
Kyuubi / Build (-Pspark-3.0 -Pspark-hadoop-2.7 -Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.1 -Dspark.archive.name=spark-3.1.1-bin-hadoop2.7.tgz -Dmaven.plugin.scalatest.exclude.tags=org.apache.kyuubi.tags.DataLakeTest) (push) Has been cancelled
Kyuubi / Build (-Pspark-3.0 -Pspark-hadoop-2.7) (push) Has been cancelled
Kyuubi / Build (-Pspark-3.1 -Pspark-hadoop-2.7) (push) Has been cancelled
Kyuubi / Build (-Pspark-3.1 -Pspark-hadoop-3.2) (push) Has been cancelled
SL Scan / Scan-Build (push) Has been cancelled
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/NetEase/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Improve test coverage.

### _How was this patch tested?_
Pass `org.apache.kyuubi.engine.spark.SchedulerPoolSuite`.

Closes #543 from ulysses-you/add-scheduler-pool-test.

Closes #543

f5563b1 [ulysses-you] trait
57938a1 [ulysses-you] init

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
(cherry picked from commit cacc2b05b3)
Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
2021-04-19 17:10:12 +08:00
ulysses-you
b15b42cadf [KYUUBI #526][BRANCH-1.1] Support kyuubi.operation.scheduler.pool for queries
![ulysses-you](https://badgen.net/badge/Hello/ulysses-you/green) [![Closes #530](https://badgen.net/badge/Preview/Closes%20%23530/blue)](https://github.com/yaooqinn/kyuubi/pull/530) ![29](https://badgen.net/badge/%2B/29/red) ![4](https://badgen.net/badge/-/4/green) ![1](https://badgen.net/badge/commits/1/yellow) ![Test Plan](https://badgen.net/badge/Missing/Test%20Plan/ff0000) [<img width="16" alt="Powered by Pull Request Badge" src="https://user-images.githubusercontent.com/1393946/111216524-d2bb8e00-85d4-11eb-821b-ed4c00989c02.png">](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

backport #528 for branch-1.1

Closes #530 from ulysses-you/support-spark-scheduler-mode.

2ca4104 [ulysses-you] backport #528

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
2021-04-15 11:40:03 +08:00
BetaCat
12fbef030d [KYUUBI #525] fix: broken link
![BetaCat0](https://badgen.net/badge/Hello/BetaCat0/green) [![Closes #525](https://badgen.net/badge/Preview/Closes%20%23525/blue)](https://github.com/yaooqinn/kyuubi/pull/525) ![1](https://badgen.net/badge/%2B/1/red) ![1](https://badgen.net/badge/-/1/green) ![1](https://badgen.net/badge/commits/1/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) ![Test Plan](https://badgen.net/badge/Missing/Test%20Plan/ff0000) ![Doc](https://badgen.net/badge/Label/Doc/) [<img width="16" alt="Powered by Pull Request Badge" src="https://user-images.githubusercontent.com/1393946/111216524-d2bb8e00-85d4-11eb-821b-ed4c00989c02.png">](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

Will fix https://github.com/yaooqinn/kyuubi/issues/515

### _Why are the changes needed?_
To fix a broken image ref link

### _How was this patch tested?_
Not necessary

Closes #525 from BetaCat0/broken-link.

a2ea566 [BetaCat] fix: broken link

Authored-by: BetaCat <outman99@hotmail.com>
Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
(cherry picked from commit cf9f256b3d)
Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
2021-04-14 15:43:27 +08:00
fwang12
14b2d82630
[KYUUBI #479][TEST] Fix flaky test "BindException: Cannot assign requested address"
![turboFei](https://badgen.net/badge/Hello/turboFei/green) [![Closes #480](https://badgen.net/badge/Preview/Closes%20%23480/blue)](https://github.com/yaooqinn/kyuubi/pull/480) ![1](https://badgen.net/badge/%2B/1/red) ![0](https://badgen.net/badge/-/0/green) ![1](https://badgen.net/badge/commits/1/yellow) [<img width="16" alt="Powered by Pull Request Badge" src="https://user-images.githubusercontent.com/1393946/111216524-d2bb8e00-85d4-11eb-821b-ed4c00989c02.png">](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
To fix flaky test "BindException: Cannot assign requested address".
https://github.com/yaooqinn/kyuubi/runs/2258848387?check_suite_focus=true

![image](https://user-images.githubusercontent.com/6757692/113473923-2c5dfc80-949f-11eb-9e3d-1f5b179e709c.png)
It might be caused by the build env issue.
After setting FRONTEND_BIND_HOST to 127.0.0.1, the canonical host name would be localhost, I think it can make the build job more stable.
```
scala> val serverHost = None
val serverHost: None.type = None

scala> serverHost.map(InetAddress.getByName).getOrElse(InetAddress.getLocalHost).getCanonicalHostName
val res7: String = 192.168.3.20

scala> val serverHost = Some("127.0.0.1")
val serverHost: Some[String] = Some(127.0.0.1)

scala> serverHost.map(InetAddress.getByName).getOrElse(InetAddress.getLocalHost).getCanonicalHostName
val res6: String = localhost
```

### _How was this patch tested?_
Existing UT.

- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #480 from turboFei/bind_exception.

Closes #479

d214f70 [fwang12] [KYUUBI #479][TEST] FIx flaky test "BindException: Cannot assign requested address"

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit f0525a5eb7)
Signed-off-by: Kent Yao <yao@apache.org>
2021-04-03 21:11:52 +08:00
ulysses-you
a00b9fa9d0
[KYUUBI #456] Can not cancel SQL job if the timeout value is small
![ulysses-you](https://badgen.net/badge/Hello/ulysses-you/green) [![Closes #461](https://badgen.net/badge/Preview/Closes%20%23461/blue)](https://github.com/yaooqinn/kyuubi/pull/461) ![10](https://badgen.net/badge/%2B/10/red) ![17](https://badgen.net/badge/-/17/green) ![6](https://badgen.net/badge/commits/6/yellow) ![Test Plan](https://badgen.net/badge/Missing/Test%20Plan/ff0000) ![Feature](https://badgen.net/badge/Label/Feature/) [<img width="16" alt="Powered by Pull Request Badge" src="https://user-images.githubusercontent.com/1393946/111216524-d2bb8e00-85d4-11eb-821b-ed4c00989c02.png">](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
* avoid condition race
* avoid job leak

### _How was this patch tested?_
pass `org.apache.kyuubi.engine.spark.SparkEngineSuites`
close #456

Closes #461 from ulysses-you/thread.

970ba7a [ulysses-you] fix
8786779 [ulysses-you] npe
d82321e [ulysses-you] remove
c28e671 [ulysses-you] interrupt execute statement
8dccf73 [ulysses-you] cleanup
29857e3 [ulysses-you] sync

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit 066075ac74)
Signed-off-by: Kent Yao <yao@apache.org>
2021-03-26 23:50:41 +08:00
ulysses-you
1503af361b
[KYUUBI #451] Support query auto timeout cancel on thriftserver
![ulysses-you](https://badgen.net/badge/Hello/ulysses-you/green) [![Closes #451](https://badgen.net/badge/Preview/Closes%20%23451/blue)](https://github.com/yaooqinn/kyuubi/pull/451) ![200](https://badgen.net/badge/%2B/200/red) ![17](https://badgen.net/badge/-/17/green) ![27](https://badgen.net/badge/commits/27/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) ![Test Plan](https://badgen.net/badge/Missing/Test%20Plan/ff0000) ![Feature](https://badgen.net/badge/Label/Feature/) [<img width="16" alt="Powered by Pull Request Badge" src="https://user-images.githubusercontent.com/1393946/111216524-d2bb8e00-85d4-11eb-821b-ed4c00989c02.png">](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Manual cherry-pick some Spark patch into Kyuubi.
1. [Support query auto timeout cancel on thriftserver](https://github.com/apache/spark/pull/29933)
2. [Add config to control if cancel invoke interrupt task on thriftserver](https://github.com/apache/spark/pull/30481)

In order to keep backward with early Spark version, we hard code the config key instead of refer to Spark SQLConf.

Note that, the exists timeout of operator (`kyuubi.operation.idle.timeout`) is to cancel that client has no access with engine. That said if a query run a long time and the client is alive, the query would not be cancelled. Then the new added config `spark.sql.thriftServer.queryTimeout` can handle this case.

### _How was this patch tested?_
Add new test.

Closes #451 from ulysses-you/query-timeout.

212f579 [ulysses-you] docs
9206538 [ulysses-you] empty flaky test
ddab9bf [ulysses-you] flaty test
1da02a0 [ulysses-you] flaty test
edfadf1 [ulysses-you] nit
3f9920b [ulysses-you] address comment
9492c48 [ulysses-you] correct timeout
5df997e [ulysses-you] nit
2124952 [ulysses-you] address comment
192fdcc [ulysses-you] fix tets
d684af6 [ulysses-you] global config
1d1adda [ulysses-you] empty
967a63e [ulysses-you] correct import
128948e [ulysses-you] add session conf in session
144d51b [ulysses-you] fix
a90248b [ulysses-you] unused import
c90386f [ulysses-you] timeout move to operation manager
d780965 [ulysses-you] update docs
a5f7138 [ulysses-you] fix test
f7c7308 [ulysses-you] config name
7f3fb3d [ulysses-you] change conf place
97a011e [ulysses-you] unnecessary change
0953a76 [ulysses-you] move test
38ac0c0 [ulysses-you] Merge branch 'master' of https://github.com/yaooqinn/kyuubi into query-timeout
71bea97 [ulysses-you] refector implementation
35ef6f9 [ulysses-you] update conf
0cad8e2 [ulysses-you] Support query auto timeout cancel on thriftserver

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit fecdba32a7)
Signed-off-by: Kent Yao <yao@apache.org>
2021-03-26 14:04:27 +08:00
wangsheng
68cf158279
[KYUUBI #442] [MINOR] Sort out the code logic of kyuubi/tpcds
![zwangsheng](https://badgen.net/badge/Hello/zwangsheng/green) [![Closes #442](https://badgen.net/badge/Preview/Closes%20%23442/blue)](https://github.com/yaooqinn/kyuubi/pull/442) ![9](https://badgen.net/badge/%2B/9/red) ![5](https://badgen.net/badge/-/5/green) ![2](https://badgen.net/badge/commits/2/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) [<img width="16" alt="Powered by Pull Request Badge" src="https://user-images.githubusercontent.com/1393946/111216524-d2bb8e00-85d4-11eb-821b-ed4c00989c02.png">](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
There is a tpcds generate in kyuubi/dev/kyuubi-tpcds.
Helpe to sort out the code logic and modify the pom dependency of kyuubi-tpcds.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #442 from zwangsheng/change_dev_tpcds.

cdc5604 [wangsheng] Unify the style of adding parameters to the string
e7c7879 [wangsheng] Sort out the code logic and modify the pom dependency

Authored-by: wangsheng <2213335496@qq.com>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit 9f5c752402)
Signed-off-by: Kent Yao <yao@apache.org>
2021-03-17 22:34:51 +08:00
Cheng Pan
b81f53d6c2
[KYUUBI #441] [BUILD] Downgrade scala-maven-plugin to 4.3.0
![pan3793](https://badgen.net/badge/Hello/pan3793/green) [![Closes #441](https://badgen.net/badge/Preview/Closes%20%23441/blue)](https://github.com/yaooqinn/kyuubi/pull/441) ![2](https://badgen.net/badge/%2B/2/red) ![1](https://badgen.net/badge/-/1/green) ![2](https://badgen.net/badge/commits/2/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) [<img width="16" alt="Powered by Pull Request Badge" src="https://user-images.githubusercontent.com/1393946/111216524-d2bb8e00-85d4-11eb-821b-ed4c00989c02.png">](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
There are several known issues with `scala-maven-plugin:4.4.0` about `zinc`.

On CentOS 7, with Oracle JDK 8 installed via RPM, `/bin/java` links to `jdk/jre/bin/java` but `/bin/javac` links to `jdk/bin/javac`
```
[opsbd-ck-uat-30007-vm ~]$ ll -l /bin/java
lrwxrwxrwx 1 root root 22 Mar 12 14:03 /bin/java -> /etc/alternatives/java
[opsbd-ck-uat-30007-vm ~]$ ll -l /etc/alternatives/java
lrwxrwxrwx 1 root root 41 Mar 12 14:03 /etc/alternatives/java -> /usr/java/jdk1.8.0_251-amd64/jre/bin/java
[opsbd-ck-uat-30007-vm ~]$ ll -l /bin/javac
lrwxrwxrwx 1 root root 23 Mar 12 14:03 /bin/javac -> /etc/alternatives/javac
[opsbd-ck-uat-30007-vm ~]$ ll -l /etc/alternatives/javac
lrwxrwxrwx 1 root root 38 Mar 12 14:03 /etc/alternatives/javac -> /usr/java/jdk1.8.0_251-amd64/bin/javac
```
when run `build/dist --tgz` cause
```
[ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:4.4.0:testCompile (scala-test-compile-first) on project kyuubi-common: wrap: java.io.IOException: Cannot run program "/usr/java/jdk1.8.0_221-amd64/jre/bin/javac" (in directory "/data/ansible/ci/kyuubi"): error=2, No such file or directory -> [Help 1]
```
And there is another issue reported by Apache Spark: https://github.com/apache/spark/pull/31031

Please also merge it into branch-1.1

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #441 from pan3793/build.

6f6c34d [Cheng Pan] add comments
312b2c5 [Cheng Pan] [BUILD] Downgrade scala-maven-plugin to 4.3.0

Authored-by: Cheng Pan <379377944@qq.com>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit 794a75395f)
Signed-off-by: Kent Yao <yao@apache.org>
2021-03-16 22:59:34 +08:00
ulysses-you
6afacdcdb8 release v1.1.0 2021-03-10 15:30:47 +08:00
MyLanPangzi
46725901c6 [KYUUBI #413] [MINOR]: remove unused class import
![MyLanPangzi](https://badgen.net/badge/Hello/MyLanPangzi/green) [![Closes #413](https://badgen.net/badge/Preview/Closes%20%23413/blue)](https://github.com/yaooqinn/kyuubi/pull/413) ![1](https://badgen.net/badge/%2B/1/red) ![1](https://badgen.net/badge/-/1/green) ![1](https://badgen.net/badge/commits/1/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) ![Test Plan](https://badgen.net/badge/Missing/Test%20Plan/ff0000) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

### _Why are the changes needed?_
remove unused class import

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #413 from MyLanPangzi/master.

dc6f65a [hiscat] [MINOR]: remove unused class import

Lead-authored-by: MyLanPangzi <1251723871@qq.com>
Co-authored-by: hiscat <46845236+MyLanPangzi@users.noreply.github.com>
Signed-off-by: Cheng Pan <379377944@qq.com>
(cherry picked from commit 8f44d0b4cf)
Signed-off-by: Cheng Pan <379377944@qq.com>
2021-03-10 11:39:29 +08:00
ulysses-you
5e551bdd71 [KYUUBI #408] Fix engine log does not be overwrite
![ulysses-you](https://badgen.net/badge/Hello/ulysses-you/green) [![Closes #408](https://badgen.net/badge/Preview/Closes%20%23408/blue)](https://github.com/yaooqinn/kyuubi/pull/408) ![40](https://badgen.net/badge/%2B/40/red) ![7](https://badgen.net/badge/-/7/green) ![3](https://badgen.net/badge/commits/3/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) ![Test Plan](https://badgen.net/badge/Missing/Test%20Plan/ff0000) ![Bug](https://badgen.net/badge/Label/Bug/) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Bug fix, otherwise the engine log will always increase size with append mode.

### _How was this patch tested?_
Add new test.

Closes #408 from ulysses-you/fix-engine-log-overwrite.

7e4fd1d [ulysses-you] simply code
b8d92ee [ulysses-you] config
821891c [ulysses-you] init

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
(cherry picked from commit 57ed76f48d)
Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
2021-03-09 16:29:54 +08:00
ulysses-you
7107666f1b [KYUUBI #407] Auto update release note
![ulysses-you](https://badgen.net/badge/Hello/ulysses-you/green) [![Closes #407](https://badgen.net/badge/Preview/Closes%20%23407/blue)](https://github.com/yaooqinn/kyuubi/pull/407) ![5](https://badgen.net/badge/%2B/5/red) ![0](https://badgen.net/badge/-/0/green) ![6](https://badgen.net/badge/commits/6/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) ![Test Plan](https://badgen.net/badge/Missing/Test%20Plan/ff0000) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

### _How was this patch tested?_

Closes #407 from ulysses-you/auto-update-release-note.

4dc09ae [ulysses-you] Release Changelog Builder
cfbe23c [ulysses-you] remove
8fcb56c [ulysses-you] changelog
28e8618 [ulysses-you] release
e953874 [ulysses-you] config
d792eea [ulysses-you] init

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
(cherry picked from commit e971732e1d)
Signed-off-by: ulysses-you <ulyssesyou18@gmail.com>
2021-03-09 10:39:41 +08:00
Kent Yao
48e359b4a7
[HOTFIX] User hadoop2.7 as default for spark built in 2021-03-09 09:51:42 +08:00
Kent Yao
49094f7768
[KYUUBI #406] Create GitHub Release only once
![yaooqinn](https://badgen.net/badge/Hello/yaooqinn/green) [![Closes #406](https://badgen.net/badge/Preview/Closes%20%23406/blue)](https://github.com/yaooqinn/kyuubi/pull/406) ![10](https://badgen.net/badge/%2B/10/red) ![10](https://badgen.net/badge/-/10/green) ![1](https://badgen.net/badge/commits/1/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #406 from yaooqinn/release.

2fc8754 [Kent Yao] Create GitHub Release only once

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit 016277d183)
Signed-off-by: Kent Yao <yao@apache.org>
2021-03-08 20:11:19 +08:00
29 changed files with 446 additions and 58 deletions

View File

@ -9,14 +9,6 @@ jobs:
build:
name: Create and Publish Release
runs-on: ubuntu-latest
strategy:
matrix:
profiles:
- '-Pspark-3.0 -Pspark-hadoop-2.7'
- '-Pspark-3.0 -Pspark-hadoop-3.2'
- '-Pspark-3.1 -Pspark-hadoop-2.7'
- '-Pspark-3.1 -Pspark-hadoop-3.2'
- '--spark-provided'
steps:
- uses: actions/checkout@master
# We split caches because GitHub Action Cache has a 400MB-size limit.
@ -42,8 +34,16 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: '1.8'
- name: Make Distribution
run: ./build/dist --tgz ${{ matrix.profiles }}
- name: Make Distribution -Pspark-3.0
run: ./build/dist --tgz -Pspark-3.0
- name: Make Distribution -Pspark-3.0 -Pspark-hadoop-3.2
run: ./build/dist --tgz -Pspark-3.0 -Pspark-hadoop-3.2
- name: Make Distribution -Pspark-3.1
run: ./build/dist --tgz -Pspark-3.1
- name: Make Distribution -Pspark-3.1 -Pspark-hadoop-3.2
run: ./build/dist --tgz -Pspark-3.1 -Pspark-hadoop-3.2
- name: Make Distribution --spark-provided
run: ./build/dist --tgz --spark-provided
- name: Create Release
id: create_release
uses: actions/create-release@v1
@ -60,3 +60,8 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
asset_paths: '["./kyuubi-*tar.gz"]'
- name: Update Release Note
id: github_release_changelog
uses: mikepenz/release-changelog-builder-action@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@ -22,7 +22,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -22,7 +22,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -32,6 +32,10 @@
<name>Kyuubi Project Dev TPCDS Generator</name>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>

View File

@ -52,7 +52,6 @@ case class TableGenerator(
private def toDF: DataFrame = {
val rawRDD = ss.sparkContext.parallelize(1 to parallelism, parallelism).flatMap { i =>
val parallel = s"-PARALLEL $parallelism -child $i"
val os = System.getProperty("os.name").split(' ')(0).toLowerCase
val loader = Thread.currentThread().getContextClassLoader
@ -85,20 +84,21 @@ case class TableGenerator(
val cmd = s"./dsdgen" +
s" -TABLE $name" +
s" -SCALE $scaleFactor" +
s" $parallel" +
s" -PARALLEL $parallelism" +
s" -child $i" +
s" -DISTRIBUTIONS tpcds.idx" +
s" -FORCE Y" +
s" -QUIET Y"
val builder = new ProcessBuilder(cmd.split(" "): _*)
builder.directory(tempDir)
logger.info(s"Start $cmd at " + builder.directory())
builder.redirectError(Redirect.INHERIT)
val data = Paths.get(tempDir.toString, s"${name}_${i}_$parallelism.dat")
logger.info(s"Start $cmd at ${builder.directory()}")
val process = builder.start()
val res = process.waitFor()
logger.info(s"Finish w/ $res" + cmd)
logger.info(s"Finish w/ $res ${cmd}")
val data = Paths.get(tempDir.toString, s"${name}_${i}_$parallelism.dat")
val iterator = if (Files.exists(data)) {
new BufferedSource(new FileInputStream(data.toFile), 8192).getLines()
} else {

View File

@ -285,6 +285,9 @@ kyuubi\.kinit<br>\.principal|<div style='width: 80pt;word-wrap: break-word;white
Key | Default | Meaning | Since
--- | --- | --- | ---
kyuubi\.operation\.idle<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT3H</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Operation will be closed when it's not accessed for this duration of time</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.operation<br>\.interrupt\.on\.cancel|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.operation<br>\.query\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT0S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Set a query duration timeout in seconds in Kyuubi. If the timeout is set to a positive value, a running query will be cancelled automatically if timeout. Otherwise the query continues to run till completion. If timeout values are set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller than this configuration value, they take precedence. If you set this timeout and prefer to cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.operation<br>\.scheduler\.pool|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>The scheduler pool of job. Note that, this config should be used after change Spark config spark.scheduler.mode=FAIR.</div>|<div style='width: 20pt'>1.1.1</div>
kyuubi\.operation<br>\.status\.polling<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT5S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Timeout(ms) for long polling asynchronous running sql query's status</div>|<div style='width: 20pt'>1.0.0</div>
### Session

View File

@ -12,7 +12,7 @@ Welcome to Kyuubi's documentation!
Kyuubi™ is a unified multi-tenant JDBC interface for large-scale data processing and analytics, built on top of `Apache Spark™ <http://spark.apache.org/>`_.
.. image:: ./imgs/kyuubi_layers.*
.. image:: ./imgs/kyuubi_layers.png
In general, the complete ecosystem of Kyuubi falls into the hierarchies shown in the above figure, with each layer loosely coupled to the other.

View File

@ -22,7 +22,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -22,7 +22,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -17,24 +17,34 @@
package org.apache.kyuubi.engine.spark.operation
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.{RejectedExecutionException, TimeUnit}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, KyuubiSparkUtil}
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThreadUtils
class ExecuteStatement(
spark: SparkSession,
session: Session,
protected override val statement: String,
override val shouldRunAsync: Boolean)
override val shouldRunAsync: Boolean,
queryTimeout: Long)
extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) with Logging {
private val forceCancel =
session.sessionManager.getConf.get(KyuubiConf.OPERATION_FORCE_CANCEL)
private val schedulerPool =
spark.conf.getOption(KyuubiConf.OPERATION_SCHEDULER_POOL.key).orElse(
session.sessionManager.getConf.get(KyuubiConf.OPERATION_SCHEDULER_POOL))
private val operationLog: OperationLog =
OperationLog.createOperationLog(session.handle, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@ -58,24 +68,22 @@ class ExecuteStatement(
OperationLog.removeCurrentOperationLog()
}
private def executeStatement(): Unit = {
private def executeStatement(): Unit = withLocalProperties {
try {
setState(OperationState.RUNNING)
info(KyuubiSparkUtil.diagnostics(spark))
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
spark.sparkContext.setJobGroup(statementId, statement)
result = spark.sql(statement)
debug(result.queryExecution)
iter = new ArrayFetchIterator(result.collect())
setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
} finally {
spark.sparkContext.clearJobGroup()
}
}
override protected def runInternal(): Unit = {
addTimeoutMonitor()
if (shouldRunAsync) {
val asyncOperation = new Runnable {
override def run(): Unit = {
@ -100,4 +108,33 @@ class ExecuteStatement(
executeStatement()
}
}
private def withLocalProperties[T](f: => T): T = {
try {
spark.sparkContext.setJobGroup(statementId, statement, forceCancel)
schedulerPool match {
case Some(pool) =>
spark.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
case None =>
}
f
} finally {
spark.sparkContext.setLocalProperty("spark.scheduler.pool", null)
spark.sparkContext.clearJobGroup()
}
}
private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
timeoutExecutor.schedule(new Runnable {
override def run(): Unit = {
cleanup(OperationState.TIMEOUT)
timeoutExecutor.shutdown()
}
}, queryTimeout, TimeUnit.SECONDS)
}
}
}

View File

@ -49,9 +49,10 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
protected def resultSchema: StructType
protected def cleanup(targetState: OperationState): Unit = synchronized {
protected def cleanup(targetState: OperationState): Unit = state.synchronized {
if (!isTerminalState(state)) {
setState(targetState)
Option(getBackgroundHandle).foreach(_.cancel(true))
spark.sparkContext.cancelJobGroup(statementId)
}
}
@ -92,7 +93,11 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
if (cancel) spark.sparkContext.cancelJobGroup(statementId)
state.synchronized {
val errMsg = KyuubiSQLException.stringifyException(e)
if (isTerminalState(state)) {
if (state == OperationState.TIMEOUT) {
val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
setOperationException(ke)
throw ke
} else if (isTerminalState(state)) {
warn(s"Ignore exception in terminal state with $statementId: $errMsg")
} else {
setState(OperationState.ERROR)

View File

@ -58,7 +58,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
runAsync: Boolean,
queryTimeout: Long): Operation = {
val spark = getSparkSession(session.handle)
val operation = new ExecuteStatement(spark, session, statement, runAsync)
val operation = new ExecuteStatement(spark, session, statement, runAsync, queryTimeout)
addOperation(operation)
}

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<allocations>
<pool name="p0">
<minShare>2</minShare>
<weight>1</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
<pool name="p1">
<minShare>0</minShare>
<weight>1</weight>
<schedulingMode>FAIR</schedulingMode>
</pool>
</allocations>

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark
import java.sql.{SQLTimeoutException, Statement}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import org.apache.spark.TaskKilled
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.SparkSession
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.JDBCTestUtils
class SparkEngineSuites extends KyuubiFunSuite {
test("Add config to control if cancel invoke interrupt task on engine") {
Seq(true, false).foreach { force =>
withSparkJdbcStatement(Map(KyuubiConf.OPERATION_FORCE_CANCEL.key -> force.toString)) {
case (statement, spark) =>
val index = new AtomicInteger(0)
val forceCancel = new AtomicBoolean(false)
val listener = new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
assert(taskEnd.reason.isInstanceOf[TaskKilled])
if (forceCancel.get()) {
assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 3000)
index.incrementAndGet()
} else {
assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime >= 4000)
index.incrementAndGet()
}
}
}
spark.sparkContext.addSparkListener(listener)
try {
statement.setQueryTimeout(3)
forceCancel.set(force)
val e1 = intercept[SQLTimeoutException] {
statement.execute("select java_method('java.lang.Thread', 'sleep', 5000L)")
}.getMessage
assert(e1.contains("Query timed out"))
eventually(Timeout(30.seconds)) {
assert(index.get() == 1)
}
} finally {
spark.sparkContext.removeSparkListener(listener)
}
}
}
}
private def withSparkJdbcStatement(
conf: Map[String, String] = Map.empty)(
statement: (Statement, SparkSession) => Unit): Unit = {
val spark = new WithSparkSuite {
override def withKyuubiConf: Map[String, String] = conf
override protected def jdbcUrl: String = getJdbcUrl
}
spark.startSparkEngine()
val tmp: Statement => Unit = { tmpStatement =>
statement(tmpStatement, spark.getSpark)
}
try {
spark.withJdbcStatement()(tmp)
} finally {
spark.stopSparkEngine()
}
}
}
trait WithSparkSuite extends WithSparkSQLEngine with JDBCTestUtils

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark
import java.util.concurrent.Executors
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart}
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.apache.kyuubi.operation.JDBCTestUtils
class SchedulerPoolSuite extends WithSparkSQLEngine with JDBCTestUtils {
override protected def jdbcUrl: String = getJdbcUrl
override def withKyuubiConf: Map[String, String] = {
val poolFile =
Thread.currentThread().getContextClassLoader.getResource("test-scheduler-pool.xml")
Map("spark.scheduler.mode" -> "FAIR",
"spark.scheduler.allocation.file" -> poolFile.getFile,
"spark.master" -> "local[2]")
}
test("Scheudler pool") {
@volatile var job0Started = false
@volatile var job1StartTime = 0L
@volatile var job2StartTime = 0L
@volatile var job1FinishTime = 0L
@volatile var job2FinishTime = 0L
val listener = new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
info(jobStart)
jobStart.jobId match {
case 1 => job1StartTime = jobStart.time
case 2 => job2StartTime = jobStart.time
case 0 => job0Started = true
}
}
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
info(jobEnd)
jobEnd.jobId match {
case 1 => job1FinishTime = jobEnd.time
case 2 => job2FinishTime = jobEnd.time
case _ =>
}
}
}
spark.sparkContext.addSparkListener(listener)
try {
val threads = Executors.newFixedThreadPool(3)
threads.execute(new Runnable {
override def run(): Unit = {
withJdbcStatement() { statement =>
statement.execute("SET kyuubi.operation.scheduler.pool=p0")
statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 5000l)" +
"FROM range(1, 3, 1, 2)")
}
}
})
// make sure job0 started then we have no resource right now
eventually(Timeout(3.seconds)) {
assert(job0Started)
}
Seq(1, 0).foreach { priority =>
threads.execute(new Runnable {
override def run(): Unit = {
priority match {
case 0 =>
withJdbcStatement() { statement =>
statement.execute("SET kyuubi.operation.scheduler.pool=p0")
statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 1500l)" +
"FROM range(1, 3, 1, 2)")
}
case 1 =>
withJdbcStatement() { statement =>
statement.execute("SET kyuubi.operation.scheduler.pool=p1")
statement.execute("SELECT java_method('java.lang.Thread', 'sleep', 1500l)" +
" FROM range(1, 3, 1, 2)")
}
}
}
})
}
threads.shutdown()
eventually(Timeout(10.seconds)) {
// We can not ensure that job1 is started before job2 so here using abs.
assert(Math.abs(job1StartTime - job2StartTime) < 1000)
// Job1 minShare is 2(total resource) so that job2 should be allocated tasks after
// job1 finished.
assert(job2FinishTime - job1FinishTime >= 1000)
}
} finally {
spark.sparkContext.removeSparkListener(listener)
}
}
}

View File

@ -36,7 +36,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
super.beforeAll()
}
protected def startSparkEngine(): Unit = {
def startSparkEngine(): Unit = {
val warehousePath = Utils.createTempDir()
val metastorePath = Utils.createTempDir()
warehousePath.toFile.delete()
@ -63,7 +63,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
stopSparkEngine()
}
protected def stopSparkEngine(): Unit = {
def stopSparkEngine(): Unit = {
// we need to clean up conf since it's the global config in same jvm.
withKyuubiConf.foreach { case (k, _) =>
System.clearProperty(k)
@ -83,4 +83,5 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
}
protected def getJdbcUrl: String = s"jdbc:hive2://$connectionUrl/;"
def getSpark: SparkSession = spark
}

View File

@ -24,7 +24,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi</artifactId>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -502,6 +502,27 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofSeconds(5).toMillis)
val OPERATION_FORCE_CANCEL: ConfigEntry[Boolean] =
buildConf("operation.interrupt.on.cancel")
.doc("When true, all running tasks will be interrupted if one cancels a query. " +
"When false, all running tasks will remain until finished.")
.version("1.2.0")
.booleanConf
.createWithDefault(true)
val OPERATION_QUERY_TIMEOUT: ConfigEntry[Long] =
buildConf("operation.query.timeout")
.doc("Set a query duration timeout in seconds in Kyuubi. If the timeout is set to " +
"a positive value, a running query will be cancelled automatically if timeout. " +
"Otherwise the query continues to run till completion. If timeout values are " +
"set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller " +
"than this configuration value, they take precedence. If you set this timeout and prefer " +
"to cancel the queries right away without waiting task to finish, consider enabling " +
s"${OPERATION_FORCE_CANCEL.key} together.")
.version("1.2.0")
.timeConf
.createWithDefault(Duration.ZERO.toMillis)
val ENGINE_SHARED_LEVEL: ConfigEntry[String] = buildConf("session.engine.share.level")
.doc("The SQL engine App will be shared in different levels, available configs are: <ul>" +
" <li>CONNECTION: the App will not be shared but only used by the current client" +
@ -514,4 +535,11 @@ object KyuubiConf {
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(ShareLevel.values.map(_.toString))
.createWithDefault(ShareLevel.USER.toString)
val OPERATION_SCHEDULER_POOL: OptionalConfigEntry[String] = buildConf("operation.scheduler.pool")
.doc("The scheduler pool of job. Note that, this config should be used after change Spark " +
"config spark.scheduler.mode=FAIR.")
.version("1.1.1")
.stringConf
.createOptional
}

View File

@ -74,7 +74,7 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
var timeCost = ""
newState match {
case RUNNING => startTime = System.currentTimeMillis()
case ERROR | FINISHED | CANCELED =>
case ERROR | FINISHED | CANCELED | TIMEOUT =>
completedTime = System.currentTimeMillis()
timeCost = s", time taken: ${(completedTime - startTime) / 1000.0} seconds"
case _ =>

View File

@ -34,7 +34,7 @@ trait JDBCTestUtils extends KyuubiFunSuite {
protected val patterns = Seq("", "*", "%", null, ".*", "_*", "_%", ".%")
protected def jdbcUrl: String
protected def withMultipleConnectionJdbcStatement(
def withMultipleConnectionJdbcStatement(
tableNames: String*)(fs: (Statement => Unit)*): Unit = {
val connections = fs.map { _ => DriverManager.getConnection(jdbcUrl, user, "") }
val statements = connections.map(_.createStatement())
@ -57,7 +57,7 @@ trait JDBCTestUtils extends KyuubiFunSuite {
}
}
protected def withDatabases(dbNames: String*)(fs: (Statement => Unit)*): Unit = {
def withDatabases(dbNames: String*)(fs: (Statement => Unit)*): Unit = {
val connections = fs.map { _ => DriverManager.getConnection(jdbcUrl, user, "") }
val statements = connections.map(_.createStatement())
@ -75,11 +75,11 @@ trait JDBCTestUtils extends KyuubiFunSuite {
}
}
protected def withJdbcStatement(tableNames: String*)(f: Statement => Unit): Unit = {
def withJdbcStatement(tableNames: String*)(f: Statement => Unit): Unit = {
withMultipleConnectionJdbcStatement(tableNames: _*)(f)
}
protected def withThriftClient(f: TCLIService.Iface => Unit): Unit = {
def withThriftClient(f: TCLIService.Iface => Unit): Unit = {
val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head.split(":")
val host = hostAndPort.head
val port = hostAndPort(1).toInt
@ -96,7 +96,7 @@ trait JDBCTestUtils extends KyuubiFunSuite {
}
}
protected def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = {
def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername(user)
@ -117,7 +117,7 @@ trait JDBCTestUtils extends KyuubiFunSuite {
}
}
protected def checkGetSchemas(
def checkGetSchemas(
rs: ResultSet, dbNames: Seq[String], catalogName: String = ""): Unit = {
var count = 0
while(rs.next()) {

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.operation
import java.sql.{Date, SQLException, Timestamp}
import java.sql.{Date, SQLException, SQLTimeoutException, Timestamp}
import scala.collection.JavaConverters._
@ -327,4 +327,26 @@ trait JDBCTests extends BasicJDBCTests {
assert(metaData.getScale(1) === 0)
}
}
test("Support query auto timeout cancel on thriftserver - setQueryTimeout") {
withJdbcStatement() { statement =>
statement.setQueryTimeout(1)
val e = intercept[SQLTimeoutException] {
statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)")
}.getMessage
assert(e.contains("Query timed out after"))
statement.setQueryTimeout(0)
val rs1 = statement.executeQuery(
"select 'test', java_method('java.lang.Thread', 'sleep', 3000L)")
rs1.next()
assert(rs1.getString(1) == "test")
statement.setQueryTimeout(-1)
val rs2 = statement.executeQuery(
"select 'test', java_method('java.lang.Thread', 'sleep', 3000L)")
rs2.next()
assert(rs2.getString(1) == "test")
}
}
}

View File

@ -37,6 +37,7 @@ class FrontendServiceSuite extends KyuubiFunSuite {
protected val server = new NoopServer()
protected val conf = KyuubiConf()
.set(KyuubiConf.FRONTEND_BIND_HOST, "127.0.0.1")
.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
.set("kyuubi.test.server.should.fail", "false")

View File

@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi</artifactId>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -22,7 +22,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -17,7 +17,7 @@
package org.apache.kyuubi.engine
import java.io.{File, FilenameFilter, IOException}
import java.io.{File, IOException}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path}
@ -65,15 +65,11 @@ trait ProcBuilder {
// Visible for test
private[kyuubi] var logCaptureThread: Thread = _
private lazy val engineLog: File = ProcBuilder.synchronized {
private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
val currentTime = System.currentTimeMillis()
val processLogPath = workingDir
val totalExistsFile = processLogPath.toFile.listFiles(new FilenameFilter() {
override def accept(dir: File, name: String): Boolean = {
name.startsWith(module)
}
})
val totalExistsFile = processLogPath.toFile.listFiles { (_, name) => name.startsWith(module) }
val sorted = totalExistsFile.sortBy(_.getName.split("\\.").last.toInt)
val nextIndex = if (sorted.isEmpty) {
0
@ -81,6 +77,18 @@ trait ProcBuilder {
sorted.last.getName.split("\\.").last.toInt + 1
}
val file = sorted.find(_.lastModified() < currentTime - engineLogTimeout)
.map { existsFile =>
try {
// Here we want to overwrite the exists log file
existsFile.delete()
existsFile.createNewFile()
existsFile
} catch {
case e: Exception =>
warn(s"failed to delete engine log file: ${existsFile.getAbsolutePath}", e)
null
}
}
.getOrElse {
Files.createDirectories(processLogPath)
val newLogFile = new File(processLogPath.toFile, s"$module.log.$nextIndex")

View File

@ -31,7 +31,8 @@ class ExecuteStatement(
client: TCLIService.Iface,
remoteSessionHandle: TSessionHandle,
override val statement: String,
override val shouldRunAsync: Boolean)
override val shouldRunAsync: Boolean,
queryTimeout: Long)
extends KyuubiOperation(
OperationType.EXECUTE_STATEMENT, session, client, remoteSessionHandle) {
@ -64,6 +65,7 @@ class ExecuteStatement(
try {
val req = new TExecuteStatementReq(remoteSessionHandle, statement)
req.setRunAsync(shouldRunAsync)
req.setQueryTimeout(queryTimeout)
val resp = client.ExecuteStatement(req)
verifyTStatus(resp.getStatus)
_remoteOpHandle = resp.getOperationHandle

View File

@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
import org.apache.hive.service.rpc.thrift.{TCLIService, TFetchResultsReq, TRow, TRowSet, TSessionHandle}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.{Session, SessionHandle}
import org.apache.kyuubi.util.ThriftUtils
@ -49,6 +50,18 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
tSessionHandle
}
private def getQueryTimeout(clientQueryTimeout: Long): Long = {
// If clientQueryTimeout is smaller than systemQueryTimeout value,
// we use the clientQueryTimeout value.
val systemQueryTimeout = getConf.get(KyuubiConf.OPERATION_QUERY_TIMEOUT)
if (clientQueryTimeout > 0 &&
(systemQueryTimeout <= 0 || clientQueryTimeout < systemQueryTimeout)) {
clientQueryTimeout
} else {
systemQueryTimeout
}
}
def setConnection(
sessionHandle: SessionHandle,
client: TCLIService.Iface,
@ -69,9 +82,9 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
queryTimeout: Long): Operation = {
val client = getThriftClient(session.handle)
val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
val operation = new ExecuteStatement(session, client, remoteSessionHandle, statement, runAsync)
val operation = new ExecuteStatement(session, client, remoteSessionHandle, statement, runAsync,
getQueryTimeout(queryTimeout))
addOperation(operation)
}
override def newGetTypeInfoOperation(session: Session): Operation = {
@ -143,7 +156,6 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
addOperation(operation)
}
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation, maxRows: Int): TRowSet = {

View File

@ -18,13 +18,15 @@
package org.apache.kyuubi.engine.spark
import java.io.File
import java.nio.file.{Files, Path, Paths}
import java.nio.file.{Files, Path, Paths, StandardOpenOption}
import java.time.Duration
import java.util.concurrent.{Executors, TimeUnit}
import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_LOG_TIMEOUT
import org.apache.kyuubi.service.ServiceUtils
class SparkProcessBuilderSuite extends KerberizedTestHelper {
@ -179,6 +181,29 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
atomicTest()
}
}
test("overwrite log file should cleanup before write") {
val fakeWorkDir = Files.createTempDirectory("fake")
val conf = KyuubiConf()
conf.set(ENGINE_LOG_TIMEOUT, Duration.ofDays(1).toMillis)
val builder1 = new FakeSparkProcessBuilder(conf) {
override val workingDir: Path = fakeWorkDir
}
val file1 = builder1.engineLog
Files.write(file1.toPath, "a".getBytes(), StandardOpenOption.APPEND)
assert(file1.length() == 1)
Files.write(file1.toPath, "a".getBytes(), StandardOpenOption.APPEND)
assert(file1.length() == 2)
file1.setLastModified(System.currentTimeMillis() - Duration.ofDays(1).toMillis - 1000)
val builder2 = new FakeSparkProcessBuilder(conf) {
override val workingDir: Path = fakeWorkDir
}
val file2 = builder2.engineLog
assert(file1.getAbsolutePath == file2.getAbsolutePath)
Files.write(file2.toPath, "a".getBytes(), StandardOpenOption.APPEND)
assert(file2.length() == 1)
}
}
class FakeSparkProcessBuilder(config: KyuubiConf)

View File

@ -23,7 +23,7 @@
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi</artifactId>
<name>Kyuubi Project Parent</name>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.0</version>
<modules>
<module>dev/kyuubi-codecov</module>
<module>externals/kyuubi-download</module>
@ -68,7 +68,7 @@
<hadoop.version>3.2.2</hadoop.version>
<hive.version>2.3.7</hive.version>
<spark.version>3.0.2</spark.version>
<spark.hadoop.binary.version>3.2</spark.hadoop.binary.version>
<spark.hadoop.binary.version>2.7</spark.hadoop.binary.version>
<spark.archive.name>spark-${spark.version}-bin-hadoop${spark.hadoop.binary.version}.tgz</spark.archive.name>
<spark.archive.mirror>https://archive.apache.org/dist/spark/spark-${spark.version}</spark.archive.mirror>
<spark.archive.download.skip>false</spark.archive.download.skip>
@ -85,7 +85,8 @@
<maven.plugin.antrun.version>3.0.0</maven.plugin.antrun.version>
<maven.plugin.compiler.version>3.8.1</maven.plugin.compiler.version>
<maven.plugin.scala.version>4.4.0</maven.plugin.scala.version>
<!-- DO NOT bump 4.4.0, see https://github.com/yaooqinn/kyuubi/pull/441 -->
<maven.plugin.scala.version>4.3.0</maven.plugin.scala.version>
<maven.plugin.surefire.version>2.22.0</maven.plugin.surefire.version>
<maven.plugin.scalatest.version>2.0.0</maven.plugin.scalatest.version>
<maven.plugin.scalatest.exclude.tags></maven.plugin.scalatest.exclude.tags>