diff --git a/docs/deployment/engine_on_yarn.md b/docs/deployment/engine_on_yarn.md index e8c0deceb..0fd270b34 100644 --- a/docs/deployment/engine_on_yarn.md +++ b/docs/deployment/engine_on_yarn.md @@ -219,7 +219,11 @@ $ echo "export HADOOP_CONF_DIR=/path/to/hadoop/conf" >> $KYUUBI_HOME/conf/kyuubi #### Required Environment Variable -The `FLINK_HADOOP_CLASSPATH` is required, too. +The `FLINK_HADOOP_CLASSPATH` is required unless the necessary Hadoop client jars (such as `hadoop-client` or +`flink-shaded-hadoop`) have already been placed in the Flink lib directory (`$FLINK_HOME/lib`). + +If the jars are not present in `$FLINK_HOME/lib`, you must set `FLINK_HADOOP_CLASSPATH` to include the appropriate +Hadoop client jars. For users who are using Hadoop 3.x, Hadoop shaded client is recommended instead of Hadoop vanilla jars. For users who are using Hadoop 2.x, `FLINK_HADOOP_CLASSPATH` should be set to hadoop classpath to use Hadoop diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala index e5ead4e53..b83089bc3 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala @@ -225,21 +225,41 @@ class FlinkProcessBuilder( env.get("HBASE_CONF_DIR").foreach(classpathEntries.add) env.get("HIVE_CONF_DIR").foreach(classpathEntries.add) val hadoopCp = env.get(FLINK_HADOOP_CLASSPATH_KEY) - hadoopCp.foreach(classpathEntries.add) val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH) extraCp.foreach(classpathEntries.add) - if (hadoopCp.isEmpty && extraCp.isEmpty) { - warn(s"The conf of ${FLINK_HADOOP_CLASSPATH_KEY} and " + - s"${ENGINE_FLINK_EXTRA_CLASSPATH.key} is empty.") + + val hasHadoopJar = { + val files = Paths.get(flinkHome) + .resolve("lib") + .toFile + .listFiles(new FilenameFilter { + override def accept(dir: File, name: String): Boolean = { + name.startsWith("hadoop-client") || + name.startsWith("flink-shaded-hadoop") + } + }) + files != null && files.nonEmpty + } + + if (!hasHadoopJar) { + hadoopCp.foreach(classpathEntries.add) + } + + if (!hasHadoopJar && hadoopCp.isEmpty && extraCp.isEmpty) { + warn(s"No Hadoop client jars found in $flinkHome/lib, and the conf of " + + s"$FLINK_HADOOP_CLASSPATH_KEY and ${ENGINE_FLINK_EXTRA_CLASSPATH.key} is empty.") debug("Detected development environment.") mainResource.foreach { path => val devHadoopJars = Paths.get(path).getParent .resolve(s"scala-$SCALA_COMPILE_VERSION") .resolve("jars") if (!Files.exists(devHadoopJars)) { - throw new KyuubiException(s"The path $devHadoopJars does not exists. " + - s"Please set ${FLINK_HADOOP_CLASSPATH_KEY} or ${ENGINE_FLINK_EXTRA_CLASSPATH.key}" + - s" for configuring location of hadoop client jars, etc.") + throw new KyuubiException( + s"The path $devHadoopJars does not exist. Please set " + + s"${FLINK_HADOOP_CLASSPATH_KEY} or ${ENGINE_FLINK_EXTRA_CLASSPATH.key} " + + s"to configure the location of Hadoop client jars. Alternatively," + + s"you can place the required hadoop-client or flink-shaded-hadoop jars " + + s"directly into the Flink lib directory: $flinkHome/lib.") } classpathEntries.add(s"$devHadoopJars${File.separator}*") }