diff --git a/externals/kyuubi-download/pom.xml b/externals/kyuubi-download/pom.xml
index f4019f4d0..9a356b44c 100644
--- a/externals/kyuubi-download/pom.xml
+++ b/externals/kyuubi-download/pom.xml
@@ -23,6 +23,7 @@
kyuubi
org.apache.kyuubi
1.0.0-SNAPSHOT
+ ../../pom.xml
4.0.0
@@ -34,7 +35,7 @@
spark-${spark.version}-bin-hadoop2.7.tgz
https://mirrors.bfsu.edu.cn
- f5652835094d9f69eb3260e20ca9c2d58e8bdf85a8ed15797549a518b23c862b75a329b38d4248f8427e4310718238c60fae0f9d1afb3c70fb390d3e9cce2e49
+
@@ -53,7 +54,7 @@
${spark.archive.mirror}/apache/spark/spark-${spark.version}/${spark.archive.name}
${spark.archive.sha512}
- ${project.build.directory}
+
60000
3
true
diff --git a/kyuubi-common/pom.xml b/kyuubi-common/pom.xml
index 3271a20a6..9140f6f8f 100644
--- a/kyuubi-common/pom.xml
+++ b/kyuubi-common/pom.xml
@@ -46,21 +46,6 @@
org.apache.hive
hive-service-rpc
- ${hive.version}
-
-
- commons-codec
- commons-codec
-
-
- commons-cli
- commons-cli
-
-
- tomcat
- *
-
-
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala
index 9111f1568..f71d5be4b 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala
@@ -41,12 +41,24 @@ case class KyuubiSQLException(msg: String, cause: Throwable) extends SQLExceptio
object KyuubiSQLException {
+ private final val HEAD_MARK: String = "*"
+ private final val SEPARATOR: Char = ':'
+
def apply(cause: Throwable): KyuubiSQLException = {
new KyuubiSQLException(cause.getMessage, cause)
}
def apply(msg: String): KyuubiSQLException = new KyuubiSQLException(msg, null)
+ def apply(tStatus: TStatus): KyuubiSQLException = {
+ val msg = tStatus.getErrorMessage
+ val cause = toCause(tStatus.getInfoMessages.asScala)
+ cause match {
+ case k: KyuubiSQLException if k.getMessage == msg => k
+ case _ => apply(msg, cause)
+ }
+ }
+
def toTStatus(e: Exception): TStatus = e match {
case k: KyuubiSQLException => k.toTStatus
case _ =>
@@ -81,16 +93,56 @@ object KyuubiSQLException {
trace: Array[StackTraceElement],
max: Int): List[String] = {
val builder = new StringBuilder
- builder.append('*').append(ex.getClass.getName).append(':')
- builder.append(ex.getMessage).append(':')
- builder.append(trace.length).append(':').append(max)
+ builder.append(HEAD_MARK).append(ex.getClass.getName).append(SEPARATOR)
+ builder.append(ex.getMessage).append(SEPARATOR)
+ builder.append(trace.length).append(SEPARATOR).append(max)
List(builder.toString) ++ (0 to max).map { i =>
builder.setLength(0)
- builder.append(trace(i).getClassName).append(":")
- builder.append(trace(i).getMethodName).append(":")
- builder.append(Option(trace(i).getFileName).getOrElse("")).append(':')
+ builder.append(trace(i).getClassName).append(SEPARATOR)
+ builder.append(trace(i).getMethodName).append(SEPARATOR)
+ builder.append(Option(trace(i).getFileName).getOrElse("")).append(SEPARATOR)
builder.append(trace(i).getLineNumber)
builder.toString
}.toList
}
+
+ private def newInstance(className: String, message: String, cause: Throwable): Throwable = {
+ try {
+ Class.forName(className)
+ .getConstructor(classOf[String], classOf[Throwable])
+ .newInstance(message, cause).asInstanceOf[Throwable]
+ } catch {
+ case e: Exception => throw new RuntimeException(className + ":" + message, e)
+ }
+ }
+
+ private def getCoordinates(line: String): (Int, Int, Int) = {
+ val i1 = line.indexOf(SEPARATOR)
+ val i3 = line.lastIndexOf(SEPARATOR)
+ val i2 = line.substring(0, i3).lastIndexOf(SEPARATOR)
+ (i1, i2, i3)
+ }
+
+ private def toCause(details: Seq[String]): Throwable = {
+ var ex: Throwable = null
+ if (details != null && details.nonEmpty) {
+ val head = details.head
+ val (i1, i2, i3) = getCoordinates(head)
+ val exClz = head.substring(1, i1)
+ val msg = head.substring(i1 + 1, i2)
+ val length = head.substring(i3 + 1).toInt
+ val stackTraceElements = details.tail.take(length + 1).map { line =>
+ val (i1, i2, i3) = getCoordinates(line)
+ val clzName = line.substring(0, i1)
+ val methodName = line.substring(i1 + 1, i2)
+ val fileName = line.substring(i2 + 1, i3)
+ val lineNum = line.substring(i3 + 1).toInt
+ new StackTraceElement(clzName, methodName, fileName, lineNum)
+ }
+ ex = newInstance(exClz, msg, toCause(details.slice(length + 2, details.length)))
+ ex.setStackTrace(stackTraceElements.toArray)
+ }
+ ex
+ }
+
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
index 11a7c88ac..0a4934649 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala
@@ -46,6 +46,8 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
@volatile protected var operationException: KyuubiSQLException = _
@volatile protected var hasResultSet: Boolean = false
+ protected def statement: String = opType.toString
+
protected def setHasResultSet(hasResultSet: Boolean): Unit = {
this.hasResultSet = hasResultSet
handle.setHasResultSet(hasResultSet)
@@ -56,6 +58,8 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
}
protected def setState(newState: OperationState): Unit = {
+ info(s"Processing ${session.user}'s query[$statementId]: ${state.name} -> ${newState.name}," +
+ s" statement: $statement")
OperationState.validateTransition(state, newState)
state = newState
@@ -72,6 +76,10 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
state == OperationState.CLOSED || state == OperationState.CANCELED
}
+ protected def isTerminalState(operationState: OperationState): Boolean = {
+ OperationState.isTerminal(operationState)
+ }
+
protected def assertState(state: OperationState): Unit = {
if (this.state ne state) {
throw new IllegalStateException(s"Expected state $state, but found ${this.state}")
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/KyuubiSQLExceptionSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/KyuubiSQLExceptionSuite.scala
new file mode 100644
index 000000000..b884bde18
--- /dev/null
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/KyuubiSQLExceptionSuite.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi
+
+class KyuubiSQLExceptionSuite extends KyuubiFunSuite {
+
+ test("KyuubiSQLException") {
+ val msg0 = "this is just a dummy msg 0"
+ val msg1 = "this is just a dummy msg 1"
+ val msg2 = "this is just a dummy msg 2"
+
+ val e0 = new RuntimeException(msg0)
+ val e1 = new KyuubiException(msg1, e0)
+ val e2 = new KyuubiSQLException(msg2, e1)
+ assert(e2.toTStatus === KyuubiSQLException.toTStatus(e2))
+ val e3 = KyuubiSQLException(e2.toTStatus)
+ assert(e3.getMessage === e2.getMessage)
+ assert(e3.getStackTrace === e2.getStackTrace)
+ assert(e3.getCause.getMessage === e1.getMessage)
+ assert(e3.getCause.getCause.getMessage === e0.getMessage)
+ }
+
+}
diff --git a/kyuubi-main/pom.xml b/kyuubi-main/pom.xml
index 9d644798e..0124cc0a8 100644
--- a/kyuubi-main/pom.xml
+++ b/kyuubi-main/pom.xml
@@ -36,6 +36,7 @@
kyuubi-common
${project.version}
+
org.apache.kyuubi
kyuubi-ha
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
new file mode 100644
index 000000000..5a7219b0f
--- /dev/null
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.hive.service.rpc.thrift.{TCLIService, TExecuteStatementReq, TSessionHandle}
+
+import org.apache.kyuubi.session.Session
+
+class ExecuteStatement(
+ session: Session,
+ client: TCLIService.Iface,
+ remoteSessionHandle: TSessionHandle,
+ statement: String)
+ extends KyuubiOperation(
+ OperationType.EXECUTE_STATEMENT, session, client, remoteSessionHandle) {
+
+ override protected def runInternal(): Unit = {
+ try {
+ val req = new TExecuteStatementReq(remoteSessionHandle, statement)
+ val resp = client.ExecuteStatement(req)
+ verifyTStatus(resp.getStatus)
+ _remoteOpHandle = resp.getOperationHandle
+ } catch onError()
+ }
+}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala
new file mode 100644
index 000000000..c41c9f703
--- /dev/null
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetCatalogs.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.hive.service.rpc.thrift.{TCLIService, TGetCatalogsReq, TSessionHandle}
+
+import org.apache.kyuubi.session.Session
+
+class GetCatalogs(
+ session: Session,
+ client: TCLIService.Iface,
+ remoteSessionHandle: TSessionHandle)
+ extends KyuubiOperation(
+ OperationType.GET_CATALOGS, session, client, remoteSessionHandle) {
+
+ override protected def runInternal(): Unit = {
+ try {
+ val req = new TGetCatalogsReq(remoteSessionHandle)
+ val resp = client.GetCatalogs(req)
+ verifyTStatus(resp.getStatus)
+ _remoteOpHandle = resp.getOperationHandle
+ } catch onError()
+ }
+}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala
new file mode 100644
index 000000000..5c250c9c1
--- /dev/null
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetColumns.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.hive.service.rpc.thrift.{TCLIService, TGetColumnsReq, TSessionHandle}
+
+import org.apache.kyuubi.session.Session
+
+class GetColumns(
+ session: Session,
+ client: TCLIService.Iface,
+ remoteSessionHandle: TSessionHandle,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ columnName: String)
+ extends KyuubiOperation(
+ OperationType.GET_COLUMNS, session, client, remoteSessionHandle) {
+
+ override protected def runInternal(): Unit = {
+ try {
+ val req = new TGetColumnsReq(remoteSessionHandle)
+ req.setCatalogName(catalogName)
+ req.setSchemaName(schemaName)
+ req.setTableName(tableName)
+ req.setColumnName(columnName)
+ val resp = client.GetColumns(req)
+ verifyTStatus(resp.getStatus)
+ _remoteOpHandle = resp.getOperationHandle
+ } catch onError()
+ }
+}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala
new file mode 100644
index 000000000..31f959507
--- /dev/null
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetFunctions.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.hive.service.rpc.thrift.{TCLIService, TGetFunctionsReq, TSessionHandle}
+
+import org.apache.kyuubi.session.Session
+
+class GetFunctions(
+ session: Session,
+ client: TCLIService.Iface,
+ remoteSessionHandle: TSessionHandle,
+ catalogName: String,
+ schemaName: String,
+ functionName: String)
+ extends KyuubiOperation(
+ OperationType.GET_FUNCTIONS, session, client, remoteSessionHandle) {
+
+ override protected def runInternal(): Unit = {
+ try {
+ val req = new TGetFunctionsReq()
+ req.setSessionHandle(remoteSessionHandle)
+ req.setCatalogName(catalogName)
+ req.setSchemaName(schemaName)
+ req.setFunctionName(functionName)
+ val resp = client.GetFunctions(req)
+ verifyTStatus(resp.getStatus)
+ _remoteOpHandle = resp.getOperationHandle
+ } catch onError()
+ }
+}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala
new file mode 100644
index 000000000..c46369b3b
--- /dev/null
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetSchemas.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.hive.service.rpc.thrift.{TCLIService, TGetSchemasReq, TSessionHandle}
+
+import org.apache.kyuubi.session.Session
+
+class GetSchemas(
+ session: Session,
+ client: TCLIService.Iface,
+ remoteSessionHandle: TSessionHandle,
+ catalogName: String,
+ schemaName: String)
+ extends KyuubiOperation(
+ OperationType.GET_SCHEMAS, session, client, remoteSessionHandle) {
+
+ override protected def runInternal(): Unit = {
+ try {
+ val req = new TGetSchemasReq()
+ req.setSessionHandle(remoteSessionHandle)
+ req.setCatalogName(catalogName)
+ req.setSchemaName(schemaName)
+ val resp = client.GetSchemas(req)
+ verifyTStatus(resp.getStatus)
+ _remoteOpHandle = resp.getOperationHandle
+ } catch onError()
+ }
+}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala
new file mode 100644
index 000000000..0f80de6cc
--- /dev/null
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTableTypes.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.hive.service.rpc.thrift.{TCLIService, TGetTableTypesReq, TSessionHandle}
+
+import org.apache.kyuubi.session.Session
+
+class GetTableTypes(
+ session: Session,
+ client: TCLIService.Iface,
+ remoteSessionHandle: TSessionHandle)
+ extends KyuubiOperation(
+ OperationType.GET_TABLE_TYPES, session, client, remoteSessionHandle) {
+
+ override protected def runInternal(): Unit = {
+ try {
+ val req = new TGetTableTypesReq()
+ req.setSessionHandle(remoteSessionHandle)
+ val resp = client.GetTableTypes(req)
+ verifyTStatus(resp.getStatus)
+ _remoteOpHandle = resp.getOperationHandle
+ } catch onError()
+ }
+}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTables.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTables.scala
new file mode 100644
index 000000000..fbf5a97a0
--- /dev/null
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTables.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.hive.service.rpc.thrift.{TCLIService, TGetTablesReq, TSessionHandle}
+
+import org.apache.kyuubi.session.Session
+
+class GetTables(
+ session: Session,
+ client: TCLIService.Iface,
+ remoteSessionHandle: TSessionHandle,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ tableTypes: java.util.List[String])
+ extends KyuubiOperation(
+ OperationType.GET_TABLES, session, client, remoteSessionHandle) {
+
+ override protected def runInternal(): Unit = {
+ try {
+ val req = new TGetTablesReq()
+ req.setSessionHandle(remoteSessionHandle)
+ req.setCatalogName(catalogName)
+ req.setSchemaName(schemaName)
+ req.setTableName(tableName)
+ req.setTableTypes(tableTypes)
+ val resp = client.GetTables(req)
+ verifyTStatus(resp.getStatus)
+ _remoteOpHandle = resp.getOperationHandle
+ } catch onError()
+ }
+}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala
new file mode 100644
index 000000000..d32fca46a
--- /dev/null
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/GetTypeInfo.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.hive.service.rpc.thrift.{TCLIService, TGetTypeInfoReq, TSessionHandle}
+
+import org.apache.kyuubi.session.Session
+
+class GetTypeInfo(
+ session: Session,
+ client: TCLIService.Iface,
+ remoteSessionHandle: TSessionHandle)
+ extends KyuubiOperation(
+ OperationType.GET_TYPE_INFO, session, client, remoteSessionHandle) {
+
+ override protected def runInternal(): Unit = {
+ try {
+ val req = new TGetTypeInfoReq(remoteSessionHandle)
+ val resp = client.GetTypeInfo(req)
+ verifyTStatus(resp.getStatus)
+ _remoteOpHandle = resp.getOperationHandle
+ } catch onError()
+ }
+}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
new file mode 100644
index 000000000..caf8e9398
--- /dev/null
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.hive.service.rpc.thrift._
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.operation.OperationType.OperationType
+import org.apache.kyuubi.session.Session
+
+abstract class KyuubiOperation(
+ opType: OperationType,
+ session: Session,
+ client: TCLIService.Iface,
+ remoteSessionHandle: TSessionHandle) extends AbstractOperation(opType, session) {
+
+ @volatile protected var _remoteOpHandle: TOperationHandle = _
+
+ def remoteOpHandle(): TOperationHandle = _remoteOpHandle
+
+ protected def verifyTStatus(tStatus: TStatus): Unit = {
+ if (tStatus.getStatusCode != TStatusCode.SUCCESS_STATUS) {
+ throw KyuubiSQLException(tStatus)
+ }
+ }
+
+ protected def onError(action: String = "running"): PartialFunction[Throwable, Unit] = {
+ case e: Exception =>
+ state.synchronized {
+ if (isTerminalState(state)) {
+ warn(s"Ignore exception in terminal state with $statementId: $e")
+ } else {
+ setState(OperationState.ERROR)
+ e match {
+ case kse: KyuubiSQLException => throw kse
+ case _ =>
+ throw KyuubiSQLException(s"Error $action $opType: ${e.getMessage}", e)
+ }
+ }
+ }
+ }
+
+ override protected def beforeRun(): Unit = {
+ setHasResultSet(true)
+ setState(OperationState.RUNNING)
+ }
+
+ override protected def afterRun(): Unit = {
+ state.synchronized {
+ if (!isTerminalState(state)) {
+ setState(OperationState.FINISHED)
+ }
+ }
+ }
+
+ override def cancel(): Unit = {
+ if (_remoteOpHandle != null && !isTerminalState(state)) {
+ try {
+ val req = new TCancelOperationReq(_remoteOpHandle)
+ val resp = client.CancelOperation(req)
+ verifyTStatus(resp.getStatus)
+ setState(OperationState.CANCELED)
+ } catch onError("cancelling")
+ }
+ }
+
+ override def close(): Unit = {
+ if (_remoteOpHandle != null && !isTerminalState(state)) {
+ try {
+ val req = new TCloseOperationReq(_remoteOpHandle)
+ val resp = client.CloseOperation(req)
+ verifyTStatus(resp.getStatus)
+ setState(OperationState.CLOSED)
+ } catch onError("closing")
+ }
+ }
+
+ override def getResultSetSchema: TTableSchema = {
+ val req = new TGetResultSetMetadataReq(_remoteOpHandle)
+ val resp = client.GetResultSetMetadata(req)
+ verifyTStatus(resp.getStatus)
+ resp.getSchema
+ }
+
+ override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
+ validateDefaultFetchOrientation(order)
+ assertState(OperationState.FINISHED)
+ setHasResultSet(true)
+ val req = new TFetchResultsReq(
+ _remoteOpHandle, FetchOrientation.toTFetchOrientation(order), rowSetSize)
+ val resp = client.FetchResults(req)
+ resp.getResults
+ }
+
+ override def shouldRunAsync: Boolean = false
+}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
new file mode 100644
index 000000000..d481f139f
--- /dev/null
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hive.service.rpc.thrift.{TCLIService, TFetchResultsReq, TRowSet, TSessionHandle}
+
+import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.session.{Session, SessionHandle}
+
+class KyuubiOperationManager private (name: String) extends OperationManager(name) {
+
+ def this() = this(classOf[KyuubiOperationManager].getSimpleName)
+
+ private val handleToClient = new ConcurrentHashMap[SessionHandle, TCLIService.Iface]()
+ private val handleToTSessionHandle = new ConcurrentHashMap[SessionHandle, TSessionHandle]()
+
+ private def getThriftClient(sessionHandle: SessionHandle): TCLIService.Iface = {
+ val client = handleToClient.get(sessionHandle)
+ if (client == null) {
+ throw KyuubiSQLException(s"$sessionHandle has not been initialized or already been closed")
+ }
+ client
+ }
+
+ private def getRemoteTSessionHandle(sessionHandle: SessionHandle): TSessionHandle = {
+ val tSessionHandle = handleToTSessionHandle.get(sessionHandle)
+ if (tSessionHandle == null) {
+ throw KyuubiSQLException(s"$sessionHandle has not been initialized or already been closed")
+ }
+ tSessionHandle
+ }
+
+ def setConnection(
+ sessionHandle: SessionHandle,
+ client: TCLIService.Iface,
+ remoteSessionHandle: TSessionHandle): Unit = {
+ handleToClient.put(sessionHandle, client)
+ handleToTSessionHandle.put(sessionHandle, remoteSessionHandle)
+ }
+
+ override def newExecuteStatementOperation(
+ session: Session,
+ statement: String,
+ runAsync: Boolean,
+ queryTimeout: Long): Operation = {
+ val client = getThriftClient(session.handle)
+ val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
+ val operation = new ExecuteStatement(session, client, remoteSessionHandle, statement)
+ addOperation(operation)
+
+ }
+
+ override def newGetTypeInfoOperation(session: Session): Operation = {
+ val client = getThriftClient(session.handle)
+ val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
+ val operation = new GetTypeInfo(session, client, remoteSessionHandle)
+ addOperation(operation)
+ }
+
+ override def newGetCatalogsOperation(session: Session): Operation = {
+ val client = getThriftClient(session.handle)
+ val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
+ val operation = new GetCatalogs(session, client, remoteSessionHandle)
+ addOperation(operation)
+ }
+
+ override def newGetSchemasOperation(
+ session: Session,
+ catalog: String,
+ schema: String): Operation = {
+ val client = getThriftClient(session.handle)
+ val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
+ val operation = new GetSchemas(session, client, remoteSessionHandle, catalog, schema)
+ addOperation(operation)
+ }
+
+ override def newGetTablesOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ tableTypes: java.util.List[String]): Operation = {
+ val client = getThriftClient(session.handle)
+ val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
+ val operation = new GetTables(
+ session, client, remoteSessionHandle, catalogName, schemaName, tableName, tableTypes)
+ addOperation(operation)
+ }
+
+ override def newGetTableTypesOperation(session: Session): Operation = {
+ val client = getThriftClient(session.handle)
+ val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
+ val operation = new GetTableTypes(session, client, remoteSessionHandle)
+ addOperation(operation)
+ }
+
+ override def newGetColumnsOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ columnName: String): Operation = {
+ val client = getThriftClient(session.handle)
+ val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
+ val operation = new GetColumns(
+ session, client, remoteSessionHandle, catalogName, schemaName, tableName, columnName)
+ addOperation(operation)
+ }
+
+ override def newGetFunctionsOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ functionName: String): Operation = {
+ val client = getThriftClient(session.handle)
+ val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
+ val operation = new GetFunctions(
+ session, client, remoteSessionHandle, catalogName, schemaName, functionName)
+ addOperation(operation)
+ }
+
+ override def getOperationLogRowSet(
+ opHandle: OperationHandle,
+ order: FetchOrientation,
+ maxRows: Int): TRowSet = {
+ val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation]
+ val client = getThriftClient(operation.getSession.handle)
+
+ val orientation = FetchOrientation.toTFetchOrientation(order)
+ val req = new TFetchResultsReq(operation.remoteOpHandle(), orientation, maxRows)
+ val resp = client.FetchResults(req)
+ resp.getResults
+ }
+}
diff --git a/kyuubi-server/pom.xml b/kyuubi-server/pom.xml
index 59f98800d..27535388d 100644
--- a/kyuubi-server/pom.xml
+++ b/kyuubi-server/pom.xml
@@ -57,8 +57,8 @@
org.apache.spark
- spark-core_2.12
-
+ spark-core_${scala.binary.version}
+ ${spark.version2}
${spark.scope}
diff --git a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index e1a469874..d8fde67f5 100644
--- a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -41,14 +41,8 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
def getOperationLog: OperationLog = operationLog
- protected def statement: String = opType.toString
-
protected def resultSchema: StructType
- protected def isTerminalState(operationState: OperationState): Boolean = {
- OperationState.isTerminal(operationState)
- }
-
protected def cleanup(targetState: OperationState): Unit = synchronized {
if (!isTerminalState(state)) {
setState(targetState)
@@ -106,12 +100,6 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
}
}
- override def setState(newState: OperationState): Unit = {
- info(s"Processing ${session.user}'s query[$statementId]: ${state.name} -> ${newState.name}," +
- s" statement: $statement")
- super.setState(newState)
- }
-
override protected def beforeRun(): Unit = {
setHasResultSet(true)
setState(OperationState.RUNNING)
diff --git a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index fb89d6e2c..7f014cf3b 100644
--- a/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++ b/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.spark.operation.log.{LogDivertAppender, OperationLog}
+import org.apache.kyuubi.engine.spark.operation.log.LogDivertAppender
import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationManager}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.{Session, SessionHandle}
diff --git a/pom.xml b/pom.xml
index 97b693cf0..d43d5a295 100644
--- a/pom.xml
+++ b/pom.xml
@@ -214,6 +214,47 @@
provided
+
+ org.apache.hive
+ hive-service-rpc
+ ${hive.version}
+
+
+ commons-codec
+ commons-codec
+
+
+ commons-cli
+ commons-cli
+
+
+ tomcat
+ *
+
+
+
+
+
+
+ org.apache.hive
+ hive-jdbc
+ ${hive.version}
+
+
+ commons-codec
+ commons-codec
+
+
+ commons-cli
+ commons-cli
+
+
+ tomcat
+ *
+
+
+
+
org.eclipse.jetty
jetty-servlet