[KYUUBI #4359] Workaround for SPARK-41448 to keep FileWriterFactory serializable
### _Why are the changes needed?_ [SPARK-41448](https://issues.apache.org/jira/browse/SPARK-41448) make consistent MR job IDs in FileBatchWriter and FileFormatWriter in Apache Spark 3.3.2, but it breaks a serializable issue, JobId is non-serializable. And this pr aims to rewrite `FileWriterFactory` to circumvent the problem ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4359 from Yikf/FileWriterFactory. Closes #4359 dd8c90fe [Cheng Pan] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala 1e5164ec [Yikf] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory Lead-authored-by: Yikf <yikaifei@apache.org> Co-authored-by: Cheng Pan <pan3793@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
parent
e60855fbcb
commit
4feb83d0f3
@ -0,0 +1,78 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kyuubi.spark.connector.hive.write
|
||||
|
||||
import java.util.Date
|
||||
|
||||
import org.apache.hadoop.mapred.JobID
|
||||
import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskID, TaskType}
|
||||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
|
||||
import org.apache.spark.internal.io.FileCommitProtocol
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory}
|
||||
import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataSingleWriter, SingleDirectoryDataWriter, WriteJobDescription}
|
||||
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.sparkHadoopWriterUtils
|
||||
|
||||
/**
|
||||
* This class is rewritten because of SPARK-42478, which affects Spark 3.3.2
|
||||
*/
|
||||
case class FileWriterFactory(
|
||||
description: WriteJobDescription,
|
||||
committer: FileCommitProtocol) extends DataWriterFactory {
|
||||
|
||||
private val jobTrackerId = sparkHadoopWriterUtils.createJobTrackerID(new Date)
|
||||
|
||||
override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = {
|
||||
val taskAttemptContext = createTaskAttemptContext(partitionId)
|
||||
committer.setupTask(taskAttemptContext)
|
||||
if (description.partitionColumns.isEmpty) {
|
||||
new SingleDirectoryDataWriter(description, taskAttemptContext, committer)
|
||||
} else {
|
||||
new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer)
|
||||
}
|
||||
}
|
||||
|
||||
private def createTaskAttemptContext(partitionId: Int): TaskAttemptContextImpl = {
|
||||
val jobId = createJobID(jobTrackerId, 0)
|
||||
val taskId = new TaskID(jobId, TaskType.MAP, partitionId)
|
||||
val taskAttemptId = new TaskAttemptID(taskId, 0)
|
||||
// Set up the configuration object
|
||||
val hadoopConf = description.serializableHadoopConf.value
|
||||
hadoopConf.set("mapreduce.job.id", jobId.toString)
|
||||
hadoopConf.set("mapreduce.task.id", taskId.toString)
|
||||
hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
|
||||
hadoopConf.setBoolean("mapreduce.task.ismap", true)
|
||||
hadoopConf.setInt("mapreduce.task.partition", 0)
|
||||
|
||||
new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a job ID.
|
||||
*
|
||||
* @param jobTrackerID unique job track id
|
||||
* @param id job number
|
||||
* @return a job ID
|
||||
*/
|
||||
def createJobID(jobTrackerID: String, id: Int): JobID = {
|
||||
if (id < 0) {
|
||||
throw new IllegalArgumentException("Job number is negative")
|
||||
}
|
||||
new JobID(jobTrackerID, id)
|
||||
}
|
||||
}
|
||||
@ -23,12 +23,13 @@ import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.hive.conf.HiveConf
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.io.FileCommitProtocol
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.sql.catalyst.catalog._
|
||||
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
|
||||
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage}
|
||||
import org.apache.spark.sql.execution.command.CommandUtils
|
||||
import org.apache.spark.sql.execution.datasources.WriteTaskResult
|
||||
import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult}
|
||||
import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
|
||||
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, toSQLValue, HiveExternalCatalog}
|
||||
import org.apache.spark.sql.types.StringType
|
||||
@ -47,10 +48,12 @@ class HiveBatchWrite(
|
||||
ifPartitionNotExists: Boolean,
|
||||
hadoopConf: Configuration,
|
||||
fileBatchWrite: FileBatchWrite,
|
||||
externalCatalog: ExternalCatalog) extends BatchWrite with Logging {
|
||||
externalCatalog: ExternalCatalog,
|
||||
description: WriteJobDescription,
|
||||
committer: FileCommitProtocol) extends BatchWrite with Logging {
|
||||
|
||||
override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
|
||||
fileBatchWrite.createBatchWriterFactory(info)
|
||||
FileWriterFactory(description, committer)
|
||||
}
|
||||
|
||||
override def commit(messages: Array[WriterCommitMessage]): Unit = {
|
||||
|
||||
@ -112,7 +112,9 @@ case class HiveWrite(
|
||||
ifPartitionNotExists,
|
||||
hadoopConf,
|
||||
new FileBatchWrite(job, description, committer),
|
||||
externalCatalog)
|
||||
externalCatalog,
|
||||
description,
|
||||
committer)
|
||||
}
|
||||
|
||||
private def createWriteJobDescription(
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.kyuubi.connector
|
||||
import scala.collection.mutable
|
||||
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.internal.io.SparkHadoopWriterUtils
|
||||
import org.apache.spark.rdd.InputFileBlockHolder
|
||||
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogEvent}
|
||||
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal}
|
||||
@ -43,6 +44,7 @@ object HiveBridgeHelper {
|
||||
val hive = org.apache.spark.sql.hive.client.hive
|
||||
val logicalExpressions: LogicalExpressions.type = LogicalExpressions
|
||||
val hiveClientImpl: HiveClientImpl.type = HiveClientImpl
|
||||
val sparkHadoopWriterUtils: SparkHadoopWriterUtils.type = SparkHadoopWriterUtils
|
||||
val catalogV2Util: CatalogV2Util.type = CatalogV2Util
|
||||
val hiveTableUtil: HiveTableUtil.type = HiveTableUtil
|
||||
val hiveShim: HiveShim.type = HiveShim
|
||||
|
||||
Loading…
Reference in New Issue
Block a user