diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala index ff08b4d18..d2fb52a2b 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala @@ -133,4 +133,15 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage tableName: String): Operation = { throw KyuubiSQLException.featureNotSupported() } + + override def newGetCrossReferenceOperation( + session: Session, + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String): Operation = { + throw KyuubiSQLException.featureNotSupported() + } } diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/GetCrossReference.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/GetCrossReference.scala new file mode 100644 index 000000000..22009e01d --- /dev/null +++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/GetCrossReference.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.hive.operation + +import org.apache.hive.service.cli.operation.Operation + +import org.apache.kyuubi.operation.OperationType +import org.apache.kyuubi.session.Session + +class GetCrossReference( + session: Session, + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String) + extends HiveOperation(OperationType.GET_FUNCTIONS, session) { + + override val internalHiveOperation: Operation = + delegatedOperationManager.newGetCrossReferenceOperation( + hive, + primaryCatalog, + primarySchema, + primaryTable, + foreignCatalog, + foreignSchema, + foreignTable) +} diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala index fe6842831..04aca79ad 100644 --- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala +++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala @@ -102,6 +102,25 @@ class HiveOperationManager() extends OperationManager("HiveOperationManager") { addOperation(operation) } + override def newGetCrossReferenceOperation( + session: Session, + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String): Operation = { + val operation = new GetCrossReference( + session, + primaryCatalog, + primarySchema, + primaryTable, + foreignCatalog, + foreignSchema, + foreignTable) + addOperation(operation) + } + override def getOperationLogRowSet( opHandle: OperationHandle, order: FetchOrientation, diff --git a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala index 6b17d8f58..7e2a63db0 100644 --- a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala +++ b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala @@ -241,7 +241,7 @@ class HiveOperationSuite extends HiveJDBCTestHelper { withDatabases("test_schema") { statement => statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema") statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table(a string, " + - "PRIMARY KEY(a) disable novalidate)") + "PRIMARY KEY(a) DISABLE NOVALIDATE)") try { val meta = statement.getConnection.getMetaData @@ -261,6 +261,52 @@ class HiveOperationSuite extends HiveJDBCTestHelper { } } + test("get cross reference") { + withDatabases("test_schema") { statement => + statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema") + statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table1(a string, " + + "PRIMARY KEY(a) DISABLE NOVALIDATE)") + statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table2(a string, b string, " + + "FOREIGN KEY(b) REFERENCES test_schema.test_table1(a) DISABLE NOVALIDATE)") + + try { + val meta = statement.getConnection.getMetaData + val resultSet = meta.getCrossReference( + null, + "test_schema", + "test_table1", + null, + "test_schema", + "test_table2") + val resultSetBuffer = + ArrayBuffer[(String, String, String, String, String, String, String, String)]() + while (resultSet.next()) { + resultSetBuffer += Tuple8( + resultSet.getString("PKTABLE_CAT"), + resultSet.getString("PKTABLE_SCHEM"), + resultSet.getString("PKTABLE_NAME"), + resultSet.getString("PKCOLUMN_NAME"), + resultSet.getString("FKTABLE_CAT"), + resultSet.getString("FKTABLE_SCHEM"), + resultSet.getString("FKTABLE_NAME"), + resultSet.getString("FKCOLUMN_NAME")) + } + assert(resultSetBuffer.contains(( + null, + "test_schema", + "test_table1", + "a", + null, + "test_schema", + "test_table2", + "b"))) + } finally { + statement.execute("DROP TABLE test_schema.test_table2") + statement.execute("DROP TABLE test_schema.test_table1") + } + } + } + test("basic execute statements, create, insert query") { withJdbcStatement("hive_engine_test") { statement => statement.execute("CREATE TABLE hive_engine_test(id int, value string) stored as orc") diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala index c0c7899bd..e35dbbc1a 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala @@ -140,4 +140,15 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n tableName: String): Operation = { throw KyuubiSQLException.featureNotSupported() } + + override def newGetCrossReferenceOperation( + session: Session, + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String): Operation = { + throw KyuubiSQLException.featureNotSupported() + } } diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala index 9441024f8..67367bba6 100644 --- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala +++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala @@ -104,4 +104,15 @@ class TrinoOperationManager extends OperationManager("TrinoOperationManager") { tableName: String): Operation = { throw KyuubiSQLException.featureNotSupported() } + + override def newGetCrossReferenceOperation( + session: Session, + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String): Operation = { + throw KyuubiSQLException.featureNotSupported() + } } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala index e14941c52..3015e3427 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala @@ -75,6 +75,14 @@ abstract class OperationManager(name: String) extends AbstractService(name) { catalogName: String, schemaName: String, tableName: String): Operation + def newGetCrossReferenceOperation( + session: Session, + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String): Operation final def addOperation(operation: Operation): Operation = synchronized { handleToOperation.put(operation.getHandle, operation) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala index 985ce8c80..b24123ab3 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala @@ -131,6 +131,25 @@ abstract class AbstractBackendService(name: String) .getPrimaryKeys(catalogName, schemaName, tableName) } + override def getCrossReference( + sessionHandle: SessionHandle, + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String): OperationHandle = { + sessionManager + .getSession(sessionHandle) + .getCrossReference( + primaryCatalog, + primarySchema, + primaryTable, + foreignCatalog, + foreignSchema, + foreignTable) + } + override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = { val operation = sessionManager.operationManager.getOperation(operationHandle) if (operation.shouldRunAsync) { diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala index 18ce237e3..638b7e66b 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala @@ -81,6 +81,14 @@ trait BackendService { catalogName: String, schemaName: String, tableName: String): OperationHandle + def getCrossReference( + sessionHandle: SessionHandle, + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String): OperationHandle def getOperationStatus(operationHandle: OperationHandle): OperationStatus def cancelOperation(operationHandle: OperationHandle): Unit diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala index d2d12695e..2f07e3ab6 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala @@ -384,8 +384,29 @@ abstract class TFrontendService(name: String) override def GetCrossReference(req: TGetCrossReferenceReq): TGetCrossReferenceResp = { debug(req.toString) val resp = new TGetCrossReferenceResp - val errStatus = KyuubiSQLException.featureNotSupported().toTStatus - resp.setStatus(errStatus) + try { + val sessionHandle = SessionHandle(req.getSessionHandle) + val primaryCatalog = req.getParentCatalogName + val primarySchema = req.getParentSchemaName + val primaryTable = req.getParentTableName + val foreignCatalog = req.getForeignCatalogName + val foreignSchema = req.getForeignSchemaName + val foreignTable = req.getForeignTableName + val opHandle = be.getCrossReference( + sessionHandle, + primaryCatalog, + primarySchema, + primaryTable, + foreignCatalog, + foreignSchema, + foreignTable) + resp.setOperationHandle(opHandle.toTOperationHandle) + resp.setStatus(OK_STATUS) + } catch { + case e: Exception => + error("Error getting primary keys: ", e) + resp.setStatus(KyuubiSQLException.toTStatus(e)) + } resp } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala index d11e1aa38..8300795ad 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala @@ -182,6 +182,25 @@ abstract class AbstractSession( runOperation(operation) } + override def getCrossReference( + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String): OperationHandle = { + val operation = sessionManager.operationManager + .newGetCrossReferenceOperation( + this, + primaryCatalog, + primarySchema, + primaryTable, + foreignCatalog, + foreignSchema, + foreignTable) + runOperation(operation) + } + override def cancelOperation(operationHandle: OperationHandle): Unit = withAcquireRelease() { sessionManager.operationManager.cancelOperation(operationHandle) } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala index af84ad9d3..0218b6580 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala @@ -75,6 +75,13 @@ trait Session { catalogName: String, schemaName: String, tableName: String): OperationHandle + def getCrossReference( + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String): OperationHandle def cancelOperation(operationHandle: OperationHandle): Unit def closeOperation(operationHandle: OperationHandle): Unit diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala index 428845640..ccd384466 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala @@ -101,6 +101,19 @@ class NoopOperationManager extends OperationManager("noop") { addOperation(operation) } + override def newGetCrossReferenceOperation( + session: Session, + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String): Operation = { + val operation = + new NoopOperation(OperationType.GET_FUNCTIONS, session, primarySchema == invalid) + addOperation(operation) + } + override def getOperationLogRowSet( opHandle: OperationHandle, order: FetchOrientation, diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala index 66ed51bc0..8ce1cb749 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala @@ -464,8 +464,12 @@ trait SparkMetadataTests extends HiveJDBCTestHelper { assert(e.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage)) } assert(!metaData.getImportedKeys("", "default", "").next()) - intercept[SQLException] { - metaData.getCrossReference("", "default", "src", "", "default", "src2") + try { + assert(!metaData.getCrossReference("", "default", "src", "", "default", "src2").next()) + } catch { + case e: Exception => + assert(e.isInstanceOf[SQLException]) + assert(e.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage)) } assert(!metaData.getIndexInfo("", "default", "src", true, true).next()) diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala index cfb99e1b5..ebf62d194 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala @@ -342,10 +342,17 @@ class TFrontendServiceSuite extends KyuubiFunSuite { withSessionHandle { (client, handle) => val req = new TGetCrossReferenceReq(handle) val resp = client.GetCrossReference(req) - assert(resp.getOperationHandle === null) - assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) - assert(resp.getStatus.getSqlState === "0A000") - assert(resp.getStatus.getErrorMessage startsWith "feature not supported") + val opHandle = resp.getOperationHandle + assert(opHandle.getOperationType === TOperationType.GET_FUNCTIONS) + assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS) + checkOperationResult(client, opHandle) + + req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle) + val resp1 = client.GetCrossReference(req) + assert(resp1.getOperationHandle === null) + assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS) + assert(resp1.getStatus.getSqlState === null) + assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle") } } diff --git a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala index b72254932..68116f4db 100644 --- a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala +++ b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala @@ -62,6 +62,7 @@ object MetricsConstants { final val BS_GET_COLUMNS = BACKEND_SERVICE + "get_columns" final val BS_GET_FUNCTIONS = BACKEND_SERVICE + "get_functions" final val BS_GET_PRIMARY_KEY = BACKEND_SERVICE + "get_primary_keys" + final val BS_GET_CROSS_REFERENCE = BACKEND_SERVICE + "get_cross_reference" final val BS_GET_OPERATION_STATUS = BACKEND_SERVICE + "get_operation_status" final val BS_CANCEL_OPERATION = BACKEND_SERVICE + "cancel_operation" final val BS_CLOSE_OPERATION = BACKEND_SERVICE + "close_operation" diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala index 98843f687..1ec359b49 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala @@ -197,6 +197,26 @@ class KyuubiSyncThriftClient private (protocol: TProtocol) resp.getOperationHandle } + def getCrossReference( + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String): TOperationHandle = { + val req = new TGetCrossReferenceReq() + req.setSessionHandle(_remoteSessionHandle) + req.setParentCatalogName(primaryCatalog) + req.setParentSchemaName(primarySchema) + req.setParentTableName(primaryTable) + req.setForeignCatalogName(foreignCatalog) + req.setForeignSchemaName(foreignSchema) + req.setForeignTableName(foreignTable) + val resp = withLockAcquired(GetCrossReference(req)) + ThriftUtils.verifyTStatus(resp.getStatus) + resp.getOperationHandle + } + def getOperationStatus(operationHandle: TOperationHandle): TGetOperationStatusResp = { val req = new TGetOperationStatusReq(operationHandle) val resp = withLockAcquired(GetOperationStatus(req)) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetCrossReference.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetCrossReference.scala new file mode 100644 index 000000000..cdc032171 --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetCrossReference.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import org.apache.kyuubi.session.Session + +class GetCrossReference( + session: Session, + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String) + extends KyuubiOperation(OperationType.GET_FUNCTIONS, session) { + + override protected def runInternal(): Unit = { + try { + _remoteOpHandle = client.getCrossReference( + primaryCatalog, + primarySchema, + primaryTable, + foreignCatalog, + foreignSchema, + foreignTable) + } catch onError() + } +} diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala index c4dd96061..db2898b3e 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala @@ -119,7 +119,26 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam catalogName: String, schemaName: String, tableName: String): Operation = { - val operation = new GetFunctions(session, catalogName, schemaName, tableName) + val operation = new GetPrimaryKeys(session, catalogName, schemaName, tableName) + addOperation(operation) + } + + override def newGetCrossReferenceOperation( + session: Session, + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String): Operation = { + val operation = new GetCrossReference( + session, + primaryCatalog, + primarySchema, + primaryTable, + foreignCatalog, + foreignSchema, + foreignTable) addOperation(operation) } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala index 8a2bc7730..684f2114c 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala @@ -132,6 +132,26 @@ trait BackendServiceMetric extends BackendService { } } + abstract override def getCrossReference( + sessionHandle: SessionHandle, + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String): OperationHandle = { + MetricsSystem.timerTracing(MetricsConstants.BS_GET_CROSS_REFERENCE) { + super.getCrossReference( + sessionHandle, + primaryCatalog, + primarySchema, + primaryTable, + foreignCatalog, + foreignSchema, + foreignTable) + } + } + abstract override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = { MetricsSystem.timerTracing(MetricsConstants.BS_GET_OPERATION_STATUS) { super.getOperationStatus(operationHandle) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala index 00a54637c..5a37f6f46 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala @@ -351,4 +351,31 @@ private[v1] class SessionsResource extends ApiRequestContext with Logging { throw new NotFoundException(errorMsg) } } + + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.APPLICATION_JSON)), + description = "Create an operation with GET_FUNCTIONS type") + @POST + @Path("{sessionHandle}/operations/crossReference") + def getCrossReference( + @PathParam("sessionHandle") sessionHandleStr: String, + request: GetCrossReferenceRequest): OperationHandle = { + try { + fe.be.getCrossReference( + parseSessionHandle(sessionHandleStr), + request.primaryCatalog, + request.primarySchema, + request.primaryTable, + request.foreignCatalog, + request.foreignSchema, + request.foreignTable) + } catch { + case NonFatal(e) => + val errorMsg = "Error getting cross reference" + error(errorMsg, e) + throw new NotFoundException(errorMsg) + } + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala index eb4a5bf08..7158469bb 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala @@ -76,6 +76,14 @@ case class GetPrimaryKeysRequest( schemaName: String, tableName: String) +case class GetCrossReferenceRequest( + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String) + case class OpActionRequest(action: String) case class ResultSetMetaData(columns: Seq[ColumnDesc]) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala index d6fd60c22..5fe1361a2 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala @@ -251,8 +251,18 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { response = webTarget.path(s"$pathPrefix/operations/primaryKeys") .request(MediaType.APPLICATION_JSON_TYPE) .post(Entity.entity(getPrimaryKeysReq, MediaType.APPLICATION_JSON_TYPE)) - assert(200 == response.getStatus) - operationHandle = response.readEntity(classOf[OperationHandle]) - assert(operationHandle.typ == OperationType.GET_FUNCTIONS) + assert(404 == response.getStatus) + + val getCrossReferenceReq = GetCrossReferenceRequest( + "spark_catalog", + "default", + "default", + "spark_catalog", + "default", + "default") + response = webTarget.path(s"$pathPrefix/operations/crossReference") + .request(MediaType.APPLICATION_JSON_TYPE) + .post(Entity.entity(getCrossReferenceReq, MediaType.APPLICATION_JSON_TYPE)) + assert(404 == response.getStatus) } }