[KYUUBI #2333][KYUUBI #2554] Configuring Flink Engine heap memory and java opts

### _Why are the changes needed?_

fix #2554

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2579 from jiaoqingbo/kyuubi2554.

Closes #2333

Closes #2554

f0365c91 [jiaoqingbo] code review
1700aab9 [jiaoqingbo] code review
1ca10a65 [jiaoqingbo] fix ut failed
b53dcdd4 [jiaoqingbo] code review
f9ceb72c [jiaoqingbo] [KYUUBI #2554] Configuring Flink Engine heap memory and java opts

Authored-by: jiaoqingbo <1178404354@qq.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
This commit is contained in:
jiaoqingbo 2022-05-10 12:31:01 +08:00 committed by Cheng Pan
parent b8fd378526
commit 6b6da1f4d0
No known key found for this signature in database
GPG Key ID: 8001952629BCC75D
7 changed files with 88 additions and 19 deletions

View File

@ -46,6 +46,7 @@
# - FLINK_HOME Flink distribution which you would like to use in Kyuubi.
# - FLINK_CONF_DIR Optional directory where the Flink configuration lives.
# (Default: $FLINK_HOME/conf)
# - FLINK_HADOOP_CLASSPATH Required Hadoop jars when you use the Kyuubi Flink engine.
# - HIVE_HOME Hive distribution which you would like to use in Kyuubi.
# - HIVE_CONF_DIR Optional directory where the Hive configuration lives.
# (Default: $HIVE_HOME/conf)
@ -59,6 +60,7 @@
# export SPARK_HOME=/opt/spark
# export FLINK_HOME=/opt/flink
# export HIVE_HOME=/opt/hive
# export FLINK_HADOOP_CLASSPATH=/path/to/hadoop-client-runtime-3.3.2.jar:/path/to/hadoop-client-api-3.3.2.jar
# export HIVE_HADOOP_CLASSPATH=${HADOOP_HOME}/share/hadoop/common/lib/commons-collections-3.2.2.jar:${HADOOP_HOME}/share/hadoop/client/hadoop-client-runtime-3.1.0.jar:${HADOOP_HOME}/share/hadoop/client/hadoop-client-api-3.1.0.jar:${HADOOP_HOME}/share/hadoop/common/lib/htrace-core4-4.1.0-incubating.jar
# export HADOOP_CONF_DIR=/usr/ndp/current/mapreduce_client/conf
# export YARN_CONF_DIR=/usr/ndp/current/yarn/conf

View File

@ -169,12 +169,33 @@ export HADOOP_CLASSPATH=`hadoop classpath`
echo "stop" | ./bin/yarn-session.sh -id application_XXXXX_XXX
```
If the `TopSpeedWindowing` passes, configure it in `$KYUUBI_HOME/conf/kyuubi-env.sh` or `$FLINK_HOME/bin/config.sh`, e.g.
If the `TopSpeedWindowing` passes, configure it in `$KYUUBI_HOME/conf/kyuubi-env.sh`
```bash
$ echo "export HADOOP_CONF_DIR=/path/to/hadoop/conf" >> $KYUUBI_HOME/conf/kyuubi-env.sh
```
#### Required Environment Variable
The `FLINK_HADOOP_CLASSPATH` is required, too.
For users who are using Hadoop 3.x, Hadoop shaded client is recommended instead of Hadoop vanilla jars.
For users who are using Hadoop 2.x, `FLINK_HADOOP_CLASSPATH` should be set to hadoop classpath to use Hadoop
vanilla jars. For users which does not use Hadoop services, e.g. HDFS, YARN at all, Hadoop client jars
is also required, and recommend to use Hadoop shaded client as Hadoop 3.x's users do.
See [HADOOP-11656](https://issues.apache.org/jira/browse/HADOOP-11656) for details of Hadoop shaded client.
To use Hadoop shaded client, please configure $KYUUBI_HOME/conf/kyuubi-env.sh as follows:
```bash
$ echo "export FLINK_HADOOP_CLASSPATH=/path/to/hadoop-client-runtime-3.3.2.jar:/path/to/hadoop-client-api-3.3.2.jar" >> $KYUUBI_HOME/conf/kyuubi-env.sh
```
To use Hadoop vanilla jars, please configure $KYUUBI_HOME/conf/kyuubi-env.sh as follows:
```bash
$ echo "export FLINK_HADOOP_CLASSPATH=`hadoop classpath`" >> $KYUUBI_HOME/conf/kyuubi-env.sh
```
### Deployment Modes Supported by Flink on YARN
For experiment use, we recommend deploying Kyuubi Flink SQL engine in [Session Mode](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/resource-providers/yarn/#session-mode).

View File

@ -81,6 +81,7 @@ You can configure the environment variables in `$KYUUBI_HOME/conf/kyuubi-env.sh`
# - FLINK_HOME Flink distribution which you would like to use in Kyuubi.
# - FLINK_CONF_DIR Optional directory where the Flink configuration lives.
# (Default: $FLINK_HOME/conf)
# - FLINK_HADOOP_CLASSPATH Required Hadoop jars when you use the Kyuubi Flink engine.
# - HIVE_HOME Hive distribution which you would like to use in Kyuubi.
# - HIVE_CONF_DIR Optional directory where the Hive configuration lives.
# (Default: $HIVE_HOME/conf)
@ -94,6 +95,7 @@ You can configure the environment variables in `$KYUUBI_HOME/conf/kyuubi-env.sh`
# export SPARK_HOME=/opt/spark
# export FLINK_HOME=/opt/flink
# export HIVE_HOME=/opt/hive
# export FLINK_HADOOP_CLASSPATH=/path/to/hadoop-client-runtime-3.3.2.jar:/path/to/hadoop-client-api-3.3.2.jar
# export HIVE_HADOOP_CLASSPATH=${HADOOP_HOME}/share/hadoop/common/lib/commons-collections-3.2.2.jar:${HADOOP_HOME}/share/hadoop/client/hadoop-client-runtime-3.1.0.jar:${HADOOP_HOME}/share/hadoop/client/hadoop-client-api-3.1.0.jar:${HADOOP_HOME}/share/hadoop/common/lib/htrace-core4-4.1.0-incubating.jar
# export HADOOP_CONF_DIR=/usr/ndp/current/mapreduce_client/conf
# export YARN_CONF_DIR=/usr/ndp/current/yarn/conf
@ -207,6 +209,9 @@ Key | Default | Meaning | Type | Since
<code>kyuubi.engine.deregister.job.max.failures</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>4</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Number of failures of job before deregistering the engine.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.2.0</div>
<code>kyuubi.engine.event.json.log.path</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>file:///tmp/kyuubi/events</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The location of all the engine events go for the builtin JSON logger.<ul><li>Local Path: start with 'file://'</li><li>HDFS Path: start with 'hdfs://'</li></ul></div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.3.0</div>
<code>kyuubi.engine.event.loggers</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>SPARK</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of engine history loggers, where engine/session/operation etc events go. We use spark logger by default.<ul> <li>SPARK: the events will be written to the spark listener bus.</li> <li>JSON: the events will be written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be done</li> <li>CUSTOM: to be done.</li></ul></div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.3.0</div>
<code>kyuubi.engine.flink.extra.classpath</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The extra classpath for the flink sql engine, for configuring location of hadoop client jars, etc</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.engine.flink.java.options</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The extra java options for the flink sql engine</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.engine.flink.memory</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>1g</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The heap memory for the flink sql engine</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.engine.hive.extra.classpath</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The extra classpath for the hive query engine, for configuring location of hadoop client jars, etc</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.engine.hive.java.options</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The extra java options for the hive query engine</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>
<code>kyuubi.engine.hive.memory</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>1g</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The heap memory for the hive query engine</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.6.0</div>

View File

@ -40,7 +40,7 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster with HiveJ
.set(ENGINE_TYPE, "FLINK_SQL")
.set("flink.parallelism.default", "6")
.set(
s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CLASSPATH",
s"$KYUUBI_ENGINE_ENV_PREFIX.FLINK_HADOOP_CLASSPATH",
s"$hadoopClasspath${File.separator}" +
s"hadoop-client-api-$HADOOP_COMPILE_VERSION.jar${File.pathSeparator}" +
s"$hadoopClasspath${File.separator}hadoop-client-runtime-$HADOOP_COMPILE_VERSION.jar")

View File

@ -1437,6 +1437,28 @@ object KyuubiConf {
.stringConf
.createOptional
val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.flink.memory")
.doc("The heap memory for the flink sql engine")
.version("1.6.0")
.stringConf
.createWithDefault("1g")
val ENGINE_FLINK_JAVA_OPTIONS: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.flink.java.options")
.doc("The extra java options for the flink sql engine")
.version("1.6.0")
.stringConf
.createOptional
val ENGINE_FLINK_EXTRA_CLASSPATH: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.flink.extra.classpath")
.doc("The extra classpath for the flink sql engine, for configuring location" +
" of hadoop client jars, etc")
.version("1.6.0")
.stringConf
.createOptional
val SERVER_LIMIT_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
buildConf("kyuubi.server.limit.connections.per.user")
.doc("Maximum kyuubi server connections per user." +

View File

@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ProcBuilder
import org.apache.kyuubi.operation.log.OperationLog
@ -41,6 +42,8 @@ class FlinkProcessBuilder(
private val flinkHome: String = getEngineHome(shortName)
private val FLINK_HADOOP_CLASSPATH: String = "FLINK_HADOOP_CLASSPATH"
override protected def module: String = "kyuubi-flink-sql-engine"
override protected def mainClass: String = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
@ -49,9 +52,12 @@ class FlinkProcessBuilder(
val buffer = new ArrayBuffer[String]()
buffer += executable
// TODO: add Kyuubi.engineEnv.FLINK_ENGINE_MEMORY or kyuubi.engine.flink.memory to configure
// -Xmx5g
// java options
val memory = conf.get(ENGINE_FLINK_MEMORY)
buffer += s"-Xmx$memory"
val javaOptions = conf.get(ENGINE_FLINK_JAVA_OPTIONS)
if (javaOptions.isDefined) {
buffer += javaOptions.get
}
buffer += "-cp"
val classpathEntries = new LinkedHashSet[String]
@ -77,13 +83,17 @@ class FlinkProcessBuilder(
env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add)
env.get("YARN_CONF_DIR").foreach(classpathEntries.add)
env.get("HBASE_CONF_DIR").foreach(classpathEntries.add)
val hadoopClasspath = env.get("HADOOP_CLASSPATH")
if (hadoopClasspath.isEmpty) {
throw KyuubiSQLException("HADOOP_CLASSPATH is not set! " +
"For more detail information on configuring HADOOP_CLASSPATH" +
"https://kyuubi.apache.org/docs/latest/deployment/settings.html#environments")
val hadoopCp = env.get(FLINK_HADOOP_CLASSPATH)
hadoopCp.foreach(classpathEntries.add)
val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
extraCp.foreach(classpathEntries.add)
if (hadoopCp.isEmpty && extraCp.isEmpty) {
throw new KyuubiException(s"The conf of ${FLINK_HADOOP_CLASSPATH} and " +
s"${ENGINE_FLINK_EXTRA_CLASSPATH.key} is empty." +
s"Please set ${FLINK_HADOOP_CLASSPATH} or ${ENGINE_FLINK_EXTRA_CLASSPATH.key} for " +
s"configuring location of hadoop client jars, etc")
}
classpathEntries.add(hadoopClasspath.get)
buffer += classpathEntries.asScala.mkString(File.pathSeparator)
buffer += mainClass

View File

@ -22,26 +22,33 @@ import java.io.File
import scala.collection.JavaConverters._
import scala.collection.immutable.ListMap
import org.apache.kyuubi.{FLINK_COMPILE_VERSION, KyuubiFunSuite, KyuubiSQLException, SCALA_COMPILE_VERSION}
import org.apache.kyuubi.{FLINK_COMPILE_VERSION, KyuubiException, KyuubiFunSuite, SCALA_COMPILE_VERSION}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_EXTRA_CLASSPATH, ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY}
class FlinkProcessBuilderSuite extends KyuubiFunSuite {
private def conf = KyuubiConf().set("kyuubi.on", "off")
.set(ENGINE_FLINK_MEMORY, "512m")
.set(
ENGINE_FLINK_JAVA_OPTIONS,
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005")
private def envDefault: ListMap[String, String] = ListMap(
"JAVA_HOME" -> s"${File.separator}jdk1.8.0_181")
"JAVA_HOME" -> s"${File.separator}jdk")
private def envWithoutHadoopCLASSPATH: ListMap[String, String] = envDefault +
("HADOOP_CONF_DIR" -> s"${File.separator}hadoop${File.separator}conf") +
("YARN_CONF_DIR" -> s"${File.separator}yarn${File.separator}conf") +
("HBASE_CONF_DIR" -> s"${File.separator}hbase${File.separator}conf")
private def envWithAllHadoop: ListMap[String, String] = envWithoutHadoopCLASSPATH +
("HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
("FLINK_HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
private def confStr: String = {
conf.getAll.map { case (k, v) => s"\\\n\t--conf $k=$v" }.mkString(" ")
}
private def compareActualAndExpected(builder: FlinkProcessBuilder) = {
val actualCommands = builder.toString
val classpathStr: String = constructClasspathStr(builder)
val expectedCommands = s"$javaPath " +
val expectedCommands = s"$javaPath -Xmx512m " +
s"-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 " +
s"-cp $classpathStr $mainClassStr \\\n\t--conf kyuubi.session.user=vinoyang " +
s"$confStr"
info(s"\n\n actualCommands $actualCommands")
@ -71,6 +78,8 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
classpathEntries.add(v)
}
}
val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
extraCp.foreach(classpathEntries.add)
val classpathStr = classpathEntries.asScala.mkString(File.pathSeparator)
classpathStr
}
@ -90,17 +99,17 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
compareActualAndExpected(builder)
}
test("all hadoop related environment variables are configured except HADOOP_CLASSPATH") {
test("all hadoop related environment variables are configured except FLINK_HADOOP_CLASSPATH") {
val builder = new FlinkProcessBuilder("vinoyang", conf) {
override def env: Map[String, String] = envWithoutHadoopCLASSPATH
}
assertThrows[KyuubiSQLException](builder.toString)
assertThrows[KyuubiException](builder.toString)
}
test("only HADOOP_CLASSPATH environment variables are configured") {
test("only FLINK_HADOOP_CLASSPATH environment variables are configured") {
val builder = new FlinkProcessBuilder("vinoyang", conf) {
override def env: Map[String, String] = envDefault +
("HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
("FLINK_HADOOP_CLASSPATH" -> s"${File.separator}hadoop")
}
compareActualAndExpected(builder)
}