From efdd9fafc8ef76cbc850ab14b8d04f9bdf569630 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 29 Jan 2021 10:27:48 +0800 Subject: [PATCH] [KYUUBI #307]GetCatalogs supports DSv2 and keeps its backward compatibility (#307) ![yaooqinn](https://badgen.net/badge/Hello/yaooqinn/green) [![PR 307](https://badgen.net/badge/Preview/PR%20307/blue)](https://github.com/yaooqinn/kyuubi/pull/307) ![Feature](https://badgen.net/badge/Label/Feature/) [❨?❩](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info) ### Please add issue ID here? Fixes #307 ### Why are the changes needed? GetCatalogs supports DSv2 and keeps its backward compatibility ### Test Plan: - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate ![image](https://user-images.githubusercontent.com/8326978/106161043-7259c400-61c1-11eb-9beb-3326f6093284.png) - [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request * GetCatalogs supports DSv2 * pr template * nit * nit * shim * add iceberg tests * nit --- .github/PULL_REQUEST_TEMPLATE | 31 +++++--- .github/pr-badge.yml | 16 +++- .../engine/spark/operation/GetCatalogs.scala | 9 ++- .../spark/operation/GetTableTypes.scala | 3 +- .../kyuubi/engine/spark/shim/Shim_v2_4.scala | 26 +++++++ .../kyuubi/engine/spark/shim/Shim_v3_0.scala | 39 ++++++++++ .../kyuubi/engine/spark/shim/SparkShim.scala | 76 +++++++++++++++++++ .../spark/SparkSQLEngineListenerSuite.scala | 2 +- .../org/apache/kyuubi/session/package.scala | 6 ++ .../ha/client/ServiceDiscoverySuite.scala | 1 + .../ha/server/EmbeddedZkServerSuite.scala | 5 +- kyuubi-main/pom.xml | 6 ++ .../kyuubi/session/KyuubiSessionImpl.scala | 17 ++++- .../datalake/IcebergOperationSuite.scala | 60 +++++++++++++++ pom.xml | 9 +++ 15 files changed, 282 insertions(+), 24 deletions(-) create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v2_4.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala create mode 100644 externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala create mode 100644 kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala diff --git a/.github/PULL_REQUEST_TEMPLATE b/.github/PULL_REQUEST_TEMPLATE index b8ef7e061..6ba9b7da1 100644 --- a/.github/PULL_REQUEST_TEMPLATE +++ b/.github/PULL_REQUEST_TEMPLATE @@ -1,20 +1,31 @@ -### Please add issue ID here? - -Fixes #${issue ID} +### _Which issue are you going to fix?_ + -### Why are the changes needed? +Fixes #${ID} + +### _Why are the changes needed?_ -### Test Plan: -- Add some test cases that check the changes thoroughly including negative and positive cases if possible -- Add screenshots for manual tests if appropriate -- [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request + +### _How was this patch tested?_ +- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible + +- [ ] Add screenshots for manual tests if appropriate + +- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request diff --git a/.github/pr-badge.yml b/.github/pr-badge.yml index d211fcc82..bac3441b8 100644 --- a/.github/pr-badge.yml +++ b/.github/pr-badge.yml @@ -3,10 +3,22 @@ color: "green" - label: "Preview" - message: "PR $prNumber" + message: "Closes%20#$prNumber" color: "blue" url: "https://github.com/yaooqinn/kyuubi/pull/$prNumber" +- label: "+" + message: "$additions" + color: "red" + +- label: "-" + message: "$deletions" + color: "green" + +- label: "commits" + message: "$commits" + color: "yellow" + - label: "Missing" message: "Target Issue" color: "#ff0000" @@ -15,7 +27,7 @@ - label: "Missing" message: "Test Plan" color: "#ff0000" - when: "$payload.pull_request.body.includes('## Test Plan') === false" + when: "$payload.pull_request.body.includes('- [x]') === false" - label: "Label" message: "Feature" diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala index 3485f54d7..eec959e08 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala @@ -17,9 +17,10 @@ package org.apache.kyuubi.engine.spark.operation -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType +import org.apache.kyuubi.engine.spark.shim.SparkShim import org.apache.kyuubi.operation.OperationType import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT import org.apache.kyuubi.session.Session @@ -33,8 +34,8 @@ class GetCatalogs(spark: SparkSession, session: Session) } override protected def runInternal(): Unit = { - iter = Seq( - Row(spark.sessionState.catalogManager.currentCatalog.name()) - ).toList.iterator + try { + iter = SparkShim().getCatalogs(spark).toIterator + } catch onError() } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala index 40ea77b72..46cef7f32 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala @@ -18,7 +18,6 @@ package org.apache.kyuubi.engine.spark.operation import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.types.StructType import org.apache.kyuubi.operation.OperationType @@ -33,6 +32,6 @@ class GetTableTypes(spark: SparkSession, session: Session) } override protected def runInternal(): Unit = { - iter = CatalogTableType.tableTypes.map(t => Row(t.name)).toList.iterator + iter = Seq("EXTERNAL", "MANAGED", "VIEW").map(Row(_)).toList.iterator } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v2_4.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v2_4.scala new file mode 100644 index 000000000..2a841b035 --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v2_4.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.shim + +import org.apache.spark.sql.{Row, SparkSession} + +class Shim_v2_4 extends SparkShim { + override def getCatalogs(ss: SparkSession): Seq[Row] = { + Seq(Row("spark_catalog")) + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala new file mode 100644 index 000000000..39cae2f4d --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.shim + +import org.apache.spark.sql.{Row, SparkSession} + +class Shim_v3_0 extends Shim_v2_4 { + + override def getCatalogs(ss: SparkSession): Seq[Row] = { + val sessionState = getSessionState(ss) + + // A [[CatalogManager]] is session unique + val catalogMgr = invoke(sessionState, "catalogManager") + // get the custom v2 session catalog or default spark_catalog + val sessionCatalog = invoke(catalogMgr, "v2SessionCatalog") + val defaultCatalog = invoke(catalogMgr, "currentCatalog") + + val defaults = Seq(sessionCatalog, defaultCatalog).distinct + .map(invoke(_, "name").asInstanceOf[String]) + val catalogs = getField(catalogMgr, "catalogs") + .asInstanceOf[scala.collection.Map[String, _]] + (catalogs.keys ++: defaults).distinct.map(Row(_)) + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala new file mode 100644 index 000000000..51f75d55d --- /dev/null +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.spark.shim + +import org.apache.spark.sql.{Row, SparkSession} + +import org.apache.kyuubi.{Logging, Utils} + +/** + * A shim that defines the interface interact with Spark's catalogs + */ +trait SparkShim extends Logging { + + /** + * Get all register catalogs in Spark's `CatalogManager` + */ + def getCatalogs(ss: SparkSession): Seq[Row] + + protected def getSessionState(ss: SparkSession): Any = { + invoke(classOf[SparkSession], ss, "sessionState") + } + + protected def invoke( + obj: Any, + methodName: String, + args: (Class[_], AnyRef)*): Any = { + val (types, values) = args.unzip + val method = obj.getClass.getDeclaredMethod(methodName, types: _*) + method.setAccessible(true) + method.invoke(obj, values.toSeq: _*) + } + + protected def invoke( + clazz: Class[_], + obj: AnyRef, + methodName: String, + args: (Class[_], AnyRef)*): AnyRef = { + val (types, values) = args.unzip + val method = clazz.getDeclaredMethod(methodName, types: _*) + method.setAccessible(true) + method.invoke(obj, values.toSeq: _*) + } + + protected def getField(o: Any, fieldName: String): Any = { + val field = o.getClass.getDeclaredField(fieldName) + field.setAccessible(true) + field.get(o) + } +} + +object SparkShim { + def apply(): SparkShim = { + val runtimeSparkVer = org.apache.spark.SPARK_VERSION + val (major, minor) = Utils.majorMinorVersion(runtimeSparkVer) + (major, minor) match { + case (3, _) => new Shim_v3_0 + case (2, _) => new Shim_v2_4 + case _ => throw new IllegalArgumentException(s"Not Support spark version $runtimeSparkVer") + } + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala index d65734e76..ba4ecf39d 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala @@ -36,7 +36,7 @@ class SparkSQLEngineListenerSuite extends KyuubiFunSuite { .builder().master("local").config("spark.ui.port", "0").getOrCreate() val engine = new SparkSQLEngine(spark) - engine.initialize(KyuubiConf()) + engine.initialize(KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 0)) engine.start() assert(engine.getServiceState === ServiceState.STARTED) spark.stop() diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala index 421c0d052..ab8fa7cc9 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala @@ -24,4 +24,10 @@ package object session { val HIVE_VAR_PREFIX: Regex = """set:hivevar:([^=]+)""".r val HIVE_CONF_PREFIX: Regex = """set:hiveconf:([^=]+)""".r + val ENV_PREFIX = "env:" + val SYSTEM_PREFIX = "system:" + val HIVECONF_PREFIX = "hiveconf:" + val HIVEVAR_PREFIX = "hivevar:" + val METACONF_PREFIX = "metaconf:" + } diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala index d68c08d60..f3505d9f8 100644 --- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala +++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala @@ -59,6 +59,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper { .unset(KyuubiConf.SERVER_PRINCIPAL) .set(HA_ZK_QUORUM, zkServer.getConnectString) .set(HA_ZK_NAMESPACE, namespace) + .set(KyuubiConf.FRONTEND_BIND_PORT, 0) val server: Serverable = new NoopServer() server.initialize(conf) diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/server/EmbeddedZkServerSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/server/EmbeddedZkServerSuite.scala index fb1121179..825ee9f79 100644 --- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/server/EmbeddedZkServerSuite.scala +++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/server/EmbeddedZkServerSuite.scala @@ -33,15 +33,14 @@ class EmbeddedZkServerSuite extends KyuubiFunSuite { assert(zkServer.getName === zkServer.getClass.getSimpleName) assert(zkServer.getServiceState === LATENT) val conf = KyuubiConf() + conf.set(KyuubiConf.EMBEDDED_ZK_PORT, 0) zkServer.stop() // only for test coverage zkServer.initialize(conf) assert(zkServer.getConf === conf) assert(zkServer.getServiceState === INITIALIZED) - assert(zkServer.getConnectString.endsWith("2181")) assert(zkServer.getStartTime === 0) zkServer.start() assert(zkServer.getServiceState === STARTED) - assert(zkServer.getConnectString.endsWith("2181")) assert(zkServer.getStartTime !== 0) zkServer.stop() assert(zkServer.getServiceState === STOPPED) @@ -50,7 +49,7 @@ class EmbeddedZkServerSuite extends KyuubiFunSuite { test("connect test with embedded zookeeper") { val zkServer = new EmbeddedZkServer() assert(zkServer.getConnectString === null) - zkServer.initialize(KyuubiConf()) + zkServer.initialize(KyuubiConf().set(KyuubiConf.EMBEDDED_ZK_PORT, 0)) zkServer.start() val zkClient = CuratorFrameworkFactory.builder() diff --git a/kyuubi-main/pom.xml b/kyuubi-main/pom.xml index 4318fa4ab..b265a524e 100644 --- a/kyuubi-main/pom.xml +++ b/kyuubi-main/pom.xml @@ -100,6 +100,12 @@ netty test + + + org.apache.iceberg + ${iceberg.name} + test + diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index d6267f42d..68d719226 100644 --- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -51,8 +51,21 @@ class KyuubiSessionImpl( private def mergeConf(): Unit = { conf.foreach { - case (HIVE_VAR_PREFIX(key), value) => sessionConf.set(key, value) - case (HIVE_CONF_PREFIX(key), value) => sessionConf.set(key, value) + case (k, v) if k.startsWith("set:") => + val newKey = k.substring(4) + if (newKey.startsWith(SYSTEM_PREFIX)) { + sessionConf.set(newKey.substring(SYSTEM_PREFIX.length), v) + } else if (newKey.startsWith(HIVECONF_PREFIX)) { + sessionConf.set(newKey.substring(HIVECONF_PREFIX.length), v) + } else if (newKey.startsWith(HIVEVAR_PREFIX)) { + sessionConf.set(newKey.substring(HIVEVAR_PREFIX.length), v) + } else if (newKey.startsWith(METACONF_PREFIX)) { + sessionConf.set(newKey.substring(METACONF_PREFIX.length), v) + } else if (newKey.startsWith(SYSTEM_PREFIX)) { + // do nothing + } else { + sessionConf.set(k, v) + } case ("use:database", _) => case (key, value) => sessionConf.set(key, value) } diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala new file mode 100644 index 000000000..1e633349e --- /dev/null +++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.operation.datalake + +import org.apache.kyuubi.Utils +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.operation.{JDBCTestUtils, WithKyuubiServer} +import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT + +class IcebergOperationSuite extends WithKyuubiServer with JDBCTestUtils { + + protected def catalog: String = "hadoop_prod" + + private val iceberg: String = { + System.getProperty("java.class.path") + .split(":") + .filter(_.contains("iceberg-spark")).head + } + + private val warehouse = Utils.createTempDir() + + override def jdbcUrl: String = getJdbcUrl + + "#" + + s"spark.sql.defaultCatalog=$catalog;" + + "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;" + + "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;" + + "spark.sql.catalog.spark_catalog.type=hive;" + + s"spark.sql.catalog.$catalog=org.apache.iceberg.spark.SparkCatalog;" + + s"spark.sql.catalog.$catalog.type=hadoop;" + + s"spark.sql.catalog.$catalog.warehouse=$warehouse;" + + s"spark.jars=$iceberg;" + + override protected val conf: KyuubiConf = KyuubiConf() + + test("get catalogs") { + withJdbcStatement() { statement => + val metaData = statement.getConnection.getMetaData + val catalogs = metaData.getCatalogs + catalogs.next() + assert(catalogs.getString(TABLE_CAT) === "spark_catalog") + catalogs.next() + assert(catalogs.getString(TABLE_CAT) === catalog) + } + } +} diff --git a/pom.xml b/pom.xml index 6f54c26e7..cbb223510 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,8 @@ 3.4.14 3.0.3 + iceberg-spark3-runtime + 0.11.0 UTF-8 ${project.build.directory}/scala-${scala.binary.version}/jars @@ -507,6 +509,12 @@ netty ${netty3.version} + + + org.apache.iceberg + ${iceberg.name} + ${iceberg.version} + @@ -815,6 +823,7 @@ 2.4.7 2.11.12 2.11 + iceberg-spark-runtime