diff --git a/externals/kyuubi-spark-sql-engine/pom.xml b/externals/kyuubi-spark-sql-engine/pom.xml index 78da15bd2..3bf12ee42 100644 --- a/externals/kyuubi-spark-sql-engine/pom.xml +++ b/externals/kyuubi-spark-sql-engine/pom.xml @@ -213,6 +213,20 @@ + + + org.apache.maven.plugins + maven-jar-plugin + + + prepare-test-jar + test-compile + + test-jar + + + + diff --git a/kyuubi-hive-jdbc/README.md b/kyuubi-hive-jdbc/README.md new file mode 100644 index 000000000..7826161f0 --- /dev/null +++ b/kyuubi-hive-jdbc/README.md @@ -0,0 +1,7 @@ +# Kyuubi Hive JDBC Module + + +Aiming to make a better supported client for Kyuubi and Spark + +- Add catalog to getTables meta function for DataLakes (DONE) +- Deploy to maven central (PENDING) diff --git a/kyuubi-hive-jdbc/pom.xml b/kyuubi-hive-jdbc/pom.xml new file mode 100644 index 000000000..cd3ea851a --- /dev/null +++ b/kyuubi-hive-jdbc/pom.xml @@ -0,0 +1,113 @@ + + + + + + kyuubi + org.apache.kyuubi + 1.3.0-SNAPSHOT + + 4.0.0 + + kyuubi-hive-jdbc + Kyuubi Project Hive JDBC Client + jar + + + + + org.apache.hive + hive-jdbc + + + + org.apache.hive + hive-service-rpc + + + + org.apache.hive + hive-service + + + + org.apache.hive + hive-common + + + + org.apache.hive + hive-serde + + + + commons-lang + commons-lang + + + + org.apache.kyuubi + kyuubi-common + ${project.version} + test-jar + + + + org.apache.kyuubi + kyuubi-spark-sql-engine + ${project.version} + test + + + org.apache.kyuubi + kyuubi-spark-sql-engine + ${project.version} + test-jar + + + + org.apache.spark + spark-sql_${scala.binary.version} + test + + + + org.apache.spark + spark-core_${scala.binary.version} + test + + + commons-collections + commons-collections + test + + + + commons-io + commons-io + test + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + diff --git a/kyuubi-hive-jdbc/src/main/scala/org/apache/kyuubi/jdbc/KyuubiConnection.scala b/kyuubi-hive-jdbc/src/main/scala/org/apache/kyuubi/jdbc/KyuubiConnection.scala new file mode 100644 index 000000000..2646815cf --- /dev/null +++ b/kyuubi-hive-jdbc/src/main/scala/org/apache/kyuubi/jdbc/KyuubiConnection.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.jdbc + +import java.sql.{DatabaseMetaData, SQLException} +import java.util.Properties + +import org.apache.hive.jdbc.HiveConnection +import org.apache.hive.service.rpc.thrift.{TCLIService, TSessionHandle} + +class KyuubiConnection(url: String, info: Properties) extends HiveConnection(url, info) { + + override def getMetaData: DatabaseMetaData = { + if (isClosed) { + throw new SQLException("Connection is closed") + } else { + val clientField = classOf[HiveConnection].getDeclaredField("client") + clientField.setAccessible(true) + val client = clientField.get(this).asInstanceOf[TCLIService.Iface] + val handleField = classOf[HiveConnection].getDeclaredField("sessHandle") + handleField.setAccessible(true) + val sessionHandle = handleField.get(this).asInstanceOf[TSessionHandle] + new KyuubiDatabaseMetaData(this, client, sessionHandle) + } + } +} diff --git a/kyuubi-hive-jdbc/src/main/scala/org/apache/kyuubi/jdbc/KyuubiDatabaseMetaData.scala b/kyuubi-hive-jdbc/src/main/scala/org/apache/kyuubi/jdbc/KyuubiDatabaseMetaData.scala new file mode 100644 index 000000000..77691b84c --- /dev/null +++ b/kyuubi-hive-jdbc/src/main/scala/org/apache/kyuubi/jdbc/KyuubiDatabaseMetaData.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.jdbc + +import java.sql.{ResultSet, SQLException} + +import scala.collection.JavaConverters._ + +import org.apache.hive.jdbc.{HiveDatabaseMetaData, HiveQueryResultSet} +import org.apache.hive.service.cli.HiveSQLException +import org.apache.hive.service.rpc.thrift.{TGetTablesReq, TSessionHandle, TStatusCode} +import org.apache.hive.service.rpc.thrift.TCLIService.Iface +import org.apache.thrift.TException + +class KyuubiDatabaseMetaData(conn: KyuubiConnection, client: Iface, sessHandle: TSessionHandle) + extends HiveDatabaseMetaData(conn, client, sessHandle) { + override def getTables( + catalog: String, + schemaPattern: String, + tableNamePattern: String, + types: Array[String]): ResultSet = { + val getTableReq: TGetTablesReq = new TGetTablesReq(sessHandle) + getTableReq.setCatalogName(catalog) + getTableReq.setSchemaName(if (schemaPattern == null) "%" else schemaPattern) + getTableReq.setTableName(tableNamePattern) + if (types != null) { + getTableReq.setTableTypes(types.toList.asJava) + } + val getTableResp = try { + client.GetTables(getTableReq) + } catch { + case e: TException => throw new SQLException(e.getMessage, "08S01", e) + } + val tStatus = getTableResp.getStatus + if (tStatus.getStatusCode != TStatusCode.SUCCESS_STATUS) { + throw new HiveSQLException(tStatus) + } + new HiveQueryResultSet.Builder(conn) + .setClient(client) + .setSessionHandle(sessHandle) + .setStmtHandle(getTableResp.getOperationHandle) + .build + } +} diff --git a/kyuubi-hive-jdbc/src/main/scala/org/apache/kyuubi/jdbc/KyuubiDriver.scala b/kyuubi-hive-jdbc/src/main/scala/org/apache/kyuubi/jdbc/KyuubiDriver.scala new file mode 100644 index 000000000..2e6ff1a65 --- /dev/null +++ b/kyuubi-hive-jdbc/src/main/scala/org/apache/kyuubi/jdbc/KyuubiDriver.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.jdbc + +import java.sql.{Connection, DriverManager, SQLException} +import java.util.Properties + +import org.apache.hive.jdbc.HiveDriver + +class KyuubiDriver extends HiveDriver { + override def connect(url: String, info: Properties): Connection = { + if (acceptsURL(url)) { + new KyuubiConnection(url, info) + } else null + } +} + +object KyuubiDriver { + try { + DriverManager.registerDriver(new KyuubiDriver) + } catch { + case e: SQLException => throw new RuntimeException("Failed to register driver", e) + } +} diff --git a/kyuubi-hive-jdbc/src/test/resources/log4j.properties b/kyuubi-hive-jdbc/src/test/resources/log4j.properties new file mode 100644 index 000000000..958c9c8d1 --- /dev/null +++ b/kyuubi-hive-jdbc/src/test/resources/log4j.properties @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/unit-tests.log +log4j.rootLogger=INFO, CA, FA + +#Console Appender +log4j.appender.CA=org.apache.log4j.ConsoleAppender +log4j.appender.CA.layout=org.apache.log4j.PatternLayout +log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n +log4j.appender.CA.Threshold = FATAL + +#File Appender +log4j.appender.FA=org.apache.log4j.FileAppender +log4j.appender.FA.append=false +log4j.appender.FA.file=target/unit-tests.log +log4j.appender.FA.layout=org.apache.log4j.PatternLayout +log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n + +# Set the logger level of File Appender to WARN +log4j.appender.FA.Threshold = INFO + +# SPARK-34128:Suppress undesirable TTransportException warnings involved in THRIFT-4805 +log4j.appender.console.filter.1=org.apache.log4j.varia.StringMatchFilter +log4j.appender.console.filter.1.StringToMatch=Thrift error occurred during processing of message +log4j.appender.console.filter.1.AcceptOnMatch=false diff --git a/kyuubi-hive-jdbc/src/test/scala/org/apache/kyuubi/jdbc/KyuubiDriverSuite.scala b/kyuubi-hive-jdbc/src/test/scala/org/apache/kyuubi/jdbc/KyuubiDriverSuite.scala new file mode 100644 index 000000000..e92806d3b --- /dev/null +++ b/kyuubi-hive-jdbc/src/test/scala/org/apache/kyuubi/jdbc/KyuubiDriverSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.jdbc + +import java.util.Properties + +import org.apache.kyuubi.engine.spark.WithSparkSQLEngine +import org.apache.kyuubi.engine.spark.shim.SparkCatalogShim + +class KyuubiDriverSuite extends WithSparkSQLEngine { + + test("get tables with kyuubi driver") { + val kyuubiDriver = new KyuubiDriver() + val connection = kyuubiDriver.connect(getJdbcUrl, new Properties()) + val statement = connection.createStatement() + val table = s"${SparkCatalogShim.SESSION_CATALOG}.default.kyuubi_hive_jdbc" + try { + statement.execute(s"CREATE TABLE ${table}(key int) using parquet") + assert(connection.isInstanceOf[KyuubiConnection]) + val metaData = connection.getMetaData + assert(metaData.isInstanceOf[KyuubiDatabaseMetaData]) + val resultSet = metaData.getTables(SparkCatalogShim.SESSION_CATALOG, "default", "%", null) + assert(resultSet.next()) + assert(resultSet.getString(1) === SparkCatalogShim.SESSION_CATALOG) + assert(resultSet.getString(2) === "default") + assert(resultSet.getString(3) === "kyuubi_hive_jdbc") + } finally { + statement.execute(s"DROP TABLE ${table}") + statement.close() + connection.close() + } + connection + } + + override def withKyuubiConf: Map[String, String] = Map.empty +} diff --git a/pom.xml b/pom.xml index 49d7e4631..ca6bad6e8 100644 --- a/pom.xml +++ b/pom.xml @@ -33,6 +33,7 @@ kyuubi-common kyuubi-ctl kyuubi-ha + kyuubi-hive-jdbc kyuubi-main kyuubi-metrics kyuubi-zookeeper @@ -66,6 +67,8 @@ 4.1.1 1.15 3.2.2 + 2.8.0 + 2.6 3.10 2.12.0 1.0.0 @@ -291,12 +294,25 @@ ${commons-collections.version} + + commons-io + commons-io + ${commons-io.version} + + + + commons-lang + commons-lang + ${commons-lang.version} + + org.apache.commons commons-lang3 ${commons-lang3.version} + org.apache.spark spark-core_${scala.binary.version}