diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index 1199987b4..44057be0a 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.hive.HiveUDFExpressionBuilder import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._ import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -105,7 +106,7 @@ class HiveTableCatalog(sparkSession: SparkSession) catalogOptions = options catalog = new HiveSessionCatalog( externalCatalogBuilder = () => externalCatalog, - globalTempViewManagerBuilder = () => sparkSession.sharedState.globalTempViewManager, + globalTempViewManagerBuilder = () => globalTempViewManager, metastoreCatalog = new HiveMetastoreCatalog(sparkSession), functionRegistry = sessionState.functionRegistry, tableFunctionRegistry = sessionState.tableFunctionRegistry, @@ -115,6 +116,17 @@ class HiveTableCatalog(sparkSession: SparkSession) HiveUDFExpressionBuilder) } + private lazy val globalTempViewManager: GlobalTempViewManager = { + val globalTempDB = conf.getConf(GLOBAL_TEMP_DATABASE) + if (externalCatalog.databaseExists(globalTempDB)) { + throw KyuubiHiveConnectorException( + s"$globalTempDB is a system preserved database, please rename your existing database to " + + s"resolve the name conflict, or set a different value for ${GLOBAL_TEMP_DATABASE.key}, " + + "and launch your Spark application again.") + } + new GlobalTempViewManager(globalTempDB) + } + /** * A catalog that interacts with external systems. */ diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala index 82199e6f2..937a9557d 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala @@ -80,7 +80,7 @@ class HiveCatalogFileIndex( val partitions = selectedPartitions.map { case BindPartition(catalogTablePartition, hivePartition) => val path = new Path(catalogTablePartition.location) - val fs = path.getFileSystem(hadoopConf) + val fs = path.getFileSystem(hiveCatalog.hadoopConfiguration()) val partPath = PartitionPath( catalogTablePartition.toRow( partitionSchema, diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala index 62db1fa0a..2ee338673 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala @@ -76,7 +76,7 @@ case class HiveWrite( override def description(): String = "Kyuubi-Hive-Connector" override def toBatch: BatchWrite = { - val tmpLocation = HiveWriteHelper.getExternalTmpPath(sparkSession, hadoopConf, tableLocation) + val tmpLocation = HiveWriteHelper.getExternalTmpPath(externalCatalog, hadoopConf, tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) handleCompression(fileSinkConf, hadoopConf) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala index 68ba0bfb2..25bca911f 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala @@ -27,8 +27,8 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, HiveExternalCatalog, HiveVersion} import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorException @@ -47,7 +47,7 @@ object HiveWriteHelper extends Logging { private val hiveScratchDir = "hive.exec.scratchdir" def getExternalTmpPath( - sparkSession: SparkSession, + externalCatalog: ExternalCatalogWithListener, hadoopConf: Configuration, path: Path): Path = { @@ -70,7 +70,6 @@ object HiveWriteHelper extends Logging { assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == allSupportedHiveVersions) - val externalCatalog = sparkSession.sharedState.externalCatalog val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version val stagingDir = hadoopConf.get(hiveStagingDir, ".hive-staging") val scratchDir = hadoopConf.get(hiveScratchDir, "/tmp/hive")