[KYUUBI #307]GetCatalogs supports DSv2 and keeps its backward compatibility (#307)

![yaooqinn](https://badgen.net/badge/Hello/yaooqinn/green) [![PR 307](https://badgen.net/badge/Preview/PR%20307/blue)](https://github.com/yaooqinn/kyuubi/pull/307) ![Feature](https://badgen.net/badge/Label/Feature/) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
-->

### Please add issue ID here?
<!-- replace ${issue ID} with the actual issue id -->
Fixes #307

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the user case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

GetCatalogs supports DSv2 and keeps its backward compatibility

### Test Plan:
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
![image](https://user-images.githubusercontent.com/8326978/106161043-7259c400-61c1-11eb-9beb-3326f6093284.png)

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

* GetCatalogs supports DSv2

* pr template

* nit

* nit

* shim

* add iceberg tests

* nit
This commit is contained in:
Kent Yao 2021-01-29 10:27:48 +08:00 committed by GitHub
parent 05c64feaf2
commit efdd9fafc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 282 additions and 24 deletions

View File

@ -1,20 +1,31 @@
<!--
Thanks for sending a pull request! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
-->
### Please add issue ID here?
<!-- replace ${issue ID} with the actual issue id -->
Fixes #${issue ID}
### _Which issue are you going to fix?_
<!--
Replace ${ID} below with the actual issue id from
https://github.com/yaooqinn/kyuubi/issues,
so that the issue will be linked and automatically closed after merging
-->
### Why are the changes needed?
Fixes #${ID}
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the user case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
### Test Plan:
- Add some test cases that check the changes thoroughly including negative and positive cases if possible
- Add screenshots for manual tests if appropriate
- [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

16
.github/pr-badge.yml vendored
View File

@ -3,10 +3,22 @@
color: "green"
- label: "Preview"
message: "PR $prNumber"
message: "Closes%20#$prNumber"
color: "blue"
url: "https://github.com/yaooqinn/kyuubi/pull/$prNumber"
- label: "+"
message: "$additions"
color: "red"
- label: "-"
message: "$deletions"
color: "green"
- label: "commits"
message: "$commits"
color: "yellow"
- label: "Missing"
message: "Target Issue"
color: "#ff0000"
@ -15,7 +27,7 @@
- label: "Missing"
message: "Test Plan"
color: "#ff0000"
when: "$payload.pull_request.body.includes('## Test Plan') === false"
when: "$payload.pull_request.body.includes('- [x]') === false"
- label: "Label"
message: "Feature"

View File

@ -17,9 +17,10 @@
package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.engine.spark.shim.SparkShim
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
import org.apache.kyuubi.session.Session
@ -33,8 +34,8 @@ class GetCatalogs(spark: SparkSession, session: Session)
}
override protected def runInternal(): Unit = {
iter = Seq(
Row(spark.sessionState.catalogManager.currentCatalog.name())
).toList.iterator
try {
iter = SparkShim().getCatalogs(spark).toIterator
} catch onError()
}
}

View File

@ -18,7 +18,6 @@
package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.operation.OperationType
@ -33,6 +32,6 @@ class GetTableTypes(spark: SparkSession, session: Session)
}
override protected def runInternal(): Unit = {
iter = CatalogTableType.tableTypes.map(t => Row(t.name)).toList.iterator
iter = Seq("EXTERNAL", "MANAGED", "VIEW").map(Row(_)).toList.iterator
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.shim
import org.apache.spark.sql.{Row, SparkSession}
class Shim_v2_4 extends SparkShim {
override def getCatalogs(ss: SparkSession): Seq[Row] = {
Seq(Row("spark_catalog"))
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.shim
import org.apache.spark.sql.{Row, SparkSession}
class Shim_v3_0 extends Shim_v2_4 {
override def getCatalogs(ss: SparkSession): Seq[Row] = {
val sessionState = getSessionState(ss)
// A [[CatalogManager]] is session unique
val catalogMgr = invoke(sessionState, "catalogManager")
// get the custom v2 session catalog or default spark_catalog
val sessionCatalog = invoke(catalogMgr, "v2SessionCatalog")
val defaultCatalog = invoke(catalogMgr, "currentCatalog")
val defaults = Seq(sessionCatalog, defaultCatalog).distinct
.map(invoke(_, "name").asInstanceOf[String])
val catalogs = getField(catalogMgr, "catalogs")
.asInstanceOf[scala.collection.Map[String, _]]
(catalogs.keys ++: defaults).distinct.map(Row(_))
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark.shim
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.kyuubi.{Logging, Utils}
/**
* A shim that defines the interface interact with Spark's catalogs
*/
trait SparkShim extends Logging {
/**
* Get all register catalogs in Spark's `CatalogManager`
*/
def getCatalogs(ss: SparkSession): Seq[Row]
protected def getSessionState(ss: SparkSession): Any = {
invoke(classOf[SparkSession], ss, "sessionState")
}
protected def invoke(
obj: Any,
methodName: String,
args: (Class[_], AnyRef)*): Any = {
val (types, values) = args.unzip
val method = obj.getClass.getDeclaredMethod(methodName, types: _*)
method.setAccessible(true)
method.invoke(obj, values.toSeq: _*)
}
protected def invoke(
clazz: Class[_],
obj: AnyRef,
methodName: String,
args: (Class[_], AnyRef)*): AnyRef = {
val (types, values) = args.unzip
val method = clazz.getDeclaredMethod(methodName, types: _*)
method.setAccessible(true)
method.invoke(obj, values.toSeq: _*)
}
protected def getField(o: Any, fieldName: String): Any = {
val field = o.getClass.getDeclaredField(fieldName)
field.setAccessible(true)
field.get(o)
}
}
object SparkShim {
def apply(): SparkShim = {
val runtimeSparkVer = org.apache.spark.SPARK_VERSION
val (major, minor) = Utils.majorMinorVersion(runtimeSparkVer)
(major, minor) match {
case (3, _) => new Shim_v3_0
case (2, _) => new Shim_v2_4
case _ => throw new IllegalArgumentException(s"Not Support spark version $runtimeSparkVer")
}
}
}

View File

@ -36,7 +36,7 @@ class SparkSQLEngineListenerSuite extends KyuubiFunSuite {
.builder().master("local").config("spark.ui.port", "0").getOrCreate()
val engine = new SparkSQLEngine(spark)
engine.initialize(KyuubiConf())
engine.initialize(KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 0))
engine.start()
assert(engine.getServiceState === ServiceState.STARTED)
spark.stop()

View File

@ -24,4 +24,10 @@ package object session {
val HIVE_VAR_PREFIX: Regex = """set:hivevar:([^=]+)""".r
val HIVE_CONF_PREFIX: Regex = """set:hiveconf:([^=]+)""".r
val ENV_PREFIX = "env:"
val SYSTEM_PREFIX = "system:"
val HIVECONF_PREFIX = "hiveconf:"
val HIVEVAR_PREFIX = "hivevar:"
val METACONF_PREFIX = "metaconf:"
}

View File

@ -59,6 +59,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
.unset(KyuubiConf.SERVER_PRINCIPAL)
.set(HA_ZK_QUORUM, zkServer.getConnectString)
.set(HA_ZK_NAMESPACE, namespace)
.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
val server: Serverable = new NoopServer()
server.initialize(conf)

View File

@ -33,15 +33,14 @@ class EmbeddedZkServerSuite extends KyuubiFunSuite {
assert(zkServer.getName === zkServer.getClass.getSimpleName)
assert(zkServer.getServiceState === LATENT)
val conf = KyuubiConf()
conf.set(KyuubiConf.EMBEDDED_ZK_PORT, 0)
zkServer.stop() // only for test coverage
zkServer.initialize(conf)
assert(zkServer.getConf === conf)
assert(zkServer.getServiceState === INITIALIZED)
assert(zkServer.getConnectString.endsWith("2181"))
assert(zkServer.getStartTime === 0)
zkServer.start()
assert(zkServer.getServiceState === STARTED)
assert(zkServer.getConnectString.endsWith("2181"))
assert(zkServer.getStartTime !== 0)
zkServer.stop()
assert(zkServer.getServiceState === STOPPED)
@ -50,7 +49,7 @@ class EmbeddedZkServerSuite extends KyuubiFunSuite {
test("connect test with embedded zookeeper") {
val zkServer = new EmbeddedZkServer()
assert(zkServer.getConnectString === null)
zkServer.initialize(KyuubiConf())
zkServer.initialize(KyuubiConf().set(KyuubiConf.EMBEDDED_ZK_PORT, 0))
zkServer.start()
val zkClient = CuratorFrameworkFactory.builder()

View File

@ -100,6 +100,12 @@
<artifactId>netty</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>${iceberg.name}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -51,8 +51,21 @@ class KyuubiSessionImpl(
private def mergeConf(): Unit = {
conf.foreach {
case (HIVE_VAR_PREFIX(key), value) => sessionConf.set(key, value)
case (HIVE_CONF_PREFIX(key), value) => sessionConf.set(key, value)
case (k, v) if k.startsWith("set:") =>
val newKey = k.substring(4)
if (newKey.startsWith(SYSTEM_PREFIX)) {
sessionConf.set(newKey.substring(SYSTEM_PREFIX.length), v)
} else if (newKey.startsWith(HIVECONF_PREFIX)) {
sessionConf.set(newKey.substring(HIVECONF_PREFIX.length), v)
} else if (newKey.startsWith(HIVEVAR_PREFIX)) {
sessionConf.set(newKey.substring(HIVEVAR_PREFIX.length), v)
} else if (newKey.startsWith(METACONF_PREFIX)) {
sessionConf.set(newKey.substring(METACONF_PREFIX.length), v)
} else if (newKey.startsWith(SYSTEM_PREFIX)) {
// do nothing
} else {
sessionConf.set(k, v)
}
case ("use:database", _) =>
case (key, value) => sessionConf.set(key, value)
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation.datalake
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.{JDBCTestUtils, WithKyuubiServer}
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
class IcebergOperationSuite extends WithKyuubiServer with JDBCTestUtils {
protected def catalog: String = "hadoop_prod"
private val iceberg: String = {
System.getProperty("java.class.path")
.split(":")
.filter(_.contains("iceberg-spark")).head
}
private val warehouse = Utils.createTempDir()
override def jdbcUrl: String = getJdbcUrl +
"#" +
s"spark.sql.defaultCatalog=$catalog;" +
"spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;" +
"spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;" +
"spark.sql.catalog.spark_catalog.type=hive;" +
s"spark.sql.catalog.$catalog=org.apache.iceberg.spark.SparkCatalog;" +
s"spark.sql.catalog.$catalog.type=hadoop;" +
s"spark.sql.catalog.$catalog.warehouse=$warehouse;" +
s"spark.jars=$iceberg;"
override protected val conf: KyuubiConf = KyuubiConf()
test("get catalogs") {
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
val catalogs = metaData.getCatalogs
catalogs.next()
assert(catalogs.getString(TABLE_CAT) === "spark_catalog")
catalogs.next()
assert(catalogs.getString(TABLE_CAT) === catalog)
}
}
}

View File

@ -70,6 +70,8 @@
<zookeeper.version>3.4.14</zookeeper.version>
<scalatest.version>3.0.3</scalatest.version>
<iceberg.name>iceberg-spark3-runtime</iceberg.name>
<iceberg.version>0.11.0</iceberg.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jars.target.dir>${project.build.directory}/scala-${scala.binary.version}/jars</jars.target.dir>
@ -507,6 +509,12 @@
<artifactId>netty</artifactId>
<version>${netty3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>${iceberg.name}</artifactId>
<version>${iceberg.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -815,6 +823,7 @@
<spark.version>2.4.7</spark.version>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<iceberg.name>iceberg-spark-runtime</iceberg.name>
</properties>
</profile>
</profiles>