[KYUUBI #6787] Improve the compatibility of queryTimeout in more version clients

# 🔍 Description
## Issue References 🔗

This pull request fixes #2112

## Describe Your Solution 🔧

Similar to #2113, the query-timeout-thread should verify the Thrift protocol version. For protocol versions <= HIVE_CLI_SERVICE_PROTOCOL_V8, it should convert TIMEDOUT_STATE to CANCELED.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6787 from lsm1/branch-timer-checker-set-cancel.

Closes #6787

9fbe1ac97 [senmiaoliu] add isHive21OrLower method
0c77c6f6f [senmiaoliu] time checker set cancel state

Authored-by: senmiaoliu <senmiaoliu@trip.com>
Signed-off-by: senmiaoliu <senmiaoliu@trip.com>
This commit is contained in:
senmiaoliu 2024-11-04 19:12:48 +08:00
parent d3520ddbce
commit c9d9433f74
2 changed files with 18 additions and 5 deletions

View File

@ -62,7 +62,18 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
val action: Runnable = () => cleanup(OperationState.TIMEOUT)
val action: Runnable = () =>
// Clients less than version 2.1 have no HIVE-4924 Patch,
// no queryTimeout parameter and no TIMEOUT status.
// When the server enables kyuubi.operation.query.timeout,
// this will cause the client of the lower version to get stuck.
// Check thrift protocol version <= HIVE_CLI_SERVICE_PROTOCOL_V8(Hive 2.1.0),
// convert TIMEDOUT_STATE to CANCELED.
if (isHive21OrLower) {
cleanup(OperationState.CANCELED)
} else {
cleanup(OperationState.TIMEOUT)
}
timeoutExecutor.schedule(action, queryTimeout, TimeUnit.SECONDS)
statementTimeoutCleaner = Some(timeoutExecutor)
}
@ -275,4 +286,8 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
}
}
}
protected def isHive21OrLower: Boolean = {
getProtocolVersion.getValue <= TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8.getValue
}
}

View File

@ -28,7 +28,7 @@ import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.operation.FetchOrientation.FETCH_NEXT
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TGetOperationStatusResp, TOperationState, TProtocolVersion}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TGetOperationStatusResp, TOperationState}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TOperationState._
class ExecuteStatement(
@ -124,9 +124,7 @@ class ExecuteStatement(
// this will cause the client of the lower version to get stuck.
// Check thrift protocol version <= HIVE_CLI_SERVICE_PROTOCOL_V8(Hive 2.1.0),
// convert TIMEDOUT_STATE to CANCELED.
if getProtocolVersion.getValue <=
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8.getValue =>
setState(OperationState.CANCELED)
if isHive21OrLower => setState(OperationState.CANCELED)
case TIMEDOUT_STATE =>
setState(OperationState.TIMEOUT)