Compare commits

...

23 Commits

Author SHA1 Message Date
ulysses-you
3268a7e588
[KYUUBI #357] [Kyuubi-1.0] Update built-in Spark version to 3.0.2
Some checks failed
Kyuubi / Build (push) Has been cancelled
SL Scan / Scan-Build (push) Has been cancelled
![ulysses-you](https://badgen.net/badge/Hello/ulysses-you/green) [![Closes #357](https://badgen.net/badge/Preview/Closes%20%23357/blue)](https://github.com/yaooqinn/kyuubi/pull/357) ![12](https://badgen.net/badge/%2B/12/red) ![3](https://badgen.net/badge/-/3/green) ![2](https://badgen.net/badge/commits/2/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) ![Test Plan](https://badgen.net/badge/Missing/Test%20Plan/ff0000) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

backport [#356](https://github.com/yaooqinn/kyuubi/pull/356) for branch-1.0.
<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Spark 3.0.2 has been released now.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #357 from ulysses-you/branch-1.0-spark-3.0.2.

b5d4442 [ulysses-you] sync
2c5ee8e [ulysses-you] 3.0.2

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
2021-02-23 21:00:20 +08:00
Honglun
b1a53a9eb2
[KYUUBI #347]Closing expired operations does not require updating 'lastAccessTime'
![Honglun](https://badgen.net/badge/Hello/Honglun/green) [![Closes #348](https://badgen.net/badge/Preview/Closes%20%23348/blue)](https://github.com/yaooqinn/kyuubi/pull/348) ![5](https://badgen.net/badge/%2B/5/red) ![4](https://badgen.net/badge/-/4/green) ![2](https://badgen.net/badge/commits/2/yellow) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

… and 'lastTidleTime'

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Method 'session.closeExpiredOperations' will constantly update the values of '_lastAccessTime' and '_lastIdleTime', so the session can't be closed. Method 'closeExpiredOperations' should remove 'withAcquirerease'.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request
conf.set(KyuubiConf.SESSION_CHECK_INTERVAL,Duration.ofSeconds(5).toMillis)
conf.set(KyuubiConf.SESSION_TIMEOUT,Duration.ofMinutes(1L).toMillis)
![image](https://user-images.githubusercontent.com/3722800/106875807-0cfe5980-6712-11eb-8a2e-d899d9c96bc1.png)

Closes #348 from Honglun/master.

6cff013 [Honglun] Closing expired operations does not require updating 'lastAccessTime'.
a6330d7 [Honglun] Closing expired operations does not require updating 'lastAccessTime' and 'lastTidleTime'

Authored-by: Honglun <283471011@qq.com>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit c1a71b7d00)
Signed-off-by: Kent Yao <yao@apache.org>
2021-02-04 20:56:24 +08:00
Kent Yao
024cdf4414
Prepare Kyuubi v1.0.4 2021-02-03 19:47:58 +08:00
Kent Yao
9964d17b81
Release v1.0.3 2021-02-03 19:46:21 +08:00
ulysses-you
ca6fb53a9b
[KYUUBI #345] Catch Throwable instead of Exception
![ulysses-you](https://badgen.net/badge/Hello/ulysses-you/green) [![Closes #345](https://badgen.net/badge/Preview/Closes%20%23345/blue)](https://github.com/yaooqinn/kyuubi/pull/345) ![3](https://badgen.net/badge/%2B/3/red) ![1](https://badgen.net/badge/-/1/green) ![1](https://badgen.net/badge/commits/1/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) ![Test Plan](https://badgen.net/badge/Missing/Test%20Plan/ff0000) ![Bug](https://badgen.net/badge/Label/Bug/) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

We might get this error when running spark sql,so it's more reasonable to catch `Throwable`.
```
java.lang.NoClassDefFoundError: org/apache/hadoop/hive/serde2/SerDe
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
```

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #345 from ulysses-you/throwable.

b6ffacd [ulysses-you] throw

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit fbf1e3506c)
Signed-off-by: Kent Yao <yao@apache.org>
2021-02-03 19:31:40 +08:00
jhx1008
1932f6a06c
[KYUUBI #333] Fix hanging due to disordered service stopping
![yaooqinn](https://badgen.net/badge/Hello/yaooqinn/green) [![Closes #333](https://badgen.net/badge/Preview/Closes%20%23333/blue)](https://github.com/yaooqinn/kyuubi/pull/333) ![10](https://badgen.net/badge/%2B/10/red) ![6](https://badgen.net/badge/-/6/green) ![1](https://badgen.net/badge/commits/1/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

the service discovery should stop before the embedded zk server.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

```before
2021-01-31 11:26:15.800 ERROR server.KyuubiServer: RECEIVED SIGNAL 15: TERM
2021-01-31 11:26:15.801 INFO server.NIOServerCnxn: Closed socket connection for client /127.0.0.1:58707 which had sessionid 0x1000fbbfad70002
2021-01-31 11:26:15.801 INFO server.NIOServerCnxn: Closed socket connection for client /127.0.0.1:58490 which had sessionid 0x1000fbbfad70000
2021-01-31 11:26:15.801 INFO server.NIOServerCnxnFactory: NIOServerCnxn factory exited run method
2021-01-31 11:26:15.801 INFO zookeeper.ClientCnxn: Unable to read additional data from server sessionid 0x1000fbbfad70000, likely server has closed socket, closing socket connection and attempting reconnect
2021-01-31 11:26:15.801 INFO server.ZooKeeperServer: shutting down
2021-01-31 11:26:15.801 INFO server.SessionTrackerImpl: Shutting down
2021-01-31 11:26:15.801 INFO server.PrepRequestProcessor: Shutting down
2021-01-31 11:26:15.802 INFO server.KyuubiServer: Service: [ServiceDiscovery] is stopping.
2021-01-31 11:26:15.802 INFO server.SyncRequestProcessor: Shutting down
2021-01-31 11:26:15.802 INFO server.PrepRequestProcessor: PrepRequestProcessor exited loop!
2021-01-31 11:26:15.802 INFO server.SyncRequestProcessor: SyncRequestProcessor exited!
2021-01-31 11:26:15.802 INFO server.FinalRequestProcessor: shutdown of request processor complete
2021-01-31 11:26:15.804 INFO server.EmbeddedZkServer: Service[EmbeddedZkServer] is stopped.
2021-01-31 11:26:15.906 INFO state.ConnectionStateManager: State change: SUSPENDED
2021-01-31 11:26:15.907 INFO client.ServiceDiscovery: Zookeeper client connection state changed to: SUSPENDED
2021-01-31 11:26:17.615 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2021-01-31 11:26:17.615 INFO zookeeper.ClientCnxn: Socket error occurred: localhost/127.0.0.1:2181: Connection refused
2021-01-31 11:26:18.793 INFO server.SessionTrackerImpl: SessionTrackerImpl exited loop!
2021-01-31 11:26:19.126 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2021-01-31 11:26:19.126 INFO zookeeper.ClientCnxn: Socket error occurred: localhost/127.0.0.1:2181: Connection refused
2021-01-31 11:26:20.815 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2021-01-31 11:26:20.815 INFO zookeeper.ClientCnxn: Socket error occurred: localhost/127.0.0.1:2181: Connection refused
2021-01-31 11:26:21.947 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2021-01-31 11:26:21.948 INFO zookeeper.ClientCnxn: Socket error occurred: localhost/127.0.0.1:2181: Connection refused
2021-01-31 11:26:23.288 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2021-01-31 11:26:23.289 INFO zookeeper.ClientCnxn: Socket error occurred: localhost/127.0.0.1:2181: Connection refused
2021-01-31 11:26:25.075 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2021-01-31 11:26:25.075 INFO zookeeper.ClientCnxn: Socket error occurred: localhost/127.0.0.1:2181: Connection refused
2021-01-31 11:26:26.965 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2021-01-31 11:26:26.965 INFO zookeeper.ClientCnxn: Socket error occurred: localhost/127.0.0.1:2181: Connection refused
2021-01-31 11:26:28.412 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2021-01-31 11:26:28.412 INFO zookeeper.ClientCnxn: Socket error occurred: localhost/127.0.0.1:2181: Connection refused
2021-01-31 11:26:30.356 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2021-01-31 11:26:30.356 INFO zookeeper.ClientCnxn: Socket error occurred: localhost/127.0.0.1:2181: Connection refused
2021-01-31 11:26:31.751 ERROR curator.ConnectionState: Connection timed out for connection string (127.0.0.1:2181) and timeout (15000) / elapsed (15841)
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
```
```after
2021-01-31 11:49:15.070 ERROR server.KyuubiServer: RECEIVED SIGNAL 15: TERM
2021-01-31 11:49:15.071 INFO server.KyuubiServer: Service: [ServiceDiscovery] is stopping.
2021-01-31 11:49:15.074 INFO server.PrepRequestProcessor: Processed session termination for sessionid: 0x1000fd1db370000
2021-01-31 11:49:15.074 WARN client.ServiceDiscovery: This Kyuubi instance localhost:10009 is now de-registered from ZooKeeper. The server will be shut down after the last client session completes.
2021-01-31 11:49:15.074 INFO client.ServiceDiscovery: Service[ServiceDiscovery] is stopped.
2021-01-31 11:49:15.075 INFO server.NIOServerCnxn: Closed socket connection for client /127.0.0.1:63940 which had sessionid 0x1000fd1db370000
2021-01-31 11:49:15.075 INFO zookeeper.ZooKeeper: Session: 0x1000fd1db370000 closed
2021-01-31 11:49:15.076 WARN client.ServiceDiscovery: Service[ServiceDiscovery] is not started(STOPPED) yet.
2021-01-31 11:49:15.076 INFO server.KyuubiServer: Service: [FrontendService] is stopping.
2021-01-31 11:49:15.078 INFO service.FrontendService: FrontendService has stopped
2021-01-31 11:49:15.078 INFO service.FrontendService: Service[FrontendService] is stopped.
2021-01-31 11:49:15.078 INFO server.KyuubiServer: Service: [KyuubiBackendService] is stopping.
2021-01-31 11:49:15.078 INFO server.KyuubiBackendService: Service: [KyuubiSessionManager] is stopping.
2021-01-31 11:49:15.078 INFO session.KyuubiSessionManager: Service: [KyuubiOperationManager] is stopping.
2021-01-31 11:49:15.078 INFO operation.KyuubiOperationManager: Service[KyuubiOperationManager] is stopped.
2021-01-31 11:49:15.078 INFO session.KyuubiSessionManager: Service[KyuubiSessionManager] is stopped.
2021-01-31 11:49:15.079 INFO server.KyuubiBackendService: Service[KyuubiBackendService] is stopped.
2021-01-31 11:49:15.079 INFO server.KyuubiServer: Service: [KinitAuxiliaryService] is stopping.
2021-01-31 11:49:15.079 INFO service.KinitAuxiliaryService: Service[KinitAuxiliaryService] is stopped.
2021-01-31 11:49:15.079 INFO server.KyuubiServer: Service[KyuubiServer] is stopped.
2021-01-31 11:49:15.079 INFO server.NIOServerCnxn: Closed socket connection for client /127.0.0.1:64039 which had sessionid 0x1000fd1db370002
2021-01-31 11:49:15.079 INFO server.NIOServerCnxnFactory: NIOServerCnxn factory exited run method
2021-01-31 11:49:15.079 INFO server.ZooKeeperServer: shutting down
2021-01-31 11:49:15.079 INFO server.SessionTrackerImpl: Shutting down
2021-01-31 11:49:15.079 INFO server.PrepRequestProcessor: Shutting down
2021-01-31 11:49:15.080 INFO server.SyncRequestProcessor: Shutting down
2021-01-31 11:49:15.080 INFO server.PrepRequestProcessor: PrepRequestProcessor exited loop!
2021-01-31 11:49:15.080 INFO server.SyncRequestProcessor: SyncRequestProcessor exited!
2021-01-31 11:49:15.080 INFO server.FinalRequestProcessor: shutdown of request processor complete
2021-01-31 11:49:15.081 INFO server.EmbeddedZkServer: Service[EmbeddedZkServer] is stopped.
```

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #333 from yaooqinn/hang.

945e1ba [jhx1008] Fix hanging due to disordered service stopping

Authored-by: jhx1008 <jhx1008@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit 9db0088ae1)
Signed-off-by: Kent Yao <yao@apache.org>
2021-01-31 13:30:55 +08:00
ulysses-you
45f6bb310b
[KYUUBI #303][FOLLOWUP] Fix test
fix #305
Squashed commit of the following:

commit f67ca84bafcd7fa43ae62a13edc6b044e9e7916f
Author: ulysses-you <ulyssesyou18@gmail.com>
Date:   Tue Jan 26 20:36:10 2021 +0800

    init
2021-01-26 21:56:05 +08:00
ulysses-you
0ac6dffe81
[KYUUBI #303] Add host:port msg if we failed to initialize
![ulysses-you](https://badgen.net/badge/Hello/ulysses-you/green) [![PR 303](https://badgen.net/badge/Preview/PR%20303/blue)](https://github.com/yaooqinn/kyuubi/pull/303) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
-->

<!-- replace ${issue ID} with the actual issue id -->
Fixes #303

<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the user case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

Before this PR, if we start Kyuubi with an already used port, we will get this error msg.
It's better to print host:port which we bind.
```
Exception in thread "main" org.apache.kyuubi.KyuubiException: Failed to initialize frontend service
	at org.apache.kyuubi.service.FrontendService.initialize(FrontendService.scala:102)
	at org.apache.kyuubi.service.CompositeService.$anonfun$initialize$1(CompositeService.scala:40)
	at org.apache.kyuubi.service.CompositeService.$anonfun$initialize$1$adapted(CompositeService.scala:40)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.kyuubi.service.CompositeService.initialize(CompositeService.scala:40)
	at org.apache.kyuubi.service.Serverable.initialize(Serverable.scala:37)
	at org.apache.kyuubi.server.KyuubiServer.initialize(KyuubiServer.scala:88)
	at org.apache.kyuubi.server.KyuubiServer$.startServer(KyuubiServer.scala:45)
	at org.apache.kyuubi.server.KyuubiServer$.main(KyuubiServer.scala:74)
	at org.apache.kyuubi.server.KyuubiServer.main(KyuubiServer.scala)
Caused by: java.net.BindException: Address already in use
	at java.net.PlainSocketImpl.socketBind(Native Method)
	at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)
	at java.net.ServerSocket.bind(ServerSocket.java:375)
	at java.net.ServerSocket.<init>(ServerSocket.java:237)
	at org.apache.kyuubi.service.FrontendService.initialize(FrontendService.scala:77)
	... 11 more
```

- Add some test cases that check the changes thoroughly including negative and positive cases if possible
- Add screenshots for manual tests if appropriate
- [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request
Squashed commit of the following:

commit c3f5ffa3dae5d9822ae9b0d8eec879e971ccdea2
Author: ulysses-you <ulyssesyou18@gmail.com>
Date:   Tue Jan 26 18:23:40 2021 +0800

    fix

commit 42d333de2c1edc0373c737053ff6c28b48a41cf7
Author: ulysses-you <ulyssesyou18@gmail.com>
Date:   Tue Jan 26 14:44:35 2021 +0800

    err msg
2021-01-26 18:40:46 +08:00
Kent Yao
a6545a4e45
Remove dulicated error msg for open sessions at engine side (#293) 2021-01-20 17:19:29 +08:00
ulysses-you
982041e416
[KYUUBI #302]fix zk client not release if open session failed
![ulysses-you](https://badgen.net/badge/Hello/ulysses-you/green) [![PR 302](https://badgen.net/badge/Preview/PR%20302/blue)](https://github.com/yaooqinn/kyuubi/pull/302) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
-->

<!-- replace ${issue ID} with the actual issue id -->
Fixes #302

<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the user case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Fix zk client leak problem.

- Add some test cases that check the changes thoroughly including negative and positive cases if possible
- Add screenshots for manual tests if appropriate
- [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request
2021-01-20 14:44:44 +08:00
Kent Yao
db84c06e32
Prepare Kyuubi v1.0.3 2021-01-13 17:43:30 +08:00
Kent Yao
546163cbf1
Release v1.0.2 2021-01-13 17:42:01 +08:00
ulysses-you
70c28cbc61
[KYUUBI#209][Server] Fix high cpu load due to log capture thread not release
fixes #279
Squashed commit of the following:

commit adc811370380a74783cc22c21c02de0d15ea73fd
Author: ulysses-you <ulyssesyou18@gmail.com>
Date:   Wed Jan 13 14:26:38 2021 +0800

    remove sleep

commit 41ad1172e9be59a9d88d0cdf22d5462690308b06
Author: ulysses-you <ulyssesyou18@gmail.com>
Date:   Wed Jan 13 11:58:43 2021 +0800

    comment

commit fa7a7e90637818662aadc59c4f349316d66aa0a7
Author: ulysses-you <ulyssesyou18@gmail.com>
Date:   Wed Jan 13 11:54:46 2021 +0800

    add test

commit 9f579e89a999aca9c98f424f5a748f2cd0fae3fb
Author: ulysses-you <ulyssesyou18@gmail.com>
Date:   Wed Jan 13 11:54:38 2021 +0800

    release log capture thread
2021-01-13 17:38:12 +08:00
Kent Yao
eb31f471e0
Handle InvocationTargetException and UndeclaredThrowableException in KSE (#274)
* Handle InvocationTargetException and UndeclaredThrowableException

* nit
2021-01-13 11:34:31 +08:00
Kent Yao
33bdbbdab0
Support launch SparkProcessBuilder with keytab and principal (#269)
* Support launch SparkProcessBuilder with keytab and principal

* fix tests
2021-01-13 11:34:14 +08:00
ulysses-you
8a997956ab
Fix EmbeddedZkServerSuite embedded zookeeper server NPE problem
fixes #267
Squashed commit of the following:

commit 38b3278e2bd4285dabeee093df855a1497ad35f8
Author: ulysses-you <ulyssesyou18@gmail.com>
Date:   Fri Jan 8 12:43:14 2021 +0800

    user name

commit 76c5691f4a9dc419668d8610f5cf148d266ff46c
Author: cathy <cathy@cathydeMacBook-Pro.local>
Date:   Fri Jan 8 12:11:31 2021 +0800

    move to stop

commit 15e0ca8f7ec5e3fa27fda3572dc44ab3fa83c9ac
Author: cathy <cathy@cathydeMacBook-Pro.local>
Date:   Fri Jan 8 11:50:49 2021 +0800

    init
2021-01-08 13:10:55 +08:00
Kent Yao
357c12a90f
Prepare Kyuubi v1.0.2 2021-01-06 14:00:35 +08:00
Kent Yao
37ff5028d5
Release v1.0.1 2021-01-06 13:59:41 +08:00
Kent Yao
90cc63ec84
ignore (#265) 2021-01-04 16:39:02 +08:00
Kent Yao
e1b39c811a
Serverable suites 2021-01-04 16:38:24 +08:00
Kent Yao
3613b9c7a1
Add SparkSQLEngineListener 2021-01-03 02:10:01 +08:00
Kent Yao
b1f5461676
Prepare Kyuubi v1.0.1 2020-12-31 15:50:26 +08:00
Kent Yao
d03235a2cc
Release Kyuubi v1.0.0 2020-12-31 15:48:56 +08:00
31 changed files with 308 additions and 81 deletions

View File

@ -66,7 +66,16 @@ fi
SPARK_BUILTIN="${KYUUBI_HOME}/externals/spark-$SPARK_VERSION_BUILD-bin-hadoop${HADOOP_VERSION_BUILD:0:3}$HIVE_VERSION_SUFFIX"
if [[ ! -d ${SPARK_BUILTIN} ]]; then
SPARK_BUILTIN="${KYUUBI_HOME}/externals/kyuubi-download/target/spark-3.0.1-bin-hadoop2.7"
MVN="${MVN:-"${KYUUBI_HOME}/build/mvn"}"
SPARK_VERSION_BUILD=$("$MVN" help:evaluate -Dexpression=spark.version 2>/dev/null\
| grep -v "INFO"\
| grep -v "WARNING"\
| tail -n 1)
HADOOP_VERSION_BUILD=$("$MVN" help:evaluate -Dexpression=hadoop.version 2>/dev/null\
| grep -v "INFO"\
| grep -v "WARNING"\
| tail -n 1)
SPARK_BUILTIN="${KYUUBI_HOME}/externals/kyuubi-download/target/spark-$SPARK_VERSION_BUILD-bin-hadoop${HADOOP_VERSION_BUILD:0:3}"
fi
export SPARK_HOME="${SPARK_HOME:-"${SPARK_BUILTIN}"}"

View File

@ -22,7 +22,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.4-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -22,7 +22,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.4-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -54,7 +54,7 @@ limitations under the License.
author = 'Kent Yao'
# The full version, including alpha/beta/rc tags
release = '1.0.0'
release = '1.0.4'
# -- General configuration ---------------------------------------------------

View File

@ -28,4 +28,4 @@ For instance,
./build/dist --name custom-name --tgz
```
This results a Kyuubi distribution named `kyuubi-{version}-bin-custom-name-spark-3.0.1.tar.gz` for you.
This results a Kyuubi distribution named `kyuubi-{version}-bin-custom-name.tar.gz` for you.

View File

@ -22,7 +22,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.4-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -22,7 +22,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.4-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -40,6 +40,8 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
private val discoveryService = new ServiceDiscovery(this)
override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
spark.sparkContext.addSparkListener(listener)
super.initialize(conf)
if (ServiceDiscovery.supportServiceDiscovery(conf)) {
addService(discoveryService)
@ -49,7 +51,6 @@ private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
override protected def stopServer(): Unit = {
countDownLatch.countDown()
spark.stop()
}
}
@ -112,9 +113,11 @@ object SparkSQLEngine extends Logging {
error("Error start SparkSQLEngine", t)
if (engine != null) {
engine.stop()
} else if (spark != null) {
spark.stop()
}
} finally {
if (spark != null) {
spark.stop()
}
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.kyuubi.Logging
import org.apache.kyuubi.service.{Serverable, ServiceState}
class SparkSQLEngineListener(server: Serverable) extends SparkListener with Logging {
override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
server.getServiceState match {
case ServiceState.STOPPED => debug("Received ApplicationEnd Message form Spark after the" +
" engine has stopped")
case state =>
info(s"Received ApplicationEnd Message from Spark at $state, stopping")
server.stop()
}
}
}

View File

@ -85,7 +85,9 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
}
protected def onError(cancel: Boolean = false): PartialFunction[Throwable, Unit] = {
case e: Exception =>
// We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
// could be thrown.
case e: Throwable =>
if (cancel) spark.sparkContext.cancelJobGroup(statementId)
state.synchronized {
val errMsg = KyuubiSQLException.stringifyException(e)

View File

@ -71,7 +71,7 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
} catch {
case e: Exception =>
sessionImpl.close()
throw KyuubiSQLException(s"Error opening session $handle for $user: ${e.getMessage}", e)
throw KyuubiSQLException(e)
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.ServiceState
class SparkSQLEngineListenerSuite extends KyuubiFunSuite {
override def beforeAll(): Unit = {
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
super.beforeAll()
}
test("application end") {
val spark = SparkSession
.builder().master("local").config("spark.ui.port", "0").getOrCreate()
val engine = new SparkSQLEngine(spark)
engine.initialize(KyuubiConf())
engine.start()
assert(engine.getServiceState === ServiceState.STARTED)
spark.stop()
assert(engine.getServiceState === ServiceState.STOPPED)
}
}

View File

@ -39,6 +39,8 @@ trait WithSparkSQLEngine extends JDBCTests {
System.setProperty("spark.sql.warehouse.dir", warehousePath.toString)
System.setProperty("spark.sql.hive.metastore.sharedPrefixes", "org.apache.hive.jdbc")
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
protected val spark: SparkSession = SparkSQLEngine.createSpark()
protected var engine: SparkSQLEngine = _
@ -56,6 +58,7 @@ trait WithSparkSQLEngine extends JDBCTests {
if (engine != null) {
engine.stop()
}
spark.stop()
SessionState.detachSession()
Hive.closeCurrent()
}

View File

@ -24,7 +24,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -18,8 +18,10 @@
package org.apache.kyuubi
import java.io.{PrintWriter, StringWriter}
import java.lang.reflect.{InvocationTargetException, UndeclaredThrowableException}
import java.sql.SQLException
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.{TStatus, TStatusCode}
@ -46,10 +48,11 @@ object KyuubiSQLException {
private final val SEPARATOR: Char = ':'
def apply(msg: String, throwable: Throwable): KyuubiSQLException = {
new KyuubiSQLException(msg, throwable)
new KyuubiSQLException(msg, findCause(throwable))
}
def apply(cause: Throwable): KyuubiSQLException = {
new KyuubiSQLException(cause.getMessage, cause)
val theCause = findCause(cause)
new KyuubiSQLException(theCause.getMessage, theCause)
}
def apply(msg: String): KyuubiSQLException = new KyuubiSQLException(msg, null)
@ -157,4 +160,10 @@ object KyuubiSQLException {
stm.toString
}
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e @ (_: UndeclaredThrowableException | _: InvocationTargetException)
if e.getCause != null => findCause(e.getCause)
case e => e
}
}

View File

@ -99,7 +99,8 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
s" [$minThreads, $maxThreads] worker threads")
} catch {
case e: Throwable =>
throw new KyuubiException("Failed to initialize frontend service", e)
throw new KyuubiException(
s"Failed to initialize frontend service on $serverAddr:$portNum.", e)
}
super.initialize(conf)
}

View File

@ -47,14 +47,19 @@ abstract class Serverable(name: String) extends CompositeService(name) {
override def stop(): Unit = synchronized {
try {
stopServer()
} catch {
case t: Throwable =>
warn(s"Error stopping spark ${t.getMessage}", t)
} finally {
if (started.getAndSet(false)) {
super.stop()
}
} catch {
case t: Throwable =>
warn(s"Error stopping $name ${t.getMessage}", t)
} finally {
try {
stopServer()
} catch {
case t: Throwable =>
warn(s"Error stopping spark ${t.getMessage}", t)
}
}
}
}

View File

@ -61,10 +61,11 @@ abstract class AbstractSession(
private def release(userAccess: Boolean): Unit = {
if (userAccess) {
_lastAccessTime = System.currentTimeMillis
if (opHandleSet.isEmpty) {
_lastIdleTime = System.currentTimeMillis
}
}
if (opHandleSet.isEmpty) {
_lastIdleTime = System.currentTimeMillis
} else {
if (!opHandleSet.isEmpty) {
_lastIdleTime = 0
}
}
@ -211,7 +212,7 @@ abstract class AbstractSession(
}
}
override def closeExpiredOperations: Unit = withAcquireRelease() {
override def closeExpiredOperations: Unit = withAcquireRelease(false) {
val operations = sessionManager.operationManager
.removeExpiredOperations(opHandleSet.asScala.toSeq)
operations.foreach { op =>

View File

@ -17,6 +17,8 @@
package org.apache.kyuubi
import java.lang.reflect.{InvocationTargetException, UndeclaredThrowableException}
import org.apache.hive.service.rpc.thrift.TStatusCode
class KyuubiSQLExceptionSuite extends KyuubiFunSuite {
@ -50,4 +52,18 @@ class KyuubiSQLExceptionSuite extends KyuubiFunSuite {
assert(e5.getMessage === msg0)
assert(e5.getCause === e0)
}
test("find the root cause") {
val theCause = new RuntimeException("this is just a dummy message but shall be seen")
val ite1 = new InvocationTargetException(theCause)
val ute1 = new UndeclaredThrowableException(ite1)
val ute2 = new UndeclaredThrowableException(ute1)
val ite2 = new InvocationTargetException(ute2)
val ke = KyuubiSQLException(ite2)
assert(ke.getMessage == theCause.getMessage)
assert(ke.getCause == theCause)
val cornerCase = new InvocationTargetException(null)
assert(KyuubiSQLException(cornerCase).getCause === cornerCase)
}
}

View File

@ -22,16 +22,7 @@ import org.apache.kyuubi.config.KyuubiConf
class ServerableSuite extends KyuubiFunSuite {
test("Serverable") {
val serverable = new NoopServer()
serverable.stop()
assert(serverable.getStartTime === 0)
assert(serverable.getConf === null)
assert(serverable.getName === "noop")
intercept[IllegalStateException](serverable.connectionUrl)
assert(serverable.getServiceState === ServiceState.LATENT)
intercept[IllegalStateException](serverable.start())
ignore("Serverable") {
val serverable1 = new NoopServer()
val conf = KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 0)
serverable1.initialize(conf)
@ -54,7 +45,7 @@ class ServerableSuite extends KyuubiFunSuite {
test("invalid port") {
val conf = KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 100)
val e = intercept[KyuubiException](new NoopServer().initialize(conf))
assert(e.getMessage === "Failed to initialize frontend service")
assert(e.getMessage.contains("Failed to initialize frontend service"))
assert(e.getCause.getMessage === "Invalid Port number")
}

View File

@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi</artifactId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.4-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -58,6 +58,12 @@ class EmbeddedZkServer private(name: String) extends AbstractService(name) with
override def stop(): Unit = {
if (server != null) {
// Just a tradeoff, otherwise we may get NPE if we call stop() immediately.
// (e.g. the unit test EmbeddedZkServerSuite)
// TestingZooKeeperMain has a bug that the CountDownLatch released before cnxnFactory inited.
// More details could see TestingZooKeeperMain.runFromConfig.
while (getStartTime > 0 && System.currentTimeMillis() - getStartTime < 5000) {
}
server.close()
server = null
}

View File

@ -22,7 +22,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -65,6 +65,16 @@
<artifactId>hive-jdbc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.directory.server</groupId>
<artifactId>apacheds-service</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -56,6 +56,8 @@ trait ProcBuilder {
}
@volatile private var error: Throwable = UNCAUGHT_ERROR
// Visible for test
private[kyuubi] var logCaptureThread: Thread = null
final def start: Process = synchronized {
val procLog = Paths.get(workingDir.toAbsolutePath.toString,
@ -86,15 +88,23 @@ trait ProcBuilder {
}
} catch {
case _: IOException =>
case _: InterruptedException =>
} finally {
reader.close()
}
}
PROC_BUILD_LOGGER.newThread(redirect).start()
logCaptureThread = PROC_BUILD_LOGGER.newThread(redirect)
logCaptureThread.start()
proc
}
def close(): Unit = {
if (logCaptureThread != null) {
logCaptureThread.interrupt()
}
}
def getError: Throwable = synchronized {
if (error == UNCAUGHT_ERROR) {
Thread.sleep(3000)

View File

@ -17,10 +17,13 @@
package org.apache.kyuubi.engine.spark
import java.io.IOException
import java.nio.file.{Files, Path, Paths}
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAIN_RESOURCE
@ -108,8 +111,11 @@ class SparkProcessBuilder(
buffer += CONF
buffer += s"$k=$v"
}
buffer += PROXY_USER
buffer += proxyUser
// iff the keytab is specified, PROXY_USER is not supported
if (!useKeytab()) {
buffer += PROXY_USER
buffer += proxyUser
}
mainResource.foreach { r => buffer += r }
@ -119,11 +125,32 @@ class SparkProcessBuilder(
override def toString: String = commands.mkString(" ")
override protected def module: String = "kyuubi-spark-sql-engine"
private def useKeytab(): Boolean = {
val principal = conf.get(PRINCIPAL)
val keytab = conf.get(KEYTAB)
if (principal.isEmpty || keytab.isEmpty) {
false
} else {
try {
val ugi = UserGroupInformation
.loginUserFromKeytabAndReturnUGI(principal.get, keytab.get)
ugi.getShortUserName == proxyUser
} catch {
case e: IOException =>
error(s"Failed to login for ${principal.get}", e)
false
}
}
}
}
object SparkProcessBuilder {
final val APP_KEY = "spark.app.name"
private final val CONF = "--conf"
private final val CLASS = "--class"
private final val PROXY_USER = "--proxy-user"
final val APP_KEY = "spark.app.name"
private final val PRINCIPAL = "spark.kerberos.principal"
private final val KEYTAB = "spark.kerberos.keytab"
}

View File

@ -36,7 +36,6 @@ object KyuubiServer extends Logging {
if (!ServiceDiscovery.supportServiceDiscovery(conf)) {
zkServer.initialize(conf)
zkServer.start()
sys.addShutdownHook(zkServer.stop())
conf.set(HA_ZK_QUORUM, zkServer.getConnectString)
conf.set(HA_ZK_ACL_ENABLED, false)
}
@ -82,7 +81,7 @@ class KyuubiServer(name: String) extends Serverable(name) {
override private[kyuubi] val backendService: AbstractBackendService = new KyuubiBackendService()
private val discoveryService = new ServiceDiscovery(this)
override def initialize(conf: KyuubiConf): Unit = {
override def initialize(conf: KyuubiConf): Unit = synchronized {
val kinit = new KinitAuxiliaryService()
addService(kinit)
super.initialize(conf)

View File

@ -100,38 +100,47 @@ class KyuubiSessionImpl(
super.open()
// Init zookeeper client here to capture errors
zkClient
getServerHost match {
case Some((host, port)) => openSession(host, port)
case None =>
sessionConf.set(SparkProcessBuilder.APP_KEY, boundAppName.toString)
sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace)
val builder = new SparkProcessBuilder(appUser, sessionConf.toSparkPrefixedConf)
val process = builder.start
info(s"Launching SQL engine: $builder")
var sh = getServerHost
val started = System.currentTimeMillis()
var exitValue: Option[Int] = None
while (sh.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
exitValue = Some(process.exitValue())
if (exitValue.get != 0) {
throw builder.getError
}
}
if (started + timeout <= System.currentTimeMillis()) {
process.destroyForcibly()
throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder",
builder.getError)
}
sh = getServerHost
}
val Some((host, port)) = sh
openSession(host, port)
}
try {
zkClient.close()
} catch {
case e: IOException => error("Failed to release the zkClient after session established", e)
getServerHost match {
case Some((host, port)) => openSession(host, port)
case None =>
sessionConf.set(SparkProcessBuilder.APP_KEY, boundAppName.toString)
sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace)
val builder = new SparkProcessBuilder(appUser, sessionConf.toSparkPrefixedConf)
try {
val process = builder.start
info(s"Launching SQL engine: $builder")
var sh = getServerHost
val started = System.currentTimeMillis()
var exitValue: Option[Int] = None
while (sh.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
exitValue = Some(process.exitValue())
if (exitValue.get != 0) {
throw builder.getError
}
}
if (started + timeout <= System.currentTimeMillis()) {
process.destroyForcibly()
throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder",
builder.getError)
}
sh = getServerHost
}
val Some((host, port)) = sh
openSession(host, port)
} finally {
// we must close the process builder whether session open is success or failure since
// we have a log capture thread in process builder.
builder.close()
}
}
} finally {
try {
zkClient.close()
} catch {
case e: IOException => error("Failed to release the zkClient after session established", e)
}
}
}

View File

@ -20,11 +20,12 @@ package org.apache.kyuubi.engine.spark
import java.nio.file.{Files, Paths}
import java.util.concurrent.TimeUnit
import org.apache.kyuubi.{KyuubiFunSuite, KyuubiSQLException}
import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.service.ServiceUtils
class SparkProcessBuilderSuite extends KyuubiFunSuite {
class SparkProcessBuilderSuite extends KerberizedTestHelper {
private val conf = KyuubiConf()
.set(EMBEDDED_ZK_PORT, 5555)
.set(EMBEDDED_ZK_TEMP_DIR, "spark_process_test")
@ -71,4 +72,47 @@ class SparkProcessBuilderSuite extends KyuubiFunSuite {
error1.getMessage.contains("Caused by: org.apache.hadoop.hive.ql.metadata.HiveException:"))
}
test("proxy user or keytab") {
val b1 = new SparkProcessBuilder("kentyao", conf)
assert(b1.toString.contains("--proxy-user kentyao"))
val conf1 = conf ++ Map("spark.kerberos.principal" -> testPrincipal)
val b2 = new SparkProcessBuilder("kentyao", conf1)
assert(b2.toString.contains("--proxy-user kentyao"))
val conf2 = conf ++ Map("spark.kerberos.keytab" -> testKeytab)
val b3 = new SparkProcessBuilder("kentyao", conf2)
assert(b3.toString.contains("--proxy-user kentyao"))
tryWithSecurityEnabled {
val conf3 = conf ++ Map("spark.kerberos.principal" -> testPrincipal,
"spark.kerberos.keytab" -> "testKeytab")
val b4 = new SparkProcessBuilder(Utils.currentUser, conf3)
assert(b4.toString.contains(s"--proxy-user ${Utils.currentUser}"))
val conf4 = conf ++ Map("spark.kerberos.principal" -> testPrincipal,
"spark.kerberos.keytab" -> testKeytab)
val b5 = new SparkProcessBuilder("kentyao", conf4)
assert(b5.toString.contains("--proxy-user kentyao"))
val b6 = new SparkProcessBuilder(ServiceUtils.getShortName(testPrincipal), conf4)
assert(!b6.toString.contains("--proxy-user kentyao"))
}
}
test("log capture should release after close") {
val process = new FakeSparkProcessBuilder
try {
val subProcess = process.start
assert(!process.logCaptureThread.isInterrupted)
subProcess.waitFor(3, TimeUnit.SECONDS)
} finally {
process.close()
}
assert(process.logCaptureThread.isInterrupted)
}
}
class FakeSparkProcessBuilder extends SparkProcessBuilder("fake", Map.empty) {
override protected def commands: Array[String] = Array("ls")
}

View File

@ -60,7 +60,7 @@ class KyuubiServerSuite extends KyuubiFunSuite {
ignore("invalid port") {
val conf = KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 100)
val e = intercept[KyuubiException](new KyuubiServer().initialize(conf))
assert(e.getMessage === "Failed to initialize frontend service")
assert(e.getMessage.contains("Failed to initialize frontend service"))
assert(e.getCause.getMessage === "Invalid Port number")
}
}

View File

@ -23,7 +23,7 @@
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi</artifactId>
<name>Kyuubi Project Parent</name>
<version>1.0.0-SNAPSHOT</version>
<version>1.0.4-SNAPSHOT</version>
<modules>
<module>kyuubi-common</module>
<module>kyuubi-ha</module>
@ -65,7 +65,7 @@
<scalatest.version>3.0.3</scalatest.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.version>3.6.3</maven.version>
<spark.version>3.0.1</spark.version>
<spark.version>3.0.2</spark.version>
<log4j.version>1.7.30</log4j.version>
</properties>