[KYUUBI #3720] Support to show remaining logs with timeout if the batch failed

### _Why are the changes needed?_

If the batch failed, we shall try to show the remaining logs with timeout.

### _How was this patch tested?_
Existing UT.

Closes #3720 from turboFei/wait_log_fetched.

Closes #3720

a5d0b37f [Fei Wang] comments
220be4ee [Fei Wang] in same thread
90ada13c [Fei Wang] revert
e3ab5ec8 [Fei Wang] address comments
e94a348c [Fei Wang] refactor
c901e4b6 [Fei Wang] prevent NPE
9ed76685 [Fei Wang] save
1f3560bd [Fei Wang] save

Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Fei Wang <fwang12@ebay.com>
This commit is contained in:
Fei Wang 2022-10-31 20:34:37 +08:00
parent 81d9a8f1ca
commit 528afce614
3 changed files with 38 additions and 8 deletions

View File

@ -192,6 +192,7 @@ kyuubi.credentials.update.wait.timeout|PT1M|How long to wait until credentials a
Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
kyuubi.ctl.batch.log.on.failure.timeout|PT10S|The timeout for fetching remaining batch logs if the batch failed.|duration|1.7.0
kyuubi.ctl.batch.log.query.interval|PT3S|The interval for fetching batch logs.|duration|1.6.0
kyuubi.ctl.rest.auth.schema|basic|The authentication schema. Valid values are: basic, spnego.|string|1.6.0
kyuubi.ctl.rest.base.url|&lt;undefined&gt;|The REST API base URL, which contains the scheme (http:// or https://), host name, port number|string|1.6.0

View File

@ -85,4 +85,11 @@ object CtlConf {
.version("1.6.0")
.timeConf
.createWithDefault(Duration.ofSeconds(3).toMillis)
val CTL_BATCH_LOG_ON_FAILURE_TIMEOUT: ConfigEntry[Long] =
buildConf("kyuubi.ctl.batch.log.on.failure.timeout")
.doc("The timeout for fetching remaining batch logs if the batch failed.")
.version("1.7.0")
.timeConf
.createWithDefault(Duration.ofSeconds(10).toMillis)
}

View File

@ -55,15 +55,17 @@ class LogBatchCommand(
withKyuubiInstanceRestClient(kyuubiRestClient, kyuubiInstance) { kyuubiInstanceRestClient =>
val kyuubiInstanceBatchRestApi: BatchRestApi = new BatchRestApi(kyuubiInstanceRestClient)
def retrieveOperationLog(): OperationLog = {
val log = kyuubiInstanceBatchRestApi.getBatchLocalLog(batchId, from, size)
from += log.getLogRowSet.size
log.getLogRowSet.asScala.foreach(x => info(x))
log
}
while (!done) {
try {
log = kyuubiInstanceBatchRestApi.getBatchLocalLog(
batchId,
from,
size)
from += log.getLogRowSet.size
log.getLogRowSet.asScala.foreach(x => info(x))
log = retrieveOperationLog()
val (latestBatch, shouldFinishLog) =
checkStatus(kyuubiInstanceBatchRestApi, batchId, log)
batch = latestBatch
@ -79,7 +81,27 @@ class LogBatchCommand(
}
if (!done) {
Thread.sleep(conf.get(CTL_BATCH_LOG_QUERY_INTERVAL).toInt)
Thread.sleep(conf.get(CTL_BATCH_LOG_QUERY_INTERVAL))
}
}
// if the batch failed, show remaining batch logs with timeout
if (batch != null && BatchUtils.isTerminalState(
batch.getState) && !BatchUtils.isFinishedState(batch.getState)) {
val startTime = System.currentTimeMillis()
val timeout = conf.get(CTL_BATCH_LOG_ON_FAILURE_TIMEOUT)
var hasRemainingLogs = true
while (hasRemainingLogs && System.currentTimeMillis() - startTime < timeout) {
try {
if (retrieveOperationLog().getLogRowSet.size == 0) {
hasRemainingLogs = false
}
} catch {
case e: Exception => error(s"Error fetching batch logs: ${e.getMessage}")
}
}
if (hasRemainingLogs) {
info(s"See the complete logs with command `kyuubi-ctl log batch $batchId --forward`.")
}
}
}