From 4feb83d0f38f0eb3fd39ecf669772ba8d3780e99 Mon Sep 17 00:00:00 2001 From: Yikf Date: Sat, 18 Feb 2023 22:12:53 +0800 Subject: [PATCH] [KYUUBI #4359] Workaround for SPARK-41448 to keep FileWriterFactory serializable ### _Why are the changes needed?_ [SPARK-41448](https://issues.apache.org/jira/browse/SPARK-41448) make consistent MR job IDs in FileBatchWriter and FileFormatWriter in Apache Spark 3.3.2, but it breaks a serializable issue, JobId is non-serializable. And this pr aims to rewrite `FileWriterFactory` to circumvent the problem ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4359 from Yikf/FileWriterFactory. Closes #4359 dd8c90fe [Cheng Pan] Update extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala 1e5164ec [Yikf] Make a serializable jobTrackerId instead of a non-serializable JobID in FileWriterFactory Lead-authored-by: Yikf Co-authored-by: Cheng Pan Signed-off-by: Cheng Pan --- .../hive/write/FileWriterFactory.scala | 78 +++++++++++++++++++ .../connector/hive/write/HiveBatchWrite.scala | 9 ++- .../connector/hive/write/HiveWrite.scala | 4 +- .../kyuubi/connector/HiveBridgeHelper.scala | 2 + 4 files changed, 89 insertions(+), 4 deletions(-) create mode 100644 extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala new file mode 100644 index 000000000..c8e8f9b69 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/FileWriterFactory.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.hive.write + +import java.util.Date + +import org.apache.hadoop.mapred.JobID +import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskID, TaskType} +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.write.{DataWriter, DataWriterFactory} +import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataSingleWriter, SingleDirectoryDataWriter, WriteJobDescription} +import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.sparkHadoopWriterUtils + +/** + * This class is rewritten because of SPARK-42478, which affects Spark 3.3.2 + */ +case class FileWriterFactory( + description: WriteJobDescription, + committer: FileCommitProtocol) extends DataWriterFactory { + + private val jobTrackerId = sparkHadoopWriterUtils.createJobTrackerID(new Date) + + override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = { + val taskAttemptContext = createTaskAttemptContext(partitionId) + committer.setupTask(taskAttemptContext) + if (description.partitionColumns.isEmpty) { + new SingleDirectoryDataWriter(description, taskAttemptContext, committer) + } else { + new DynamicPartitionDataSingleWriter(description, taskAttemptContext, committer) + } + } + + private def createTaskAttemptContext(partitionId: Int): TaskAttemptContextImpl = { + val jobId = createJobID(jobTrackerId, 0) + val taskId = new TaskID(jobId, TaskType.MAP, partitionId) + val taskAttemptId = new TaskAttemptID(taskId, 0) + // Set up the configuration object + val hadoopConf = description.serializableHadoopConf.value + hadoopConf.set("mapreduce.job.id", jobId.toString) + hadoopConf.set("mapreduce.task.id", taskId.toString) + hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString) + hadoopConf.setBoolean("mapreduce.task.ismap", true) + hadoopConf.setInt("mapreduce.task.partition", 0) + + new TaskAttemptContextImpl(hadoopConf, taskAttemptId) + } + + /** + * Create a job ID. + * + * @param jobTrackerID unique job track id + * @param id job number + * @return a job ID + */ + def createJobID(jobTrackerID: String, id: Int): JobID = { + if (id < 0) { + throw new IllegalArgumentException("Job number is negative") + } + new JobID(jobTrackerID, id) + } +} diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala index c4e473ff2..625d79d0c 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala @@ -23,12 +23,13 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} import org.apache.spark.sql.execution.command.CommandUtils -import org.apache.spark.sql.execution.datasources.WriteTaskResult +import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult} import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{hive, toSQLValue, HiveExternalCatalog} import org.apache.spark.sql.types.StringType @@ -47,10 +48,12 @@ class HiveBatchWrite( ifPartitionNotExists: Boolean, hadoopConf: Configuration, fileBatchWrite: FileBatchWrite, - externalCatalog: ExternalCatalog) extends BatchWrite with Logging { + externalCatalog: ExternalCatalog, + description: WriteJobDescription, + committer: FileCommitProtocol) extends BatchWrite with Logging { override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = { - fileBatchWrite.createBatchWriterFactory(info) + FileWriterFactory(description, committer) } override def commit(messages: Array[WriterCommitMessage]): Unit = { diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala index 486a7aa22..2d72327b4 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveWrite.scala @@ -112,7 +112,9 @@ case class HiveWrite( ifPartitionNotExists, hadoopConf, new FileBatchWrite(job, description, committer), - externalCatalog) + externalCatalog, + description, + committer) } private def createWriteJobDescription( diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala index 88b0305dd..1a11790d8 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveBridgeHelper.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.kyuubi.connector import scala.collection.mutable import org.apache.spark.SparkContext +import org.apache.spark.internal.io.SparkHadoopWriterUtils import org.apache.spark.rdd.InputFileBlockHolder import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogEvent} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal} @@ -43,6 +44,7 @@ object HiveBridgeHelper { val hive = org.apache.spark.sql.hive.client.hive val logicalExpressions: LogicalExpressions.type = LogicalExpressions val hiveClientImpl: HiveClientImpl.type = HiveClientImpl + val sparkHadoopWriterUtils: SparkHadoopWriterUtils.type = SparkHadoopWriterUtils val catalogV2Util: CatalogV2Util.type = CatalogV2Util val hiveTableUtil: HiveTableUtil.type = HiveTableUtil val hiveShim: HiveShim.type = HiveShim