Deploy yaooqinn/kyuubi to github.com/yaooqinn/kyuubi.git:gh-pages

This commit is contained in:
Kent Yao (from Travis CI) 2020-11-28 02:58:36 +00:00
parent f802057c0f
commit 9b4f19ea0b
27 changed files with 231 additions and 164 deletions

6
.gitignore vendored
View File

@ -45,9 +45,11 @@ metrics/report.json
metrics/.report.json.crc
/kyuubi-ha/embedded_zookeeper/
embedded_zookeeper/
/externals/kyuubi-spark-sql-engine/kyuubi_operation_logs/
/externals/kyuubi-spark-sql-engine/operation_logs/
/externals/kyuubi-spark-sql-engine/spark-warehouse/
/work/
/docs/_build/
/kyuubi-common/metrics/
kyuubi-common/kyuubi_operation_logs/
kyuubi-common/operation_logs/
/kyuubi-common/operation_logs/
/kyuubi-main/operation_logs/

View File

@ -138,7 +138,7 @@ kyuubi\.frontend\.bind<br>\.host|<div style='width: 80pt;word-wrap: break-word;w
kyuubi\.frontend\.bind<br>\.port|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>10009</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Port of the machine on which to run the frontend service.</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.frontend\.login<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT20S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Timeout for Thrift clients during login to the frontend service.</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.frontend\.max<br>\.message\.size|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>104857600</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Maximum message size in bytes a Kyuubi server will accept.</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.frontend\.max<br>\.worker\.threads|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>99</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Maximum number of threads in the of frontend worker thread pool for the frontend service</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.frontend\.max<br>\.worker\.threads|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>999</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Maximum number of threads in the of frontend worker thread pool for the frontend service</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.frontend\.min<br>\.worker\.threads|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>9</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Minimum number of threads in the of frontend worker thread pool for the frontend service</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.frontend<br>\.worker\.keepalive\.time|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Keep-alive time (in milliseconds) for an idle worker thread</div>|<div style='width: 20pt'>1.0.0</div>

View File

@ -185,3 +185,13 @@ This error means that you are using incompatible version of Hive metastore clien
To fix this problem you could use a compatible version for Hive client by configuring
`spark.sql.hive.metastore.jars` and `spark.sql.hive.metastore.version` at Spark side.
### hive.server2.thrift.max.worker.threads
```java
Unexpected end of file when reading from HS2 server. The root cause might be too many concurrent connections. Please ask the administrator to check the number of active connections, and adjust hive.server2.thrift.max.worker.threads if applicable.
Error: org.apache.thrift.transport.TTransportException (state=08S01,code=0)
```
In Kyuubi, we should increase `kyuubi.frontend.min.worker.threads` instead of `hive.server2.thrift.max.worker.threads`

View File

@ -24,8 +24,8 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.engine.spark.operation.log.OperationLog
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
class ExecuteStatement(

View File

@ -23,11 +23,11 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.spark.operation.log.OperationLog
import org.apache.kyuubi.operation.{AbstractOperation, OperationState}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.OperationType.OperationType
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.schema.{RowSet, SchemaHelper}
import org.apache.kyuubi.session.Session
@ -38,8 +38,8 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
protected final val operationLog: OperationLog =
OperationLog.createOperationLog(session.handle, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
def getOperationLog: OperationLog = operationLog
protected def resultSchema: StructType
@ -88,11 +88,12 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
case e: Exception =>
if (cancel) spark.sparkContext.cancelJobGroup(statementId)
state.synchronized {
val errMsg = KyuubiSQLException.stringifyException(e)
if (isTerminalState(state)) {
warn(s"Ignore exception in terminal state with $statementId: $e")
warn(s"Ignore exception in terminal state with $statementId: $errMsg")
} else {
setState(OperationState.ERROR)
val ke = KyuubiSQLException(s"Error operating $opType: ${e.getMessage}", e)
val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
setOperationException(ke)
throw ke
}
@ -121,7 +122,7 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
override def close(): Unit = {
cleanup(OperationState.CLOSED)
if (operationLog != null) operationLog.close()
getOperationLog.foreach(_.close())
}
override def getResultSetSchema: TTableSchema = SchemaHelper.toTTableSchema(resultSchema)

View File

@ -19,14 +19,10 @@ package org.apache.kyuubi.engine.spark.operation
import java.util.concurrent.ConcurrentHashMap
import org.apache.hive.service.rpc.thrift.TRowSet
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.operation.log.LogDivertAppender
import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationManager}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.{Operation, OperationManager}
import org.apache.kyuubi.session.{Session, SessionHandle}
class SparkSQLOperationManager private (name: String) extends OperationManager(name) {
@ -122,17 +118,4 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
val op = new GetFunctions(spark, session, catalogName, schemaName, functionName)
addOperation(op)
}
override def initialize(conf: KyuubiConf): Unit = {
LogDivertAppender.initialize()
super.initialize(conf)
}
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = {
val log = getOperation(opHandle).asInstanceOf[SparkOperation].getOperationLog
log.read(maxRows)
}
}

View File

@ -19,7 +19,6 @@ package org.apache.kyuubi.engine.spark.session
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.engine.spark.operation.log.OperationLog
import org.apache.kyuubi.session.{AbstractSession, SessionManager}
class SparkSessionImpl(
@ -31,7 +30,4 @@ class SparkSessionImpl(
sessionManager: SessionManager)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
override def open(): Unit = {
OperationLog.createOperationLogRootDirectory(handle)
}
}

View File

@ -138,12 +138,10 @@ class SparkOperationSuite extends WithSparkSQLEngine {
}
val e = intercept[HiveSQLException](metaData.getColumns(null, "*", null, null))
assert(e.getCause.getMessage === "org.apache.kyuubi.KyuubiSQLException:" +
"Error operating GET_COLUMNS: Dangling meta character '*' near index 0\n*\n^")
assert(e.getCause.getMessage contains "Dangling meta character '*' near index 0\n*\n^")
val e1 = intercept[HiveSQLException](metaData.getColumns(null, null, null, "*"))
assert(e1.getCause.getMessage === "org.apache.kyuubi.KyuubiSQLException:" +
"Error operating GET_COLUMNS: Dangling meta character '*' near index 0\n*\n^")
assert(e1.getCause.getMessage contains "Dangling meta character '*' near index 0\n*\n^")
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kyuubi
import java.io.{PrintWriter, StringWriter}
import java.sql.SQLException
import scala.collection.JavaConverters._
@ -147,4 +148,12 @@ object KyuubiSQLException {
ex
}
def stringifyException(e: Throwable): String = {
val stm = new StringWriter
val wrt = new PrintWriter(stm)
e.printStackTrace(wrt)
wrt.close()
stm.toString
}
}

View File

@ -27,6 +27,8 @@ import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP}
case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
import KyuubiConf._
private val settings = new ConcurrentHashMap[String, String]()
private lazy val reader: ConfigProvider = new ConfigProvider(settings)
@ -111,6 +113,16 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
cloned
}
private val serverOnlyConfEntries: Set[ConfigEntry[_]] = Set(
EMBEDDED_ZK_PORT,
EMBEDDED_ZK_TEMP_DIR,
FRONTEND_BIND_HOST,
FRONTEND_BIND_PORT,
AUTHENTICATION_METHOD,
SERVER_KEYTAB,
SERVER_PRINCIPAL,
KINIT_INTERVAL)
def getUserDefaults(user: String): KyuubiConf = {
val cloned = KyuubiConf(false)
@ -121,6 +133,7 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
for ((k, v) <- getAllWithPrefix(s"___${user}___", "")) {
cloned.set(k, v)
}
serverOnlyConfEntries.foreach(cloned.unset)
cloned
}
@ -239,7 +252,7 @@ object KyuubiConf {
" service")
.version("1.0.0")
.intConf
.createWithDefault(99)
.createWithDefault(999)
val FRONTEND_WORKER_KEEPALIVE_TIME: ConfigEntry[Long] =
buildConf("frontend.worker.keepalive.time")

View File

@ -19,9 +19,11 @@ package org.apache.kyuubi.operation
import java.util.concurrent.Future
import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet, TTableSchema}
import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet, TStatus, TStatusCode, TTableSchema}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
trait Operation {
@ -37,6 +39,7 @@ trait Operation {
def getSession: Session
def getHandle: OperationHandle
def getStatus: OperationStatus
def getOperationLog: Option[OperationLog]
def getBackgroundHandle: Future[_]
def shouldRunAsync: Boolean

View File

@ -20,7 +20,9 @@ package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.log.LogDivertAppender
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.session.Session
@ -34,6 +36,11 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
private final val handleToOperation = new java.util.HashMap[OperationHandle, Operation]()
override def initialize(conf: KyuubiConf): Unit = {
LogDivertAppender.initialize()
super.initialize(conf)
}
def newExecuteStatementOperation(
session: Session,
statement: String,
@ -110,7 +117,12 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet
maxRows: Int): TRowSet = {
val operationLog = getOperation(opHandle).getOperationLog
operationLog.map(_.read(maxRows)).getOrElse{
throw KyuubiSQLException(s"$opHandle failed to generate operation log")
}
}
final def removeExpiredOperations(handles: Seq[OperationHandle]): Seq[Operation] = synchronized {
handles.map(handleToOperation.get).filter { operation =>

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.operation.log
package org.apache.kyuubi.operation.log
import java.io.CharArrayWriter

View File

@ -15,27 +15,22 @@
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.operation.log
package org.apache.kyuubi.operation.log
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
import org.apache.commons.io.FileUtils
import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn}
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.operation.OperationHandle
import org.apache.kyuubi.session.SessionHandle
/**
* TODO: This part is spark-independent, mv to kyuubi-common?
* we can do this after we decide to support other engines
*/
object OperationLog extends Logging {
final val LOG_ROOT: String = "kyuubi_operation_logs"
final val LOG_ROOT: String = "operation_logs"
private final val OPERATION_LOG: InheritableThreadLocal[OperationLog] = {
new InheritableThreadLocal[OperationLog] {
override def initialValue(): OperationLog = null
@ -132,7 +127,7 @@ class OperationLog(path: Path) extends Logging {
try {
reader.close()
writer.close()
FileUtils.forceDelete(path.toFile)
Files.delete(path)
} catch {
case e: IOException =>
error(s"Failed to remove corresponding log file of operation: ${path.toAbsolutePath}", e)

View File

@ -22,7 +22,7 @@ import org.apache.thrift.{TProcessor, TProcessorFactory}
import org.apache.thrift.transport.TTransport
private[authentication]
case class CLIServiceProcessorFactory(saslServer: HadoopThriftAuthBridgeServer, service: Iface)
case class FEServiceProcessorFactory(saslServer: HadoopThriftAuthBridgeServer, service: Iface)
extends TProcessorFactory(null) {
override def getProcessor(trans: TTransport): TProcessor = {
val sqlProcessor = new Processor[Iface](service)

View File

@ -19,9 +19,8 @@ package org.apache.kyuubi.service.authentication
import java.io.IOException
import java.net.InetAddress
import java.security.{PrivilegedAction, PrivilegedExceptionAction}
import java.util.Locale
import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback, UnsupportedCallbackException}
import java.security.PrivilegedAction
import javax.security.auth.callback._
import javax.security.sasl.{AuthorizeCallback, RealmCallback}
import org.apache.commons.codec.binary.Base64
@ -31,7 +30,7 @@ import org.apache.hadoop.security.SaslRpcServer.AuthMethod
import org.apache.hadoop.security.token.SecretManager.InvalidToken
import org.apache.thrift.{TException, TProcessor}
import org.apache.thrift.protocol.TProtocol
import org.apache.thrift.transport.{TSaslServerTransport, TSocket, TTransport, TTransportException, TTransportFactory}
import org.apache.thrift.transport._
import org.apache.kyuubi.Logging
@ -72,20 +71,11 @@ class HadoopThriftAuthBridgeServer(secretMgr: KyuubiDelegationTokenManager) {
new TUGIAssumingTransportFactory(ugi, transFactory)
}
/**
* Wrap a TProcessor in such a way that, before processing any RPC, it
* assumes the UserGroupInformation of the user authenticated by
* the SASL transport.
*/
def wrapProcessor(processor: TProcessor): TProcessor = {
new TUGIAssumingProcessor(processor, secretMgr, userProxy = true)
}
/**
* Wrap a TProcessor to capture the client information like connecting userid, ip etc
*/
def wrapNonAssumingProcessor(processor: TProcessor): TProcessor = {
new TUGIAssumingProcessor(processor, secretMgr, userProxy = false)
new TUGIAssumingProcessor(processor, secretMgr)
}
def getRemoteAddress: InetAddress = REMOTE_ADDRESS.get
@ -141,8 +131,7 @@ object HadoopThriftAuthBridgeServer {
*/
class TUGIAssumingProcessor(
wrapped: TProcessor,
secretMgr: KyuubiDelegationTokenManager,
userProxy: Boolean) extends TProcessor with Logging {
secretMgr: KyuubiDelegationTokenManager) extends TProcessor with Logging {
override def process(in: TProtocol, out: TProtocol): Boolean = {
val transport = in.getTransport
transport match {
@ -155,48 +144,41 @@ object HadoopThriftAuthBridgeServer {
REMOTE_ADDRESS.set(socket.getInetAddress)
val mechanismName = saslServer.getMechanismName
USER_AUTH_MECHANISM.set(mechanismName)
AuthMethod.valueOf(mechanismName.toUpperCase(Locale.ROOT)) match {
case AuthMethod.PLAIN =>
REMOTE_USER.set(endUser)
wrapped.process(in, out)
case other =>
if (other.equals(AuthMethod.TOKEN)) try {
val identifier = SaslRpcServer.getIdentifier(authId, secretMgr)
endUser = identifier.getUser.getUserName
} catch {
case e: InvalidToken => throw new TException(e.getMessage)
}
var clientUgi: UserGroupInformation = null
if (AuthMethod.PLAIN.getMechanismName.equalsIgnoreCase(mechanismName)) {
REMOTE_USER.set(endUser)
wrapped.process(in, out)
} else {
if (AuthMethod.TOKEN.getMechanismName.equalsIgnoreCase(mechanismName)) {
try {
if (userProxy) {
clientUgi =
UserGroupInformation.createProxyUser(endUser, UserGroupInformation.getLoginUser)
REMOTE_USER.set(clientUgi.getShortUserName)
clientUgi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = try wrapped.process(in, out) catch {
case e: TException => throw new RuntimeException(e)
}
})
} else {
val endUserUgi = UserGroupInformation.createRemoteUser(endUser)
REMOTE_USER.set(endUserUgi.getShortUserName)
debug(s"SET REMOTE USER: ${REMOTE_USER.get()} from endUser: $endUser")
wrapped.process(in, out)
}
val identifier = SaslRpcServer.getIdentifier(authId, secretMgr)
endUser = identifier.getUser.getUserName
} catch {
case e: RuntimeException => e.getCause match {
case t: TException => throw t
case _ => throw e
}
case e: InterruptedException => throw new RuntimeException(e)
case e: IOException => throw new RuntimeException(e)
} finally if (clientUgi != null) try FileSystem.closeAllForUGI(clientUgi) catch {
case e: IOException =>
error(s"Could not clean up file-system handles for UGI: $clientUgi", e)
case e: InvalidToken => throw new TException(e.getMessage)
}
}
val clientUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(endUser)
try {
REMOTE_USER.set(clientUgi.getShortUserName)
debug(s"SET REMOTE USER: ${REMOTE_USER.get()} from endUser: $clientUgi")
wrapped.process(in, out)
} catch {
case e: RuntimeException => e.getCause match {
case t: TException => throw t
case _ => throw e
}
case e: InterruptedException => throw new RuntimeException(e)
case e: IOException => throw new RuntimeException(e)
} finally {
try {
FileSystem.closeAllForUGI(clientUgi)
} catch {
case e: IOException =>
error(s"Could not clean up file-system handles for UGI: $clientUgi", e)
}
}
}
case _ => throw new TException(s"Unexpected non-Sasl transport ${transport.getClass}")
case _ => throw new TException(s"Unexpected non-SASL transport ${transport.getClass}")
}
}
}

View File

@ -61,11 +61,15 @@ class KyuubiAuthenticationFactory(conf: KyuubiConf) {
def getTTransportFactory: TTransportFactory = {
saslServer match {
case Some(server) => try {
server.createSaslServerTransportFactory(getSaslProperties)
} catch {
case e: TTransportException => throw new LoginException(e.getMessage)
}
case Some(server) =>
val serverTransportFactory = try {
server.createSaslServerTransportFactory(getSaslProperties)
} catch {
case e: TTransportException => throw new LoginException(e.getMessage)
}
server.wrapTransportFactory(serverTransportFactory)
case _ => authType match {
case NOSASL => new TTransportFactory
case _ => PlainSASLHelper.getTransportFactory(authType.toString, conf)
@ -74,7 +78,7 @@ class KyuubiAuthenticationFactory(conf: KyuubiConf) {
}
def getTProcessorFactory(fe: Iface): TProcessorFactory = saslServer match {
case Some(server) => CLIServiceProcessorFactory(server, fe)
case Some(server) => FEServiceProcessorFactory(server, fe)
case _ => PlainSASLHelper.getProcessFactory(fe)
}

View File

@ -24,6 +24,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.log.OperationLog
abstract class AbstractSession(
val protocol: TProtocolVersion,
@ -222,4 +223,9 @@ abstract class AbstractSession(
}
}
}
override def open(): Unit = {
OperationLog.createOperationLogRootDirectory(handle)
}
}

View File

@ -122,8 +122,7 @@ trait JDBCTests extends KyuubiFunSuite {
checkResult(metaData.getSchemas(null, "db_not_exist"), Seq.empty)
val e = intercept[HiveSQLException](metaData.getSchemas(null, "*"))
assert(e.getCause.getMessage === "org.apache.kyuubi.KyuubiSQLException:" +
"Error operating GET_SCHEMAS: Dangling meta character '*' near index 0\n*\n^")
assert(e.getCause.getMessage contains "Dangling meta character '*' near index 0\n*\n^")
}
}
@ -185,8 +184,7 @@ trait JDBCTests extends KyuubiFunSuite {
}
val e = intercept[HiveSQLException](metaData.getTables(null, "*", null, null))
assert(e.getCause.getMessage === "org.apache.kyuubi.KyuubiSQLException:" +
"Error operating GET_TABLES: Dangling meta character '*' near index 0\n*\n^")
assert(e.getCause.getMessage contains "Dangling meta character '*' near index 0\n*\n^")
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TPrimitiveTypeE
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationType.OperationType
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
class NoopOperation(typ: OperationType, session: Session, shouldFail: Boolean = false)
@ -48,7 +49,6 @@ class NoopOperation(typ: OperationType, session: Session, shouldFail: Boolean =
if (!OperationState.isTerminal(state)) {
setState(OperationState.FINISHED)
}
}
override def cancel(): Unit = {
@ -81,4 +81,6 @@ class NoopOperation(typ: OperationType, session: Session, shouldFail: Boolean =
}
override def shouldRunAsync: Boolean = false
override def getOperationLog: Option[OperationLog] = None
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.operation.log
package org.apache.kyuubi.operation.log
import java.nio.file.{Files, Paths}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi
import org.apache.hive.service.rpc.thrift.{TRow, TRowSet, TStatus, TStatusCode}
object ThriftUtils {
def verifyTStatus(tStatus: TStatus): Unit = {
if (tStatus.getStatusCode != TStatusCode.SUCCESS_STATUS) {
throw KyuubiSQLException(tStatus)
}
}
val EMPTY_ROW_SET = new TRowSet(0, new java.util.ArrayList[TRow](0))
}

View File

@ -17,12 +17,13 @@
package org.apache.kyuubi.operation
import java.util.concurrent.RejectedExecutionException
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.{TCLIService, TExecuteStatementReq, TGetOperationStatusReq, TSessionHandle}
import org.apache.hive.service.rpc.thrift.{TCLIService, TExecuteStatementReq, TFetchOrientation, TFetchResultsReq, TGetOperationStatusReq, TSessionHandle}
import org.apache.hive.service.rpc.thrift.TOperationState._
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
class ExecuteStatement(
@ -34,14 +35,30 @@ class ExecuteStatement(
extends KyuubiOperation(
OperationType.EXECUTE_STATEMENT, session, client, remoteSessionHandle) {
private final val _operationLog: OperationLog = if (shouldRunAsync) {
OperationLog.createOperationLog(session.handle, getHandle)
} else {
null
}
override def getOperationLog: Option[OperationLog] = Option(_operationLog)
private lazy val statusReq = new TGetOperationStatusReq(_remoteOpHandle)
private lazy val fetchLogReq = {
val req = new TFetchResultsReq(_remoteOpHandle, TFetchOrientation.FETCH_NEXT, 1000)
req.setFetchType(1.toShort)
req
}
override def beforeRun(): Unit = {
OperationLog.setCurrentOperationLog(_operationLog)
setHasResultSet(true)
setState(OperationState.PENDING)
}
override protected def afterRun(): Unit = {}
override protected def afterRun(): Unit = {
OperationLog.removeCurrentOperationLog()
}
private def executeStatement(): Unit = {
try {
@ -58,40 +75,52 @@ class ExecuteStatement(
var statusResp = client.GetOperationStatus(statusReq)
var isComplete = false
while (!isComplete) {
getQueryLog()
verifyTStatus(statusResp.getStatus)
val remoteState = statusResp.getOperationState
info(s"Query[$statementId] in ${remoteState.name()}")
isComplete = true
remoteState match {
case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE =>
isComplete = false
statusResp = client.GetOperationStatus(statusReq)
case FINISHED_STATE =>
setState(OperationState.FINISHED)
isComplete = true
case CLOSED_STATE =>
setState(OperationState.CLOSED)
isComplete = true
case CANCELED_STATE =>
setState(OperationState.CANCELED)
isComplete = true
case TIMEDOUT_STATE =>
setState(OperationState.TIMEOUT)
isComplete = true
case ERROR_STATE =>
setState(OperationState.ERROR)
val ke = KyuubiSQLException(statusResp.getErrorMessage)
setOperationException(ke)
throw ke
case UKNOWN_STATE =>
setState(OperationState.ERROR)
val ke = KyuubiSQLException(s"UNKNOWN STATE for $statement")
setOperationException(ke)
throw ke
}
}
// see if anymore log could be fetched
getQueryLog()
}
private def getQueryLog(): Unit = {
getOperationLog.foreach { logger =>
try {
val resp = client.FetchResults(fetchLogReq)
verifyTStatus(resp.getStatus)
val logs = resp.getResults.getColumns.get(0).getStringVal.getValues.asScala
logs.foreach(log => logger.write(log + "\n"))
} catch {
case _: Exception => // do nothing
}
}
}
@ -107,14 +136,7 @@ class ExecuteStatement(
val backgroundOperation =
sessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundOperation)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
val ke = KyuubiSQLException("Error submitting query in background, query rejected",
rejected)
setOperationException(ke)
throw ke
}
} catch onError("submitting query in background, query rejected")
} else {
setState(OperationState.RUNNING)
executeStatement()

View File

@ -19,9 +19,10 @@ package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationType.OperationType
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
abstract class KyuubiOperation(
@ -35,11 +36,11 @@ abstract class KyuubiOperation(
def remoteOpHandle(): TOperationHandle = _remoteOpHandle
protected def verifyTStatus(tStatus: TStatus): Unit = {
if (tStatus.getStatusCode != TStatusCode.SUCCESS_STATUS) {
throw KyuubiSQLException(tStatus)
}
ThriftUtils.verifyTStatus(tStatus)
}
override def getOperationLog: Option[OperationLog] = None
protected def onError(action: String = "operating"): PartialFunction[Throwable, Unit] = {
case e: Exception =>
state.synchronized {
@ -72,7 +73,7 @@ abstract class KyuubiOperation(
}
override def cancel(): Unit = {
if (_remoteOpHandle != null && !isTerminalState(state)) {
if (_remoteOpHandle != null && !isClosedOrCanceled) {
try {
val req = new TCancelOperationReq(_remoteOpHandle)
val resp = client.CancelOperation(req)
@ -83,8 +84,9 @@ abstract class KyuubiOperation(
}
override def close(): Unit = {
if (_remoteOpHandle != null && !isTerminalState(state)) {
if (_remoteOpHandle != null && !isClosedOrCanceled) {
try {
getOperationLog.foreach(_.close())
val req = new TCloseOperationReq(_remoteOpHandle)
val resp = client.CloseOperation(req)
verifyTStatus(resp.getStatus)

View File

@ -21,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap
import org.apache.hive.service.rpc.thrift.{TCLIService, TFetchResultsReq, TRow, TRowSet, TSessionHandle}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.{Session, SessionHandle}
@ -142,26 +142,28 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
addOperation(operation)
}
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
maxRows: Int): TRowSet = {
val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation]
val client = getThriftClient(operation.getSession.handle)
order: FetchOrientation, maxRows: Int): TRowSet = {
// TODO:(kentyao): In async mode, if we query log like below, will put very heavy load on engine
// side and get thrift err like:
// org.apache.thrift.transport.TTransportException: Read a negative frame size (-2147418110)!
val tOperationHandle = operation.remoteOpHandle()
if (tOperationHandle == null) {
new TRowSet(0, new java.util.ArrayList[TRow](0))
} else {
// val orientation = FetchOrientation.toTFetchOrientation(order)
// val req = new TFetchResultsReq(tOperationHandle, orientation, maxRows)
// req.setFetchType(1.toShort)
// val resp = client.FetchResults(req)
// resp.getResults
new TRowSet(0, new java.util.ArrayList[TRow](0))
val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation]
val operationLog = operation.getOperationLog
operationLog match {
case Some(log) => log.read(maxRows)
case None =>
val remoteHandle = operation.remoteOpHandle()
val client = getThriftClient(operation.getSession.handle)
if (remoteHandle != null) {
val or = FetchOrientation.toTFetchOrientation(order)
val req = new TFetchResultsReq(remoteHandle, or, maxRows)
val resp = client.FetchResults(req)
ThriftUtils.verifyTStatus(resp.getStatus)
resp.getResults
} else {
ThriftUtils.EMPTY_ROW_SET
}
}
}
}

View File

@ -28,7 +28,7 @@ import org.apache.thrift.TException
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.{TSocket, TTransport}
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
@ -81,6 +81,7 @@ class KyuubiSessionImpl(
}
override def open(): Unit = {
super.open()
// Init zookeeper client here to capture errors
zkClient
getServerHost match {
@ -120,24 +121,19 @@ class KyuubiSessionImpl(
req.setPassword(passwd)
req.setConfiguration(conf.asJava)
val resp = client.OpenSession(req)
verifyTStatus(resp.getStatus)
ThriftUtils.verifyTStatus(resp.getStatus)
remoteSessionHandle = resp.getSessionHandle
sessionManager.operationManager.setConnection(handle, client, remoteSessionHandle)
}
protected def verifyTStatus(tStatus: TStatus): Unit = {
if (tStatus.getStatusCode != TStatusCode.SUCCESS_STATUS) {
throw KyuubiSQLException(tStatus)
}
}
override def close(): Unit = {
super.close()
sessionManager.operationManager.removeConnection(handle)
try {
if (remoteSessionHandle != null) {
val req = new TCloseSessionReq(remoteSessionHandle)
client.CloseSession(req)
val resp = client.CloseSession(req)
ThriftUtils.verifyTStatus(resp.getStatus)
}
} catch {
case e: TException =>

View File

@ -65,7 +65,6 @@
<scalatest.version>3.0.3</scalatest.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.version>3.6.3</maven.version>
<spark.group>org.apache.spark</spark.group>
<spark.version>3.0.1</spark.version>
<log4j.version>1.7.30</log4j.version>
</properties>