Implementaton of KyuubiOperationManager

This commit is contained in:
Kent Yao 2020-08-21 10:51:55 +08:00
parent 86e1a7aac3
commit 0468fd4d4e
20 changed files with 758 additions and 38 deletions

View File

@ -23,6 +23,7 @@
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -34,7 +35,7 @@
<spark.archive.name>spark-${spark.version}-bin-hadoop2.7.tgz</spark.archive.name>
<!-- see more at http://www.apache.org/mirrors/, e.g. https://mirror.bit.edu.cn, https://mirrors.tuna.tsinghua.edu.cn e.t.c -->
<spark.archive.mirror>https://mirrors.bfsu.edu.cn</spark.archive.mirror>
<spark.archive.sha512>f5652835094d9f69eb3260e20ca9c2d58e8bdf85a8ed15797549a518b23c862b75a329b38d4248f8427e4310718238c60fae0f9d1afb3c70fb390d3e9cce2e49</spark.archive.sha512>
<!-- spark.archive.sha512>f5652835094d9f69eb3260e20ca9c2d58e8bdf85a8ed15797549a518b23c862b75a329b38d4248f8427e4310718238c60fae0f9d1afb3c70fb390d3e9cce2e49</spark.archive.sha512 -->
</properties>
<build>
@ -53,7 +54,7 @@
<configuration>
<url>${spark.archive.mirror}/apache/spark/spark-${spark.version}/${spark.archive.name}</url>
<sha512>${spark.archive.sha512}</sha512>
<outputDirectory>${project.build.directory}</outputDirectory>
<!-- outputDirectory>${project.build.directory}</outputDirectory -->
<readTimeOut>60000</readTimeOut>
<retries>3</retries>
<unpack>true</unpack>

View File

@ -46,21 +46,6 @@
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service-rpc</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- ut -->

View File

@ -41,12 +41,24 @@ case class KyuubiSQLException(msg: String, cause: Throwable) extends SQLExceptio
object KyuubiSQLException {
private final val HEAD_MARK: String = "*"
private final val SEPARATOR: Char = ':'
def apply(cause: Throwable): KyuubiSQLException = {
new KyuubiSQLException(cause.getMessage, cause)
}
def apply(msg: String): KyuubiSQLException = new KyuubiSQLException(msg, null)
def apply(tStatus: TStatus): KyuubiSQLException = {
val msg = tStatus.getErrorMessage
val cause = toCause(tStatus.getInfoMessages.asScala)
cause match {
case k: KyuubiSQLException if k.getMessage == msg => k
case _ => apply(msg, cause)
}
}
def toTStatus(e: Exception): TStatus = e match {
case k: KyuubiSQLException => k.toTStatus
case _ =>
@ -81,16 +93,56 @@ object KyuubiSQLException {
trace: Array[StackTraceElement],
max: Int): List[String] = {
val builder = new StringBuilder
builder.append('*').append(ex.getClass.getName).append(':')
builder.append(ex.getMessage).append(':')
builder.append(trace.length).append(':').append(max)
builder.append(HEAD_MARK).append(ex.getClass.getName).append(SEPARATOR)
builder.append(ex.getMessage).append(SEPARATOR)
builder.append(trace.length).append(SEPARATOR).append(max)
List(builder.toString) ++ (0 to max).map { i =>
builder.setLength(0)
builder.append(trace(i).getClassName).append(":")
builder.append(trace(i).getMethodName).append(":")
builder.append(Option(trace(i).getFileName).getOrElse("")).append(':')
builder.append(trace(i).getClassName).append(SEPARATOR)
builder.append(trace(i).getMethodName).append(SEPARATOR)
builder.append(Option(trace(i).getFileName).getOrElse("")).append(SEPARATOR)
builder.append(trace(i).getLineNumber)
builder.toString
}.toList
}
private def newInstance(className: String, message: String, cause: Throwable): Throwable = {
try {
Class.forName(className)
.getConstructor(classOf[String], classOf[Throwable])
.newInstance(message, cause).asInstanceOf[Throwable]
} catch {
case e: Exception => throw new RuntimeException(className + ":" + message, e)
}
}
private def getCoordinates(line: String): (Int, Int, Int) = {
val i1 = line.indexOf(SEPARATOR)
val i3 = line.lastIndexOf(SEPARATOR)
val i2 = line.substring(0, i3).lastIndexOf(SEPARATOR)
(i1, i2, i3)
}
private def toCause(details: Seq[String]): Throwable = {
var ex: Throwable = null
if (details != null && details.nonEmpty) {
val head = details.head
val (i1, i2, i3) = getCoordinates(head)
val exClz = head.substring(1, i1)
val msg = head.substring(i1 + 1, i2)
val length = head.substring(i3 + 1).toInt
val stackTraceElements = details.tail.take(length + 1).map { line =>
val (i1, i2, i3) = getCoordinates(line)
val clzName = line.substring(0, i1)
val methodName = line.substring(i1 + 1, i2)
val fileName = line.substring(i2 + 1, i3)
val lineNum = line.substring(i3 + 1).toInt
new StackTraceElement(clzName, methodName, fileName, lineNum)
}
ex = newInstance(exClz, msg, toCause(details.slice(length + 2, details.length)))
ex.setStackTrace(stackTraceElements.toArray)
}
ex
}
}

View File

@ -46,6 +46,8 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
@volatile protected var operationException: KyuubiSQLException = _
@volatile protected var hasResultSet: Boolean = false
protected def statement: String = opType.toString
protected def setHasResultSet(hasResultSet: Boolean): Unit = {
this.hasResultSet = hasResultSet
handle.setHasResultSet(hasResultSet)
@ -56,6 +58,8 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
}
protected def setState(newState: OperationState): Unit = {
info(s"Processing ${session.user}'s query[$statementId]: ${state.name} -> ${newState.name}," +
s" statement: $statement")
OperationState.validateTransition(state, newState)
state = newState
@ -72,6 +76,10 @@ abstract class AbstractOperation(opType: OperationType, session: Session)
state == OperationState.CLOSED || state == OperationState.CANCELED
}
protected def isTerminalState(operationState: OperationState): Boolean = {
OperationState.isTerminal(operationState)
}
protected def assertState(state: OperationState): Unit = {
if (this.state ne state) {
throw new IllegalStateException(s"Expected state $state, but found ${this.state}")

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi
class KyuubiSQLExceptionSuite extends KyuubiFunSuite {
test("KyuubiSQLException") {
val msg0 = "this is just a dummy msg 0"
val msg1 = "this is just a dummy msg 1"
val msg2 = "this is just a dummy msg 2"
val e0 = new RuntimeException(msg0)
val e1 = new KyuubiException(msg1, e0)
val e2 = new KyuubiSQLException(msg2, e1)
assert(e2.toTStatus === KyuubiSQLException.toTStatus(e2))
val e3 = KyuubiSQLException(e2.toTStatus)
assert(e3.getMessage === e2.getMessage)
assert(e3.getStackTrace === e2.getStackTrace)
assert(e3.getCause.getMessage === e1.getMessage)
assert(e3.getCause.getCause.getMessage === e0.getMessage)
}
}

View File

@ -36,6 +36,7 @@
<artifactId>kyuubi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-ha</artifactId>

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift.{TCLIService, TExecuteStatementReq, TSessionHandle}
import org.apache.kyuubi.session.Session
class ExecuteStatement(
session: Session,
client: TCLIService.Iface,
remoteSessionHandle: TSessionHandle,
statement: String)
extends KyuubiOperation(
OperationType.EXECUTE_STATEMENT, session, client, remoteSessionHandle) {
override protected def runInternal(): Unit = {
try {
val req = new TExecuteStatementReq(remoteSessionHandle, statement)
val resp = client.ExecuteStatement(req)
verifyTStatus(resp.getStatus)
_remoteOpHandle = resp.getOperationHandle
} catch onError()
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift.{TCLIService, TGetCatalogsReq, TSessionHandle}
import org.apache.kyuubi.session.Session
class GetCatalogs(
session: Session,
client: TCLIService.Iface,
remoteSessionHandle: TSessionHandle)
extends KyuubiOperation(
OperationType.GET_CATALOGS, session, client, remoteSessionHandle) {
override protected def runInternal(): Unit = {
try {
val req = new TGetCatalogsReq(remoteSessionHandle)
val resp = client.GetCatalogs(req)
verifyTStatus(resp.getStatus)
_remoteOpHandle = resp.getOperationHandle
} catch onError()
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift.{TCLIService, TGetColumnsReq, TSessionHandle}
import org.apache.kyuubi.session.Session
class GetColumns(
session: Session,
client: TCLIService.Iface,
remoteSessionHandle: TSessionHandle,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String)
extends KyuubiOperation(
OperationType.GET_COLUMNS, session, client, remoteSessionHandle) {
override protected def runInternal(): Unit = {
try {
val req = new TGetColumnsReq(remoteSessionHandle)
req.setCatalogName(catalogName)
req.setSchemaName(schemaName)
req.setTableName(tableName)
req.setColumnName(columnName)
val resp = client.GetColumns(req)
verifyTStatus(resp.getStatus)
_remoteOpHandle = resp.getOperationHandle
} catch onError()
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift.{TCLIService, TGetFunctionsReq, TSessionHandle}
import org.apache.kyuubi.session.Session
class GetFunctions(
session: Session,
client: TCLIService.Iface,
remoteSessionHandle: TSessionHandle,
catalogName: String,
schemaName: String,
functionName: String)
extends KyuubiOperation(
OperationType.GET_FUNCTIONS, session, client, remoteSessionHandle) {
override protected def runInternal(): Unit = {
try {
val req = new TGetFunctionsReq()
req.setSessionHandle(remoteSessionHandle)
req.setCatalogName(catalogName)
req.setSchemaName(schemaName)
req.setFunctionName(functionName)
val resp = client.GetFunctions(req)
verifyTStatus(resp.getStatus)
_remoteOpHandle = resp.getOperationHandle
} catch onError()
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift.{TCLIService, TGetSchemasReq, TSessionHandle}
import org.apache.kyuubi.session.Session
class GetSchemas(
session: Session,
client: TCLIService.Iface,
remoteSessionHandle: TSessionHandle,
catalogName: String,
schemaName: String)
extends KyuubiOperation(
OperationType.GET_SCHEMAS, session, client, remoteSessionHandle) {
override protected def runInternal(): Unit = {
try {
val req = new TGetSchemasReq()
req.setSessionHandle(remoteSessionHandle)
req.setCatalogName(catalogName)
req.setSchemaName(schemaName)
val resp = client.GetSchemas(req)
verifyTStatus(resp.getStatus)
_remoteOpHandle = resp.getOperationHandle
} catch onError()
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift.{TCLIService, TGetTableTypesReq, TSessionHandle}
import org.apache.kyuubi.session.Session
class GetTableTypes(
session: Session,
client: TCLIService.Iface,
remoteSessionHandle: TSessionHandle)
extends KyuubiOperation(
OperationType.GET_TABLE_TYPES, session, client, remoteSessionHandle) {
override protected def runInternal(): Unit = {
try {
val req = new TGetTableTypesReq()
req.setSessionHandle(remoteSessionHandle)
val resp = client.GetTableTypes(req)
verifyTStatus(resp.getStatus)
_remoteOpHandle = resp.getOperationHandle
} catch onError()
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift.{TCLIService, TGetTablesReq, TSessionHandle}
import org.apache.kyuubi.session.Session
class GetTables(
session: Session,
client: TCLIService.Iface,
remoteSessionHandle: TSessionHandle,
catalogName: String,
schemaName: String,
tableName: String,
tableTypes: java.util.List[String])
extends KyuubiOperation(
OperationType.GET_TABLES, session, client, remoteSessionHandle) {
override protected def runInternal(): Unit = {
try {
val req = new TGetTablesReq()
req.setSessionHandle(remoteSessionHandle)
req.setCatalogName(catalogName)
req.setSchemaName(schemaName)
req.setTableName(tableName)
req.setTableTypes(tableTypes)
val resp = client.GetTables(req)
verifyTStatus(resp.getStatus)
_remoteOpHandle = resp.getOperationHandle
} catch onError()
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift.{TCLIService, TGetTypeInfoReq, TSessionHandle}
import org.apache.kyuubi.session.Session
class GetTypeInfo(
session: Session,
client: TCLIService.Iface,
remoteSessionHandle: TSessionHandle)
extends KyuubiOperation(
OperationType.GET_TYPE_INFO, session, client, remoteSessionHandle) {
override protected def runInternal(): Unit = {
try {
val req = new TGetTypeInfoReq(remoteSessionHandle)
val resp = client.GetTypeInfo(req)
verifyTStatus(resp.getStatus)
_remoteOpHandle = resp.getOperationHandle
} catch onError()
}
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationType.OperationType
import org.apache.kyuubi.session.Session
abstract class KyuubiOperation(
opType: OperationType,
session: Session,
client: TCLIService.Iface,
remoteSessionHandle: TSessionHandle) extends AbstractOperation(opType, session) {
@volatile protected var _remoteOpHandle: TOperationHandle = _
def remoteOpHandle(): TOperationHandle = _remoteOpHandle
protected def verifyTStatus(tStatus: TStatus): Unit = {
if (tStatus.getStatusCode != TStatusCode.SUCCESS_STATUS) {
throw KyuubiSQLException(tStatus)
}
}
protected def onError(action: String = "running"): PartialFunction[Throwable, Unit] = {
case e: Exception =>
state.synchronized {
if (isTerminalState(state)) {
warn(s"Ignore exception in terminal state with $statementId: $e")
} else {
setState(OperationState.ERROR)
e match {
case kse: KyuubiSQLException => throw kse
case _ =>
throw KyuubiSQLException(s"Error $action $opType: ${e.getMessage}", e)
}
}
}
}
override protected def beforeRun(): Unit = {
setHasResultSet(true)
setState(OperationState.RUNNING)
}
override protected def afterRun(): Unit = {
state.synchronized {
if (!isTerminalState(state)) {
setState(OperationState.FINISHED)
}
}
}
override def cancel(): Unit = {
if (_remoteOpHandle != null && !isTerminalState(state)) {
try {
val req = new TCancelOperationReq(_remoteOpHandle)
val resp = client.CancelOperation(req)
verifyTStatus(resp.getStatus)
setState(OperationState.CANCELED)
} catch onError("cancelling")
}
}
override def close(): Unit = {
if (_remoteOpHandle != null && !isTerminalState(state)) {
try {
val req = new TCloseOperationReq(_remoteOpHandle)
val resp = client.CloseOperation(req)
verifyTStatus(resp.getStatus)
setState(OperationState.CLOSED)
} catch onError("closing")
}
}
override def getResultSetSchema: TTableSchema = {
val req = new TGetResultSetMetadataReq(_remoteOpHandle)
val resp = client.GetResultSetMetadata(req)
verifyTStatus(resp.getStatus)
resp.getSchema
}
override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
val req = new TFetchResultsReq(
_remoteOpHandle, FetchOrientation.toTFetchOrientation(order), rowSetSize)
val resp = client.FetchResults(req)
resp.getResults
}
override def shouldRunAsync: Boolean = false
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation
import java.util.concurrent.ConcurrentHashMap
import org.apache.hive.service.rpc.thrift.{TCLIService, TFetchResultsReq, TRowSet, TSessionHandle}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.{Session, SessionHandle}
class KyuubiOperationManager private (name: String) extends OperationManager(name) {
def this() = this(classOf[KyuubiOperationManager].getSimpleName)
private val handleToClient = new ConcurrentHashMap[SessionHandle, TCLIService.Iface]()
private val handleToTSessionHandle = new ConcurrentHashMap[SessionHandle, TSessionHandle]()
private def getThriftClient(sessionHandle: SessionHandle): TCLIService.Iface = {
val client = handleToClient.get(sessionHandle)
if (client == null) {
throw KyuubiSQLException(s"$sessionHandle has not been initialized or already been closed")
}
client
}
private def getRemoteTSessionHandle(sessionHandle: SessionHandle): TSessionHandle = {
val tSessionHandle = handleToTSessionHandle.get(sessionHandle)
if (tSessionHandle == null) {
throw KyuubiSQLException(s"$sessionHandle has not been initialized or already been closed")
}
tSessionHandle
}
def setConnection(
sessionHandle: SessionHandle,
client: TCLIService.Iface,
remoteSessionHandle: TSessionHandle): Unit = {
handleToClient.put(sessionHandle, client)
handleToTSessionHandle.put(sessionHandle, remoteSessionHandle)
}
override def newExecuteStatementOperation(
session: Session,
statement: String,
runAsync: Boolean,
queryTimeout: Long): Operation = {
val client = getThriftClient(session.handle)
val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
val operation = new ExecuteStatement(session, client, remoteSessionHandle, statement)
addOperation(operation)
}
override def newGetTypeInfoOperation(session: Session): Operation = {
val client = getThriftClient(session.handle)
val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
val operation = new GetTypeInfo(session, client, remoteSessionHandle)
addOperation(operation)
}
override def newGetCatalogsOperation(session: Session): Operation = {
val client = getThriftClient(session.handle)
val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
val operation = new GetCatalogs(session, client, remoteSessionHandle)
addOperation(operation)
}
override def newGetSchemasOperation(
session: Session,
catalog: String,
schema: String): Operation = {
val client = getThriftClient(session.handle)
val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
val operation = new GetSchemas(session, client, remoteSessionHandle, catalog, schema)
addOperation(operation)
}
override def newGetTablesOperation(
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
tableTypes: java.util.List[String]): Operation = {
val client = getThriftClient(session.handle)
val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
val operation = new GetTables(
session, client, remoteSessionHandle, catalogName, schemaName, tableName, tableTypes)
addOperation(operation)
}
override def newGetTableTypesOperation(session: Session): Operation = {
val client = getThriftClient(session.handle)
val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
val operation = new GetTableTypes(session, client, remoteSessionHandle)
addOperation(operation)
}
override def newGetColumnsOperation(
session: Session,
catalogName: String,
schemaName: String,
tableName: String,
columnName: String): Operation = {
val client = getThriftClient(session.handle)
val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
val operation = new GetColumns(
session, client, remoteSessionHandle, catalogName, schemaName, tableName, columnName)
addOperation(operation)
}
override def newGetFunctionsOperation(
session: Session,
catalogName: String,
schemaName: String,
functionName: String): Operation = {
val client = getThriftClient(session.handle)
val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
val operation = new GetFunctions(
session, client, remoteSessionHandle, catalogName, schemaName, functionName)
addOperation(operation)
}
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = {
val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation]
val client = getThriftClient(operation.getSession.handle)
val orientation = FetchOrientation.toTFetchOrientation(order)
val req = new TFetchResultsReq(operation.remoteOpHandle(), orientation, maxRows)
val resp = client.FetchResults(req)
resp.getResults
}
}

View File

@ -57,8 +57,8 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version></version>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version2}</version>
<scope>${spark.scope}</scope>
</dependency>

View File

@ -41,14 +41,8 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
def getOperationLog: OperationLog = operationLog
protected def statement: String = opType.toString
protected def resultSchema: StructType
protected def isTerminalState(operationState: OperationState): Boolean = {
OperationState.isTerminal(operationState)
}
protected def cleanup(targetState: OperationState): Unit = synchronized {
if (!isTerminalState(state)) {
setState(targetState)
@ -106,12 +100,6 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
}
}
override def setState(newState: OperationState): Unit = {
info(s"Processing ${session.user}'s query[$statementId]: ${state.name} -> ${newState.name}," +
s" statement: $statement")
super.setState(newState)
}
override protected def beforeRun(): Unit = {
setHasResultSet(true)
setState(OperationState.RUNNING)

View File

@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.operation.log.{LogDivertAppender, OperationLog}
import org.apache.kyuubi.engine.spark.operation.log.LogDivertAppender
import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationManager}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.{Session, SessionHandle}

41
pom.xml
View File

@ -214,6 +214,47 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service-rpc</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>