From d6290a4dede9e43ab0918473bf32d77c14cd0ba3 Mon Sep 17 00:00:00 2001 From: fwang12 Date: Wed, 19 Jul 2023 15:44:57 +0800 Subject: [PATCH] [KYUUBI #5063] Support to filter batch with batch name ### _Why are the changes needed?_ Support to filter batch with batch name filter condition. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes #5063 from turboFei/batch_name. Closes #5063 63915a56d [fwang12] ut 2548815a2 [fwang12] ut 34a9229b0 [fwang12] style Authored-by: fwang12 Signed-off-by: fwang12 --- .../kyuubi/ctl/cli/ControlCliArguments.scala | 1 + .../ctl/cmd/list/ListBatchCommand.scala | 1 + .../org/apache/kyuubi/ctl/opt/CliConfig.scala | 1 + .../apache/kyuubi/ctl/opt/CommandLine.scala | 3 ++ .../kyuubi/ctl/ControlCliArgumentsSuite.scala | 1 + .../apache/kyuubi/client/BatchRestApi.java | 13 ++++++ .../server/api/v1/BatchesResource.scala | 24 ++++++----- .../server/metadata/MetadataManager.scala | 16 +------ .../server/metadata/api/MetadataFilter.scala | 1 + .../metadata/jdbc/JDBCMetadataStore.scala | 4 ++ .../kyuubi/session/KyuubiSessionManager.scala | 15 ++----- .../server/api/v1/BatchesResourceSuite.scala | 42 ++++++++++++++----- .../metadata/MetadataManagerSuite.scala | 4 +- .../server/rest/client/BatchCliSuite.scala | 8 ++-- 14 files changed, 80 insertions(+), 54 deletions(-) diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cli/ControlCliArguments.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cli/ControlCliArguments.scala index 35b4ccacf..10bb99296 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cli/ControlCliArguments.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cli/ControlCliArguments.scala @@ -112,6 +112,7 @@ class ControlCliArguments(args: Seq[String], env: Map[String, String] = sys.env) | batchType ${cliConfig.batchOpts.batchType} | batchUser ${cliConfig.batchOpts.batchUser} | batchState ${cliConfig.batchOpts.batchState} + | batchName ${cliConfig.batchOpts.batchName} | createTime ${cliConfig.batchOpts.createTime} | endTime ${cliConfig.batchOpts.endTime} | from ${cliConfig.batchOpts.from} diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListBatchCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListBatchCommand.scala index 4ce1b49b2..db781da38 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListBatchCommand.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/ListBatchCommand.scala @@ -46,6 +46,7 @@ class ListBatchCommand(cliConfig: CliConfig) extends Command[GetBatchesResponse] batchOpts.batchType, batchOpts.batchUser, batchOpts.batchState, + batchOpts.batchName, batchOpts.createTime, batchOpts.endTime, if (batchOpts.from < 0) 0 else batchOpts.from, diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CliConfig.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CliConfig.scala index 38284c595..7818f694a 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CliConfig.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CliConfig.scala @@ -66,6 +66,7 @@ case class BatchOpts( batchType: String = null, batchUser: String = null, batchState: String = null, + batchName: String = null, createTime: Long = 0, endTime: Long = 0, from: Int = -1, diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CommandLine.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CommandLine.scala index 478c439a4..271bb06ab 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CommandLine.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CommandLine.scala @@ -222,6 +222,9 @@ object CommandLine extends CommonCommandLine { opt[String]("batchState") .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchState = v))) .text("Batch state."), + opt[String]("batchName") + .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(batchName = v))) + .text("Batch name."), opt[String]("createTime") .action((v, c) => c.copy(batchOpts = c.batchOpts.copy(createTime = diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala index 1b973c0eb..bd5b2ac45 100644 --- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala +++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/ControlCliArgumentsSuite.scala @@ -429,6 +429,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit { | --batchType Batch type. | --batchUser Batch user. | --batchState Batch state. + | --batchName Batch name. | --createTime Batch create time, should be in yyyyMMddHHmmss format. | --endTime Batch end time, should be in yyyyMMddHHmmss format. | --from Specify which record to start from retrieving info. diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java index f5099568b..afcfe77d3 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/BatchRestApi.java @@ -63,10 +63,23 @@ public class BatchRestApi { Long endTime, int from, int size) { + return listBatches(batchType, batchUser, batchState, null, createTime, endTime, from, size); + } + + public GetBatchesResponse listBatches( + String batchType, + String batchUser, + String batchState, + String batchName, + Long createTime, + Long endTime, + int from, + int size) { Map params = new HashMap<>(); params.put("batchType", batchType); params.put("batchUser", batchUser); params.put("batchState", batchState); + params.put("batchName", batchName); if (null != createTime && createTime > 0) { params.put("createTime", createTime); } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala index ba043f071..e5ac23905 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala @@ -45,8 +45,8 @@ import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, Operat import org.apache.kyuubi.server.api.ApiRequestContext import org.apache.kyuubi.server.api.v1.BatchesResource._ import org.apache.kyuubi.server.metadata.MetadataManager -import org.apache.kyuubi.server.metadata.api.Metadata -import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle} +import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} +import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle, SessionType} import org.apache.kyuubi.util.JdbcUtils @Tag(name = "Batch") @@ -315,6 +315,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { @QueryParam("batchType") batchType: String, @QueryParam("batchState") batchState: String, @QueryParam("batchUser") batchUser: String, + @QueryParam("batchName") batchName: String, @QueryParam("createTime") createTime: Long, @QueryParam("endTime") endTime: Long, @QueryParam("from") from: Int, @@ -327,15 +328,16 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging { validBatchState(batchState), s"The valid batch state can be one of the following: ${VALID_BATCH_STATES.mkString(",")}") } - val batches = - sessionManager.getBatchesFromMetadataStore( - batchType, - batchUser, - batchState, - createTime, - endTime, - from, - size) + + val filter = MetadataFilter( + sessionType = SessionType.BATCH, + engineType = batchType, + username = batchUser, + state = batchState, + requestName = batchName, + createTime = createTime, + endTime = endTime) + val batches = sessionManager.getBatchesFromMetadataStore(filter, from, size) new GetBatchesResponse(from, batches.size, batches.asJava) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala index 17beeb62a..e2648076d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/MetadataManager.scala @@ -133,21 +133,7 @@ class MetadataManager extends AbstractService("MetadataManager") { .filter(_.sessionType == SessionType.BATCH) } - def getBatches( - batchType: String, - batchUser: String, - batchState: String, - createTime: Long, - endTime: Long, - from: Int, - size: Int): Seq[Batch] = { - val filter = MetadataFilter( - sessionType = SessionType.BATCH, - engineType = batchType, - username = batchUser, - state = batchState, - createTime = createTime, - endTime = endTime) + def getBatches(filter: MetadataFilter, from: Int, size: Int): Seq[Batch] = { withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, size, true)).map( buildBatch) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/MetadataFilter.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/MetadataFilter.scala index 6213f8e64..d4f7f2b63 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/MetadataFilter.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/MetadataFilter.scala @@ -27,6 +27,7 @@ case class MetadataFilter( engineType: String = null, username: String = null, state: String = null, + requestName: String = null, kyuubiInstance: String = null, createTime: Long = 0L, endTime: Long = 0L, diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala index 3f9463bf7..9f0bd6843 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala @@ -238,6 +238,10 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging { whereConditions += "state = ?" params += state.toUpperCase(Locale.ROOT) } + Option(filter.requestName).filter(_.nonEmpty).foreach { requestName => + whereConditions += "request_name = ?" + params += requestName + } Option(filter.kyuubiInstance).filter(_.nonEmpty).foreach { kyuubiInstance => whereConditions += "kyuubi_instance = ?" params += kyuubiInstance diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala index d2547bca9..8d63dfbf7 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala @@ -36,7 +36,7 @@ import org.apache.kyuubi.metrics.MetricsSystem import org.apache.kyuubi.operation.{KyuubiOperationManager, OperationState} import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader, SessionConfAdvisor} import org.apache.kyuubi.server.metadata.{MetadataManager, MetadataRequestsRetryRef} -import org.apache.kyuubi.server.metadata.api.Metadata +import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} import org.apache.kyuubi.sql.parser.server.KyuubiParser import org.apache.kyuubi.util.{SignUtils, ThreadUtils} @@ -240,17 +240,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) { metadataManager.flatMap(mm => mm.getBatch(batchId)) } - def getBatchesFromMetadataStore( - batchType: String, - batchUser: String, - batchState: String, - createTime: Long, - endTime: Long, - from: Int, - size: Int): Seq[Batch] = { - metadataManager.map { mm => - mm.getBatches(batchType, batchUser, batchState, createTime, endTime, from, size) - }.getOrElse(Seq.empty) + def getBatchesFromMetadataStore(filter: MetadataFilter, from: Int, size: Int): Seq[Batch] = { + metadataManager.map(_.getBatches(filter, from, size)).getOrElse(Seq.empty) } def getBatchMetadata(batchId: String): Option[Metadata] = { diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala index 8a797f842..b6bc1af52 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala @@ -44,7 +44,7 @@ import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState} import org.apache.kyuubi.operation.OperationState.OperationState import org.apache.kyuubi.server.KyuubiRestFrontendService import org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER -import org.apache.kyuubi.server.metadata.api.Metadata +import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} import org.apache.kyuubi.service.authentication.KyuubiAuthenticationFactory import org.apache.kyuubi.session.{KyuubiBatchSession, KyuubiSessionManager, SessionHandle, SessionType} @@ -62,10 +62,9 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi sessionManager.allSessions().foreach { session => sessionManager.closeSession(session.handle) } - sessionManager.getBatchesFromMetadataStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach { - batch => - sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), batch.getId) - sessionManager.cleanupMetadata(batch.getId) + sessionManager.getBatchesFromMetadataStore(MetadataFilter(), 0, Int.MaxValue).foreach { batch => + sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), batch.getId) + sessionManager.cleanupMetadata(batch.getId) } } @@ -518,11 +517,7 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi } assert(sessionManager.getBatchesFromMetadataStore( - "SPARK", - null, - null, - 0, - 0, + MetadataFilter(engineType = "SPARK"), 0, Int.MaxValue).size == 2) } @@ -711,4 +706,31 @@ class BatchesResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper wi .replaceAll("\\[", "").replaceAll("\\]", "") assert(sessionManager.getBatchMetadata(batchId).map(_.state).contains("CANCELED")) } + + test("get batch list with batch name filter condition") { + val sessionManager = server.frontendServices.head + .be.sessionManager.asInstanceOf[KyuubiSessionManager] + sessionManager.allSessions().foreach(_.close()) + + val uniqueName = UUID.randomUUID().toString + sessionManager.openBatchSession( + "kyuubi", + "kyuubi", + InetAddress.getLocalHost.getCanonicalHostName, + Map(KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString), + newBatchRequest( + "spark", + sparkBatchTestResource.get, + "", + uniqueName)) + + val response = webTarget.path("api/v1/batches") + .queryParam("batchName", uniqueName) + .request(MediaType.APPLICATION_JSON_TYPE) + .get() + + assert(response.getStatus == 200) + val getBatchListResponse = response.readEntity(classOf[GetBatchesResponse]) + assert(getBatchListResponse.getTotal == 1) + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala index 8064b7f1f..aca416dac 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala @@ -28,7 +28,7 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} import org.apache.kyuubi.metrics.MetricsConstants._ -import org.apache.kyuubi.server.metadata.api.Metadata +import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter} import org.apache.kyuubi.session.SessionType class MetadataManagerSuite extends KyuubiFunSuite { @@ -157,7 +157,7 @@ class MetadataManagerSuite extends KyuubiFunSuite { metadataManager.start() f(metadataManager) } finally { - metadataManager.getBatches(null, null, null, 0, 0, 0, Int.MaxValue).foreach { batch => + metadataManager.getBatches(MetadataFilter(), 0, Int.MaxValue).foreach { batch => metadataManager.cleanupMetadataById(batch.getId) } // ensure no metadata request leak diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala index 7cf939910..0c44fc3a8 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala @@ -34,6 +34,7 @@ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.ctl.{CtlConf, TestPrematureExit} import org.apache.kyuubi.engine.ApplicationManagerInfo import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem} +import org.apache.kyuubi.server.metadata.api.MetadataFilter import org.apache.kyuubi.session.KyuubiSessionManager class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with BatchTestHelper { @@ -101,10 +102,9 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat sessionManager.allSessions().foreach { session => sessionManager.closeSession(session.handle) } - sessionManager.getBatchesFromMetadataStore(null, null, null, 0, 0, 0, Int.MaxValue).foreach { - batch => - sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), batch.getId) - sessionManager.cleanupMetadata(batch.getId) + sessionManager.getBatchesFromMetadataStore(MetadataFilter(), 0, Int.MaxValue).foreach { batch => + sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), batch.getId) + sessionManager.cleanupMetadata(batch.getId) } }