diff --git a/.gitignore b/.gitignore
index ef7d1d68f..feea82254 100644
--- a/.gitignore
+++ b/.gitignore
@@ -45,9 +45,11 @@ metrics/report.json
metrics/.report.json.crc
/kyuubi-ha/embedded_zookeeper/
embedded_zookeeper/
-/externals/kyuubi-spark-sql-engine/kyuubi_operation_logs/
+/externals/kyuubi-spark-sql-engine/operation_logs/
/externals/kyuubi-spark-sql-engine/spark-warehouse/
/work/
/docs/_build/
/kyuubi-common/metrics/
-kyuubi-common/kyuubi_operation_logs/
+kyuubi-common/operation_logs/
+/kyuubi-common/operation_logs/
+/kyuubi-main/operation_logs/
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index b50bb7892..1479ad3e2 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -138,7 +138,7 @@ kyuubi\.frontend\.bind
\.host|
10009
|Port of the machine on which to run the frontend service.
|1.0.0
kyuubi\.frontend\.login
\.timeout|PT20S
|Timeout for Thrift clients during login to the frontend service.
|1.0.0
kyuubi\.frontend\.max
\.message\.size|104857600
|Maximum message size in bytes a Kyuubi server will accept.
|1.0.0
-kyuubi\.frontend\.max
\.worker\.threads|99
|Maximum number of threads in the of frontend worker thread pool for the frontend service
|1.0.0
+kyuubi\.frontend\.max
\.worker\.threads|999
|Maximum number of threads in the of frontend worker thread pool for the frontend service
|1.0.0
kyuubi\.frontend\.min
\.worker\.threads|9
|Minimum number of threads in the of frontend worker thread pool for the frontend service
|1.0.0
kyuubi\.frontend
\.worker\.keepalive\.time|PT1M
|Keep-alive time (in milliseconds) for an idle worker thread
|1.0.0
diff --git a/docs/deployment/trouble_shooting.md b/docs/deployment/trouble_shooting.md
index ee5da8a11..342067cae 100644
--- a/docs/deployment/trouble_shooting.md
+++ b/docs/deployment/trouble_shooting.md
@@ -185,3 +185,13 @@ This error means that you are using incompatible version of Hive metastore clien
To fix this problem you could use a compatible version for Hive client by configuring
`spark.sql.hive.metastore.jars` and `spark.sql.hive.metastore.version` at Spark side.
+
+
+### hive.server2.thrift.max.worker.threads
+
+```java
+Unexpected end of file when reading from HS2 server. The root cause might be too many concurrent connections. Please ask the administrator to check the number of active connections, and adjust hive.server2.thrift.max.worker.threads if applicable.
+Error: org.apache.thrift.transport.TTransportException (state=08S01,code=0)
+```
+
+In Kyuubi, we should increase `kyuubi.frontend.min.worker.threads` instead of `hive.server2.thrift.max.worker.threads`
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 32f9ecf34..f4e733843 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
-import org.apache.kyuubi.engine.spark.operation.log.OperationLog
import org.apache.kyuubi.operation.{OperationState, OperationType}
+import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
class ExecuteStatement(
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 602127949..226127318 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -23,11 +23,11 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.engine.spark.operation.log.OperationLog
import org.apache.kyuubi.operation.{AbstractOperation, OperationState}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.OperationType.OperationType
+import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.schema.{RowSet, SchemaHelper}
import org.apache.kyuubi.session.Session
@@ -38,8 +38,8 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
protected final val operationLog: OperationLog =
OperationLog.createOperationLog(session.handle, getHandle)
+ override def getOperationLog: Option[OperationLog] = Option(operationLog)
- def getOperationLog: OperationLog = operationLog
protected def resultSchema: StructType
@@ -88,11 +88,12 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
case e: Exception =>
if (cancel) spark.sparkContext.cancelJobGroup(statementId)
state.synchronized {
+ val errMsg = KyuubiSQLException.stringifyException(e)
if (isTerminalState(state)) {
- warn(s"Ignore exception in terminal state with $statementId: $e")
+ warn(s"Ignore exception in terminal state with $statementId: $errMsg")
} else {
setState(OperationState.ERROR)
- val ke = KyuubiSQLException(s"Error operating $opType: ${e.getMessage}", e)
+ val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
setOperationException(ke)
throw ke
}
@@ -121,7 +122,7 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
override def close(): Unit = {
cleanup(OperationState.CLOSED)
- if (operationLog != null) operationLog.close()
+ getOperationLog.foreach(_.close())
}
override def getResultSetSchema: TTableSchema = SchemaHelper.toTTableSchema(resultSchema)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index d5bbe222e..0d0ae06cf 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -19,14 +19,10 @@ package org.apache.kyuubi.engine.spark.operation
import java.util.concurrent.ConcurrentHashMap
-import org.apache.hive.service.rpc.thrift.TRowSet
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.KyuubiSQLException
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.spark.operation.log.LogDivertAppender
-import org.apache.kyuubi.operation.{Operation, OperationHandle, OperationManager}
-import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.operation.{Operation, OperationManager}
import org.apache.kyuubi.session.{Session, SessionHandle}
class SparkSQLOperationManager private (name: String) extends OperationManager(name) {
@@ -122,17 +118,4 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
val op = new GetFunctions(spark, session, catalogName, schemaName, functionName)
addOperation(op)
}
-
- override def initialize(conf: KyuubiConf): Unit = {
- LogDivertAppender.initialize()
- super.initialize(conf)
- }
-
- override def getOperationLogRowSet(
- opHandle: OperationHandle,
- order: FetchOrientation,
- maxRows: Int): TRowSet = {
- val log = getOperation(opHandle).asInstanceOf[SparkOperation].getOperationLog
- log.read(maxRows)
- }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
index b0177aded..fffcc0b09 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/session/SparkSessionImpl.scala
@@ -19,7 +19,6 @@ package org.apache.kyuubi.engine.spark.session
import org.apache.hive.service.rpc.thrift.TProtocolVersion
-import org.apache.kyuubi.engine.spark.operation.log.OperationLog
import org.apache.kyuubi.session.{AbstractSession, SessionManager}
class SparkSessionImpl(
@@ -31,7 +30,4 @@ class SparkSessionImpl(
sessionManager: SessionManager)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
- override def open(): Unit = {
- OperationLog.createOperationLogRootDirectory(handle)
- }
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
index 30e32d665..481f3c2bd 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkOperationSuite.scala
@@ -138,12 +138,10 @@ class SparkOperationSuite extends WithSparkSQLEngine {
}
val e = intercept[HiveSQLException](metaData.getColumns(null, "*", null, null))
- assert(e.getCause.getMessage === "org.apache.kyuubi.KyuubiSQLException:" +
- "Error operating GET_COLUMNS: Dangling meta character '*' near index 0\n*\n^")
+ assert(e.getCause.getMessage contains "Dangling meta character '*' near index 0\n*\n^")
val e1 = intercept[HiveSQLException](metaData.getColumns(null, null, null, "*"))
- assert(e1.getCause.getMessage === "org.apache.kyuubi.KyuubiSQLException:" +
- "Error operating GET_COLUMNS: Dangling meta character '*' near index 0\n*\n^")
+ assert(e1.getCause.getMessage contains "Dangling meta character '*' near index 0\n*\n^")
}
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala
index 148181ce4..2c88fc35a 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/KyuubiSQLException.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi
+import java.io.{PrintWriter, StringWriter}
import java.sql.SQLException
import scala.collection.JavaConverters._
@@ -147,4 +148,12 @@ object KyuubiSQLException {
ex
}
+ def stringifyException(e: Throwable): String = {
+ val stm = new StringWriter
+ val wrt = new PrintWriter(stm)
+ e.printStackTrace(wrt)
+ wrt.close()
+ stm.toString
+ }
+
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index d12927c9e..6703a6c4c 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -27,6 +27,8 @@ import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP}
case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
+ import KyuubiConf._
+
private val settings = new ConcurrentHashMap[String, String]()
private lazy val reader: ConfigProvider = new ConfigProvider(settings)
@@ -111,6 +113,16 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
cloned
}
+ private val serverOnlyConfEntries: Set[ConfigEntry[_]] = Set(
+ EMBEDDED_ZK_PORT,
+ EMBEDDED_ZK_TEMP_DIR,
+ FRONTEND_BIND_HOST,
+ FRONTEND_BIND_PORT,
+ AUTHENTICATION_METHOD,
+ SERVER_KEYTAB,
+ SERVER_PRINCIPAL,
+ KINIT_INTERVAL)
+
def getUserDefaults(user: String): KyuubiConf = {
val cloned = KyuubiConf(false)
@@ -121,6 +133,7 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
for ((k, v) <- getAllWithPrefix(s"___${user}___", "")) {
cloned.set(k, v)
}
+ serverOnlyConfEntries.foreach(cloned.unset)
cloned
}
@@ -239,7 +252,7 @@ object KyuubiConf {
" service")
.version("1.0.0")
.intConf
- .createWithDefault(99)
+ .createWithDefault(999)
val FRONTEND_WORKER_KEEPALIVE_TIME: ConfigEntry[Long] =
buildConf("frontend.worker.keepalive.time")
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
index a728e24ea..a5e84f7ae 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/Operation.scala
@@ -19,9 +19,11 @@ package org.apache.kyuubi.operation
import java.util.concurrent.Future
-import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet, TTableSchema}
+import org.apache.hive.service.rpc.thrift.{TProtocolVersion, TRowSet, TStatus, TStatusCode, TTableSchema}
+import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
trait Operation {
@@ -37,6 +39,7 @@ trait Operation {
def getSession: Session
def getHandle: OperationHandle
def getStatus: OperationStatus
+ def getOperationLog: Option[OperationLog]
def getBackgroundHandle: Future[_]
def shouldRunAsync: Boolean
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
index d7968e706..4fadc1ecc 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
@@ -20,7 +20,9 @@ package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.operation.log.LogDivertAppender
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.session.Session
@@ -34,6 +36,11 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
private final val handleToOperation = new java.util.HashMap[OperationHandle, Operation]()
+ override def initialize(conf: KyuubiConf): Unit = {
+ LogDivertAppender.initialize()
+ super.initialize(conf)
+ }
+
def newExecuteStatementOperation(
session: Session,
statement: String,
@@ -110,7 +117,12 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
- maxRows: Int): TRowSet
+ maxRows: Int): TRowSet = {
+ val operationLog = getOperation(opHandle).getOperationLog
+ operationLog.map(_.read(maxRows)).getOrElse{
+ throw KyuubiSQLException(s"$opHandle failed to generate operation log")
+ }
+ }
final def removeExpiredOperations(handles: Seq[OperationHandle]): Seq[Operation] = synchronized {
handles.map(handleToOperation.get).filter { operation =>
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/LogDivertAppender.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/LogDivertAppender.scala
similarity index 97%
rename from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/LogDivertAppender.scala
rename to kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/LogDivertAppender.scala
index 2918aaef0..f8ad2c32d 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/LogDivertAppender.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/LogDivertAppender.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kyuubi.engine.spark.operation.log
+package org.apache.kyuubi.operation.log
import java.io.CharArrayWriter
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLog.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
similarity index 93%
rename from externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLog.scala
rename to kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index 50f899a67..0556c9d61 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLog.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -15,27 +15,22 @@
* limitations under the License.
*/
-package org.apache.kyuubi.engine.spark.operation.log
+package org.apache.kyuubi.operation.log
import java.io.IOException
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
-import org.apache.commons.io.FileUtils
import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, TStringColumn}
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.operation.OperationHandle
import org.apache.kyuubi.session.SessionHandle
-/**
- * TODO: This part is spark-independent, mv to kyuubi-common?
- * we can do this after we decide to support other engines
- */
object OperationLog extends Logging {
- final val LOG_ROOT: String = "kyuubi_operation_logs"
+ final val LOG_ROOT: String = "operation_logs"
private final val OPERATION_LOG: InheritableThreadLocal[OperationLog] = {
new InheritableThreadLocal[OperationLog] {
override def initialValue(): OperationLog = null
@@ -132,7 +127,7 @@ class OperationLog(path: Path) extends Logging {
try {
reader.close()
writer.close()
- FileUtils.forceDelete(path.toFile)
+ Files.delete(path)
} catch {
case e: IOException =>
error(s"Failed to remove corresponding log file of operation: ${path.toAbsolutePath}", e)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/CLIServiceProcessorFactory.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/FEServiceProcessorFactory.scala
similarity index 92%
rename from kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/CLIServiceProcessorFactory.scala
rename to kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/FEServiceProcessorFactory.scala
index 62ac87c19..0b3fd9325 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/CLIServiceProcessorFactory.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/FEServiceProcessorFactory.scala
@@ -22,7 +22,7 @@ import org.apache.thrift.{TProcessor, TProcessorFactory}
import org.apache.thrift.transport.TTransport
private[authentication]
-case class CLIServiceProcessorFactory(saslServer: HadoopThriftAuthBridgeServer, service: Iface)
+case class FEServiceProcessorFactory(saslServer: HadoopThriftAuthBridgeServer, service: Iface)
extends TProcessorFactory(null) {
override def getProcessor(trans: TTransport): TProcessor = {
val sqlProcessor = new Processor[Iface](service)
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/HadoopThriftAuthBridgeServer.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/HadoopThriftAuthBridgeServer.scala
index 4e73bc2c1..27dcb152a 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/HadoopThriftAuthBridgeServer.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/HadoopThriftAuthBridgeServer.scala
@@ -19,9 +19,8 @@ package org.apache.kyuubi.service.authentication
import java.io.IOException
import java.net.InetAddress
-import java.security.{PrivilegedAction, PrivilegedExceptionAction}
-import java.util.Locale
-import javax.security.auth.callback.{Callback, CallbackHandler, NameCallback, PasswordCallback, UnsupportedCallbackException}
+import java.security.PrivilegedAction
+import javax.security.auth.callback._
import javax.security.sasl.{AuthorizeCallback, RealmCallback}
import org.apache.commons.codec.binary.Base64
@@ -31,7 +30,7 @@ import org.apache.hadoop.security.SaslRpcServer.AuthMethod
import org.apache.hadoop.security.token.SecretManager.InvalidToken
import org.apache.thrift.{TException, TProcessor}
import org.apache.thrift.protocol.TProtocol
-import org.apache.thrift.transport.{TSaslServerTransport, TSocket, TTransport, TTransportException, TTransportFactory}
+import org.apache.thrift.transport._
import org.apache.kyuubi.Logging
@@ -72,20 +71,11 @@ class HadoopThriftAuthBridgeServer(secretMgr: KyuubiDelegationTokenManager) {
new TUGIAssumingTransportFactory(ugi, transFactory)
}
- /**
- * Wrap a TProcessor in such a way that, before processing any RPC, it
- * assumes the UserGroupInformation of the user authenticated by
- * the SASL transport.
- */
- def wrapProcessor(processor: TProcessor): TProcessor = {
- new TUGIAssumingProcessor(processor, secretMgr, userProxy = true)
- }
-
/**
* Wrap a TProcessor to capture the client information like connecting userid, ip etc
*/
def wrapNonAssumingProcessor(processor: TProcessor): TProcessor = {
- new TUGIAssumingProcessor(processor, secretMgr, userProxy = false)
+ new TUGIAssumingProcessor(processor, secretMgr)
}
def getRemoteAddress: InetAddress = REMOTE_ADDRESS.get
@@ -141,8 +131,7 @@ object HadoopThriftAuthBridgeServer {
*/
class TUGIAssumingProcessor(
wrapped: TProcessor,
- secretMgr: KyuubiDelegationTokenManager,
- userProxy: Boolean) extends TProcessor with Logging {
+ secretMgr: KyuubiDelegationTokenManager) extends TProcessor with Logging {
override def process(in: TProtocol, out: TProtocol): Boolean = {
val transport = in.getTransport
transport match {
@@ -155,48 +144,41 @@ object HadoopThriftAuthBridgeServer {
REMOTE_ADDRESS.set(socket.getInetAddress)
val mechanismName = saslServer.getMechanismName
USER_AUTH_MECHANISM.set(mechanismName)
- AuthMethod.valueOf(mechanismName.toUpperCase(Locale.ROOT)) match {
- case AuthMethod.PLAIN =>
- REMOTE_USER.set(endUser)
- wrapped.process(in, out)
- case other =>
- if (other.equals(AuthMethod.TOKEN)) try {
- val identifier = SaslRpcServer.getIdentifier(authId, secretMgr)
- endUser = identifier.getUser.getUserName
- } catch {
- case e: InvalidToken => throw new TException(e.getMessage)
- }
- var clientUgi: UserGroupInformation = null
+ if (AuthMethod.PLAIN.getMechanismName.equalsIgnoreCase(mechanismName)) {
+ REMOTE_USER.set(endUser)
+ wrapped.process(in, out)
+ } else {
+ if (AuthMethod.TOKEN.getMechanismName.equalsIgnoreCase(mechanismName)) {
try {
- if (userProxy) {
- clientUgi =
- UserGroupInformation.createProxyUser(endUser, UserGroupInformation.getLoginUser)
- REMOTE_USER.set(clientUgi.getShortUserName)
- clientUgi.doAs(new PrivilegedExceptionAction[Boolean] {
- override def run(): Boolean = try wrapped.process(in, out) catch {
- case e: TException => throw new RuntimeException(e)
- }
- })
- } else {
- val endUserUgi = UserGroupInformation.createRemoteUser(endUser)
- REMOTE_USER.set(endUserUgi.getShortUserName)
- debug(s"SET REMOTE USER: ${REMOTE_USER.get()} from endUser: $endUser")
- wrapped.process(in, out)
- }
+ val identifier = SaslRpcServer.getIdentifier(authId, secretMgr)
+ endUser = identifier.getUser.getUserName
} catch {
- case e: RuntimeException => e.getCause match {
- case t: TException => throw t
- case _ => throw e
- }
- case e: InterruptedException => throw new RuntimeException(e)
- case e: IOException => throw new RuntimeException(e)
- } finally if (clientUgi != null) try FileSystem.closeAllForUGI(clientUgi) catch {
- case e: IOException =>
- error(s"Could not clean up file-system handles for UGI: $clientUgi", e)
+ case e: InvalidToken => throw new TException(e.getMessage)
}
+ }
+ val clientUgi: UserGroupInformation = UserGroupInformation.createRemoteUser(endUser)
+ try {
+ REMOTE_USER.set(clientUgi.getShortUserName)
+ debug(s"SET REMOTE USER: ${REMOTE_USER.get()} from endUser: $clientUgi")
+ wrapped.process(in, out)
+ } catch {
+ case e: RuntimeException => e.getCause match {
+ case t: TException => throw t
+ case _ => throw e
+ }
+ case e: InterruptedException => throw new RuntimeException(e)
+ case e: IOException => throw new RuntimeException(e)
+ } finally {
+ try {
+ FileSystem.closeAllForUGI(clientUgi)
+ } catch {
+ case e: IOException =>
+ error(s"Could not clean up file-system handles for UGI: $clientUgi", e)
+ }
+ }
}
- case _ => throw new TException(s"Unexpected non-Sasl transport ${transport.getClass}")
+ case _ => throw new TException(s"Unexpected non-SASL transport ${transport.getClass}")
}
}
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala
index 4b21338fb..fa552236c 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/authentication/KyuubiAuthenticationFactory.scala
@@ -61,11 +61,15 @@ class KyuubiAuthenticationFactory(conf: KyuubiConf) {
def getTTransportFactory: TTransportFactory = {
saslServer match {
- case Some(server) => try {
- server.createSaslServerTransportFactory(getSaslProperties)
- } catch {
- case e: TTransportException => throw new LoginException(e.getMessage)
- }
+ case Some(server) =>
+ val serverTransportFactory = try {
+ server.createSaslServerTransportFactory(getSaslProperties)
+ } catch {
+ case e: TTransportException => throw new LoginException(e.getMessage)
+ }
+
+ server.wrapTransportFactory(serverTransportFactory)
+
case _ => authType match {
case NOSASL => new TTransportFactory
case _ => PlainSASLHelper.getTransportFactory(authType.toString, conf)
@@ -74,7 +78,7 @@ class KyuubiAuthenticationFactory(conf: KyuubiConf) {
}
def getTProcessorFactory(fe: Iface): TProcessorFactory = saslServer match {
- case Some(server) => CLIServiceProcessorFactory(server, fe)
+ case Some(server) => FEServiceProcessorFactory(server, fe)
case _ => PlainSASLHelper.getProcessFactory(fe)
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index 66480e1b6..8ba47dfc6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -24,6 +24,7 @@ import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtoco
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
+import org.apache.kyuubi.operation.log.OperationLog
abstract class AbstractSession(
val protocol: TProtocolVersion,
@@ -222,4 +223,9 @@ abstract class AbstractSession(
}
}
}
+
+ override def open(): Unit = {
+ OperationLog.createOperationLogRootDirectory(handle)
+ }
+
}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTests.scala
index 8a4c2f353..66bc13415 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTests.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/JDBCTests.scala
@@ -122,8 +122,7 @@ trait JDBCTests extends KyuubiFunSuite {
checkResult(metaData.getSchemas(null, "db_not_exist"), Seq.empty)
val e = intercept[HiveSQLException](metaData.getSchemas(null, "*"))
- assert(e.getCause.getMessage === "org.apache.kyuubi.KyuubiSQLException:" +
- "Error operating GET_SCHEMAS: Dangling meta character '*' near index 0\n*\n^")
+ assert(e.getCause.getMessage contains "Dangling meta character '*' near index 0\n*\n^")
}
}
@@ -185,8 +184,7 @@ trait JDBCTests extends KyuubiFunSuite {
}
val e = intercept[HiveSQLException](metaData.getTables(null, "*", null, null))
- assert(e.getCause.getMessage === "org.apache.kyuubi.KyuubiSQLException:" +
- "Error operating GET_TABLES: Dangling meta character '*' near index 0\n*\n^")
+ assert(e.getCause.getMessage contains "Dangling meta character '*' near index 0\n*\n^")
}
}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
index 5a0e4f8cc..9e249a56c 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperation.scala
@@ -26,6 +26,7 @@ import org.apache.hive.service.rpc.thrift.{TColumn, TColumnDesc, TPrimitiveTypeE
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationType.OperationType
+import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
class NoopOperation(typ: OperationType, session: Session, shouldFail: Boolean = false)
@@ -48,7 +49,6 @@ class NoopOperation(typ: OperationType, session: Session, shouldFail: Boolean =
if (!OperationState.isTerminal(state)) {
setState(OperationState.FINISHED)
}
-
}
override def cancel(): Unit = {
@@ -81,4 +81,6 @@ class NoopOperation(typ: OperationType, session: Session, shouldFail: Boolean =
}
override def shouldRunAsync: Boolean = false
+
+ override def getOperationLog: Option[OperationLog] = None
}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLogSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
similarity index 98%
rename from externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLogSuite.scala
rename to kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index 3a595e1e5..2251a2a31 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/log/OperationLogSuite.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.kyuubi.engine.spark.operation.log
+package org.apache.kyuubi.operation.log
import java.nio.file.{Files, Paths}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/ThriftUtils.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/ThriftUtils.scala
new file mode 100644
index 000000000..738c0cc00
--- /dev/null
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/ThriftUtils.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi
+
+import org.apache.hive.service.rpc.thrift.{TRow, TRowSet, TStatus, TStatusCode}
+
+object ThriftUtils {
+
+ def verifyTStatus(tStatus: TStatus): Unit = {
+ if (tStatus.getStatusCode != TStatusCode.SUCCESS_STATUS) {
+ throw KyuubiSQLException(tStatus)
+ }
+ }
+
+ val EMPTY_ROW_SET = new TRowSet(0, new java.util.ArrayList[TRow](0))
+
+}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
index 58c449f17..e019badac 100644
--- a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala
@@ -17,12 +17,13 @@
package org.apache.kyuubi.operation
-import java.util.concurrent.RejectedExecutionException
+import scala.collection.JavaConverters._
-import org.apache.hive.service.rpc.thrift.{TCLIService, TExecuteStatementReq, TGetOperationStatusReq, TSessionHandle}
+import org.apache.hive.service.rpc.thrift.{TCLIService, TExecuteStatementReq, TFetchOrientation, TFetchResultsReq, TGetOperationStatusReq, TSessionHandle}
import org.apache.hive.service.rpc.thrift.TOperationState._
import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
class ExecuteStatement(
@@ -34,14 +35,30 @@ class ExecuteStatement(
extends KyuubiOperation(
OperationType.EXECUTE_STATEMENT, session, client, remoteSessionHandle) {
+ private final val _operationLog: OperationLog = if (shouldRunAsync) {
+ OperationLog.createOperationLog(session.handle, getHandle)
+ } else {
+ null
+ }
+
+ override def getOperationLog: Option[OperationLog] = Option(_operationLog)
+
private lazy val statusReq = new TGetOperationStatusReq(_remoteOpHandle)
+ private lazy val fetchLogReq = {
+ val req = new TFetchResultsReq(_remoteOpHandle, TFetchOrientation.FETCH_NEXT, 1000)
+ req.setFetchType(1.toShort)
+ req
+ }
override def beforeRun(): Unit = {
+ OperationLog.setCurrentOperationLog(_operationLog)
setHasResultSet(true)
setState(OperationState.PENDING)
}
- override protected def afterRun(): Unit = {}
+ override protected def afterRun(): Unit = {
+ OperationLog.removeCurrentOperationLog()
+ }
private def executeStatement(): Unit = {
try {
@@ -58,40 +75,52 @@ class ExecuteStatement(
var statusResp = client.GetOperationStatus(statusReq)
var isComplete = false
while (!isComplete) {
+ getQueryLog()
verifyTStatus(statusResp.getStatus)
val remoteState = statusResp.getOperationState
info(s"Query[$statementId] in ${remoteState.name()}")
+ isComplete = true
remoteState match {
case INITIALIZED_STATE | PENDING_STATE | RUNNING_STATE =>
+ isComplete = false
statusResp = client.GetOperationStatus(statusReq)
case FINISHED_STATE =>
setState(OperationState.FINISHED)
- isComplete = true
case CLOSED_STATE =>
setState(OperationState.CLOSED)
- isComplete = true
case CANCELED_STATE =>
setState(OperationState.CANCELED)
- isComplete = true
case TIMEDOUT_STATE =>
setState(OperationState.TIMEOUT)
- isComplete = true
case ERROR_STATE =>
setState(OperationState.ERROR)
val ke = KyuubiSQLException(statusResp.getErrorMessage)
setOperationException(ke)
- throw ke
case UKNOWN_STATE =>
setState(OperationState.ERROR)
val ke = KyuubiSQLException(s"UNKNOWN STATE for $statement")
setOperationException(ke)
- throw ke
+ }
+ }
+ // see if anymore log could be fetched
+ getQueryLog()
+ }
+
+ private def getQueryLog(): Unit = {
+ getOperationLog.foreach { logger =>
+ try {
+ val resp = client.FetchResults(fetchLogReq)
+ verifyTStatus(resp.getStatus)
+ val logs = resp.getResults.getColumns.get(0).getStringVal.getValues.asScala
+ logs.foreach(log => logger.write(log + "\n"))
+ } catch {
+ case _: Exception => // do nothing
}
}
}
@@ -107,14 +136,7 @@ class ExecuteStatement(
val backgroundOperation =
sessionManager.submitBackgroundOperation(asyncOperation)
setBackgroundHandle(backgroundOperation)
- } catch {
- case rejected: RejectedExecutionException =>
- setState(OperationState.ERROR)
- val ke = KyuubiSQLException("Error submitting query in background, query rejected",
- rejected)
- setOperationException(ke)
- throw ke
- }
+ } catch onError("submitting query in background, query rejected")
} else {
setState(OperationState.RUNNING)
executeStatement()
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
index 1859c1930..5d82010a8 100644
--- a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala
@@ -19,9 +19,10 @@ package org.apache.kyuubi.operation
import org.apache.hive.service.rpc.thrift._
-import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.operation.OperationType.OperationType
+import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
abstract class KyuubiOperation(
@@ -35,11 +36,11 @@ abstract class KyuubiOperation(
def remoteOpHandle(): TOperationHandle = _remoteOpHandle
protected def verifyTStatus(tStatus: TStatus): Unit = {
- if (tStatus.getStatusCode != TStatusCode.SUCCESS_STATUS) {
- throw KyuubiSQLException(tStatus)
- }
+ ThriftUtils.verifyTStatus(tStatus)
}
+ override def getOperationLog: Option[OperationLog] = None
+
protected def onError(action: String = "operating"): PartialFunction[Throwable, Unit] = {
case e: Exception =>
state.synchronized {
@@ -72,7 +73,7 @@ abstract class KyuubiOperation(
}
override def cancel(): Unit = {
- if (_remoteOpHandle != null && !isTerminalState(state)) {
+ if (_remoteOpHandle != null && !isClosedOrCanceled) {
try {
val req = new TCancelOperationReq(_remoteOpHandle)
val resp = client.CancelOperation(req)
@@ -83,8 +84,9 @@ abstract class KyuubiOperation(
}
override def close(): Unit = {
- if (_remoteOpHandle != null && !isTerminalState(state)) {
+ if (_remoteOpHandle != null && !isClosedOrCanceled) {
try {
+ getOperationLog.foreach(_.close())
val req = new TCloseOperationReq(_remoteOpHandle)
val resp = client.CloseOperation(req)
verifyTStatus(resp.getStatus)
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index 15d8f714f..181e30f54 100644
--- a/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap
import org.apache.hive.service.rpc.thrift.{TCLIService, TFetchResultsReq, TRow, TRowSet, TSessionHandle}
-import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils}
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.{Session, SessionHandle}
@@ -142,26 +142,28 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
addOperation(operation)
}
+
override def getOperationLogRowSet(
opHandle: OperationHandle,
- order: FetchOrientation,
- maxRows: Int): TRowSet = {
- val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation]
- val client = getThriftClient(operation.getSession.handle)
+ order: FetchOrientation, maxRows: Int): TRowSet = {
- // TODO:(kentyao): In async mode, if we query log like below, will put very heavy load on engine
- // side and get thrift err like:
- // org.apache.thrift.transport.TTransportException: Read a negative frame size (-2147418110)!
- val tOperationHandle = operation.remoteOpHandle()
- if (tOperationHandle == null) {
- new TRowSet(0, new java.util.ArrayList[TRow](0))
- } else {
-// val orientation = FetchOrientation.toTFetchOrientation(order)
-// val req = new TFetchResultsReq(tOperationHandle, orientation, maxRows)
-// req.setFetchType(1.toShort)
-// val resp = client.FetchResults(req)
-// resp.getResults
- new TRowSet(0, new java.util.ArrayList[TRow](0))
+ val operation = getOperation(opHandle).asInstanceOf[KyuubiOperation]
+ val operationLog = operation.getOperationLog
+ operationLog match {
+ case Some(log) => log.read(maxRows)
+ case None =>
+ val remoteHandle = operation.remoteOpHandle()
+ val client = getThriftClient(operation.getSession.handle)
+
+ if (remoteHandle != null) {
+ val or = FetchOrientation.toTFetchOrientation(order)
+ val req = new TFetchResultsReq(remoteHandle, or, maxRows)
+ val resp = client.FetchResults(req)
+ ThriftUtils.verifyTStatus(resp.getStatus)
+ resp.getResults
+ } else {
+ ThriftUtils.EMPTY_ROW_SET
+ }
}
}
}
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 9c77a9044..9db3b8dac 100644
--- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -28,7 +28,7 @@ import org.apache.thrift.TException
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.{TSocket, TTransport}
-import org.apache.kyuubi.KyuubiSQLException
+import org.apache.kyuubi.{KyuubiSQLException, ThriftUtils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
@@ -81,6 +81,7 @@ class KyuubiSessionImpl(
}
override def open(): Unit = {
+ super.open()
// Init zookeeper client here to capture errors
zkClient
getServerHost match {
@@ -120,24 +121,19 @@ class KyuubiSessionImpl(
req.setPassword(passwd)
req.setConfiguration(conf.asJava)
val resp = client.OpenSession(req)
- verifyTStatus(resp.getStatus)
+ ThriftUtils.verifyTStatus(resp.getStatus)
remoteSessionHandle = resp.getSessionHandle
sessionManager.operationManager.setConnection(handle, client, remoteSessionHandle)
}
- protected def verifyTStatus(tStatus: TStatus): Unit = {
- if (tStatus.getStatusCode != TStatusCode.SUCCESS_STATUS) {
- throw KyuubiSQLException(tStatus)
- }
- }
-
override def close(): Unit = {
super.close()
sessionManager.operationManager.removeConnection(handle)
try {
if (remoteSessionHandle != null) {
val req = new TCloseSessionReq(remoteSessionHandle)
- client.CloseSession(req)
+ val resp = client.CloseSession(req)
+ ThriftUtils.verifyTStatus(resp.getStatus)
}
} catch {
case e: TException =>
diff --git a/pom.xml b/pom.xml
index 36f165f80..058d2da16 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,7 +65,6 @@
3.0.3
2.12
3.6.3
- org.apache.spark
3.0.1
1.7.30