[KYUUBI #2033] Hive Backend Engine - GetCrossReference

### _Why are the changes needed?_

Hive Backend Engine - GetCrossReference.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2194 from KenjiFujima/KYUUBI-2033.

Closes #2033

01a0065f [KenjiFujima] [KYUUBI #2033] Hive Backend Engine - GetCrossReference

Authored-by: KenjiFujima <thanosxnicholas@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
KenjiFujima 2022-03-22 15:22:23 +08:00 committed by Kent Yao
parent e390f34c83
commit 4e01f9b9b0
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
23 changed files with 409 additions and 13 deletions

View File

@ -133,4 +133,15 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
tableName: String): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
override def newGetCrossReferenceOperation(
session: Session,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.hive.operation
import org.apache.hive.service.cli.operation.Operation
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.session.Session
class GetCrossReference(
session: Session,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String)
extends HiveOperation(OperationType.GET_FUNCTIONS, session) {
override val internalHiveOperation: Operation =
delegatedOperationManager.newGetCrossReferenceOperation(
hive,
primaryCatalog,
primarySchema,
primaryTable,
foreignCatalog,
foreignSchema,
foreignTable)
}

View File

@ -102,6 +102,25 @@ class HiveOperationManager() extends OperationManager("HiveOperationManager") {
addOperation(operation)
}
override def newGetCrossReferenceOperation(
session: Session,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): Operation = {
val operation = new GetCrossReference(
session,
primaryCatalog,
primarySchema,
primaryTable,
foreignCatalog,
foreignSchema,
foreignTable)
addOperation(operation)
}
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,

View File

@ -241,7 +241,7 @@ class HiveOperationSuite extends HiveJDBCTestHelper {
withDatabases("test_schema") { statement =>
statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table(a string, " +
"PRIMARY KEY(a) disable novalidate)")
"PRIMARY KEY(a) DISABLE NOVALIDATE)")
try {
val meta = statement.getConnection.getMetaData
@ -261,6 +261,52 @@ class HiveOperationSuite extends HiveJDBCTestHelper {
}
}
test("get cross reference") {
withDatabases("test_schema") { statement =>
statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table1(a string, " +
"PRIMARY KEY(a) DISABLE NOVALIDATE)")
statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table2(a string, b string, " +
"FOREIGN KEY(b) REFERENCES test_schema.test_table1(a) DISABLE NOVALIDATE)")
try {
val meta = statement.getConnection.getMetaData
val resultSet = meta.getCrossReference(
null,
"test_schema",
"test_table1",
null,
"test_schema",
"test_table2")
val resultSetBuffer =
ArrayBuffer[(String, String, String, String, String, String, String, String)]()
while (resultSet.next()) {
resultSetBuffer += Tuple8(
resultSet.getString("PKTABLE_CAT"),
resultSet.getString("PKTABLE_SCHEM"),
resultSet.getString("PKTABLE_NAME"),
resultSet.getString("PKCOLUMN_NAME"),
resultSet.getString("FKTABLE_CAT"),
resultSet.getString("FKTABLE_SCHEM"),
resultSet.getString("FKTABLE_NAME"),
resultSet.getString("FKCOLUMN_NAME"))
}
assert(resultSetBuffer.contains((
null,
"test_schema",
"test_table1",
"a",
null,
"test_schema",
"test_table2",
"b")))
} finally {
statement.execute("DROP TABLE test_schema.test_table2")
statement.execute("DROP TABLE test_schema.test_table1")
}
}
}
test("basic execute statements, create, insert query") {
withJdbcStatement("hive_engine_test") { statement =>
statement.execute("CREATE TABLE hive_engine_test(id int, value string) stored as orc")

View File

@ -140,4 +140,15 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
tableName: String): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
override def newGetCrossReferenceOperation(
session: Session,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
}

View File

@ -104,4 +104,15 @@ class TrinoOperationManager extends OperationManager("TrinoOperationManager") {
tableName: String): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
override def newGetCrossReferenceOperation(
session: Session,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
}

View File

@ -75,6 +75,14 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
catalogName: String,
schemaName: String,
tableName: String): Operation
def newGetCrossReferenceOperation(
session: Session,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): Operation
final def addOperation(operation: Operation): Operation = synchronized {
handleToOperation.put(operation.getHandle, operation)

View File

@ -131,6 +131,25 @@ abstract class AbstractBackendService(name: String)
.getPrimaryKeys(catalogName, schemaName, tableName)
}
override def getCrossReference(
sessionHandle: SessionHandle,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): OperationHandle = {
sessionManager
.getSession(sessionHandle)
.getCrossReference(
primaryCatalog,
primarySchema,
primaryTable,
foreignCatalog,
foreignSchema,
foreignTable)
}
override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = {
val operation = sessionManager.operationManager.getOperation(operationHandle)
if (operation.shouldRunAsync) {

View File

@ -81,6 +81,14 @@ trait BackendService {
catalogName: String,
schemaName: String,
tableName: String): OperationHandle
def getCrossReference(
sessionHandle: SessionHandle,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): OperationHandle
def getOperationStatus(operationHandle: OperationHandle): OperationStatus
def cancelOperation(operationHandle: OperationHandle): Unit

View File

@ -384,8 +384,29 @@ abstract class TFrontendService(name: String)
override def GetCrossReference(req: TGetCrossReferenceReq): TGetCrossReferenceResp = {
debug(req.toString)
val resp = new TGetCrossReferenceResp
val errStatus = KyuubiSQLException.featureNotSupported().toTStatus
resp.setStatus(errStatus)
try {
val sessionHandle = SessionHandle(req.getSessionHandle)
val primaryCatalog = req.getParentCatalogName
val primarySchema = req.getParentSchemaName
val primaryTable = req.getParentTableName
val foreignCatalog = req.getForeignCatalogName
val foreignSchema = req.getForeignSchemaName
val foreignTable = req.getForeignTableName
val opHandle = be.getCrossReference(
sessionHandle,
primaryCatalog,
primarySchema,
primaryTable,
foreignCatalog,
foreignSchema,
foreignTable)
resp.setOperationHandle(opHandle.toTOperationHandle)
resp.setStatus(OK_STATUS)
} catch {
case e: Exception =>
error("Error getting primary keys: ", e)
resp.setStatus(KyuubiSQLException.toTStatus(e))
}
resp
}

View File

@ -182,6 +182,25 @@ abstract class AbstractSession(
runOperation(operation)
}
override def getCrossReference(
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): OperationHandle = {
val operation = sessionManager.operationManager
.newGetCrossReferenceOperation(
this,
primaryCatalog,
primarySchema,
primaryTable,
foreignCatalog,
foreignSchema,
foreignTable)
runOperation(operation)
}
override def cancelOperation(operationHandle: OperationHandle): Unit = withAcquireRelease() {
sessionManager.operationManager.cancelOperation(operationHandle)
}

View File

@ -75,6 +75,13 @@ trait Session {
catalogName: String,
schemaName: String,
tableName: String): OperationHandle
def getCrossReference(
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): OperationHandle
def cancelOperation(operationHandle: OperationHandle): Unit
def closeOperation(operationHandle: OperationHandle): Unit

View File

@ -101,6 +101,19 @@ class NoopOperationManager extends OperationManager("noop") {
addOperation(operation)
}
override def newGetCrossReferenceOperation(
session: Session,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): Operation = {
val operation =
new NoopOperation(OperationType.GET_FUNCTIONS, session, primarySchema == invalid)
addOperation(operation)
}
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,

View File

@ -464,8 +464,12 @@ trait SparkMetadataTests extends HiveJDBCTestHelper {
assert(e.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
}
assert(!metaData.getImportedKeys("", "default", "").next())
intercept[SQLException] {
metaData.getCrossReference("", "default", "src", "", "default", "src2")
try {
assert(!metaData.getCrossReference("", "default", "src", "", "default", "src2").next())
} catch {
case e: Exception =>
assert(e.isInstanceOf[SQLException])
assert(e.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
}
assert(!metaData.getIndexInfo("", "default", "src", true, true).next())

View File

@ -342,10 +342,17 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
withSessionHandle { (client, handle) =>
val req = new TGetCrossReferenceReq(handle)
val resp = client.GetCrossReference(req)
assert(resp.getOperationHandle === null)
assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp.getStatus.getSqlState === "0A000")
assert(resp.getStatus.getErrorMessage startsWith "feature not supported")
val opHandle = resp.getOperationHandle
assert(opHandle.getOperationType === TOperationType.GET_FUNCTIONS)
assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
checkOperationResult(client, opHandle)
req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle)
val resp1 = client.GetCrossReference(req)
assert(resp1.getOperationHandle === null)
assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
assert(resp1.getStatus.getSqlState === null)
assert(resp1.getStatus.getErrorMessage startsWith "Invalid SessionHandle")
}
}

View File

@ -62,6 +62,7 @@ object MetricsConstants {
final val BS_GET_COLUMNS = BACKEND_SERVICE + "get_columns"
final val BS_GET_FUNCTIONS = BACKEND_SERVICE + "get_functions"
final val BS_GET_PRIMARY_KEY = BACKEND_SERVICE + "get_primary_keys"
final val BS_GET_CROSS_REFERENCE = BACKEND_SERVICE + "get_cross_reference"
final val BS_GET_OPERATION_STATUS = BACKEND_SERVICE + "get_operation_status"
final val BS_CANCEL_OPERATION = BACKEND_SERVICE + "cancel_operation"
final val BS_CLOSE_OPERATION = BACKEND_SERVICE + "close_operation"

View File

@ -197,6 +197,26 @@ class KyuubiSyncThriftClient private (protocol: TProtocol)
resp.getOperationHandle
}
def getCrossReference(
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): TOperationHandle = {
val req = new TGetCrossReferenceReq()
req.setSessionHandle(_remoteSessionHandle)
req.setParentCatalogName(primaryCatalog)
req.setParentSchemaName(primarySchema)
req.setParentTableName(primaryTable)
req.setForeignCatalogName(foreignCatalog)
req.setForeignSchemaName(foreignSchema)
req.setForeignTableName(foreignTable)
val resp = withLockAcquired(GetCrossReference(req))
ThriftUtils.verifyTStatus(resp.getStatus)
resp.getOperationHandle
}
def getOperationStatus(operationHandle: TOperationHandle): TGetOperationStatusResp = {
val req = new TGetOperationStatusReq(operationHandle)
val resp = withLockAcquired(GetOperationStatus(req))

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation
import org.apache.kyuubi.session.Session
class GetCrossReference(
session: Session,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String)
extends KyuubiOperation(OperationType.GET_FUNCTIONS, session) {
override protected def runInternal(): Unit = {
try {
_remoteOpHandle = client.getCrossReference(
primaryCatalog,
primarySchema,
primaryTable,
foreignCatalog,
foreignSchema,
foreignTable)
} catch onError()
}
}

View File

@ -119,7 +119,26 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
catalogName: String,
schemaName: String,
tableName: String): Operation = {
val operation = new GetFunctions(session, catalogName, schemaName, tableName)
val operation = new GetPrimaryKeys(session, catalogName, schemaName, tableName)
addOperation(operation)
}
override def newGetCrossReferenceOperation(
session: Session,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): Operation = {
val operation = new GetCrossReference(
session,
primaryCatalog,
primarySchema,
primaryTable,
foreignCatalog,
foreignSchema,
foreignTable)
addOperation(operation)
}

View File

@ -132,6 +132,26 @@ trait BackendServiceMetric extends BackendService {
}
}
abstract override def getCrossReference(
sessionHandle: SessionHandle,
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String): OperationHandle = {
MetricsSystem.timerTracing(MetricsConstants.BS_GET_CROSS_REFERENCE) {
super.getCrossReference(
sessionHandle,
primaryCatalog,
primarySchema,
primaryTable,
foreignCatalog,
foreignSchema,
foreignTable)
}
}
abstract override def getOperationStatus(operationHandle: OperationHandle): OperationStatus = {
MetricsSystem.timerTracing(MetricsConstants.BS_GET_OPERATION_STATUS) {
super.getOperationStatus(operationHandle)

View File

@ -351,4 +351,31 @@ private[v1] class SessionsResource extends ApiRequestContext with Logging {
throw new NotFoundException(errorMsg)
}
}
@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON)),
description = "Create an operation with GET_FUNCTIONS type")
@POST
@Path("{sessionHandle}/operations/crossReference")
def getCrossReference(
@PathParam("sessionHandle") sessionHandleStr: String,
request: GetCrossReferenceRequest): OperationHandle = {
try {
fe.be.getCrossReference(
parseSessionHandle(sessionHandleStr),
request.primaryCatalog,
request.primarySchema,
request.primaryTable,
request.foreignCatalog,
request.foreignSchema,
request.foreignTable)
} catch {
case NonFatal(e) =>
val errorMsg = "Error getting cross reference"
error(errorMsg, e)
throw new NotFoundException(errorMsg)
}
}
}

View File

@ -76,6 +76,14 @@ case class GetPrimaryKeysRequest(
schemaName: String,
tableName: String)
case class GetCrossReferenceRequest(
primaryCatalog: String,
primarySchema: String,
primaryTable: String,
foreignCatalog: String,
foreignSchema: String,
foreignTable: String)
case class OpActionRequest(action: String)
case class ResultSetMetaData(columns: Seq[ColumnDesc])

View File

@ -251,8 +251,18 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
response = webTarget.path(s"$pathPrefix/operations/primaryKeys")
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(getPrimaryKeysReq, MediaType.APPLICATION_JSON_TYPE))
assert(200 == response.getStatus)
operationHandle = response.readEntity(classOf[OperationHandle])
assert(operationHandle.typ == OperationType.GET_FUNCTIONS)
assert(404 == response.getStatus)
val getCrossReferenceReq = GetCrossReferenceRequest(
"spark_catalog",
"default",
"default",
"spark_catalog",
"default",
"default")
response = webTarget.path(s"$pathPrefix/operations/crossReference")
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(getCrossReferenceReq, MediaType.APPLICATION_JSON_TYPE))
assert(404 == response.getStatus)
}
}