diff --git a/.github/PULL_REQUEST_TEMPLATE b/.github/PULL_REQUEST_TEMPLATE
index b8ef7e061..6ba9b7da1 100644
--- a/.github/PULL_REQUEST_TEMPLATE
+++ b/.github/PULL_REQUEST_TEMPLATE
@@ -1,20 +1,31 @@
-### Please add issue ID here?
-
-Fixes #${issue ID}
+### _Which issue are you going to fix?_
+
-### Why are the changes needed?
+Fixes #${ID}
+
+### _Why are the changes needed?_
-### Test Plan:
-- Add some test cases that check the changes thoroughly including negative and positive cases if possible
-- Add screenshots for manual tests if appropriate
-- [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request
+
+### _How was this patch tested?_
+- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
+
+- [ ] Add screenshots for manual tests if appropriate
+
+- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request
diff --git a/.github/pr-badge.yml b/.github/pr-badge.yml
index d211fcc82..bac3441b8 100644
--- a/.github/pr-badge.yml
+++ b/.github/pr-badge.yml
@@ -3,10 +3,22 @@
color: "green"
- label: "Preview"
- message: "PR $prNumber"
+ message: "Closes%20#$prNumber"
color: "blue"
url: "https://github.com/yaooqinn/kyuubi/pull/$prNumber"
+- label: "+"
+ message: "$additions"
+ color: "red"
+
+- label: "-"
+ message: "$deletions"
+ color: "green"
+
+- label: "commits"
+ message: "$commits"
+ color: "yellow"
+
- label: "Missing"
message: "Target Issue"
color: "#ff0000"
@@ -15,7 +27,7 @@
- label: "Missing"
message: "Test Plan"
color: "#ff0000"
- when: "$payload.pull_request.body.includes('## Test Plan') === false"
+ when: "$payload.pull_request.body.includes('- [x]') === false"
- label: "Label"
message: "Feature"
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala
index 3485f54d7..eec959e08 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetCatalogs.scala
@@ -17,9 +17,10 @@
package org.apache.kyuubi.engine.spark.operation
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
+import org.apache.kyuubi.engine.spark.shim.SparkShim
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
import org.apache.kyuubi.session.Session
@@ -33,8 +34,8 @@ class GetCatalogs(spark: SparkSession, session: Session)
}
override protected def runInternal(): Unit = {
- iter = Seq(
- Row(spark.sessionState.catalogManager.currentCatalog.name())
- ).toList.iterator
+ try {
+ iter = SparkShim().getCatalogs(spark).toIterator
+ } catch onError()
}
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala
index 40ea77b72..46cef7f32 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/GetTableTypes.scala
@@ -18,7 +18,6 @@
package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.operation.OperationType
@@ -33,6 +32,6 @@ class GetTableTypes(spark: SparkSession, session: Session)
}
override protected def runInternal(): Unit = {
- iter = CatalogTableType.tableTypes.map(t => Row(t.name)).toList.iterator
+ iter = Seq("EXTERNAL", "MANAGED", "VIEW").map(Row(_)).toList.iterator
}
}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v2_4.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v2_4.scala
new file mode 100644
index 000000000..2a841b035
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v2_4.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.shim
+
+import org.apache.spark.sql.{Row, SparkSession}
+
+class Shim_v2_4 extends SparkShim {
+ override def getCatalogs(ss: SparkSession): Seq[Row] = {
+ Seq(Row("spark_catalog"))
+ }
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala
new file mode 100644
index 000000000..39cae2f4d
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/Shim_v3_0.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.shim
+
+import org.apache.spark.sql.{Row, SparkSession}
+
+class Shim_v3_0 extends Shim_v2_4 {
+
+ override def getCatalogs(ss: SparkSession): Seq[Row] = {
+ val sessionState = getSessionState(ss)
+
+ // A [[CatalogManager]] is session unique
+ val catalogMgr = invoke(sessionState, "catalogManager")
+ // get the custom v2 session catalog or default spark_catalog
+ val sessionCatalog = invoke(catalogMgr, "v2SessionCatalog")
+ val defaultCatalog = invoke(catalogMgr, "currentCatalog")
+
+ val defaults = Seq(sessionCatalog, defaultCatalog).distinct
+ .map(invoke(_, "name").asInstanceOf[String])
+ val catalogs = getField(catalogMgr, "catalogs")
+ .asInstanceOf[scala.collection.Map[String, _]]
+ (catalogs.keys ++: defaults).distinct.map(Row(_))
+ }
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala
new file mode 100644
index 000000000..51f75d55d
--- /dev/null
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/shim/SparkShim.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.shim
+
+import org.apache.spark.sql.{Row, SparkSession}
+
+import org.apache.kyuubi.{Logging, Utils}
+
+/**
+ * A shim that defines the interface interact with Spark's catalogs
+ */
+trait SparkShim extends Logging {
+
+ /**
+ * Get all register catalogs in Spark's `CatalogManager`
+ */
+ def getCatalogs(ss: SparkSession): Seq[Row]
+
+ protected def getSessionState(ss: SparkSession): Any = {
+ invoke(classOf[SparkSession], ss, "sessionState")
+ }
+
+ protected def invoke(
+ obj: Any,
+ methodName: String,
+ args: (Class[_], AnyRef)*): Any = {
+ val (types, values) = args.unzip
+ val method = obj.getClass.getDeclaredMethod(methodName, types: _*)
+ method.setAccessible(true)
+ method.invoke(obj, values.toSeq: _*)
+ }
+
+ protected def invoke(
+ clazz: Class[_],
+ obj: AnyRef,
+ methodName: String,
+ args: (Class[_], AnyRef)*): AnyRef = {
+ val (types, values) = args.unzip
+ val method = clazz.getDeclaredMethod(methodName, types: _*)
+ method.setAccessible(true)
+ method.invoke(obj, values.toSeq: _*)
+ }
+
+ protected def getField(o: Any, fieldName: String): Any = {
+ val field = o.getClass.getDeclaredField(fieldName)
+ field.setAccessible(true)
+ field.get(o)
+ }
+}
+
+object SparkShim {
+ def apply(): SparkShim = {
+ val runtimeSparkVer = org.apache.spark.SPARK_VERSION
+ val (major, minor) = Utils.majorMinorVersion(runtimeSparkVer)
+ (major, minor) match {
+ case (3, _) => new Shim_v3_0
+ case (2, _) => new Shim_v2_4
+ case _ => throw new IllegalArgumentException(s"Not Support spark version $runtimeSparkVer")
+ }
+ }
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala
index d65734e76..ba4ecf39d 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/SparkSQLEngineListenerSuite.scala
@@ -36,7 +36,7 @@ class SparkSQLEngineListenerSuite extends KyuubiFunSuite {
.builder().master("local").config("spark.ui.port", "0").getOrCreate()
val engine = new SparkSQLEngine(spark)
- engine.initialize(KyuubiConf())
+ engine.initialize(KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 0))
engine.start()
assert(engine.getServiceState === ServiceState.STARTED)
spark.stop()
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala
index 421c0d052..ab8fa7cc9 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/package.scala
@@ -24,4 +24,10 @@ package object session {
val HIVE_VAR_PREFIX: Regex = """set:hivevar:([^=]+)""".r
val HIVE_CONF_PREFIX: Regex = """set:hiveconf:([^=]+)""".r
+ val ENV_PREFIX = "env:"
+ val SYSTEM_PREFIX = "system:"
+ val HIVECONF_PREFIX = "hiveconf:"
+ val HIVEVAR_PREFIX = "hivevar:"
+ val METACONF_PREFIX = "metaconf:"
+
}
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
index d68c08d60..f3505d9f8 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/client/ServiceDiscoverySuite.scala
@@ -59,6 +59,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
.unset(KyuubiConf.SERVER_PRINCIPAL)
.set(HA_ZK_QUORUM, zkServer.getConnectString)
.set(HA_ZK_NAMESPACE, namespace)
+ .set(KyuubiConf.FRONTEND_BIND_PORT, 0)
val server: Serverable = new NoopServer()
server.initialize(conf)
diff --git a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/server/EmbeddedZkServerSuite.scala b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/server/EmbeddedZkServerSuite.scala
index fb1121179..825ee9f79 100644
--- a/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/server/EmbeddedZkServerSuite.scala
+++ b/kyuubi-ha/src/test/scala/org/apache/kyuubi/ha/server/EmbeddedZkServerSuite.scala
@@ -33,15 +33,14 @@ class EmbeddedZkServerSuite extends KyuubiFunSuite {
assert(zkServer.getName === zkServer.getClass.getSimpleName)
assert(zkServer.getServiceState === LATENT)
val conf = KyuubiConf()
+ conf.set(KyuubiConf.EMBEDDED_ZK_PORT, 0)
zkServer.stop() // only for test coverage
zkServer.initialize(conf)
assert(zkServer.getConf === conf)
assert(zkServer.getServiceState === INITIALIZED)
- assert(zkServer.getConnectString.endsWith("2181"))
assert(zkServer.getStartTime === 0)
zkServer.start()
assert(zkServer.getServiceState === STARTED)
- assert(zkServer.getConnectString.endsWith("2181"))
assert(zkServer.getStartTime !== 0)
zkServer.stop()
assert(zkServer.getServiceState === STOPPED)
@@ -50,7 +49,7 @@ class EmbeddedZkServerSuite extends KyuubiFunSuite {
test("connect test with embedded zookeeper") {
val zkServer = new EmbeddedZkServer()
assert(zkServer.getConnectString === null)
- zkServer.initialize(KyuubiConf())
+ zkServer.initialize(KyuubiConf().set(KyuubiConf.EMBEDDED_ZK_PORT, 0))
zkServer.start()
val zkClient = CuratorFrameworkFactory.builder()
diff --git a/kyuubi-main/pom.xml b/kyuubi-main/pom.xml
index 4318fa4ab..b265a524e 100644
--- a/kyuubi-main/pom.xml
+++ b/kyuubi-main/pom.xml
@@ -100,6 +100,12 @@
netty
test
+
+
+ org.apache.iceberg
+ ${iceberg.name}
+ test
+
diff --git a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index d6267f42d..68d719226 100644
--- a/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ b/kyuubi-main/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -51,8 +51,21 @@ class KyuubiSessionImpl(
private def mergeConf(): Unit = {
conf.foreach {
- case (HIVE_VAR_PREFIX(key), value) => sessionConf.set(key, value)
- case (HIVE_CONF_PREFIX(key), value) => sessionConf.set(key, value)
+ case (k, v) if k.startsWith("set:") =>
+ val newKey = k.substring(4)
+ if (newKey.startsWith(SYSTEM_PREFIX)) {
+ sessionConf.set(newKey.substring(SYSTEM_PREFIX.length), v)
+ } else if (newKey.startsWith(HIVECONF_PREFIX)) {
+ sessionConf.set(newKey.substring(HIVECONF_PREFIX.length), v)
+ } else if (newKey.startsWith(HIVEVAR_PREFIX)) {
+ sessionConf.set(newKey.substring(HIVEVAR_PREFIX.length), v)
+ } else if (newKey.startsWith(METACONF_PREFIX)) {
+ sessionConf.set(newKey.substring(METACONF_PREFIX.length), v)
+ } else if (newKey.startsWith(SYSTEM_PREFIX)) {
+ // do nothing
+ } else {
+ sessionConf.set(k, v)
+ }
case ("use:database", _) =>
case (key, value) => sessionConf.set(key, value)
}
diff --git a/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala
new file mode 100644
index 000000000..1e633349e
--- /dev/null
+++ b/kyuubi-main/src/test/scala/org/apache/kyuubi/operation/datalake/IcebergOperationSuite.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation.datalake
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.operation.{JDBCTestUtils, WithKyuubiServer}
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
+
+class IcebergOperationSuite extends WithKyuubiServer with JDBCTestUtils {
+
+ protected def catalog: String = "hadoop_prod"
+
+ private val iceberg: String = {
+ System.getProperty("java.class.path")
+ .split(":")
+ .filter(_.contains("iceberg-spark")).head
+ }
+
+ private val warehouse = Utils.createTempDir()
+
+ override def jdbcUrl: String = getJdbcUrl +
+ "#" +
+ s"spark.sql.defaultCatalog=$catalog;" +
+ "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;" +
+ "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog;" +
+ "spark.sql.catalog.spark_catalog.type=hive;" +
+ s"spark.sql.catalog.$catalog=org.apache.iceberg.spark.SparkCatalog;" +
+ s"spark.sql.catalog.$catalog.type=hadoop;" +
+ s"spark.sql.catalog.$catalog.warehouse=$warehouse;" +
+ s"spark.jars=$iceberg;"
+
+ override protected val conf: KyuubiConf = KyuubiConf()
+
+ test("get catalogs") {
+ withJdbcStatement() { statement =>
+ val metaData = statement.getConnection.getMetaData
+ val catalogs = metaData.getCatalogs
+ catalogs.next()
+ assert(catalogs.getString(TABLE_CAT) === "spark_catalog")
+ catalogs.next()
+ assert(catalogs.getString(TABLE_CAT) === catalog)
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 6f54c26e7..cbb223510 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,8 @@
3.4.14
3.0.3
+ iceberg-spark3-runtime
+ 0.11.0
UTF-8
${project.build.directory}/scala-${scala.binary.version}/jars
@@ -507,6 +509,12 @@
netty
${netty3.version}
+
+
+ org.apache.iceberg
+ ${iceberg.name}
+ ${iceberg.version}
+
@@ -815,6 +823,7 @@
2.4.7
2.11.12
2.11
+ iceberg-spark-runtime