From 0468fd4d4e40659eb0dddb8a914ed54d6cd38262 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 21 Aug 2020 10:51:55 +0800 Subject: [PATCH] Implementaton of KyuubiOperationManager --- externals/kyuubi-download/pom.xml | 5 +- kyuubi-common/pom.xml | 15 -- .../apache/kyuubi/KyuubiSQLException.scala | 64 +++++++- .../kyuubi/operation/AbstractOperation.scala | 8 + .../kyuubi/KyuubiSQLExceptionSuite.scala | 38 +++++ kyuubi-main/pom.xml | 1 + .../kyuubi/operation/ExecuteStatement.scala | 40 +++++ .../apache/kyuubi/operation/GetCatalogs.scala | 39 +++++ .../apache/kyuubi/operation/GetColumns.scala | 47 ++++++ .../kyuubi/operation/GetFunctions.scala | 46 ++++++ .../apache/kyuubi/operation/GetSchemas.scala | 44 +++++ .../kyuubi/operation/GetTableTypes.scala | 40 +++++ .../apache/kyuubi/operation/GetTables.scala | 48 ++++++ .../apache/kyuubi/operation/GetTypeInfo.scala | 39 +++++ .../kyuubi/operation/KyuubiOperation.scala | 112 +++++++++++++ .../operation/KyuubiOperationManager.scala | 151 ++++++++++++++++++ kyuubi-server/pom.xml | 4 +- .../spark/operation/SparkOperation.scala | 12 -- .../operation/SparkSQLOperationManager.scala | 2 +- pom.xml | 41 +++++ 20 files changed, 758 insertions(+), 38 deletions(-) create mode 100644 kyuubi-common/src/test/scala/org/apache/kyuubi/KyuubiSQLExceptionSuite.scala create mode 100644 kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala create mode 100644 kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala create mode 100644 kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala create mode 100644 kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala create mode 100644 kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala create mode 100644 kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala create mode 100644 kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTables.scala create mode 100644 kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala create mode 100644 kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala create mode 100644 kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala diff --git a/externals/kyuubi-download/pom.xml b/externals/kyuubi-download/pom.xml index f4019f4d0..9a356b44c 100644 --- a/externals/kyuubi-download/pom.xml +++ b/externals/kyuubi-download/pom.xml @@ -23,6 +23,7 @@ kyuubi org.apache.kyuubi 1.0.0-SNAPSHOT + ../../pom.xml 4.0.0 @@ -34,7 +35,7 @@ spark-${spark.version}-bin-hadoop2.7.tgz https://mirrors.bfsu.edu.cn - f5652835094d9f69eb3260e20ca9c2d58e8bdf85a8ed15797549a518b23c862b75a329b38d4248f8427e4310718238c60fae0f9d1afb3c70fb390d3e9cce2e49 + @@ -53,7 +54,7 @@ ${spark.archive.mirror}/apache/spark/spark-${spark.version}/${spark.archive.name} ${spark.archive.sha512} - ${project.build.directory} + 60000 3 true diff --git a/kyuubi-common/pom.xml b/kyuubi-common/pom.xml index 3271a20a6..9140f6f8f 100644 --- a/kyuubi-common/pom.xml +++ b/kyuubi-common/pom.xml @@ -46,21 +46,6 @@ org.apache.hive hive-service-rpc - ${hive.version} - - - commons-codec - commons-codec - - - commons-cli - commons-cli - - - tomcat - * - - diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala index 9111f1568..f71d5be4b 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala @@ -41,12 +41,24 @@ case class KyuubiSQLException(msg: String, cause: Throwable) extends SQLExceptio object KyuubiSQLException { + private final val HEAD_MARK: String = "*" + private final val SEPARATOR: Char = ':' + def apply(cause: Throwable): KyuubiSQLException = { new KyuubiSQLException(cause.getMessage, cause) } def apply(msg: String): KyuubiSQLException = new KyuubiSQLException(msg, null) + def apply(tStatus: TStatus): KyuubiSQLException = { + val msg = tStatus.getErrorMessage + val cause = toCause(tStatus.getInfoMessages.asScala) + cause match { + case k: KyuubiSQLException if k.getMessage == msg => k + case _ => apply(msg, cause) + } + } + def toTStatus(e: Exception): TStatus = e match { case k: KyuubiSQLException => k.toTStatus case _ => @@ -81,16 +93,56 @@ object KyuubiSQLException { trace: Array[StackTraceElement], max: Int): List[String] = { val builder = new StringBuilder - builder.append('*').append(ex.getClass.getName).append(':') - builder.append(ex.getMessage).append(':') - builder.append(trace.length).append(':').append(max) + builder.append(HEAD_MARK).append(ex.getClass.getName).append(SEPARATOR) + builder.append(ex.getMessage).append(SEPARATOR) + builder.append(trace.length).append(SEPARATOR).append(max) List(builder.toString) ++ (0 to max).map { i => builder.setLength(0) - builder.append(trace(i).getClassName).append(":") - builder.append(trace(i).getMethodName).append(":") - builder.append(Option(trace(i).getFileName).getOrElse("")).append(':') + builder.append(trace(i).getClassName).append(SEPARATOR) + builder.append(trace(i).getMethodName).append(SEPARATOR) + builder.append(Option(trace(i).getFileName).getOrElse("")).append(SEPARATOR) builder.append(trace(i).getLineNumber) builder.toString }.toList } + + private def newInstance(className: String, message: String, cause: Throwable): Throwable = { + try { + Class.forName(className) + .getConstructor(classOf[String], classOf[Throwable]) + .newInstance(message, cause).asInstanceOf[Throwable] + } catch { + case e: Exception => throw new RuntimeException(className + ":" + message, e) + } + } + + private def getCoordinates(line: String): (Int, Int, Int) = { + val i1 = line.indexOf(SEPARATOR) + val i3 = line.lastIndexOf(SEPARATOR) + val i2 = line.substring(0, i3).lastIndexOf(SEPARATOR) + (i1, i2, i3) + } + + private def toCause(details: Seq[String]): Throwable = { + var ex: Throwable = null + if (details != null && details.nonEmpty) { + val head = details.head + val (i1, i2, i3) = getCoordinates(head) + val exClz = head.substring(1, i1) + val msg = head.substring(i1 + 1, i2) + val length = head.substring(i3 + 1).toInt + val stackTraceElements = details.tail.take(length + 1).map { line => + val (i1, i2, i3) = getCoordinates(line) + val clzName = line.substring(0, i1) + val methodName = line.substring(i1 + 1, i2) + val fileName = line.substring(i2 + 1, i3) + val lineNum = line.substring(i3 + 1).toInt + new StackTraceElement(clzName, methodName, fileName, lineNum) + } + ex = newInstance(exClz, msg, toCause(details.slice(length + 2, details.length))) + ex.setStackTrace(stackTraceElements.toArray) + } + ex + } + } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala index 11a7c88ac..0a4934649 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala @@ -46,6 +46,8 @@ abstract class AbstractOperation(opType: OperationType, session: Session) @volatile protected var operationException: KyuubiSQLException = _ @volatile protected var hasResultSet: Boolean = false + protected def statement: String = opType.toString + protected def setHasResultSet(hasResultSet: Boolean): Unit = { this.hasResultSet = hasResultSet handle.setHasResultSet(hasResultSet) @@ -56,6 +58,8 @@ abstract class AbstractOperation(opType: OperationType, session: Session) } protected def setState(newState: OperationState): Unit = { + info(s"Processing ${session.user}'s query[$statementId]: ${state.name} -> ${newState.name}," + + s" statement: $statement") OperationState.validateTransition(state, newState) state = newState @@ -72,6 +76,10 @@ abstract class AbstractOperation(opType: OperationType, session: Session) state == OperationState.CLOSED || state == OperationState.CANCELED } + protected def isTerminalState(operationState: OperationState): Boolean = { + OperationState.isTerminal(operationState) + } + protected def assertState(state: OperationState): Unit = { if (this.state ne state) { throw new IllegalStateException(s"Expected state $state, but found ${this.state}") diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/KyuubiSQLExceptionSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/KyuubiSQLExceptionSuite.scala new file mode 100644 index 000000000..b884bde18 --- /dev/null +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/KyuubiSQLExceptionSuite.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi + +class KyuubiSQLExceptionSuite extends KyuubiFunSuite { + + test("KyuubiSQLException") { + val msg0 = "this is just a dummy msg 0" + val msg1 = "this is just a dummy msg 1" + val msg2 = "this is just a dummy msg 2" + + val e0 = new RuntimeException(msg0) + val e1 = new KyuubiException(msg1, e0) + val e2 = new KyuubiSQLException(msg2, e1) + assert(e2.toTStatus === KyuubiSQLException.toTStatus(e2)) + val e3 = KyuubiSQLException(e2.toTStatus) + assert(e3.getMessage === e2.getMessage) + assert(e3.getStackTrace === e2.getStackTrace) + assert(e3.getCause.getMessage === e1.getMessage) + assert(e3.getCause.getCause.getMessage === e0.getMessage) + } + +} diff --git a/kyuubi-main/pom.xml b/kyuubi-main/pom.xml index 9d644798e..0124cc0a8 100644 --- a/kyuubi-main/pom.xml +++ b/kyuubi-main/pom.xml @@ -36,6 +36,7 @@ kyuubi-common ${project.version} + org.apache.kyuubi kyuubi-ha diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala new file mode 100644 index 000000000..5a7219b0f --- /dev/null +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import org.apache.hive.service.rpc.thrift.{TCLIService, TExecuteStatementReq, TSessionHandle} + +import org.apache.kyuubi.session.Session + +class ExecuteStatement( + session: Session, + client: TCLIService.Iface, + remoteSessionHandle: TSessionHandle, + statement: String) + extends KyuubiOperation( + OperationType.EXECUTE_STATEMENT, session, client, remoteSessionHandle) { + + override protected def runInternal(): Unit = { + try { + val req = new TExecuteStatementReq(remoteSessionHandle, statement) + val resp = client.ExecuteStatement(req) + verifyTStatus(resp.getStatus) + _remoteOpHandle = resp.getOperationHandle + } catch onError() + } +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala new file mode 100644 index 000000000..c41c9f703 --- /dev/null +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import org.apache.hive.service.rpc.thrift.{TCLIService, TGetCatalogsReq, TSessionHandle} + +import org.apache.kyuubi.session.Session + +class GetCatalogs( + session: Session, + client: TCLIService.Iface, + remoteSessionHandle: TSessionHandle) + extends KyuubiOperation( + OperationType.GET_CATALOGS, session, client, remoteSessionHandle) { + + override protected def runInternal(): Unit = { + try { + val req = new TGetCatalogsReq(remoteSessionHandle) + val resp = client.GetCatalogs(req) + verifyTStatus(resp.getStatus) + _remoteOpHandle = resp.getOperationHandle + } catch onError() + } +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala new file mode 100644 index 000000000..5c250c9c1 --- /dev/null +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import org.apache.hive.service.rpc.thrift.{TCLIService, TGetColumnsReq, TSessionHandle} + +import org.apache.kyuubi.session.Session + +class GetColumns( + session: Session, + client: TCLIService.Iface, + remoteSessionHandle: TSessionHandle, + catalogName: String, + schemaName: String, + tableName: String, + columnName: String) + extends KyuubiOperation( + OperationType.GET_COLUMNS, session, client, remoteSessionHandle) { + + override protected def runInternal(): Unit = { + try { + val req = new TGetColumnsReq(remoteSessionHandle) + req.setCatalogName(catalogName) + req.setSchemaName(schemaName) + req.setTableName(tableName) + req.setColumnName(columnName) + val resp = client.GetColumns(req) + verifyTStatus(resp.getStatus) + _remoteOpHandle = resp.getOperationHandle + } catch onError() + } +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala new file mode 100644 index 000000000..31f959507 --- /dev/null +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import org.apache.hive.service.rpc.thrift.{TCLIService, TGetFunctionsReq, TSessionHandle} + +import org.apache.kyuubi.session.Session + +class GetFunctions( + session: Session, + client: TCLIService.Iface, + remoteSessionHandle: TSessionHandle, + catalogName: String, + schemaName: String, + functionName: String) + extends KyuubiOperation( + OperationType.GET_FUNCTIONS, session, client, remoteSessionHandle) { + + override protected def runInternal(): Unit = { + try { + val req = new TGetFunctionsReq() + req.setSessionHandle(remoteSessionHandle) + req.setCatalogName(catalogName) + req.setSchemaName(schemaName) + req.setFunctionName(functionName) + val resp = client.GetFunctions(req) + verifyTStatus(resp.getStatus) + _remoteOpHandle = resp.getOperationHandle + } catch onError() + } +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala new file mode 100644 index 000000000..c46369b3b --- /dev/null +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import org.apache.hive.service.rpc.thrift.{TCLIService, TGetSchemasReq, TSessionHandle} + +import org.apache.kyuubi.session.Session + +class GetSchemas( + session: Session, + client: TCLIService.Iface, + remoteSessionHandle: TSessionHandle, + catalogName: String, + schemaName: String) + extends KyuubiOperation( + OperationType.GET_SCHEMAS, session, client, remoteSessionHandle) { + + override protected def runInternal(): Unit = { + try { + val req = new TGetSchemasReq() + req.setSessionHandle(remoteSessionHandle) + req.setCatalogName(catalogName) + req.setSchemaName(schemaName) + val resp = client.GetSchemas(req) + verifyTStatus(resp.getStatus) + _remoteOpHandle = resp.getOperationHandle + } catch onError() + } +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala new file mode 100644 index 000000000..0f80de6cc --- /dev/null +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import org.apache.hive.service.rpc.thrift.{TCLIService, TGetTableTypesReq, TSessionHandle} + +import org.apache.kyuubi.session.Session + +class GetTableTypes( + session: Session, + client: TCLIService.Iface, + remoteSessionHandle: TSessionHandle) + extends KyuubiOperation( + OperationType.GET_TABLE_TYPES, session, client, remoteSessionHandle) { + + override protected def runInternal(): Unit = { + try { + val req = new TGetTableTypesReq() + req.setSessionHandle(remoteSessionHandle) + val resp = client.GetTableTypes(req) + verifyTStatus(resp.getStatus) + _remoteOpHandle = resp.getOperationHandle + } catch onError() + } +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTables.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTables.scala new file mode 100644 index 000000000..fbf5a97a0 --- /dev/null +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTables.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import org.apache.hive.service.rpc.thrift.{TCLIService, TGetTablesReq, TSessionHandle} + +import org.apache.kyuubi.session.Session + +class GetTables( + session: Session, + client: TCLIService.Iface, + remoteSessionHandle: TSessionHandle, + catalogName: String, + schemaName: String, + tableName: String, + tableTypes: java.util.List[String]) + extends KyuubiOperation( + OperationType.GET_TABLES, session, client, remoteSessionHandle) { + + override protected def runInternal(): Unit = { + try { + val req = new TGetTablesReq() + req.setSessionHandle(remoteSessionHandle) + req.setCatalogName(catalogName) + req.setSchemaName(schemaName) + req.setTableName(tableName) + req.setTableTypes(tableTypes) + val resp = client.GetTables(req) + verifyTStatus(resp.getStatus) + _remoteOpHandle = resp.getOperationHandle + } catch onError() + } +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala new file mode 100644 index 000000000..d32fca46a --- /dev/null +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import org.apache.hive.service.rpc.thrift.{TCLIService, TGetTypeInfoReq, TSessionHandle} + +import org.apache.kyuubi.session.Session + +class GetTypeInfo( + session: Session, + client: TCLIService.Iface, + remoteSessionHandle: TSessionHandle) + extends KyuubiOperation( + OperationType.GET_TYPE_INFO, session, client, remoteSessionHandle) { + + override protected def runInternal(): Unit = { + try { + val req = new TGetTypeInfoReq(remoteSessionHandle) + val resp = client.GetTypeInfo(req) + verifyTStatus(resp.getStatus) + _remoteOpHandle = resp.getOperationHandle + } catch onError() + } +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala new file mode 100644 index 000000000..caf8e9398 --- /dev/null +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation + +import org.apache.hive.service.rpc.thrift._ + +import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation +import org.apache.kyuubi.operation.OperationType.OperationType +import org.apache.kyuubi.session.Session + +abstract class KyuubiOperation( + opType: OperationType, + session: Session, + client: TCLIService.Iface, + remoteSessionHandle: TSessionHandle) extends AbstractOperation(opType, session) { + + @volatile protected var _remoteOpHandle: TOperationHandle = _ + + def remoteOpHandle(): TOperationHandle = _remoteOpHandle + + protected def verifyTStatus(tStatus: TStatus): Unit = { + if (tStatus.getStatusCode != TStatusCode.SUCCESS_STATUS) { + throw KyuubiSQLException(tStatus) + } + } + + protected def onError(action: String = "running"): PartialFunction[Throwable, Unit] = { + case e: Exception => + state.synchronized { + if (isTerminalState(state)) { + warn(s"Ignore exception in terminal state with $statementId: $e") + } else { + setState(OperationState.ERROR) + e match { + case kse: KyuubiSQLException => throw kse + case _ => + throw KyuubiSQLException(s"Error $action $opType: ${e.getMessage}", e) + } + } + } + } + + override protected def beforeRun(): Unit = { + setHasResultSet(true) + setState(OperationState.RUNNING) + } + + override protected def afterRun(): Unit = { + state.synchronized { + if (!isTerminalState(state)) { + setState(OperationState.FINISHED) + } + } + } + + override def cancel(): Unit = { + if (_remoteOpHandle != null && !isTerminalState(state)) { + try { + val req = new TCancelOperationReq(_remoteOpHandle) + val resp = client.CancelOperation(req) + verifyTStatus(resp.getStatus) + setState(OperationState.CANCELED) + } catch onError("cancelling") + } + } + + override def close(): Unit = { + if (_remoteOpHandle != null && !isTerminalState(state)) { + try { + val req = new TCloseOperationReq(_remoteOpHandle) + val resp = client.CloseOperation(req) + verifyTStatus(resp.getStatus) + setState(OperationState.CLOSED) + } catch onError("closing") + } + } + + override def getResultSetSchema: TTableSchema = { + val req = new TGetResultSetMetadataReq(_remoteOpHandle) + val resp = client.GetResultSetMetadata(req) + verifyTStatus(resp.getStatus) + resp.getSchema + } + + override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = { + validateDefaultFetchOrientation(order) + assertState(OperationState.FINISHED) + setHasResultSet(true) + val req = new TFetchResultsReq( + _remoteOpHandle, FetchOrientation.toTFetchOrientation(order), rowSetSize) + val resp = client.FetchResults(req) + resp.getResults + } + + override def shouldRunAsync: Boolean = false +} diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala new file mode 100644 index 000000000..d481f139f --- /dev/null +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation +import java.util.concurrent.ConcurrentHashMap + +import org.apache.hive.service.rpc.thrift.{TCLIService, TFetchResultsReq, TRowSet, TSessionHandle} + +import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation +import org.apache.kyuubi.session.{Session, SessionHandle} + +class KyuubiOperationManager private (name: String) extends OperationManager(name) { + + def this() = this(classOf[KyuubiOperationManager].getSimpleName) + + private val handleToClient = new ConcurrentHashMap[SessionHandle, TCLIService.Iface]() + private val handleToTSessionHandle = new ConcurrentHashMap[SessionHandle, TSessionHandle]() + + private def getThriftClient(sessionHandle: SessionHandle): TCLIService.Iface = { + val client = handleToClient.get(sessionHandle) + if (client == null) { + throw KyuubiSQLException(s"$sessionHandle has not been initialized or already been closed") + } + client + } + + private def getRemoteTSessionHandle(sessionHandle: SessionHandle): TSessionHandle = { + val tSessionHandle = handleToTSessionHandle.get(sessionHandle) + if (tSessionHandle == null) { + throw KyuubiSQLException(s"$sessionHandle has not been initialized or already been closed") + } + tSessionHandle + } + + def setConnection( + sessionHandle: SessionHandle, + client: TCLIService.Iface, + remoteSessionHandle: TSessionHandle): Unit = { + handleToClient.put(sessionHandle, client) + handleToTSessionHandle.put(sessionHandle, remoteSessionHandle) + } + + override def newExecuteStatementOperation( + session: Session, + statement: String, + runAsync: Boolean, + queryTimeout: Long): Operation = { + val client = getThriftClient(session.handle) + val remoteSessionHandle = getRemoteTSessionHandle(session.handle) + val operation = new ExecuteStatement(session, client, remoteSessionHandle, statement) + addOperation(operation) + + } + + override def newGetTypeInfoOperation(session: Session): Operation = { + val client = getThriftClient(session.handle) + val remoteSessionHandle = getRemoteTSessionHandle(session.handle) + val operation = new GetTypeInfo(session, client, remoteSessionHandle) + addOperation(operation) + } + + override def newGetCatalogsOperation(session: Session): Operation = { + val client = getThriftClient(session.handle) + val remoteSessionHandle = getRemoteTSessionHandle(session.handle) + val operation = new GetCatalogs(session, client, remoteSessionHandle) + addOperation(operation) + } + + override def newGetSchemasOperation( + session: Session, + catalog: String, + schema: String): Operation = { + val client = getThriftClient(session.handle) + val remoteSessionHandle = getRemoteTSessionHandle(session.handle) + val operation = new GetSchemas(session, client, remoteSessionHandle, catalog, schema) + addOperation(operation) + } + + override def newGetTablesOperation( + session: Session, + catalogName: String, + schemaName: String, + tableName: String, + tableTypes: java.util.List[String]): Operation = { + val client = getThriftClient(session.handle) + val remoteSessionHandle = getRemoteTSessionHandle(session.handle) + val operation = new GetTables( + session, client, remoteSessionHandle, catalogName, schemaName, tableName, tableTypes) + addOperation(operation) + } + + override def newGetTableTypesOperation(session: Session): Operation = { + val client = getThriftClient(session.handle) + val remoteSessionHandle = getRemoteTSessionHandle(session.handle) + val operation = new GetTableTypes(session, client, remoteSessionHandle) + addOperation(operation) + } + + override def newGetColumnsOperation( + session: Session, + catalogName: String, + schemaName: String, + tableName: String, + columnName: String): Operation = { + val client = getThriftClient(session.handle) + val remoteSessionHandle = getRemoteTSessionHandle(session.handle) + val operation = new GetColumns( + session, client, remoteSessionHandle, catalogName, schemaName, tableName, columnName) + addOperation(operation) + } + + override def newGetFunctionsOperation( + session: Session, + catalogName: String, + schemaName: String, + functionName: String): Operation = { + val client = getThriftClient(session.handle) + val remoteSessionHandle = getRemoteTSessionHandle(session.handle) + val operation = new GetFunctions( + session, client, remoteSessionHandle, catalogName, schemaName, functionName) + addOperation(operation) + } + + override def getOperationLogRowSet( + opHandle: OperationHandle, + order: FetchOrientation, + maxRows: Int): TRowSet = { + val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation] + val client = getThriftClient(operation.getSession.handle) + + val orientation = FetchOrientation.toTFetchOrientation(order) + val req = new TFetchResultsReq(operation.remoteOpHandle(), orientation, maxRows) + val resp = client.FetchResults(req) + resp.getResults + } +} diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml index 59f98800d..27535388d 100644 --- a/kyuubi-server/pom.xml +++ b/kyuubi-server/pom.xml @@ -57,8 +57,8 @@ org.apache.spark - spark-core_2.12 - + spark-core_${scala.binary.version} + ${spark.version2} ${spark.scope} diff --git a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala index e1a469874..d8fde67f5 100644 --- a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala +++ b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala @@ -41,14 +41,8 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio def getOperationLog: OperationLog = operationLog - protected def statement: String = opType.toString - protected def resultSchema: StructType - protected def isTerminalState(operationState: OperationState): Boolean = { - OperationState.isTerminal(operationState) - } - protected def cleanup(targetState: OperationState): Unit = synchronized { if (!isTerminalState(state)) { setState(targetState) @@ -106,12 +100,6 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio } } - override def setState(newState: OperationState): Unit = { - info(s"Processing ${session.user}'s query[$statementId]: ${state.name} -> ${newState.name}," + - s" statement: $statement") - super.setState(newState) - } - override protected def beforeRun(): Unit = { setHasResultSet(true) setState(OperationState.RUNNING) diff --git a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala index fb89d6e2c..7f014cf3b 100644 --- a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala +++ b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.engine.spark.operation.log.{LogDivertAppender, OperationLog} +import org.apache.kyuubi.engine.spark.operation.log.LogDivertAppender import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationManager} import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation import org.apache.kyuubi.session.{Session, SessionHandle} diff --git a/pom.xml b/pom.xml index 97b693cf0..d43d5a295 100644 --- a/pom.xml +++ b/pom.xml @@ -214,6 +214,47 @@ provided + + org.apache.hive + hive-service-rpc + ${hive.version} + + + commons-codec + commons-codec + + + commons-cli + commons-cli + + + tomcat + * + + + + + + + org.apache.hive + hive-jdbc + ${hive.version} + + + commons-codec + commons-codec + + + commons-cli + commons-cli + + + tomcat + * + + + + org.eclipse.jetty jetty-servlet