[KYUUBI #5238] Fix credentials may break Flink engine launch command

### _Why are the changes needed?_
Currently, Flink engine doesn't use delegation tokens and these tokens need to be filtered out from the Flink engine launch command, or the command may be corrupted because the credentials could contain new lines.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_
No.

Closes #5238 from link3280/filter_engine_credential.

Closes #5238

5e2403a53 [Paul Lin] Optimize code style
41df6e2a4 [Paul Lin] Fix test error
524189443 [Paul Lin] Fix credentials may break Flink engine launch command

Authored-by: Paul Lin <paullin3280@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
Paul Lin 2023-09-05 04:23:28 +08:00 committed by Cheng Pan
parent 466d35d169
commit 32c5033568
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
2 changed files with 7 additions and 2 deletions

View File

@ -26,7 +26,7 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import com.google.common.annotations.VisibleForTesting
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder}
@ -79,7 +79,8 @@ class FlinkProcessBuilder(
override protected val commands: Array[String] = {
KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf)
// unset engine credentials because Flink doesn't support them at the moment
conf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY)
// flink.execution.target are required in Kyuubi conf currently
executionTarget match {
case Some("yarn-application") =>

View File

@ -27,6 +27,7 @@ import scala.util.matching.Regex
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS, ENGINE_FLINK_EXTRA_CLASSPATH, ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
class FlinkProcessBuilderSuite extends KyuubiFunSuite {
@ -37,12 +38,14 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
.set(
ENGINE_FLINK_JAVA_OPTIONS,
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005")
.set(KYUUBI_ENGINE_CREDENTIALS_KEY, "should-not-be-used")
private def applicationModeConf = KyuubiConf()
.set("flink.execution.target", "yarn-application")
.set(ENGINE_FLINK_APPLICATION_JARS, tempUdfJar.toString)
.set(APP_KEY, "kyuubi_connection_flink_paul")
.set("kyuubi.on", "off")
.set(KYUUBI_ENGINE_CREDENTIALS_KEY, "should-not-be-used")
private val tempFlinkHome = Files.createTempDirectory("flink-home").toFile
private val tempOpt =
@ -65,6 +68,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
(FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop")
private def confStr: String = {
sessionModeConf.clone.getAll
.filter(!_._1.equals(KYUUBI_ENGINE_CREDENTIALS_KEY))
.map { case (k, v) => s"\\\\\\n\\t--conf $k=$v" }
.mkString(" ")
}