From 80bc028e6dfa3bfbbaa78e1d3c8e9c698b235ba7 Mon Sep 17 00:00:00 2001 From: zhaomin Date: Mon, 26 Jun 2023 15:04:39 +0800 Subject: [PATCH] [KYUUBI #4995] Use hadoop conf and hive conf from catalog options ### _Why are the changes needed?_ There are hdfs-site.xml, hive-site, etc in spark job classpath, but we should use hadoop conf and hive conf from catalog options. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes #4995 from zhaomin1423/fix_hive_connector. Closes #4995 64429fdcb [Xiao Zhao] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala d921be750 [zhaomin] fix 375934d65 [zhaomin] Using hadoop conf and hive conf from catalog options Lead-authored-by: zhaomin Co-authored-by: Xiao Zhao Signed-off-by: Cheng Pan --- .../spark/connector/hive/HiveTableCatalog.scala | 14 +++++++++++++- .../spark/connector/hive/read/HiveFileIndex.scala | 2 +- .../spark/connector/hive/write/HiveWrite.scala | 2 +- .../connector/hive/write/HiveWriteHelper.scala | 5 ++--- 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index 1199987b4..44057be0a 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.hive.HiveUDFExpressionBuilder import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._ import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -105,7 +106,7 @@ class HiveTableCatalog(sparkSession: SparkSession) catalogOptions = options catalog = new HiveSessionCatalog( externalCatalogBuilder = () => externalCatalog, - globalTempViewManagerBuilder = () => sparkSession.sharedState.globalTempViewManager, + globalTempViewManagerBuilder = () => globalTempViewManager, metastoreCatalog = new HiveMetastoreCatalog(sparkSession), functionRegistry = sessionState.functionRegistry, tableFunctionRegistry = sessionState.tableFunctionRegistry, @@ -115,6 +116,17 @@ class HiveTableCatalog(sparkSession: SparkSession) HiveUDFExpressionBuilder) } + private lazy val globalTempViewManager: GlobalTempViewManager = { + val globalTempDB = conf.getConf(GLOBAL_TEMP_DATABASE) + if (externalCatalog.databaseExists(globalTempDB)) { + throw KyuubiHiveConnectorException( + s"$globalTempDB is a system preserved database, please rename your existing database to " + + s"resolve the name conflict, or set a different value for ${GLOBAL_TEMP_DATABASE.key}, " + + "and launch your Spark application again.") + } + new GlobalTempViewManager(globalTempDB) + } + /** * A catalog that interacts with external systems. */ diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala index 82199e6f2..937a9557d 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala @@ -80,7 +80,7 @@ class HiveCatalogFileIndex( val partitions = selectedPartitions.map { case BindPartition(catalogTablePartition, hivePartition) => val path = new Path(catalogTablePartition.location) - val fs = path.getFileSystem(hadoopConf) + val fs = path.getFileSystem(hiveCatalog.hadoopConfiguration()) val partPath = PartitionPath( catalogTablePartition.toRow( partitionSchema, diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala index 62db1fa0a..2ee338673 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala @@ -76,7 +76,7 @@ case class HiveWrite( override def description(): String = "Kyuubi-Hive-Connector" override def toBatch: BatchWrite = { - val tmpLocation = HiveWriteHelper.getExternalTmpPath(sparkSession, hadoopConf, tableLocation) + val tmpLocation = HiveWriteHelper.getExternalTmpPath(externalCatalog, hadoopConf, tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) handleCompression(fileSinkConf, hadoopConf) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala index 68ba0bfb2..25bca911f 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWriteHelper.scala @@ -27,8 +27,8 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, HiveExternalCatalog, HiveVersion} import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorException @@ -47,7 +47,7 @@ object HiveWriteHelper extends Logging { private val hiveScratchDir = "hive.exec.scratchdir" def getExternalTmpPath( - sparkSession: SparkSession, + externalCatalog: ExternalCatalogWithListener, hadoopConf: Configuration, path: Path): Path = { @@ -70,7 +70,6 @@ object HiveWriteHelper extends Logging { assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == allSupportedHiveVersions) - val externalCatalog = sparkSession.sharedState.externalCatalog val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version val stagingDir = hadoopConf.get(hiveStagingDir, ".hive-staging") val scratchDir = hadoopConf.get(hiveScratchDir, "/tmp/hive")