Compare commits

...

10 Commits

Author SHA1 Message Date
Bowen Liang
d45a60de6e [KYUUBI #5754] [MINOR] Fix engine id of the engine probe failures logging messages
Some checks failed
CI / Kyuubi and Spark Test (normal, , 11, 3.1, ) (push) Has been cancelled
CI / Kyuubi and Spark Test (normal, , 11, 3.2, ) (push) Has been cancelled
CI / Kyuubi and Spark Test (normal, , 11, 3.3, ) (push) Has been cancelled
CI / Kyuubi and Spark Test (normal, , 8, 3.1, ) (push) Has been cancelled
CI / Kyuubi and Spark Test (normal, , 8, 3.2, ) (push) Has been cancelled
CI / Kyuubi and Spark Test (normal, , 8, 3.3, ) (push) Has been cancelled
CI / Kyuubi and Spark Test (verify-on-spark-3.1-binary, -Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.IcebergTest, 8, 3.2, -Dspark.archive.mirror=https:/… (push) Has been cancelled
CI / Kyuubi and Spark Test (verify-on-spark-3.3-binary, -Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.IcebergTest, 8, 3.2, -Dspark.archive.mirror=https:/… (push) Has been cancelled
CI / Flink Test (normal, 1.14, , 11) (push) Has been cancelled
CI / Flink Test (normal, 1.14, , 8) (push) Has been cancelled
CI / Flink Test (normal, 1.15, , 11) (push) Has been cancelled
CI / Flink Test (normal, 1.15, , 8) (push) Has been cancelled
CI / Flink Test (verify-on-flink-1.14-binary, 1.15, -Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.14.5 -Dflink.archive.name=flink-1.14.5-bin-scala_2.12.tgz, 8) (push) Has been cancelled
CI / Hive Test (normal, 8) (push) Has been cancelled
CI / JDBC Test (normal, 11) (push) Has been cancelled
CI / JDBC Test (normal, 8) (push) Has been cancelled
CI / Trino Test (normal, 11) (push) Has been cancelled
CI / Trino Test (normal, 8) (push) Has been cancelled
CI / TPC-H and TPC-DS Tests (push) Has been cancelled
CI / Kyuubi Server On Kubernetes Integration Test (push) Has been cancelled
CI / Spark Engine On Kubernetes Integration Test (push) Has been cancelled
# 🔍 Description
## Issue References 🔗

As described.

## Describe Your Solution 🔧

Fix the engine id in engine probe failures messages.

![image](https://github.com/apache/kyuubi/assets/1935105/6de038e7-efab-4197-a425-a5298259174d)

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklists
## 📝 Author Self Checklist

- [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [ ] Pull request title is okay.
- [ ] No license issues.
- [ ] Milestone correctly set?
- [ ] Test coverage is ok
- [ ] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested

**Be nice. Be informative.**

Closes #5754 from bowenliang123/engineid-some.

Closes #5754

893678472 [Bowen Liang] fix engine id of the engine probe failures logging messages

Authored-by: Bowen Liang <liangbowen@gf.com.cn>
Signed-off-by: Bowen Liang <liangbowen@gf.com.cn>
(cherry picked from commit f7bbe5b9de)
Signed-off-by: Bowen Liang <liangbowen@gf.com.cn>
2023-11-23 15:56:41 +08:00
maruilei
39df2d974a
[KYUUBI #4627] [Docs] Fix a typo in rest_api.md.
### _Why are the changes needed?_

`GET /batches/{batchId}` should be `GET /batches/${batchId}`

Affected versions 1.6/1.7/master

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4627 from merrily01/master-typo.

Closes #4627

ac386f640 [maruilei] [Docs] Fix a typo in rest_api.md.

Authored-by: maruilei <maruilei@58.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit 6d8fe6c381)
Signed-off-by: Cheng Pan <chengpan@apache.org>
2023-03-28 18:15:15 +08:00
liangbowen
0101167df4 [KYUUBI #4489] [REST] Fix missing org.apache.commons.logging by including jcl-over-slf4j dependency in REST client
### _Why are the changes needed?_

- To fix missing package `org.apache.commons.logging` used by  `org.apache.httpcomponents:httpclient`in `kyuubi-rest-client`

![image](https://user-images.githubusercontent.com/1935105/224203940-246db855-0ffa-469e-8a67-58143e6e99e3.png)

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4489 from bowenliang123/httpclient-commonlogging.

Closes #4489

ccaff9b62 [liangbowen] Include `jcl-over-slf4j` dependency
740e56158 [liangbowen] Revert "skip excluding commons-logging in rest-client"
4d9ad5dcf [liangbowen] skip excluding commons-logging in rest-client

Authored-by: liangbowen <liangbowen@gf.com.cn>
Signed-off-by: liangbowen <liangbowen@gf.com.cn>
(cherry picked from commit 61a6096043)
Signed-off-by: liangbowen <liangbowen@gf.com.cn>
2023-03-10 11:20:16 +08:00
Paul Lin
efc0b1c066
[KYUUBI #4446] Fix connections blocked by Flink insert statements
### _Why are the changes needed?_
Flink 1.15 refactors the result fetching of insert statements and now `TableResult.await()` would block till the insert finishes. We could remove this line because the insert results are immediately available as other non-job statements.

Flink JIRA: https://issues.apache.org/jira/browse/FLINK-24461
Critical changes: https://github.com/apache/flink/pull/17441/files#diff-ec88f0e06d880b53e2f152113ab1a4240a820cbb7248815c5f9ecf9ab4fce4caR108

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4485 from link3280/KYUUBI-4446.

Closes #4446

256176c3b [Paul Lin] [KYUUBI #4446] Update comments
3cb982ca4 [Paul Lin] [KYUUBI #4446] Add comments
d4c194ee5 [Paul Lin] [KYUUBI #4446] Fix connections blocked by Flink insert statements

Authored-by: Paul Lin <paullin3280@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit 85b2736cc3)
Signed-off-by: Cheng Pan <chengpan@apache.org>
2023-03-09 13:42:21 +08:00
Yikf
e5239f919c
[KYUUBI #4462] Fix variable usage issue in SessionManager#stop
### _Why are the changes needed?_

Fix variable usage issue in `SessionManager#stop`

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4462 from Yikf/sessionmanager.

Closes #4462

d4340d4ec [Yikf] fix variable usage issue

Authored-by: Yikf <yikaifei@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit 8af07fa47d)
Signed-off-by: Cheng Pan <chengpan@apache.org>
2023-03-07 13:59:16 +08:00
Hanna Liashchuk
bd7d1e200e
[KYUUBI #4267] Show warning if SessionHandle is invalid
### _Why are the changes needed?_
If SessionManager tries to close a session, that was previously closed - it throws an error `org.apache.kyuubi.KyuubiSQLException: Invalid SessionHandle` which causes spark session exit with non-zero code.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [X] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4386 from hanna-liashchuk/catch-invalid-sessionhandle.

Closes #4267

bf364bad [Hanna Liashchuk] Show warning if SessionHandle is invalid

Authored-by: Hanna Liashchuk <g.liashchuk@temabit.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit 89cc8c1679)
Signed-off-by: Cheng Pan <chengpan@apache.org>
2023-02-21 10:18:16 +08:00
Cheng Pan
36b91ba2eb
[KYUUBI #4282] Fix ConcurrentModificationException when log4j2 async enabled
This PR proposes to fix #4282, since log4j2 supports async mode, we need to make sure the `Log4j2DivertAppender#append` is thread-safe.

This PR also changes `OperationLog.getCurrentOperationLog` from `OperationLog` to `Option[OperationLog]`

- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4300 from pan3793/log.

Closes #4282

010e34b0 [Cheng Pan] fix
068405b2 [Cheng Pan] fix compile
c79dedd5 [Cheng Pan] Use write lock instead
3daf8a4d [Cheng Pan] nit
94176a04 [Cheng Pan] [KYUUBI #4282] Fix ConcurrentModificationException when log4j2 async enabled

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit 0be3cbff6e)
Signed-off-by: Cheng Pan <chengpan@apache.org>
2023-02-17 11:56:06 +08:00
senmiaoliu
31483c1dc4 [KYUUBI #4305][Bug] Backport HIVE-15820: comment at the head of beeline -e
close [#4305](https://github.com/apache/kyuubi/issues/4305)

```sql
bin/beeline -u jdbc:hive2://X:10009 -e "
--asd
select 1 as a
"
```

![image](https://user-images.githubusercontent.com/18713676/218910222-b829d447-e5b7-4d80-842b-2ddd4f47a26d.png)

- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4333 from lsm1/fix/beeline_comment_header.

Closes #4305

f932d4ead [senmiaoliu] reformat
9a071fb40 [senmiaoliu] added multi line ut
425c53631 [senmiaoliu] added ut
d4dc21a61 [senmiaoliu] comment at the head of beeline -e

Authored-by: senmiaoliu <senmiaoliu@trip.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
(cherry picked from commit c489e29697)
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
2023-02-16 17:09:10 +08:00
Cheng Pan
143dac95bf
[KYUUBI #4336] Avoid listing all schemas for Spark session catalog on schema pruning
### _Why are the changes needed?_

Some DBMS tools like DBeaver and HUE will call thrift meta api for listing catalogs, databases, and tables. The current implementation of `CatalogShim_v3_0#getSchemas` will call `listAllNamespaces` first and do schema pruning on the Spark driver, which may cause "permission denied" exception when HMS has permission control, like the ranger plugin.

This PR proposes to call HMS API(through v1 session catalog) directly for `spark_catalog`, to suppress the above issue.

```
2023-02-15 20:02:13.048 ERROR org.apache.kyuubi.server.KyuubiTBinaryFrontendService: Error getting schemas:
org.apache.kyuubi.KyuubiSQLException: Error operating GetSchemas: org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Permission denied: user [user1] does not have [SELECT] privilege on [userdb1])
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:134)
        at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:249)
        at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.databaseExists(ExternalCatalogWithListener.scala:69)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.databaseExists(SessionCatalog.scala:294)
        at org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.listNamespaces(V2SessionCatalog.scala:212)
        at org.apache.kyuubi.engine.spark.shim.CatalogShim_v3_0.$anonfun$listAllNamespaces$1(CatalogShim_v3_0.scala:74)
        at org.apache.kyuubi.engine.spark.shim.CatalogShim_v3_0.$anonfun$listAllNamespaces$1$adapted(CatalogShim_v3_0.scala:73)
        at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
        at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
        at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
        at org.apache.kyuubi.engine.spark.shim.CatalogShim_v3_0.listAllNamespaces(CatalogShim_v3_0.scala:73)
        at org.apache.kyuubi.engine.spark.shim.CatalogShim_v3_0.listAllNamespaces(CatalogShim_v3_0.scala:90)
        at org.apache.kyuubi.engine.spark.shim.CatalogShim_v3_0.getSchemasWithPattern(CatalogShim_v3_0.scala:118)
        at org.apache.kyuubi.engine.spark.shim.CatalogShim_v3_0.getSchemas(CatalogShim_v3_0.scala:133)
        at org.apache.kyuubi.engine.spark.operation.GetSchemas.runInternal(GetSchemas.scala:43)
        at org.apache.kyuubi.operation.AbstractOperation.run(AbstractOperation.scala:164)
        at org.apache.kyuubi.session.AbstractSession.runOperation(AbstractSession.scala:99)
        at org.apache.kyuubi.engine.spark.session.SparkSessionImpl.runOperation(SparkSessionImpl.scala:78)
        at org.apache.kyuubi.session.AbstractSession.getSchemas(AbstractSession.scala:150)
        at org.apache.kyuubi.service.AbstractBackendService.getSchemas(AbstractBackendService.scala:83)
        at org.apache.kyuubi.service.TFrontendService.GetSchemas(TFrontendService.scala:294)
        at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetSchemas.getResult(TCLIService.java:1617)
        at org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetSchemas.getResult(TCLIService.java:1602)
        at org.apache.kyuubi.shade.org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
        at org.apache.kyuubi.shade.org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.kyuubi.service.authentication.TSetIpAddressProcessor.process(TSetIpAddressProcessor.scala:36)
        at org.apache.kyuubi.shade.org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
```

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4336 from pan3793/list-schemas.

Closes #4336

9ece864c [Cheng Pan] fix
f71587e9 [Cheng Pan] Avoid listing all schemas for Spark session catalog on schema prunning

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit 89fe835b93)
Signed-off-by: Cheng Pan <chengpan@apache.org>
2023-02-16 15:06:28 +08:00
wxmimperio
7f2f5a4d20 [KYUUBI #4311] Fix the wrong parsing of jvm parameters in jdbc url.
### _Why are the changes needed?_
The wrong parsing of jvm parameters in jdbc url.

For example:
```shell
jdbc:kyuubi://127.0.0.1:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi;user=tom?spark.driver.memory=8G;spark.driver.extraJavaOptions=-Xss256m -XX:+PrintGCDetails -XX:HeapDumpPath=/heap.hprof
```
Now because the regex `([^;]*)=([^;]*);?` only matches `=`, resulting in wrong parsing:
```shell
<spark.driver.memory, 8G>
<spark.driver.extraJavaOptions=-Xss256m -XX:+PrintGCDetails -XX:HeapDumpPath, /heap.hprof>
```

The correct parsing should be:
```shell
<spark.driver.memory, 8G>
<spark.driver.extraJavaOptions, -Xss256m -XX:+PrintGCDetails -XX:HeapDumpPath=/heap.hprof>
```

This PR change `org.apache.kyuubi.jdbc.hive.Utils` and added unit test in `org.apache.kyuubi.jdbc.hive.UtilsTest`.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4311 from imperio-wxm/master.

Closes #4311

d9e124ca [liangbowen] fix percent encoding problem
31ae63e0 [liangbowen] change `com.sun.jndi.toolkit.url.UrlUtil` to `java.net.URLEncoder`
f8271ec2 [wxm] Fix the wrong parsing of jvm parameters in jdbc url.
2e93df57 [wxmimperio] Merge branch 'apache:master' into master
4c236447 [wxmimperio] Merge branch 'apache:master' into master
3e0b7764 [wxmimperio] Scala repl output log level adjusted to debug

Lead-authored-by: wxmimperio <wxmimperio@outlook.com>
Co-authored-by: liangbowen <liangbowen@gf.com.cn>
Co-authored-by: wxm <wxmimperio@outlook.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit b79ca8c4ab)
Signed-off-by: Cheng Pan <chengpan@apache.org>
2023-02-15 11:04:39 +00:00
18 changed files with 214 additions and 52 deletions

View File

@ -63,7 +63,7 @@ Returns all the batches.
The created [Batch](#batch) object.
### GET /batches/{batchId}
### GET /batches/${batchId}
Returns the batch information.

View File

@ -161,7 +161,8 @@ class ExecuteStatement(
.build(executor)
val result = executeOperation.invoke[TableResult](sessionId, operation)
jobId = result.getJobClient.asScala.map(_.getJobID)
result.await()
// after FLINK-24461, TableResult#await() would block insert statements
// until the job finishes, instead of returning row affected immediately
resultSet = ResultSet.fromTableResult(result)
}

View File

@ -164,7 +164,12 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
}
}
}
super.closeSession(sessionHandle)
try {
super.closeSession(sessionHandle)
} catch {
case e: KyuubiSQLException =>
warn(s"Error closing session ${sessionHandle}", e)
}
if (shareLevel == ShareLevel.CONNECTION) {
info("Session stopped due to shared level is Connection.")
stopSession()

View File

@ -41,7 +41,7 @@ class CatalogShim_v2_4 extends SparkCatalogShim {
catalogName: String,
schemaPattern: String): Seq[Row] = {
(spark.sessionState.catalog.listDatabases(schemaPattern) ++
getGlobalTempViewManager(spark, schemaPattern)).map(Row(_, ""))
getGlobalTempViewManager(spark, schemaPattern)).map(Row(_, SparkCatalogShim.SESSION_CATALOG))
}
def setCurrentDatabase(spark: SparkSession, databaseName: String): Unit = {

View File

@ -129,13 +129,12 @@ class CatalogShim_v3_0 extends CatalogShim_v2_4 {
spark: SparkSession,
catalogName: String,
schemaPattern: String): Seq[Row] = {
val catalog = getCatalog(spark, catalogName)
var schemas = getSchemasWithPattern(catalog, schemaPattern)
if (catalogName == SparkCatalogShim.SESSION_CATALOG) {
val viewMgr = getGlobalTempViewManager(spark, schemaPattern)
schemas = schemas ++ viewMgr
super.getSchemas(spark, catalogName, schemaPattern)
} else {
val catalog = getCatalog(spark, catalogName)
getSchemasWithPattern(catalog, schemaPattern).map(Row(_, catalog.name))
}
schemas.map(Row(_, catalog.name))
}
override def setCurrentDatabase(spark: SparkSession, databaseName: String): Unit = {

View File

@ -39,7 +39,7 @@ class Log4j12DivertAppender extends WriterAppender {
setLayout(lo)
addFilter { _: LoggingEvent =>
if (OperationLog.getCurrentOperationLog == null) Filter.DENY else Filter.NEUTRAL
if (OperationLog.getCurrentOperationLog.isDefined) Filter.NEUTRAL else Filter.DENY
}
/**
@ -51,8 +51,7 @@ class Log4j12DivertAppender extends WriterAppender {
// That should've gone into our writer. Notify the LogContext.
val logOutput = writer.toString
writer.reset()
val log = OperationLog.getCurrentOperationLog
if (log != null) log.write(logOutput)
OperationLog.getCurrentOperationLog.foreach(_.write(logOutput))
}
}

View File

@ -18,6 +18,7 @@
package org.apache.kyuubi.operation.log
import java.io.CharArrayWriter
import java.util.concurrent.locks.ReadWriteLock
import scala.collection.JavaConverters._
@ -27,6 +28,8 @@ import org.apache.logging.log4j.core.appender.{AbstractWriterAppender, ConsoleAp
import org.apache.logging.log4j.core.filter.AbstractFilter
import org.apache.logging.log4j.core.layout.PatternLayout
import org.apache.kyuubi.reflection.DynFields
class Log4j2DivertAppender(
name: String,
layout: StringLayout,
@ -52,22 +55,19 @@ class Log4j2DivertAppender(
addFilter(new AbstractFilter() {
override def filter(event: LogEvent): Filter.Result = {
if (OperationLog.getCurrentOperationLog == null) {
Filter.Result.DENY
} else {
if (OperationLog.getCurrentOperationLog.isDefined) {
Filter.Result.NEUTRAL
} else {
Filter.Result.DENY
}
}
})
def initLayout(): StringLayout = {
LogManager.getRootLogger.asInstanceOf[org.apache.logging.log4j.core.Logger]
.getAppenders.values().asScala
.find(ap => ap.isInstanceOf[ConsoleAppender] && ap.getLayout.isInstanceOf[StringLayout])
.map(_.getLayout.asInstanceOf[StringLayout])
.getOrElse(PatternLayout.newBuilder().withPattern(
"%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n").build())
}
private val writeLock = DynFields.builder()
.hiddenImpl(classOf[AbstractWriterAppender[_]], "readWriteLock")
.build[ReadWriteLock](this)
.get()
.writeLock
/**
* Overrides AbstractWriterAppender.append(), which does the real logging. No need
@ -75,11 +75,15 @@ class Log4j2DivertAppender(
*/
override def append(event: LogEvent): Unit = {
super.append(event)
// That should've gone into our writer. Notify the LogContext.
val logOutput = writer.toString
writer.reset()
val log = OperationLog.getCurrentOperationLog
if (log != null) log.write(logOutput)
writeLock.lock()
try {
// That should've gone into our writer. Notify the LogContext.
val logOutput = writer.toString
writer.reset()
OperationLog.getCurrentOperationLog.foreach(_.write(logOutput))
} finally {
writeLock.unlock()
}
}
}
@ -95,7 +99,7 @@ object Log4j2DivertAppender {
def initialize(): Unit = {
val ap = new Log4j2DivertAppender()
org.apache.logging.log4j.LogManager.getRootLogger()
org.apache.logging.log4j.LogManager.getRootLogger
.asInstanceOf[org.apache.logging.log4j.core.Logger].addAppender(ap)
ap.start()
}

View File

@ -44,7 +44,7 @@ object OperationLog extends Logging {
OPERATION_LOG.set(operationLog)
}
def getCurrentOperationLog: OperationLog = OPERATION_LOG.get()
def getCurrentOperationLog: Option[OperationLog] = Option(OPERATION_LOG.get)
def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove()

View File

@ -282,9 +282,9 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
shutdown = true
val shutdownTimeout: Long =
if (isServer) {
conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
} else {
conf.get(SERVER_EXEC_POOL_SHUTDOWN_TIMEOUT)
} else {
conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
}
ThreadUtils.shutdown(timeoutChecker, Duration(shutdownTimeout, TimeUnit.MILLISECONDS))

View File

@ -61,10 +61,10 @@ class OperationLogSuite extends KyuubiFunSuite {
assert(!Files.exists(logFile))
OperationLog.setCurrentOperationLog(operationLog)
assert(OperationLog.getCurrentOperationLog === operationLog)
assert(OperationLog.getCurrentOperationLog === Some(operationLog))
OperationLog.removeCurrentOperationLog()
assert(OperationLog.getCurrentOperationLog === null)
assert(OperationLog.getCurrentOperationLog.isEmpty)
operationLog.write(msg1 + "\n")
assert(Files.exists(logFile))

View File

@ -29,6 +29,7 @@ import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hive.common.util.HiveStringUtils;
public class KyuubiBeeLine extends BeeLine {
public static final String KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER =
@ -191,4 +192,9 @@ public class KyuubiBeeLine extends BeeLine {
}
return code;
}
@Override
boolean dispatch(String line) {
return super.dispatch(HiveStringUtils.removeComments(line));
}
}

View File

@ -21,6 +21,7 @@ import java.io.*;
import java.sql.*;
import java.util.*;
import org.apache.hive.beeline.logs.KyuubiBeelineInPlaceUpdateStream;
import org.apache.hive.common.util.HiveStringUtils;
import org.apache.kyuubi.jdbc.hive.JdbcConnectionParams;
import org.apache.kyuubi.jdbc.hive.KyuubiStatement;
import org.apache.kyuubi.jdbc.hive.Utils;
@ -505,6 +506,59 @@ public class KyuubiCommands extends Commands {
}
}
@Override
public String handleMultiLineCmd(String line) throws IOException {
line = HiveStringUtils.removeComments(line);
Character mask =
(System.getProperty("jline.terminal", "").equals("jline.UnsupportedTerminal"))
? null
: jline.console.ConsoleReader.NULL_MASK;
while (isMultiLine(line) && beeLine.getOpts().isAllowMultiLineCommand()) {
StringBuilder prompt = new StringBuilder(beeLine.getPrompt());
if (!beeLine.getOpts().isSilent()) {
for (int i = 0; i < prompt.length() - 1; i++) {
if (prompt.charAt(i) != '>') {
prompt.setCharAt(i, i % 2 == 0 ? '.' : ' ');
}
}
}
String extra;
// avoid NPE below if for some reason -e argument has multi-line command
if (beeLine.getConsoleReader() == null) {
throw new RuntimeException(
"Console reader not initialized. This could happen when there "
+ "is a multi-line command using -e option and which requires further reading from console");
}
if (beeLine.getOpts().isSilent() && beeLine.getOpts().getScriptFile() != null) {
extra = beeLine.getConsoleReader().readLine(null, mask);
} else {
extra = beeLine.getConsoleReader().readLine(prompt.toString());
}
if (extra == null) { // it happens when using -f and the line of cmds does not end with ;
break;
}
extra = HiveStringUtils.removeComments(extra);
if (extra != null && !extra.isEmpty()) {
line += "\n" + extra;
}
}
return line;
}
// returns true if statement represented by line is not complete and needs additional reading from
// console. Used in handleMultiLineCmd method assumes line would never be null when this method is
// called
private boolean isMultiLine(String line) {
if (line.endsWith(beeLine.getOpts().getDelimiter()) || beeLine.isComment(line)) {
return false;
}
// handles the case like line = show tables; --test comment
List<String> cmds = getCmdList(line, false);
return cmds.isEmpty() || !cmds.get(cmds.size() - 1).startsWith("--");
}
static class KyuubiLogRunnable implements Runnable {
private final KyuubiCommands commands;
private final KyuubiLoggable kyuubiLoggable;

View File

@ -29,4 +29,17 @@ public class KyuubiBeeLineTest {
int result = kyuubiBeeLine.initArgs(new String[0]);
assertEquals(0, result);
}
@Test
public void testKyuubiBeelineComment() {
KyuubiBeeLine kyuubiBeeLine = new KyuubiBeeLine();
int result = kyuubiBeeLine.initArgsFromCliVars(new String[] {"-e", "--comment show database;"});
assertEquals(0, result);
result = kyuubiBeeLine.initArgsFromCliVars(new String[] {"-e", "--comment\n show database;"});
assertEquals(1, result);
result =
kyuubiBeeLine.initArgsFromCliVars(
new String[] {"-e", "--comment line 1 \n --comment line 2 \n show database;"});
assertEquals(1, result);
}
}

View File

@ -26,6 +26,7 @@ import java.sql.SQLException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.hive.service.rpc.thrift.TStatus;
import org.apache.hive.service.rpc.thrift.TStatusCode;
import org.slf4j.Logger;
@ -190,12 +191,20 @@ public class Utils {
}
}
Pattern confPattern = Pattern.compile("([^;]*)([^;]*);?");
// parse hive conf settings
String confStr = jdbcURI.getQuery();
if (confStr != null) {
Matcher confMatcher = pattern.matcher(confStr);
Matcher confMatcher = confPattern.matcher(confStr);
while (confMatcher.find()) {
connParams.getHiveConfs().put(confMatcher.group(1), confMatcher.group(2));
String connParam = confMatcher.group(1);
if (StringUtils.isNotBlank(connParam) && connParam.contains("=")) {
int symbolIndex = connParam.indexOf('=');
connParams
.getHiveConfs()
.put(connParam.substring(0, symbolIndex), connParam.substring(symbolIndex + 1));
}
}
}

View File

@ -21,8 +21,13 @@ package org.apache.kyuubi.jdbc.hive;
import static org.apache.kyuubi.jdbc.hive.Utils.extractURLComponents;
import static org.junit.Assert.assertEquals;
import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -35,23 +40,76 @@ public class UtilsTest {
private String expectedPort;
private String expectedCatalog;
private String expectedDb;
private Map<String, String> expectedHiveConf;
private String uri;
@Parameterized.Parameters
public static Collection<String[]> data() {
public static Collection<Object[]> data() throws UnsupportedEncodingException {
return Arrays.asList(
new String[][] {
{"localhost", "10009", null, "db", "jdbc:hive2:///db;k1=v1?k2=v2#k3=v3"},
{"localhost", "10009", null, "default", "jdbc:hive2:///"},
{"localhost", "10009", null, "default", "jdbc:kyuubi://"},
{"localhost", "10009", null, "default", "jdbc:hive2://"},
{"hostname", "10018", null, "db", "jdbc:hive2://hostname:10018/db;k1=v1?k2=v2#k3=v3"},
new Object[][] {
{
"localhost",
"10009",
null,
"db",
new ImmutableMap.Builder<String, String>().put("k2", "v2").build(),
"jdbc:hive2:///db;k1=v1?k2=v2#k3=v3"
},
{
"localhost",
"10009",
null,
"default",
new ImmutableMap.Builder<String, String>().build(),
"jdbc:hive2:///"
},
{
"localhost",
"10009",
null,
"default",
new ImmutableMap.Builder<String, String>().build(),
"jdbc:kyuubi://"
},
{
"localhost",
"10009",
null,
"default",
new ImmutableMap.Builder<String, String>().build(),
"jdbc:hive2://"
},
{
"hostname",
"10018",
null,
"db",
new ImmutableMap.Builder<String, String>().put("k2", "v2").build(),
"jdbc:hive2://hostname:10018/db;k1=v1?k2=v2#k3=v3"
},
{
"hostname",
"10018",
"catalog",
"db",
new ImmutableMap.Builder<String, String>().put("k2", "v2").build(),
"jdbc:hive2://hostname:10018/catalog/db;k1=v1?k2=v2#k3=v3"
},
{
"hostname",
"10018",
"catalog",
"db",
new ImmutableMap.Builder<String, String>()
.put("k2", "v2")
.put("k3", "-Xmx2g -XX:+PrintGCDetails -XX:HeapDumpPath=/heap.hprof")
.build(),
"jdbc:hive2://hostname:10018/catalog/db;k1=v1?"
+ URLEncoder.encode(
"k2=v2;k3=-Xmx2g -XX:+PrintGCDetails -XX:HeapDumpPath=/heap.hprof",
StandardCharsets.UTF_8.toString())
.replaceAll("\\+", "%20")
+ "#k4=v4"
}
});
}
@ -61,11 +119,13 @@ public class UtilsTest {
String expectedPort,
String expectedCatalog,
String expectedDb,
Map<String, String> expectedHiveConf,
String uri) {
this.expectedHost = expectedHost;
this.expectedPort = expectedPort;
this.expectedCatalog = expectedCatalog;
this.expectedDb = expectedDb;
this.expectedHiveConf = expectedHiveConf;
this.uri = uri;
}
@ -76,5 +136,6 @@ public class UtilsTest {
assertEquals(Integer.parseInt(expectedPort), jdbcConnectionParams1.getPort());
assertEquals(expectedCatalog, jdbcConnectionParams1.getCatalogName());
assertEquals(expectedDb, jdbcConnectionParams1.getDbName());
assertEquals(expectedHiveConf, jdbcConnectionParams1.getHiveConfs());
}
}

View File

@ -79,6 +79,11 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>

View File

@ -92,10 +92,11 @@ class KyuubiSyncThriftClient private (
remoteEngineBroken = false
} catch {
case e: Throwable =>
warn(s"The engine[$engineId] alive probe fails", e)
val engineIdStr = engineId.getOrElse("")
warn(s"The engine[$engineIdStr] alive probe fails", e)
val now = System.currentTimeMillis()
if (now - engineLastAlive > engineAliveTimeout) {
error(s"Mark the engine[$engineId] not alive with no recent alive probe" +
error(s"Mark the engine[$engineIdStr] not alive with no recent alive probe" +
s" success: ${now - engineLastAlive} ms exceeds timeout $engineAliveTimeout ms")
remoteEngineBroken = true
}

View File

@ -17,7 +17,8 @@
package org.apache.kyuubi.server.api.v1
import java.util.Locale
import java.util
import java.util.{Collections, Locale}
import java.util.concurrent.ConcurrentHashMap
import javax.ws.rs._
import javax.ws.rs.core.MediaType
@ -270,12 +271,16 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { batchSession =>
try {
val submissionOp = batchSession.batchJobSubmissionOp
val rowSet = submissionOp.getOperationLogRowSet(
FetchOrientation.FETCH_NEXT,
from,
size)
val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala
new OperationLog(logRowSet.asJava, logRowSet.size)
val rowSet = submissionOp.getOperationLogRowSet(FetchOrientation.FETCH_NEXT, from, size)
val columns = rowSet.getColumns
val logRowSet: util.List[String] =
if (columns == null || columns.size == 0) {
Collections.emptyList()
} else {
assert(columns.size == 1)
columns.get(0).getStringVal.getValues
}
new OperationLog(logRowSet, logRowSet.size)
} catch {
case NonFatal(e) =>
val errorMsg = s"Error getting operation log for batchId: $batchId"