[KYUUBI #5063] Support to filter batch with batch name

### _Why are the changes needed?_

Support to filter batch with batch name filter condition.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

Closes #5063 from turboFei/batch_name.

Closes #5063

63915a56d [fwang12] ut
2548815a2 [fwang12] ut
34a9229b0 [fwang12] style

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
fwang12 2023-07-19 15:44:57 +08:00
parent f66b0e98dc
commit d6290a4ded
14 changed files with 80 additions and 54 deletions

View File

@ -112,6 +112,7 @@ class ControlCliArguments(args: Seq[String], env: Map[String, String] = sys.env)
| batchType ${cliConfig.batchOpts.batchType}
| batchUser ${cliConfig.batchOpts.batchUser}
| batchState ${cliConfig.batchOpts.batchState}
| batchName ${cliConfig.batchOpts.batchName}
| createTime ${cliConfig.batchOpts.createTime}
| endTime ${cliConfig.batchOpts.endTime}
| from ${cliConfig.batchOpts.from}

View File

@ -46,6 +46,7 @@ class ListBatchCommand(cliConfig: CliConfig) extends Command[GetBatchesResponse]
batchOpts.batchType,
batchOpts.batchUser,
batchOpts.batchState,
batchOpts.batchName,
batchOpts.createTime,
batchOpts.endTime,
if (batchOpts.from < 0) 0 else batchOpts.from,

View File

@ -66,6 +66,7 @@ case class BatchOpts(
batchType: String = null,
batchUser: String = null,
batchState: String = null,
batchName: String = null,
createTime: Long = 0,
endTime: Long = 0,
from: Int = -1,

View File

@ -222,6 +222,9 @@ object CommandLine extends CommonCommandLine {
opt[String]("batchState")
.action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchState = v)))
.text("Batch state."),
opt[String]("batchName")
.action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchName = v)))
.text("Batch name."),
opt[String]("createTime")
.action((v, c) =>
c.copy(batchOpts = c.batchOpts.copy(createTime =

View File

@ -429,6 +429,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
| --batchType <value> Batch type.
| --batchUser <value> Batch user.
| --batchState <value> Batch state.
| --batchName <value> Batch name.
| --createTime <value> Batch create time, should be in yyyyMMddHHmmss format.
| --endTime <value> Batch end time, should be in yyyyMMddHHmmss format.
| --from <value> Specify which record to start from retrieving info.

View File

@ -63,10 +63,23 @@ public class BatchRestApi {
Long endTime,
int from,
int size) {
return listBatches(batchType, batchUser, batchState, null, createTime, endTime, from, size);
}
public GetBatchesResponse listBatches(
String batchType,
String batchUser,
String batchState,
String batchName,
Long createTime,
Long endTime,
int from,
int size) {
Map<String, Object> params = new HashMap<>();
params.put("batchType", batchType);
params.put("batchUser", batchUser);
params.put("batchState", batchState);
params.put("batchName", batchName);
if (null != createTime && createTime > 0) {
params.put("createTime", createTime);
}

View File

@ -45,8 +45,8 @@ import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, Operat
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource._
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle}
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle, SessionType}
import org.apache.kyuubi.util.JdbcUtils
@Tag(name = "Batch")
@ -315,6 +315,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
@QueryParam("batchType") batchType: String,
@QueryParam("batchState") batchState: String,
@QueryParam("batchUser") batchUser: String,
@QueryParam("batchName") batchName: String,
@QueryParam("createTime") createTime: Long,
@QueryParam("endTime") endTime: Long,
@QueryParam("from") from: Int,
@ -327,15 +328,16 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
validBatchState(batchState),
s"The valid batch state can be one of the following: ${VALID_BATCH_STATES.mkString(",")}")
}
val batches =
sessionManager.getBatchesFromMetadataStore(
batchType,
batchUser,
batchState,
createTime,
endTime,
from,
size)
val filter = MetadataFilter(
sessionType = SessionType.BATCH,
engineType = batchType,
username = batchUser,
state = batchState,
requestName = batchName,
createTime = createTime,
endTime = endTime)
val batches = sessionManager.getBatchesFromMetadataStore(filter, from, size)
new GetBatchesResponse(from, batches.size, batches.asJava)
}

View File

@ -133,21 +133,7 @@ class MetadataManager extends AbstractService("MetadataManager") {
.filter(_.sessionType == SessionType.BATCH)
}
def getBatches(
batchType: String,
batchUser: String,
batchState: String,
createTime: Long,
endTime: Long,
from: Int,
size: Int): Seq[Batch] = {
val filter = MetadataFilter(
sessionType = SessionType.BATCH,
engineType = batchType,
username = batchUser,
state = batchState,
createTime = createTime,
endTime = endTime)
def getBatches(filter: MetadataFilter, from: Int, size: Int): Seq[Batch] = {
withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, size, true)).map(
buildBatch)
}

View File

@ -27,6 +27,7 @@ case class MetadataFilter(
engineType: String = null,
username: String = null,
state: String = null,
requestName: String = null,
kyuubiInstance: String = null,
createTime: Long = 0L,
endTime: Long = 0L,

View File

@ -238,6 +238,10 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
whereConditions += "state = ?"
params += state.toUpperCase(Locale.ROOT)
}
Option(filter.requestName).filter(_.nonEmpty).foreach { requestName =>
whereConditions += "request_name = ?"
params += requestName
}
Option(filter.kyuubiInstance).filter(_.nonEmpty).foreach { kyuubiInstance =>
whereConditions += "kyuubi_instance = ?"
params += kyuubiInstance

View File

@ -36,7 +36,7 @@ import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.{KyuubiOperationManager, OperationState}
import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader, SessionConfAdvisor}
import org.apache.kyuubi.server.metadata.{MetadataManager, MetadataRequestsRetryRef}
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.sql.parser.server.KyuubiParser
import org.apache.kyuubi.util.{SignUtils, ThreadUtils}
@ -240,17 +240,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
metadataManager.flatMap(mm => mm.getBatch(batchId))
}
def getBatchesFromMetadataStore(
batchType: String,
batchUser: String,
batchState: String,
createTime: Long,
endTime: Long,
from: Int,
size: Int): Seq[Batch] = {
metadataManager.map { mm =>
mm.getBatches(batchType, batchUser, batchState, createTime, endTime, from, size)
}.getOrElse(Seq.empty)
def getBatchesFromMetadataStore(filter: MetadataFilter, from: Int, size: Int): Seq[Batch] = {
metadataManager.map(_.getBatches(filter, from, size)).getOrElse(Seq.empty)
}
def getBatchMetadata(batchId: String): Option[Metadata] = {

View File

@ -44,7 +44,7 @@ import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.server.KyuubiRestFrontendService
import org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory
import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle, SessionType}
@ -62,10 +62,9 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
sessionManager.allSessions().foreach { session =>
sessionManager.closeSession(session.handle)
}
sessionManager.getBatchesFromMetadataStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach {
batch =>
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), batch.getId)
sessionManager.cleanupMetadata(batch.getId)
sessionManager.getBatchesFromMetadataStore(MetadataFilter(), 0, Int.MaxValue).foreach { batch =>
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), batch.getId)
sessionManager.cleanupMetadata(batch.getId)
}
}
@ -518,11 +517,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
}
assert(sessionManager.getBatchesFromMetadataStore(
"SPARK",
null,
null,
0,
0,
MetadataFilter(engineType = "SPARK"),
0,
Int.MaxValue).size == 2)
}
@ -711,4 +706,31 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi
.replaceAll("\\[", "").replaceAll("\\]", "")
assert(sessionManager.getBatchMetadata(batchId).map(_.state).contains("CANCELED"))
}
test("get batch list with batch name filter condition") {
val sessionManager = server.frontendServices.head
.be.sessionManager.asInstanceOf[KyuubiSessionManager]
sessionManager.allSessions().foreach(_.close())
val uniqueName = UUID.randomUUID().toString
sessionManager.openBatchSession(
"kyuubi",
"kyuubi",
InetAddress.getLocalHost.getCanonicalHostName,
Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString),
newBatchRequest(
"spark",
sparkBatchTestResource.get,
"",
uniqueName))
val response = webTarget.path("api/v1/batches")
.queryParam("batchName", uniqueName)
.request(MediaType.APPLICATION_JSON_TYPE)
.get()
assert(response.getStatus == 200)
val getBatchListResponse = response.readEntity(classOf[GetBatchesResponse])
assert(getBatchListResponse.getTotal == 1)
}
}

View File

@ -28,7 +28,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.session.SessionType
class MetadataManagerSuite extends KyuubiFunSuite {
@ -157,7 +157,7 @@ class MetadataManagerSuite extends KyuubiFunSuite {
metadataManager.start()
f(metadataManager)
} finally {
metadataManager.getBatches(null, null, null, 0, 0, 0, Int.MaxValue).foreach { batch =>
metadataManager.getBatches(MetadataFilter(), 0, Int.MaxValue).foreach { batch =>
metadataManager.cleanupMetadataById(batch.getId)
}
// ensure no metadata request leak

View File

@ -34,6 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ctl.{CtlConf, TestPrematureExit}
import org.apache.kyuubi.engine.ApplicationManagerInfo
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.server.metadata.api.MetadataFilter
import org.apache.kyuubi.session.KyuubiSessionManager
class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with BatchTestHelper {
@ -101,10 +102,9 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
sessionManager.allSessions().foreach { session =>
sessionManager.closeSession(session.handle)
}
sessionManager.getBatchesFromMetadataStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach {
batch =>
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), batch.getId)
sessionManager.cleanupMetadata(batch.getId)
sessionManager.getBatchesFromMetadataStore(MetadataFilter(), 0, Int.MaxValue).foreach { batch =>
sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), batch.getId)
sessionManager.cleanupMetadata(batch.getId)
}
}