diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ServerData.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ServerData.java new file mode 100644 index 000000000..d64d43a72 --- /dev/null +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/ServerData.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.client.api.v1.dto; + +import java.util.Map; +import java.util.Objects; + +public class ServerData { + private String nodeName; + private String namespace; + private String instance; + private String host; + private int port; + private Map attributes; + private String status; + + public ServerData( + String nodeName, + String namespace, + String instance, + String host, + int port, + Map attributes, + String status) { + this.nodeName = nodeName; + this.namespace = namespace; + this.instance = instance; + this.host = host; + this.port = port; + this.attributes = attributes; + this.status = status; + } + + public String getNodeName() { + return nodeName; + } + + public ServerData setNodeName(String nodeName) { + this.nodeName = nodeName; + return this; + } + + public String getNamespace() { + return namespace; + } + + public ServerData setNamespace(String namespace) { + this.namespace = namespace; + return this; + } + + public String getInstance() { + return instance; + } + + public ServerData setInstance(String instance) { + this.instance = instance; + return this; + } + + public String getHost() { + return host; + } + + public ServerData setHost(String host) { + this.host = host; + return this; + } + + public int getPort() { + return port; + } + + public ServerData setPort(int port) { + this.port = port; + return this; + } + + public Map getAttributes() { + return attributes; + } + + public ServerData setAttributes(Map attributes) { + this.attributes = attributes; + return this; + } + + public String getStatus() { + return status; + } + + public ServerData setStatus(String status) { + this.status = status; + return this; + } + + @Override + public int hashCode() { + return Objects.hash(nodeName, namespace, instance, port, attributes, status); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + ServerData server = (ServerData) obj; + return port == server.port + && Objects.equals(nodeName, server.nodeName) + && Objects.equals(namespace, server.namespace) + && Objects.equals(instance, server.instance) + && Objects.equals(host, server.host) + && Objects.equals(status, server.status); + } +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala index ebbf04c90..1f2cb309b 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/ApiUtils.scala @@ -20,8 +20,9 @@ package org.apache.kyuubi.server.api import scala.collection.JavaConverters._ import org.apache.kyuubi.Utils -import org.apache.kyuubi.client.api.v1.dto.{OperationData, SessionData} +import org.apache.kyuubi.client.api.v1.dto.{OperationData, ServerData, SessionData} import org.apache.kyuubi.events.KyuubiOperationEvent +import org.apache.kyuubi.ha.client.ServiceNodeInfo import org.apache.kyuubi.operation.KyuubiOperation import org.apache.kyuubi.session.KyuubiSession @@ -58,4 +59,15 @@ object ApiUtils { opEvent.sessionType, operation.getSession.asInstanceOf[KyuubiSession].connectionUrl) } + + def serverData(nodeInfo: ServiceNodeInfo): ServerData = { + new ServerData( + nodeInfo.nodeName, + nodeInfo.namespace, + nodeInfo.instance, + nodeInfo.host, + nodeInfo.port, + nodeInfo.attributes.asJava, + "Running") + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala index 0d8b31b2c..113660a41 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala @@ -31,7 +31,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.zookeeper.KeeperException.NoNodeException import org.apache.kyuubi.{KYUUBI_VERSION, Logging, Utils} -import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, SessionData} +import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, ServerData, SessionData} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE @@ -296,6 +296,35 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { node.attributes.asJava)) } + @ApiResponse( + responseCode = "200", + content = Array( + new Content( + mediaType = MediaType.APPLICATION_JSON, + array = new ArraySchema(schema = new Schema(implementation = + classOf[OperationData])))), + description = "list all live kyuubi servers") + @GET + @Path("server") + def listServers(): Seq[ServerData] = { + val userName = fe.getSessionUser(Map.empty[String, String]) + val ipAddress = fe.getIpAddress + info(s"Received list all live kyuubi servers request from $userName/$ipAddress") + if (!isAdministrator(userName)) { + throw new NotAllowedException( + s"$userName is not allowed to list all live kyuubi servers") + } + val kyuubiConf = fe.getConf + val servers = ListBuffer[ServerData]() + val serverSpec = DiscoveryPaths.makePath(null, kyuubiConf.get(HA_NAMESPACE)) + withDiscoveryClient(kyuubiConf) { discoveryClient => + discoveryClient.getServiceNodesInfo(serverSpec).map(nodeInfo => { + servers += ApiUtils.serverData(nodeInfo) + }) + } + servers + } + private def getEngine( userName: String, engineType: String, diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala index a10994d7e..b7650627e 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala @@ -17,6 +17,7 @@ package org.apache.kyuubi.server.api.v1 +import java.nio.charset.StandardCharsets import java.util.{Base64, UUID} import javax.ws.rs.client.Entity import javax.ws.rs.core.{GenericType, MediaType} @@ -24,19 +25,22 @@ import javax.ws.rs.core.{GenericType, MediaType} import scala.collection.JavaConverters._ import org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2 +import org.mockito.Mockito.lenient import org.scalatest.time.SpanSugar.convertIntToGrainOfTime +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, RestFrontendTestHelper, Utils} -import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, SessionData, SessionHandle, SessionOpenRequest} +import org.apache.kyuubi.client.api.v1.dto.{Engine, OperationData, ServerData, SessionData, SessionHandle, SessionOpenRequest} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY import org.apache.kyuubi.engine.{ApplicationState, EngineRef, KyuubiApplicationManager} import org.apache.kyuubi.engine.EngineType.SPARK_SQL import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, USER} import org.apache.kyuubi.ha.HighAvailabilityConf +import org.apache.kyuubi.ha.client.{DiscoveryPaths, ServiceDiscovery} import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient -import org.apache.kyuubi.ha.client.DiscoveryPaths import org.apache.kyuubi.plugin.PluginLoader +import org.apache.kyuubi.server.KyuubiRestFrontendService import org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { @@ -46,6 +50,13 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { override protected lazy val conf: KyuubiConf = KyuubiConf() .set(KyuubiConf.SERVER_ADMINISTRATORS, Seq("admin001")) + private val encodeAuthorization: String = { + new String( + Base64.getEncoder.encode( + s"${Utils.currentUser}:".getBytes()), + StandardCharsets.UTF_8) + } + override def beforeAll(): Unit = { super.beforeAll() engineMgr.initialize(KyuubiConf()) @@ -63,11 +74,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { .post(null) assert(405 == response.getStatus) - val adminUser = Utils.currentUser - val encodeAuthorization = new String( - Base64.getEncoder.encode( - s"$adminUser:".getBytes()), - "UTF-8") response = webTarget.path("api/v1/admin/refresh/hadoop_conf") .request() .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization") @@ -76,7 +82,7 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { val admin001AuthHeader = new String( Base64.getEncoder.encode("admin001".getBytes()), - "UTF-8") + StandardCharsets.UTF_8) response = webTarget.path("api/v1/admin/refresh/hadoop_conf") .request() .header(AUTHORIZATION_HEADER, s"BASIC $admin001AuthHeader") @@ -85,7 +91,7 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { val admin002AuthHeader = new String( Base64.getEncoder.encode("admin002".getBytes()), - "UTF-8") + StandardCharsets.UTF_8) response = webTarget.path("api/v1/admin/refresh/hadoop_conf") .request() .header(AUTHORIZATION_HEADER, s"BASIC $admin002AuthHeader") @@ -99,11 +105,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { .post(null) assert(405 == response.getStatus) - val adminUser = Utils.currentUser - val encodeAuthorization = new String( - Base64.getEncoder.encode( - s"$adminUser:".getBytes()), - "UTF-8") response = webTarget.path("api/v1/admin/refresh/user_defaults_conf") .request() .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization") @@ -117,11 +118,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { .post(null) assert(405 == response.getStatus) - val adminUser = Utils.currentUser - val encodeAuthorization = new String( - Base64.getEncoder.encode( - s"$adminUser:".getBytes()), - "UTF-8") response = webTarget.path("api/v1/admin/refresh/unlimited_users") .request() .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization") @@ -136,12 +132,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { .request(MediaType.APPLICATION_JSON_TYPE) .post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE)) - val adminUser = Utils.currentUser - val encodeAuthorization = new String( - Base64.getEncoder.encode( - s"$adminUser:".getBytes()), - "UTF-8") - // get session list var response2 = webTarget.path("api/v1/admin/sessions").request() .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization") @@ -196,12 +186,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { "localhost", Map("testConfig" -> "testValue")) - val adminUser = Utils.currentUser - val encodeAuthorization = new String( - Base64.getEncoder.encode( - s"$adminUser:".getBytes()), - "UTF-8") - // list sessions var response = webTarget.path("api/v1/admin/sessions") .queryParam("users", "admin") @@ -249,12 +233,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { Map("testConfig" -> "testValue")) val operation = fe.be.getCatalogs(sessionHandle) - val adminUser = Utils.currentUser - val encodeAuthorization = new String( - Base64.getEncoder.encode( - s"$adminUser:".getBytes()), - "UTF-8") - // list operations var response = webTarget.path("api/v1/admin/operations").request() .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization") @@ -301,11 +279,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { assert(client.pathExists(engineSpace)) assert(client.getChildren(engineSpace).size == 1) - val adminUser = Utils.currentUser - val encodeAuthorization = new String( - Base64.getEncoder.encode( - s"$adminUser:".getBytes()), - "UTF-8") val response = webTarget.path("api/v1/admin/engine") .queryParam("sharelevel", "USER") .queryParam("type", "spark_sql") @@ -349,11 +322,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { assert(client.pathExists(engineSpace)) assert(client.getChildren(engineSpace).size == 1) - val adminUser = Utils.currentUser - val encodeAuthorization = new String( - Base64.getEncoder.encode( - s"$adminUser:".getBytes()), - "UTF-8") val response = webTarget.path("api/v1/admin/engine") .queryParam("sharelevel", "connection") .queryParam("type", "spark_sql") @@ -389,11 +357,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { assert(client.pathExists(engineSpace)) assert(client.getChildren(engineSpace).size == 1) - val adminUser = Utils.currentUser - val encodeAuthorization = new String( - Base64.getEncoder.encode( - s"$adminUser:".getBytes()), - "UTF-8") val response = webTarget.path("api/v1/admin/engine") .queryParam("type", "spark_sql") .request(MediaType.APPLICATION_JSON_TYPE) @@ -453,11 +416,6 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { assert(client.pathExists(engineSpace1)) assert(client.pathExists(engineSpace2)) - val adminUser = Utils.currentUser - val encodeAuthorization = new String( - Base64.getEncoder.encode( - s"$adminUser:".getBytes()), - "UTF-8") val response = webTarget.path("api/v1/admin/engine") .queryParam("type", "spark_sql") .request(MediaType.APPLICATION_JSON_TYPE) @@ -488,4 +446,36 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { } } } + + test("list server") { + // Mock Kyuubi Server + val serverDiscovery = mock[ServiceDiscovery] + lenient.when(serverDiscovery.fe).thenReturn(fe) + val namespace = conf.get(HighAvailabilityConf.HA_NAMESPACE) + withDiscoveryClient(conf) { client => + client.registerService(conf, namespace, serverDiscovery) + + val response = webTarget.path("api/v1/admin/server") + .request() + .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization") + .get + + assert(200 == response.getStatus) + val result = response.readEntity(new GenericType[Seq[ServerData]]() {}) + assert(result.size == 1) + val testServer = result.head + val export = fe.asInstanceOf[KyuubiRestFrontendService] + + assert(namespace.equals(testServer.getNamespace.replaceFirst("/", ""))) + assert(export.host.equals(testServer.getHost)) + assert(export.connectionUrl.equals(testServer.getInstance())) + assert(!testServer.getAttributes.isEmpty) + val attributes = testServer.getAttributes + assert(attributes.containsKey("serviceUri") && + attributes.get("serviceUri").equals(fe.connectionUrl)) + assert(attributes.containsKey("version")) + assert(attributes.containsKey("sequence")) + assert("Running".equals(testServer.getStatus)) + } + } }