[KYUUBI #7113] Skip Hadoop classpath check if flink-shaded-hadoop jar exists in Flink lib directory

### Why are the changes needed?

This change addresses an issue where the Flink engine in Kyuubi would perform a Hadoop classpath check even when a ‎`flink-shaded-hadoop` jar is already present in the Flink ‎`lib` directory. In such cases, the check is unnecessary and may cause confusion or warnings in environments where the shaded jar is used instead of a full Hadoop classpath. By skipping the check when a ‎`flink-shaded-hadoop` jar exists, we improve compatibility and reduce unnecessary log output.

### How was this patch tested?

The patch was tested by deploying Kyuubi with a Flink environment that includes a ‎`flink-shaded-hadoop` jar in the ‎`lib` directory and verifying that the classpath check is correctly skipped. Additional tests ensured that the check still occurs when neither the Hadoop classpath nor the shaded jar is present. Unit tests and manual verification steps were performed to confirm the fix.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #7113 from cutiechi/fix/flink-classpath-missing-hadoop-check.

Closes #7113

99a4bf834 [cutiechi] fix(flink): fix process builder suite
7b9998760 [cutiechi] fix(flink): remove hadoop cp add
ea33258a3 [cutiechi] fix(flink): update flink hadoop classpath doc
6bb3b1dfa [cutiechi] fix(flink): optimize hadoop class path messages
c548ed6a1 [cutiechi] fix(flink): simplify classpath detection by merging hasHadoopJar conditions
9c16d5436 [cutiechi] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
0f729dcf9 [cutiechi] fix(flink): skip hadoop classpath check if flink-shaded-hadoop jar exists

Authored-by: cutiechi <superchijinpeng@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
cutiechi 2025-07-02 17:33:07 +08:00 committed by Cheng Pan
parent 8c5f461dfb
commit 4717987e37
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
2 changed files with 32 additions and 8 deletions

View File

@ -219,7 +219,11 @@ $ echo "export HADOOP_CONF_DIR=/path/to/hadoop/conf" >> $KYUUBI_HOME/conf/kyuubi
#### Required Environment Variable #### Required Environment Variable
The `FLINK_HADOOP_CLASSPATH` is required, too. The `FLINK_HADOOP_CLASSPATH` is required unless the necessary Hadoop client jars (such as `hadoop-client` or
`flink-shaded-hadoop`) have already been placed in the Flink lib directory (`$FLINK_HOME/lib`).
If the jars are not present in `$FLINK_HOME/lib`, you must set `FLINK_HADOOP_CLASSPATH` to include the appropriate
Hadoop client jars.
For users who are using Hadoop 3.x, Hadoop shaded client is recommended instead of Hadoop vanilla jars. For users who are using Hadoop 3.x, Hadoop shaded client is recommended instead of Hadoop vanilla jars.
For users who are using Hadoop 2.x, `FLINK_HADOOP_CLASSPATH` should be set to hadoop classpath to use Hadoop For users who are using Hadoop 2.x, `FLINK_HADOOP_CLASSPATH` should be set to hadoop classpath to use Hadoop

View File

@ -225,21 +225,41 @@ class FlinkProcessBuilder(
env.get("HBASE_CONF_DIR").foreach(classpathEntries.add) env.get("HBASE_CONF_DIR").foreach(classpathEntries.add)
env.get("HIVE_CONF_DIR").foreach(classpathEntries.add) env.get("HIVE_CONF_DIR").foreach(classpathEntries.add)
val hadoopCp = env.get(FLINK_HADOOP_CLASSPATH_KEY) val hadoopCp = env.get(FLINK_HADOOP_CLASSPATH_KEY)
hadoopCp.foreach(classpathEntries.add)
val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH) val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
extraCp.foreach(classpathEntries.add) extraCp.foreach(classpathEntries.add)
if (hadoopCp.isEmpty && extraCp.isEmpty) {
warn(s"The conf of ${FLINK_HADOOP_CLASSPATH_KEY} and " + val hasHadoopJar = {
s"${ENGINE_FLINK_EXTRA_CLASSPATH.key} is empty.") val files = Paths.get(flinkHome)
.resolve("lib")
.toFile
.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
name.startsWith("hadoop-client") ||
name.startsWith("flink-shaded-hadoop")
}
})
files != null && files.nonEmpty
}
if (!hasHadoopJar) {
hadoopCp.foreach(classpathEntries.add)
}
if (!hasHadoopJar && hadoopCp.isEmpty && extraCp.isEmpty) {
warn(s"No Hadoop client jars found in $flinkHome/lib, and the conf of " +
s"$FLINK_HADOOP_CLASSPATH_KEY and ${ENGINE_FLINK_EXTRA_CLASSPATH.key} is empty.")
debug("Detected development environment.") debug("Detected development environment.")
mainResource.foreach { path => mainResource.foreach { path =>
val devHadoopJars = Paths.get(path).getParent val devHadoopJars = Paths.get(path).getParent
.resolve(s"scala-$SCALA_COMPILE_VERSION") .resolve(s"scala-$SCALA_COMPILE_VERSION")
.resolve("jars") .resolve("jars")
if (!Files.exists(devHadoopJars)) { if (!Files.exists(devHadoopJars)) {
throw new KyuubiException(s"The path $devHadoopJars does not exists. " + throw new KyuubiException(
s"Please set ${FLINK_HADOOP_CLASSPATH_KEY} or ${ENGINE_FLINK_EXTRA_CLASSPATH.key}" + s"The path $devHadoopJars does not exist. Please set " +
s" for configuring location of hadoop client jars, etc.") s"${FLINK_HADOOP_CLASSPATH_KEY} or ${ENGINE_FLINK_EXTRA_CLASSPATH.key} " +
s"to configure the location of Hadoop client jars. Alternatively," +
s"you can place the required hadoop-client or flink-shaded-hadoop jars " +
s"directly into the Flink lib directory: $flinkHome/lib.")
} }
classpathEntries.add(s"$devHadoopJars${File.separator}*") classpathEntries.add(s"$devHadoopJars${File.separator}*")
} }