Create Kyuubi Project Spark SQL Engine

This commit is contained in:
Kent Yao 2020-06-09 10:34:47 +08:00
parent 85689b6b64
commit 15d179507f
13 changed files with 312 additions and 22 deletions

View File

@ -23,7 +23,7 @@
<parent>
<artifactId>kyuubi</artifactId>
<groupId>yaooqinn</groupId>
<groupId>org.apache.kyuubi</groupId>
<version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@ -34,9 +34,9 @@
<dependencies>
<dependency>
<groupId>yaooqinn</groupId>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
<version>${project.version}</version>
<version>0.8.0-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -20,8 +20,8 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi</artifactId>
<groupId>yaooqinn</groupId>
<version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

View File

@ -19,5 +19,5 @@ package org.apache.kyuubi
package object config {
final val KYUUBI_PREFIX = "kyuubi."
final val SPARK_PREFIX = "spark."
}

View File

@ -21,7 +21,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>kyuubi</artifactId>
<groupId>yaooqinn</groupId>
<groupId>org.apache.kyuubi</groupId>
<version>0.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -21,7 +21,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>kyuubi</artifactId>
<groupId>yaooqinn</groupId>
<groupId>org.apache.kyuubi</groupId>
<version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@ -34,13 +34,13 @@
<dependencies>
<dependency>
<groupId>yaooqinn</groupId>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>yaooqinn</groupId>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-thrift</artifactId>
<version>${project.version}</version>
</dependency>
@ -48,16 +48,19 @@
<dependency>
<groupId>${spark.group}</groupId>
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>${spark.group}</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>${spark.group}</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
@ -101,7 +104,7 @@
</dependency>
<dependency>
<groupId>yaooqinn</groupId>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
@ -110,16 +113,19 @@
<dependency>
<groupId>${spark.group}</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>2.4.5</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>${spark.group}</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>2.4.5</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>${spark.group}</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>2.4.5</version>
<type>test-jar</type>
</dependency>
<dependency>
@ -247,8 +253,8 @@
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>yaooqinn:kyuubi-common</include>
<include>yaooqinn:kyuubi-thrift</include>
<include>org.apache.kyuubi:kyuubi-common</include>
<include>org.apache.kyuubi:kyuubi-thrift</include>
</includes>
</artifactSet>
</configuration>

View File

@ -17,8 +17,6 @@
package yaooqinn.kyuubi.server
import org.apache.spark.KyuubiConf._
import org.apache.spark.deploy.SparkHadoopUtil
import java.net.{InetAddress, ServerSocket}
import java.util.concurrent.TimeUnit
@ -27,9 +25,9 @@ import scala.util.{Failure, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.hive.service.cli.thrift._
import org.apache.kyuubi.Logging
import org.apache.kyuubi.util.ExecutorPoolCaptureOom
import org.apache.spark.{KyuubiConf, SparkConf}
import org.apache.spark.KyuubiConf._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.thrift.protocol.{TBinaryProtocol, TProtocol}
import org.apache.thrift.server.{ServerContext, TServer, TServerEventHandler, TThreadPoolServer}
import org.apache.thrift.transport.{TServerSocket, TTransport}
@ -42,6 +40,9 @@ import yaooqinn.kyuubi.schema.{SchemaMapper, SparkTableTypes}
import yaooqinn.kyuubi.service.{AbstractService, ServiceException, ServiceUtils}
import yaooqinn.kyuubi.session.SessionHandle
import org.apache.kyuubi.Logging
import org.apache.kyuubi.util.ExecutorPoolCaptureOom
/**
* [[FrontendService]] keeps compatible with all kinds of Hive JDBC/Thrift Client Connections
*

View File

@ -19,15 +19,14 @@ package yaooqinn.kyuubi.server
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.kyuubi.Logging
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
import org.apache.spark.KyuubiConf._
import yaooqinn.kyuubi._
import yaooqinn.kyuubi.ha.{FailoverService, HighAvailableService, LoadBalanceService}
import yaooqinn.kyuubi.metrics.MetricsSystem
import yaooqinn.kyuubi.service.{CompositeService, ServiceException}
import org.apache.kyuubi.Logging
/**
* Main entrance of Kyuubi Server
*/

View File

@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>kyuubi</artifactId>
<groupId>org.apache.kyuubi</groupId>
<version>0.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kyuubi-spark-sql-engine</artifactId>
<name>Kyuubi Project Spark SQL Engine</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
<version>3.0.0-preview2</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.spark;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.kyuubi.util.ExecutorPoolCaptureOom;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import javax.net.ssl.SSLServerSocket;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class KyuubiThriftBinaryCliService extends ThriftCLIService {
public KyuubiThriftBinaryCliService(CLIService cliService) {
super(cliService, KyuubiThriftBinaryCliService.class.getSimpleName());
}
@Override
public void run() {
// a hook stop the app when oom occurs
Runnable hook = new Runnable() {
@Override
public void run() {
stop();
}
};
try {
ExecutorPoolCaptureOom executorService = new ExecutorPoolCaptureOom(
"threadPoolName",
minWorkerThreads,
maxWorkerThreads,
workerKeepAliveTime, hook);
hiveAuthFactory = new HiveAuthFactory(hiveConf);
TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
TServerSocket serverSocket = null;
List<String> sslVersionBlacklist = new ArrayList<String>();
if (!hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_USE_SSL)) {
serverSocket = getServerSocket(hiveHost, portNum);
} else {
String keyStorePath = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH).trim();
if (keyStorePath.isEmpty()) {
throw new IllegalArgumentException(HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PATH.varname
+ " Not configured for SSL connection");
}
String keyStorePassword = ShimLoader.getHadoopShims().getPassword(hiveConf,
HiveConf.ConfVars.HIVE_SERVER2_SSL_KEYSTORE_PASSWORD.varname);
serverSocket = getServerSSLSocket(hiveHost, portNum, keyStorePath,
keyStorePassword, sslVersionBlacklist);
}
portNum = serverSocket.getServerSocket().getLocalPort();
// Server args
int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
int requestTimeout = (int) hiveConf.getTimeVar(
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);
int beBackoffSlotLength = (int) hiveConf.getTimeVar(
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
.processorFactory(processorFactory).transportFactory(transportFactory)
.protocolFactory(new TBinaryProtocol.Factory())
.inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
.requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
.beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
.executorService(executorService);
// TCP Server
server = new TThreadPoolServer(sargs);
server.setServerEventHandler(serverEventHandler);
String msg = "Starting " + getName() + " on port "
+ portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
LOG.info(msg);
server.serve();
} catch (Throwable t) {
LOG.error("Error starting" + getName(), t);
stop();
}
}
public static TServerSocket getServerSocket(String hiveHost, int portNum)
throws TTransportException {
InetSocketAddress serverAddress;
if (hiveHost == null || hiveHost.isEmpty()) {
// Wildcard bind
serverAddress = new InetSocketAddress(portNum);
} else {
serverAddress = new InetSocketAddress(hiveHost, portNum);
}
return new TServerSocket(serverAddress);
}
public static TServerSocket getServerSSLSocket(String hiveHost, int portNum, String keyStorePath,
String keyStorePassWord, List<String> sslVersionBlacklist) throws TTransportException, UnknownHostException {
TSSLTransportFactory.TSSLTransportParameters params =
new TSSLTransportFactory.TSSLTransportParameters();
params.setKeyStore(keyStorePath, keyStorePassWord);
InetSocketAddress serverAddress;
if (hiveHost == null || hiveHost.isEmpty()) {
// Wildcard bind
serverAddress = new InetSocketAddress(portNum);
} else {
serverAddress = new InetSocketAddress(hiveHost, portNum);
}
TServerSocket thriftServerSocket =
TSSLTransportFactory.getServerSocket(portNum, 0, serverAddress.getAddress(), params);
if (thriftServerSocket.getServerSocket() instanceof SSLServerSocket) {
List<String> sslVersionBlacklistLocal = new ArrayList<String>();
for (String sslVersion : sslVersionBlacklist) {
sslVersionBlacklistLocal.add(sslVersion.trim().toLowerCase());
}
SSLServerSocket sslServerSocket = (SSLServerSocket) thriftServerSocket.getServerSocket();
List<String> enabledProtocols = new ArrayList<String>();
for (String protocol : sslServerSocket.getEnabledProtocols()) {
if (sslVersionBlacklistLocal.contains(protocol.toLowerCase())) {
LOG.debug("Disabling SSL Protocol: " + protocol);
} else {
enabledProtocols.add(protocol);
}
}
sslServerSocket.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
LOG.info("SSL Server Socket Enabled Protocols: "
+ Arrays.toString(sslServerSocket.getEnabledProtocols()));
}
return thriftServerSocket;
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.spark
import scala.collection.JavaConverters._
import org.apache.hive.service.cli.CLIService
import org.apache.hive.service.cli.thrift.ThriftCLIService
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
object SparkSQLEngineApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf(loadDefaults = true)
val session = SparkSession.builder()
.config(conf)
.appName("Kyuubi Spark SQL Engine App")
.getOrCreate()
val server = HiveThriftServer2.startWithContext(session.sqlContext)
var thriftCLIService: ThriftCLIService = null
var cliService: CLIService = null
server.getServices.asScala.foreach {
case t: ThriftCLIService if t.getPortNumber > 0 =>
thriftCLIService = t
case c: CLIService => cliService = c
case _ =>
}
if (thriftCLIService.getPortNumber <= 0) {
thriftCLIService.stop()
try {
thriftCLIService = new KyuubiThriftBinaryCliService(cliService)
thriftCLIService.init(server.getHiveConf)
thriftCLIService.start()
}
}
val port = thriftCLIService.getPortNumber
val hostName = thriftCLIService.getServerIPAddress.getHostName
}
}

View File

@ -21,7 +21,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>kyuubi</artifactId>
<groupId>yaooqinn</groupId>
<groupId>org.apache.kyuubi</groupId>
<version>0.8.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -20,7 +20,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>yaooqinn</groupId>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi</artifactId>
<name>Kyuubi Project Parent</name>
<version>0.8.0-SNAPSHOT</version>
@ -30,6 +30,7 @@
<module>kyuubi-server</module>
<module>kyuubi-assembly</module>
<module>kyuubi-hive-thrift</module>
<module>kyuubi-spark-sql-engine</module>
</modules>
<packaging>pom</packaging>

View File

@ -206,7 +206,7 @@ This file is divided into 3 sections:
<parameter name="groups">java,scala,3rdParty,kyuubi</parameter>
<parameter name="group.java">javax?\..*</parameter>
<parameter name="group.scala">scala\..*</parameter>
<parameter name="group.3rdParty">(?!org\.apache\.spark\.).*</parameter>
<parameter name="group.3rdParty">(?!org\.apache\.kyuubi\.).*</parameter>
<parameter name="group.kyuubi">org\.apache\.kyuubi\..*</parameter>
</parameters>
</check>