[KYUUBI #3653][REST] AdminResource add list kyuubi server api

### _Why are the changes needed?_

Add List Kyuubi Server Api for `AdminResource`

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4670 from zwangsheng/KYUUBI_3653.

Closes #3653

b91a6c617 [zwangsheng] fxi
4271d0fd0 [zwangsheng] fix comments
e14f8cd55 [zwangsheng] [KYUUBI #3653][REST] AdminResource add list server api

Authored-by: zwangsheng <2213335496@qq.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
zwangsheng 2023-04-18 13:44:23 +08:00 committed by fwang12
parent 8c7b457d88
commit f6331a2a0f
4 changed files with 219 additions and 59 deletions

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.client.api.v1.dto;
import java.util.Map;
import java.util.Objects;
public class ServerData {
private String nodeName;
private String namespace;
private String instance;
private String host;
private int port;
private Map<String, String> attributes;
private String status;
public ServerData(
String nodeName,
String namespace,
String instance,
String host,
int port,
Map<String, String> attributes,
String status) {
this.nodeName = nodeName;
this.namespace = namespace;
this.instance = instance;
this.host = host;
this.port = port;
this.attributes = attributes;
this.status = status;
}
public String getNodeName() {
return nodeName;
}
public ServerData setNodeName(String nodeName) {
this.nodeName = nodeName;
return this;
}
public String getNamespace() {
return namespace;
}
public ServerData setNamespace(String namespace) {
this.namespace = namespace;
return this;
}
public String getInstance() {
return instance;
}
public ServerData setInstance(String instance) {
this.instance = instance;
return this;
}
public String getHost() {
return host;
}
public ServerData setHost(String host) {
this.host = host;
return this;
}
public int getPort() {
return port;
}
public ServerData setPort(int port) {
this.port = port;
return this;
}
public Map<String, String> getAttributes() {
return attributes;
}
public ServerData setAttributes(Map<String, String> attributes) {
this.attributes = attributes;
return this;
}
public String getStatus() {
return status;
}
public ServerData setStatus(String status) {
this.status = status;
return this;
}
@Override
public int hashCode() {
return Objects.hash(nodeName, namespace, instance, port, attributes, status);
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
ServerData server = (ServerData) obj;
return port == server.port
&& Objects.equals(nodeName, server.nodeName)
&& Objects.equals(namespace, server.namespace)
&& Objects.equals(instance, server.instance)
&& Objects.equals(host, server.host)
&& Objects.equals(status, server.status);
}
}

View File

@ -20,8 +20,9 @@ package org.apache.kyuubi.server.api
import scala.collection.JavaConverters._
import org.apache.kyuubi.Utils
import org.apache.kyuubi.client.api.v1.dto.{OperationData, SessionData}
import org.apache.kyuubi.client.api.v1.dto.{OperationData, ServerData, SessionData}
import org.apache.kyuubi.events.KyuubiOperationEvent
import org.apache.kyuubi.ha.client.ServiceNodeInfo
import org.apache.kyuubi.operation.KyuubiOperation
import org.apache.kyuubi.session.KyuubiSession
@ -58,4 +59,15 @@ object ApiUtils {
opEvent.sessionType,
operation.getSession.asInstanceOf[KyuubiSession].connectionUrl)
}
def serverData(nodeInfo: ServiceNodeInfo): ServerData = {
new ServerData(
nodeInfo.nodeName,
nodeInfo.namespace,
nodeInfo.instance,
nodeInfo.host,
nodeInfo.port,
nodeInfo.attributes.asJava,
"Running")
}
}

View File

@ -31,7 +31,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.zookeeper.KeeperException.NoNodeException
import org.apache.kyuubi.{KYUUBI_VERSION, Logging, Utils}
import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, SessionData}
import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, ServerData, SessionData}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
@ -296,6 +296,35 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
node.attributes.asJava))
}
@ApiResponse(
responseCode = "200",
content = Array(
new Content(
mediaType = MediaType.APPLICATION_JSON,
array = new ArraySchema(schema = new Schema(implementation =
classOf[OperationData])))),
description = "list all live kyuubi servers")
@GET
@Path("server")
def listServers(): Seq[ServerData] = {
val userName = fe.getSessionUser(Map.empty[String, String])
val ipAddress = fe.getIpAddress
info(s"Received list all live kyuubi servers request from $userName/$ipAddress")
if (!isAdministrator(userName)) {
throw new NotAllowedException(
s"$userName is not allowed to list all live kyuubi servers")
}
val kyuubiConf = fe.getConf
val servers = ListBuffer[ServerData]()
val serverSpec = DiscoveryPaths.makePath(null, kyuubiConf.get(HA_NAMESPACE))
withDiscoveryClient(kyuubiConf) { discoveryClient =>
discoveryClient.getServiceNodesInfo(serverSpec).map(nodeInfo => {
servers += ApiUtils.serverData(nodeInfo)
})
}
servers
}
private def getEngine(
userName: String,
engineType: String,

View File

@ -17,6 +17,7 @@
package org.apache.kyuubi.server.api.v1
import java.nio.charset.StandardCharsets
import java.util.{Base64, UUID}
import javax.ws.rs.client.Entity
import javax.ws.rs.core.{GenericType, MediaType}
@ -24,19 +25,22 @@ import javax.ws.rs.core.{GenericType, MediaType}
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2
import org.mockito.Mockito.lenient
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import org.scalatestplus.mockito.MockitoSugar.mock
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, RestFrontendTestHelper, Utils}
import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, SessionData, SessionHandle, SessionOpenRequest}
import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, ServerData, SessionData, SessionHandle, SessionOpenRequest}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
import org.apache.kyuubi.engine.{ApplicationState, EngineRef, KyuubiApplicationManager}
import org.apache.kyuubi.engine.EngineType.SPARK_SQL
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, USER}
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.{DiscoveryPaths, ServiceDiscovery}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
import org.apache.kyuubi.ha.client.DiscoveryPaths
import org.apache.kyuubi.plugin.PluginLoader
import org.apache.kyuubi.server.KyuubiRestFrontendService
import org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
@ -46,6 +50,13 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
override protected lazy val conf: KyuubiConf = KyuubiConf()
.set(KyuubiConf.SERVER_ADMINISTRATORS, Seq("admin001"))
private val encodeAuthorization: String = {
new String(
Base64.getEncoder.encode(
s"${Utils.currentUser}:".getBytes()),
StandardCharsets.UTF_8)
}
override def beforeAll(): Unit = {
super.beforeAll()
engineMgr.initialize(KyuubiConf())
@ -63,11 +74,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
.post(null)
assert(405 == response.getStatus)
val adminUser = Utils.currentUser
val encodeAuthorization = new String(
Base64.getEncoder.encode(
s"$adminUser:".getBytes()),
"UTF-8")
response = webTarget.path("api/v1/admin/refresh/hadoop_conf")
.request()
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
@ -76,7 +82,7 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
val admin001AuthHeader = new String(
Base64.getEncoder.encode("admin001".getBytes()),
"UTF-8")
StandardCharsets.UTF_8)
response = webTarget.path("api/v1/admin/refresh/hadoop_conf")
.request()
.header(AUTHORIZATION_HEADER, s"BASIC $admin001AuthHeader")
@ -85,7 +91,7 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
val admin002AuthHeader = new String(
Base64.getEncoder.encode("admin002".getBytes()),
"UTF-8")
StandardCharsets.UTF_8)
response = webTarget.path("api/v1/admin/refresh/hadoop_conf")
.request()
.header(AUTHORIZATION_HEADER, s"BASIC $admin002AuthHeader")
@ -99,11 +105,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
.post(null)
assert(405 == response.getStatus)
val adminUser = Utils.currentUser
val encodeAuthorization = new String(
Base64.getEncoder.encode(
s"$adminUser:".getBytes()),
"UTF-8")
response = webTarget.path("api/v1/admin/refresh/user_defaults_conf")
.request()
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
@ -117,11 +118,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
.post(null)
assert(405 == response.getStatus)
val adminUser = Utils.currentUser
val encodeAuthorization = new String(
Base64.getEncoder.encode(
s"$adminUser:".getBytes()),
"UTF-8")
response = webTarget.path("api/v1/admin/refresh/unlimited_users")
.request()
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
@ -136,12 +132,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
val adminUser = Utils.currentUser
val encodeAuthorization = new String(
Base64.getEncoder.encode(
s"$adminUser:".getBytes()),
"UTF-8")
// get session list
var response2 = webTarget.path("api/v1/admin/sessions").request()
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
@ -196,12 +186,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
"localhost",
Map("testConfig" -> "testValue"))
val adminUser = Utils.currentUser
val encodeAuthorization = new String(
Base64.getEncoder.encode(
s"$adminUser:".getBytes()),
"UTF-8")
// list sessions
var response = webTarget.path("api/v1/admin/sessions")
.queryParam("users", "admin")
@ -249,12 +233,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
Map("testConfig" -> "testValue"))
val operation = fe.be.getCatalogs(sessionHandle)
val adminUser = Utils.currentUser
val encodeAuthorization = new String(
Base64.getEncoder.encode(
s"$adminUser:".getBytes()),
"UTF-8")
// list operations
var response = webTarget.path("api/v1/admin/operations").request()
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
@ -301,11 +279,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(client.pathExists(engineSpace))
assert(client.getChildren(engineSpace).size == 1)
val adminUser = Utils.currentUser
val encodeAuthorization = new String(
Base64.getEncoder.encode(
s"$adminUser:".getBytes()),
"UTF-8")
val response = webTarget.path("api/v1/admin/engine")
.queryParam("sharelevel", "USER")
.queryParam("type", "spark_sql")
@ -349,11 +322,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(client.pathExists(engineSpace))
assert(client.getChildren(engineSpace).size == 1)
val adminUser = Utils.currentUser
val encodeAuthorization = new String(
Base64.getEncoder.encode(
s"$adminUser:".getBytes()),
"UTF-8")
val response = webTarget.path("api/v1/admin/engine")
.queryParam("sharelevel", "connection")
.queryParam("type", "spark_sql")
@ -389,11 +357,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(client.pathExists(engineSpace))
assert(client.getChildren(engineSpace).size == 1)
val adminUser = Utils.currentUser
val encodeAuthorization = new String(
Base64.getEncoder.encode(
s"$adminUser:".getBytes()),
"UTF-8")
val response = webTarget.path("api/v1/admin/engine")
.queryParam("type", "spark_sql")
.request(MediaType.APPLICATION_JSON_TYPE)
@ -453,11 +416,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(client.pathExists(engineSpace1))
assert(client.pathExists(engineSpace2))
val adminUser = Utils.currentUser
val encodeAuthorization = new String(
Base64.getEncoder.encode(
s"$adminUser:".getBytes()),
"UTF-8")
val response = webTarget.path("api/v1/admin/engine")
.queryParam("type", "spark_sql")
.request(MediaType.APPLICATION_JSON_TYPE)
@ -488,4 +446,36 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
}
}
}
test("list server") {
// Mock Kyuubi Server
val serverDiscovery = mock[ServiceDiscovery]
lenient.when(serverDiscovery.fe).thenReturn(fe)
val namespace = conf.get(HighAvailabilityConf.HA_NAMESPACE)
withDiscoveryClient(conf) { client =>
client.registerService(conf, namespace, serverDiscovery)
val response = webTarget.path("api/v1/admin/server")
.request()
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
.get
assert(200 == response.getStatus)
val result = response.readEntity(new GenericType[Seq[ServerData]]() {})
assert(result.size == 1)
val testServer = result.head
val export = fe.asInstanceOf[KyuubiRestFrontendService]
assert(namespace.equals(testServer.getNamespace.replaceFirst("/", "")))
assert(export.host.equals(testServer.getHost))
assert(export.connectionUrl.equals(testServer.getInstance()))
assert(!testServer.getAttributes.isEmpty)
val attributes = testServer.getAttributes
assert(attributes.containsKey("serviceUri") &&
attributes.get("serviceUri").equals(fe.connectionUrl))
assert(attributes.containsKey("version"))
assert(attributes.containsKey("sequence"))
assert("Running".equals(testServer.getStatus))
}
}
}