[KYUUBI #4995] Use hadoop conf and hive conf from catalog options
### _Why are the changes needed?_ There are hdfs-site.xml, hive-site, etc in spark job classpath, but we should use hadoop conf and hive conf from catalog options. ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes #4995 from zhaomin1423/fix_hive_connector. Closes #4995 64429fdcb [Xiao Zhao] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala d921be750 [zhaomin] fix 375934d65 [zhaomin] Using hadoop conf and hive conf from catalog options Lead-authored-by: zhaomin <zhaomin1423@163.com> Co-authored-by: Xiao Zhao <zhaomin1423@163.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
75c7a74e52
commit
80bc028e6d
@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
|
||||
import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
|
||||
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
|
||||
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
|
||||
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
|
||||
import org.apache.spark.sql.types.StructType
|
||||
import org.apache.spark.sql.util.CaseInsensitiveStringMap
|
||||
|
||||
@ -105,7 +106,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
|
||||
catalogOptions = options
|
||||
catalog = new HiveSessionCatalog(
|
||||
externalCatalogBuilder = () => externalCatalog,
|
||||
globalTempViewManagerBuilder = () => sparkSession.sharedState.globalTempViewManager,
|
||||
globalTempViewManagerBuilder = () => globalTempViewManager,
|
||||
metastoreCatalog = new HiveMetastoreCatalog(sparkSession),
|
||||
functionRegistry = sessionState.functionRegistry,
|
||||
tableFunctionRegistry = sessionState.tableFunctionRegistry,
|
||||
@ -115,6 +116,17 @@ class HiveTableCatalog(sparkSession: SparkSession)
|
||||
HiveUDFExpressionBuilder)
|
||||
}
|
||||
|
||||
private lazy val globalTempViewManager: GlobalTempViewManager = {
|
||||
val globalTempDB = conf.getConf(GLOBAL_TEMP_DATABASE)
|
||||
if (externalCatalog.databaseExists(globalTempDB)) {
|
||||
throw KyuubiHiveConnectorException(
|
||||
s"$globalTempDB is a system preserved database, please rename your existing database to " +
|
||||
s"resolve the name conflict, or set a different value for ${GLOBAL_TEMP_DATABASE.key}, " +
|
||||
"and launch your Spark application again.")
|
||||
}
|
||||
new GlobalTempViewManager(globalTempDB)
|
||||
}
|
||||
|
||||
/**
|
||||
* A catalog that interacts with external systems.
|
||||
*/
|
||||
|
||||
@ -80,7 +80,7 @@ class HiveCatalogFileIndex(
|
||||
val partitions = selectedPartitions.map {
|
||||
case BindPartition(catalogTablePartition, hivePartition) =>
|
||||
val path = new Path(catalogTablePartition.location)
|
||||
val fs = path.getFileSystem(hadoopConf)
|
||||
val fs = path.getFileSystem(hiveCatalog.hadoopConfiguration())
|
||||
val partPath = PartitionPath(
|
||||
catalogTablePartition.toRow(
|
||||
partitionSchema,
|
||||
|
||||
@ -76,7 +76,7 @@ case class HiveWrite(
|
||||
override def description(): String = "Kyuubi-Hive-Connector"
|
||||
|
||||
override def toBatch: BatchWrite = {
|
||||
val tmpLocation = HiveWriteHelper.getExternalTmpPath(sparkSession, hadoopConf, tableLocation)
|
||||
val tmpLocation = HiveWriteHelper.getExternalTmpPath(externalCatalog, hadoopConf, tableLocation)
|
||||
|
||||
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
|
||||
handleCompression(fileSinkConf, hadoopConf)
|
||||
|
||||
@ -27,8 +27,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.hadoop.hive.common.FileUtils
|
||||
import org.apache.hadoop.hive.ql.exec.TaskRunner
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
|
||||
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener
|
||||
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, HiveExternalCatalog, HiveVersion}
|
||||
|
||||
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorException
|
||||
@ -47,7 +47,7 @@ object HiveWriteHelper extends Logging {
|
||||
private val hiveScratchDir = "hive.exec.scratchdir"
|
||||
|
||||
def getExternalTmpPath(
|
||||
sparkSession: SparkSession,
|
||||
externalCatalog: ExternalCatalogWithListener,
|
||||
hadoopConf: Configuration,
|
||||
path: Path): Path = {
|
||||
|
||||
@ -70,7 +70,6 @@ object HiveWriteHelper extends Logging {
|
||||
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
|
||||
allSupportedHiveVersions)
|
||||
|
||||
val externalCatalog = sparkSession.sharedState.externalCatalog
|
||||
val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
|
||||
val stagingDir = hadoopConf.get(hiveStagingDir, ".hive-staging")
|
||||
val scratchDir = hadoopConf.get(hiveScratchDir, "/tmp/hive")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user