[KYUUBI #1629] Flink backend implementation

### _Why are the changes needed?_

This PR covers #1619.

Overall, this PR contains the following changs,

1. change `build/dist` script to support flink sql engine
2. enhance `externals/flink-sql-engine/pom.xml` to support create a shaded jar
3. simplify `externals/kyuubi-flink-sql-engine/bin/flink-sql-engine.sh`
4. introduce `FlinkSQLEngine`(flink sql engine entrypoint) and `FlinkProcessBuilder`(kyuubi server launcher)
5. add ut in kyuubi server side

After this PR, we can run the basic query e.g. `select now()` from beeline and get result, and Kyuubi Server can auto launch flink engine if there is no proper one. The Flink engine also supports other engine share levels defined in Kyuubi.

The implementation based on Flink 1.14 codebase.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1629 from pan3793/flink-backend.

Closes #1629

b7e5f0e3 [Cheng Pan] revert tgz name change
a4496c37 [Cheng Pan] Fix reflection
3b4e86a1 [Cheng Pan] deps
68efa42c [Cheng Pan] log
8a9e37f3 [Cheng Pan] nit
10fb2bc3 [Cheng Pan] CI
c1560fdb [Cheng Pan] nit
303e2f1e [Cheng Pan] Restore log conf
d84720b1 [Cheng Pan] SessionContext
b258d81a [Cheng Pan] cleanup
16edd528 [Cheng Pan] Cleanup
9ae54557 [Cheng Pan] Fix CI
25b6b57d [Cheng Pan] hadoop-client-api
c12b5ca4 [Cheng Pan] Server UT pass
502d3f08 [Cheng Pan] pass
dac4323b [yanghua] Laungh local flink engine container successfully

Lead-authored-by: Cheng Pan <chengpan@apache.org>
Co-authored-by: yanghua <yanghua1127@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
Cheng Pan 2021-12-28 17:20:26 +08:00 committed by Kent Yao
parent e1587eeaf4
commit cb8721fef1
No known key found for this signature in database
GPG Key ID: F7051850A0AF904D
32 changed files with 1245 additions and 234 deletions

View File

@ -110,6 +110,7 @@ jobs:
name: unit-tests-log
path: |
**/target/unit-tests.log
**/kyuubi-flink-sql-engine.log*
**/kyuubi-spark-sql-engine.log*
**/target/scalastyle-output.xml

View File

@ -54,4 +54,5 @@ jobs:
name: unit-tests-log
path: |
**/target/unit-tests.log
**/kyuubi-flink-sql-engine.log*
**/kyuubi-spark-sql-engine.log*

View File

@ -30,7 +30,7 @@ set -e
KYUUBI_HOME="$(cd "`dirname "$0"`/.."; pwd)"
DISTDIR="$KYUUBI_HOME/dist"
MAKE_TGZ=false
# TODO: add FLINK_PROVIDED option
FLINK_PROVIDED=false
SPARK_PROVIDED=false
NAME=none
MVN="$KYUUBI_HOME/build/mvn"
@ -62,6 +62,9 @@ while (( "$#" )); do
--tgz)
MAKE_TGZ=true
;;
--flink-provided)
FLINK_PROVIDED=true
;;
--spark-provided)
SPARK_PROVIDED=true
;;
@ -124,6 +127,11 @@ SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version $@ 2>/dev
| grep -v "WARNING"\
| tail -n 1)
FLINK_VERSION=$("$MVN" help:evaluate -Dexpression=flink.version $@ 2>/dev/null\
| grep -v "INFO"\
| grep -v "WARNING"\
| tail -n 1)
SPARK_VERSION=$("$MVN" help:evaluate -Dexpression=spark.version $@ 2>/dev/null\
| grep -v "INFO"\
| grep -v "WARNING"\
@ -144,7 +152,7 @@ HIVE_VERSION=$("$MVN" help:evaluate -Dexpression=hive.version $@ 2>/dev/null\
| grep -v "WARNING"\
| tail -n 1)
echo "Building Kyuubi package of version $VERSION against Spark version - $SPARK_VERSION"
echo "Building Kyuubi package of version $VERSION against Flink $FLINK_VERSION, Spark $SPARK_VERSION"
SUFFIX="-$NAME"
if [[ "$NAME" == "none" ]]; then
@ -163,7 +171,7 @@ fi
MVN_DIST_OPT="-DskipTests"
if [[ "$SPARK_PROVIDED" == "true" ]]; then
MVN_DIST_OPT="$MVN_DIST_OPT -Pspark-provided"
MVN_DIST_OPT="$MVN_DIST_OPT -Pflink-provided,spark-provided"
fi
BUILD_COMMAND=("$MVN" clean install $MVN_DIST_OPT $@)
@ -178,6 +186,8 @@ rm -rf "$DISTDIR"
mkdir -p "$DISTDIR/pid"
mkdir -p "$DISTDIR/logs"
mkdir -p "$DISTDIR/work"
mkdir -p "$DISTDIR/externals/engines/flink"
mkdir -p "$DISTDIR/externals/engines/flink/lib"
mkdir -p "$DISTDIR/externals/engines/spark"
mkdir -p "$DISTDIR/beeline-jars"
echo "Kyuubi $VERSION $GITREVSTRING built for" > "$DISTDIR/RELEASE"
@ -205,6 +215,12 @@ for jar in $(ls "$DISTDIR/jars/"); do
done
cd -
# Copy flink engines
cp -r "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/bin/" "$DISTDIR/externals/engines/flink/bin/"
chmod a+x "$DISTDIR/externals/engines/flink/bin/flink-sql-engine.sh"
cp -r "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/conf/" "$DISTDIR/externals/engines/flink/conf/"
cp "$KYUUBI_HOME/externals/kyuubi-flink-sql-engine/target/kyuubi-flink-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/flink/lib"
# Copy spark engines
cp "$KYUUBI_HOME/externals/kyuubi-spark-sql-engine/target/kyuubi-spark-sql-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/spark"
@ -225,6 +241,12 @@ for SPARK_EXTENSION_VERSION in ${SPARK_EXTENSION_VERSIONS[@]}; do
fi
done
if [[ "$FLINK_PROVIDED" != "true" ]]; then
# Copy flink binary dist
cp -r "$KYUUBI_HOME/externals/kyuubi-download/target/flink-$FLINK_VERSION/" \
"$DISTDIR/externals/flink-$FLINK_VERSION/"
fi
if [[ "$SPARK_PROVIDED" != "true" ]]; then
# Copy spark binary dist
cp -r "$KYUUBI_HOME/externals/kyuubi-download/target/spark-$SPARK_VERSION-bin-hadoop${SPARK_HADOOP_VERSION}$HIVE_VERSION_SUFFIX/" \

View File

@ -300,6 +300,7 @@ kyuubi\.session\.check<br>\.interval|<div style='width: 65pt;word-wrap: break-wo
kyuubi\.session\.conf<br>\.ignore\.list|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of ignored keys. If the client connection contains any of them, the key and the corresponding value will be removed silently during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.session\.conf<br>\.restrict\.list|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of restricted keys. If the client connection contains any of them, the connection will be rejected explicitly during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.session\.engine<br>\.check\.interval|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The check interval for engine timeout</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.flink\.main\.resource|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The package used to create Flink SQL engine remote job. If it is undefined, Kyuubi will use the default</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
kyuubi\.session\.engine<br>\.idle\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>engine timeout, the engine will self-terminate when it's not accessed for this duration. 0 or negative means not to self-terminate.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.initialize\.timeout|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT3M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Timeout for starting the background engine, e.g. SparkSQLEngine.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.session\.engine<br>\.launch\.async|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When opening kyuubi session, whether to launch backend engine asynchronously. When true, the Kyuubi server will set up the connection with the client without delay as the backend engine will be created asynchronously.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.4.0</div>

View File

@ -0,0 +1,69 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
################################################################################
# Adopted from "flink" bash script
################################################################################
if [[ -z "$FLINK_HOME" || ! -d "$FLINK_HOME" ]]; then
(>&2 echo "Invalid FLINK_HOME: ${FLINK_HOME:-unset}")
exit 1
fi
FLINK_SQL_ENGINE_HOME="$(cd `dirname $0`/..; pwd)"
if [[ "$FLINK_SQL_ENGINE_HOME" == "$KYUUBI_HOME/externals/engines/flink" ]]; then
FLINK_SQL_ENGINE_CONF_DIR="$FLINK_SQL_ENGINE_HOME/conf"
FLINK_SQL_ENGINE_LIB_DIR="$FLINK_SQL_ENGINE_HOME/lib"
FLINK_SQL_ENGINE_LOG_DIR="$KYUUBI_LOG_DIR"
FLINK_SQL_ENGINE_JAR=$(find "$FLINK_SQL_ENGINE_LIB_DIR" -regex ".*/kyuubi-flink-sql-engine_.*\.jar")
FLINK_HADOOP_CLASSPATH="$INTERNAL_HADOOP_CLASSPATHS"
else
echo -e "\nFLINK_SQL_ENGINE_HOME $FLINK_SQL_ENGINE_HOME doesn't match production directory, assuming in development environment..."
FLINK_SQL_ENGINE_CONF_DIR="$FLINK_SQL_ENGINE_HOME/conf"
FLINK_SQL_ENGINE_LIB_DIR="$FLINK_SQL_ENGINE_HOME/target"
FLINK_SQL_ENGINE_LOG_DIR="$FLINK_SQL_ENGINE_HOME/target"
FLINK_SQL_ENGINE_JAR=$(find "$FLINK_SQL_ENGINE_LIB_DIR" -regex '.*/kyuubi-flink-sql-engine_.*\.jar$' | grep -v '\-javadoc.jar$' | grep -v '\-tests.jar$')
_FLINK_SQL_ENGINE_HADOOP_CLIENT_JARS=$(find $FLINK_SQL_ENGINE_LIB_DIR -regex '.*/hadoop-client-.*\.jar$' | tr '\n' ':')
FLINK_HADOOP_CLASSPATH="${_FLINK_SQL_ENGINE_HADOOP_CLIENT_JARS%:}"
fi
# do NOT let config.sh detect FLINK_HOME
_FLINK_HOME_DETERMINED=1 . "$FLINK_HOME/bin/config.sh"
FLINK_IDENT_STRING=${FLINK_IDENT_STRING:-"$USER"}
FLINK_SQL_CLIENT_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-sql-client.*.jar")
CC_CLASSPATH=`constructFlinkClassPath`
FULL_CLASSPATH="$FLINK_SQL_ENGINE_JAR:$FLINK_SQL_CLIENT_JAR:$CC_CLASSPATH:$FLINK_HADOOP_CLASSPATH"
log_file="$FLINK_SQL_ENGINE_LOG_DIR/kyuubi-flink-sql-engine-$FLINK_IDENT_STRING-$HOSTNAME.log"
log_setting=(
-Dlog.file="$log_file"
-Dlog4j.configurationFile=file:"$FLINK_SQL_ENGINE_CONF_DIR/log4j.properties"
-Dlog4j.configuration=file:"$FLINK_SQL_ENGINE_CONF_DIR/log4j.properties"
-Dlogback.configurationFile=file:"$FLINK_SQL_ENGINE_CONF_DIR/logback.xml"
)
if [ -n "$FLINK_SQL_ENGINE_JAR" ]; then
exec $JAVA_RUN ${FLINK_SQL_ENGINE_DYNAMIC_ARGS} "${log_setting[@]}" -cp ${FULL_CLASSPATH} \
org.apache.kyuubi.engine.flink.FlinkSQLEngine "$@"
else
(>&2 echo "[ERROR] Flink SQL Engine JAR file 'kyuubi-flink-sql-engine*.jar' should be located in $FLINK_SQL_ENGINE_LIB_DIR.")
exit 1
fi

View File

@ -0,0 +1,43 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# This affects logging for both kyuubi-flink-sql-engine and Flink
log4j.rootLogger=INFO, CA
#Console Appender
log4j.appender.CA=org.apache.log4j.ConsoleAppender
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
log4j.appender.CA.Threshold = FATAL
# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
#File Appender
log4j.appender.FA=org.apache.log4j.FileAppender
log4j.appender.FA.append=false
log4j.appender.FA.file=target/unit-tests.log
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n
# Set the logger level of File Appender to DEBUG
log4j.appender.FA.Threshold = DEBUG

View File

@ -0,0 +1,32 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<appender name="file" class="ch.qos.logback.core.FileAppender">
<file>${log.file}</file>
<append>false</append>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>
<!-- # This affects logging for both kyuubi-flink-sql-engine and Flink -->
<root level="INFO">
<appender-ref ref="file"/>
</root>
</configuration>

View File

@ -147,4 +147,116 @@
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>org.apache.kyuubi:kyuubi-common_${scala.binary.version}</include>
<include>org.apache.kyuubi:kyuubi-ha_${scala.binary.version}</include>
<include>com.fasterxml.jackson.core:*</include>
<include>com.fasterxml.jackson.module:*</include>
<include>com.google.guava:failureaccess</include>
<include>com.google.guava:guava</include>
<include>commons-codec:commons-codec</include>
<include>io.netty:netty-all</include>
<include>org.apache.commons:commons-lang3</include>
<include>org.apache.curator:curator-client</include>
<include>org.apache.curator:curator-framework</include>
<include>org.apache.curator:curator-recipes</include>\
<include>org.apache.hive:hive-service-rpc</include>
<include>org.apache.thrift:*</include>
<include>org.apache.zookeeper:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>${kyuubi.shade.packageName}.com.fasterxml.jackson</shadedPattern>
<includes>
<include>com.fasterxml.jackson.**</include>
</includes>
</relocation>
<relocation>
<pattern>org.apache.curator</pattern>
<shadedPattern>${kyuubi.shade.packageName}.org.apache.curator</shadedPattern>
<includes>
<include>org.apache.curator.**</include>
</includes>
</relocation>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>${kyuubi.shade.packageName}.com.google.common</shadedPattern>
<includes>
<include>com.google.common.**</include>
</includes>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>${kyuubi.shade.packageName}.org.apache.commons</shadedPattern>
<includes>
<include>org.apache.commons.**</include>
</includes>
</relocation>
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>${kyuubi.shade.packageName}.io.netty</shadedPattern>
<includes>
<include>io.netty.**</include>
</includes>
</relocation>
<relocation>
<pattern>org.apache.hive.service.rpc.thrift</pattern>
<shadedPattern>${kyuubi.shade.packageName}.org.apache.hive.service.rpc.thrift</shadedPattern>
<includes>
<include>org.apache.hive.service.rpc.thrift.**</include>
</includes>
</relocation>
<relocation>
<pattern>org.apache.thrift</pattern>
<shadedPattern>${kyuubi.shade.packageName}.org.apache.thrift</shadedPattern>
<includes>
<include>org.apache.thrift.**</include>
</includes>
</relocation>
<relocation>
<pattern>org.apache.zookeeper</pattern>
<shadedPattern>${kyuubi.shade.packageName}.org.apache.zookeeper</shadedPattern>
<includes>
<include>org.apache.zookeeper.**</include>
</includes>
</relocation>
</relocations>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>prepare-test-jar</id>
<phase>test-compile</phase>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -21,5 +21,40 @@ package org.apache.kyuubi.engine.flink.result;
/** Constant column names. */
public class Constants {
// for statement execution
public static final String JOB_ID = "job_id";
// for results with SUCCESS result kind
public static final String RESULT = "result";
public static final String OK = "OK";
public static final String SHOW_MODULES_RESULT = "modules";
public static final String SHOW_CURRENT_CATALOG_RESULT = "catalog";
public static final String SHOW_CATALOGS_RESULT = "catalogs";
public static final String SHOW_CURRENT_DATABASE_RESULT = "database";
public static final String SHOW_DATABASES_RESULT = "databases";
public static final String SHOW_FUNCTIONS_RESULT = "functions";
public static final String EXPLAIN_RESULT = "explanation";
public static final String DESCRIBE_NAME = "name";
public static final String DESCRIBE_TYPE = "type";
public static final String DESCRIBE_NULL = "null";
public static final String DESCRIBE_KEY = "key";
public static final String DESCRIBE_COMPUTED_COLUMN = "computed_column";
public static final String DESCRIBE_WATERMARK = "watermark";
public static final String SHOW_TABLES_RESULT = "tables";
public static final String SHOW_VIEWS_RESULT = "views";
public static final String SET_KEY = "key";
public static final String SET_VALUE = "value";
public static final String[] SUPPORTED_TABLE_TYPES = new String[] {"TABLE", "VIEW"};
}

View File

@ -20,6 +20,9 @@ package org.apache.kyuubi.engine.flink.result;
/** ResultKind defines the types of the result. */
public enum ResultKind {
// for DDL, DCL and statements with a simple "OK"
SUCCESS,
// rows with important content are available (DML, DQL)
SUCCESS_WITH_CONTENT
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink
import org.apache.flink.runtime.util.EnvironmentInformation
import org.apache.flink.table.client.SqlClientException
import org.apache.flink.table.client.cli.{CliOptions, CliOptionsParser}
import org.apache.flink.table.client.gateway.context.SessionContext
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.kyuubi.Logging
object FlinkEngineUtils extends Logging {
val MODE_EMBEDDED = "embedded"
def checkFlinkVersion(): Unit = {
val flinkVersion = EnvironmentInformation.getVersion
if (!flinkVersion.startsWith("1.14")) {
throw new RuntimeException("Only Flink-1.14.x is supported now!")
}
}
def parseCliOptions(args: Array[String]): CliOptions = {
val (mode, modeArgs) =
if (args.isEmpty || args(0).startsWith("-")) (MODE_EMBEDDED, args)
else (args(0), args.drop(1))
// TODO remove requirement of flink-python
val options = CliOptionsParser.parseEmbeddedModeClient(modeArgs)
mode match {
case MODE_EMBEDDED if options.isPrintHelp => CliOptionsParser.printHelpEmbeddedModeClient()
case MODE_EMBEDDED =>
case _ => throw new SqlClientException("Other mode is not supported yet.")
}
options
}
def getSessionContext(localExecutor: LocalExecutor, sessionId: String): SessionContext = {
val method = classOf[LocalExecutor].getDeclaredMethod("getSessionContext", classOf[String])
method.setAccessible(true)
method.invoke(localExecutor, sessionId).asInstanceOf[SessionContext]
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink
import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.kyuubi.engine.flink.session.FlinkSQLSessionManager
import org.apache.kyuubi.service.AbstractBackendService
import org.apache.kyuubi.session.SessionManager
class FlinkSQLBackendService(engineContext: DefaultContext)
extends AbstractBackendService("FlinkSQLBackendService") {
override val sessionManager: SessionManager = new FlinkSQLSessionManager(engineContext)
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink
import java.util.concurrent.CountDownLatch
import scala.collection.JavaConverters._
import org.apache.flink.client.cli.{CliFrontend, CustomCommandLine, DefaultCLI}
import org.apache.flink.configuration.GlobalConfiguration
import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.kyuubi.Logging
import org.apache.kyuubi.Utils.{addShutdownHook, FLINK_ENGINE_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.SignalRegister
case class FlinkSQLEngine(engineContext: DefaultContext) extends Serverable("FlinkSQLEngine") {
override val backendService = new FlinkSQLBackendService(engineContext)
override val frontendServices = Seq(new FlinkThriftBinaryFrontendService(this))
override def initialize(conf: KyuubiConf): Unit = super.initialize(conf)
override protected def stopServer(): Unit = {
countDownLatch.countDown()
}
override def start(): Unit = {
super.start()
backendService.sessionManager.startTerminatingChecker { () =>
assert(currentEngine.isDefined)
currentEngine.get.stop()
}
}
override def stop(): Unit = {
super.stop()
}
}
object FlinkSQLEngine extends Logging {
val kyuubiConf: KyuubiConf = KyuubiConf()
var currentEngine: Option[FlinkSQLEngine] = None
private val countDownLatch = new CountDownLatch(1)
def main(args: Array[String]): Unit = {
SignalRegister.registerLogger(logger)
FlinkEngineUtils.checkFlinkVersion()
try {
val flinkConfDir = CliFrontend.getConfigurationDirectoryFromEnv
val flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir)
val engineContext = new DefaultContext(
List.empty.asJava,
flinkConf,
List[CustomCommandLine](new DefaultCLI).asJava)
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
startEngine(engineContext)
info("started engine...")
// blocking main thread
countDownLatch.await()
} catch {
case t: Throwable if currentEngine.isDefined =>
currentEngine.foreach { engine =>
error(t)
engine.stop()
}
case t: Throwable =>
error("Create FlinkSQL Engine Failed", t)
}
}
def startEngine(engineContext: DefaultContext): Unit = {
currentEngine = Some(new FlinkSQLEngine(engineContext))
currentEngine.foreach { engine =>
engine.initialize(kyuubiConf)
engine.start()
addShutdownHook(() => engine.stop(), FLINK_ENGINE_SHUTDOWN_PRIORITY + 1)
}
}
}

View File

@ -54,9 +54,7 @@ class ExecuteStatement(
override def getOperationLog: Option[OperationLog] = Option(operationLog)
@VisibleForTesting
override def setExecutor(executor: Executor): Unit = {
this.executor = executor
}
override def setExecutor(executor: Executor): Unit = super.setExecutor(executor)
def setSessionId(sessionId: String): Unit = {
this.sessionId = sessionId

View File

@ -42,17 +42,15 @@ abstract class FlinkOperation(
extends AbstractOperation(opType, session) {
protected val sessionContext: SessionContext = {
session.asInstanceOf[FlinkSessionImpl].getSessionContext
session.asInstanceOf[FlinkSessionImpl].sessionContext
}
protected var executor: Executor = _
protected var executor: Executor = session.asInstanceOf[FlinkSessionImpl].executor
protected def setExecutor(executor: Executor): Unit = {
this.executor = session.asInstanceOf[FlinkSessionImpl].getExecutor
}
protected def setExecutor(executor: Executor): Unit = this.executor = executor
protected var sessionId: String = {
session.asInstanceOf[FlinkSessionImpl].getSessionId
session.asInstanceOf[FlinkSessionImpl].sessionId
}
protected var resultSet: ResultSet = _

View File

@ -17,11 +17,11 @@
package org.apache.kyuubi.engine.flink.schema
import java.{lang, util}
import java.nio.ByteBuffer
import java.util
import java.util.Collections
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, seqAsJavaListConverter}
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.flink.table.types.logical._
@ -71,60 +71,61 @@ object RowSet {
val logicalType = resultSet.getColumns.get(ordinal).getLogicalType
if (logicalType.isInstanceOf[BooleanType]) {
val boolValue = new TBoolValue
if (row.getField(ordinal) != null) {
boolValue.setValue(row.getField(ordinal).asInstanceOf[Boolean])
}
TColumnValue.boolVal(boolValue)
} else if (logicalType.isInstanceOf[TinyIntType]) {
val tI16Value = new TI16Value
if (row.getField(ordinal) != null) {
tI16Value.setValue(row.getField(ordinal).asInstanceOf[Short])
}
TColumnValue.i16Val(tI16Value)
} else if (logicalType.isInstanceOf[IntType]) {
val tI32Value = new TI32Value
if (row.getField(ordinal) != null) {
tI32Value.setValue(row.getField(ordinal).asInstanceOf[Short])
}
TColumnValue.i32Val(tI32Value)
} else if (logicalType.isInstanceOf[BigIntType]) {
val tI64Value = new TI64Value
if (row.getField(ordinal) != null) {
tI64Value.setValue(row.getField(ordinal).asInstanceOf[Long])
}
TColumnValue.i64Val(tI64Value)
} else if (logicalType.isInstanceOf[FloatType]) {
val tDoubleValue = new TDoubleValue
if (row.getField(ordinal) != null) {
tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Float])
}
TColumnValue.doubleVal(tDoubleValue)
} else if (logicalType.isInstanceOf[DoubleType]) {
val tDoubleValue = new TDoubleValue
if (row.getField(ordinal) != null) {
tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Double])
}
TColumnValue.doubleVal(tDoubleValue)
} else if (logicalType.isInstanceOf[VarCharType]) {
val tStringValue = new TStringValue
if (row.getField(ordinal) != null) {
tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
}
TColumnValue.stringVal(tStringValue)
} else if (logicalType.isInstanceOf[CharType]) {
val tStringValue = new TStringValue
if (row.getField(ordinal) != null) {
tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
}
TColumnValue.stringVal(tStringValue)
} else {
val tStrValue = new TStringValue
if (row.getField(ordinal) != null) {
// TODO to be done
}
TColumnValue.stringVal(tStrValue)
logicalType match {
case _: BooleanType =>
val boolValue = new TBoolValue
if (row.getField(ordinal) != null) {
boolValue.setValue(row.getField(ordinal).asInstanceOf[Boolean])
}
TColumnValue.boolVal(boolValue)
case _: TinyIntType =>
val tI16Value = new TI16Value
if (row.getField(ordinal) != null) {
tI16Value.setValue(row.getField(ordinal).asInstanceOf[Short])
}
TColumnValue.i16Val(tI16Value)
case _: IntType =>
val tI32Value = new TI32Value
if (row.getField(ordinal) != null) {
tI32Value.setValue(row.getField(ordinal).asInstanceOf[Short])
}
TColumnValue.i32Val(tI32Value)
case _: BigIntType =>
val tI64Value = new TI64Value
if (row.getField(ordinal) != null) {
tI64Value.setValue(row.getField(ordinal).asInstanceOf[Long])
}
TColumnValue.i64Val(tI64Value)
case _: FloatType =>
val tDoubleValue = new TDoubleValue
if (row.getField(ordinal) != null) {
tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Float])
}
TColumnValue.doubleVal(tDoubleValue)
case _: DoubleType =>
val tDoubleValue = new TDoubleValue
if (row.getField(ordinal) != null) {
tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Double])
}
TColumnValue.doubleVal(tDoubleValue)
case _: VarCharType =>
val tStringValue = new TStringValue
if (row.getField(ordinal) != null) {
tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
}
TColumnValue.stringVal(tStringValue)
case _: CharType =>
val tStringValue = new TStringValue
if (row.getField(ordinal) != null) {
tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
}
TColumnValue.stringVal(tStringValue)
case _ =>
val tStrValue = new TStringValue
if (row.getField(ordinal) != null) {
// TODO to be done
}
TColumnValue.stringVal(tStrValue)
}
}
@ -132,41 +133,23 @@ object RowSet {
ByteBuffer.wrap(bitSet.toByteArray)
}
private def toTColumn(
rows: Seq[Row],
ordinal: Int,
logicalType: LogicalType): TColumn = {
private def toTColumn(rows: Seq[Row], ordinal: Int, logicalType: LogicalType): TColumn = {
val nulls = new java.util.BitSet()
if (logicalType.isInstanceOf[BooleanType]) {
val values = getOrSetAsNull[java.lang.Boolean](
rows,
ordinal,
nulls,
true)
TColumn.boolVal(new TBoolColumn(values, nulls))
} else if (logicalType.isInstanceOf[TinyIntType]) {
val values = getOrSetAsNull[java.lang.Short](
rows,
ordinal,
nulls,
0.toShort)
TColumn.i16Val(new TI16Column(values, nulls))
} else if (logicalType.isInstanceOf[VarCharType]) {
val values = getOrSetAsNull[java.lang.String](
rows,
ordinal,
nulls,
"")
TColumn.stringVal(new TStringColumn(values, nulls))
} else if (logicalType.isInstanceOf[CharType]) {
val values = getOrSetAsNull[java.lang.String](
rows,
ordinal,
nulls,
"")
TColumn.stringVal(new TStringColumn(values, nulls))
} else {
null
logicalType match {
case _: BooleanType =>
val values = getOrSetAsNull[lang.Boolean](rows, ordinal, nulls, true)
TColumn.boolVal(new TBoolColumn(values, nulls))
case _: TinyIntType =>
val values = getOrSetAsNull[lang.Short](rows, ordinal, nulls, 0.toShort)
TColumn.i16Val(new TI16Column(values, nulls))
case _: VarCharType =>
val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
TColumn.stringVal(new TStringColumn(values, nulls))
case _: CharType =>
val values = getOrSetAsNull[String](rows, ordinal, nulls, "")
TColumn.stringVal(new TStringColumn(values, nulls))
case _ =>
null
}
}
@ -222,21 +205,13 @@ object RowSet {
ret
}
def toTTypeId(typ: LogicalType): TTypeId =
if (typ.isInstanceOf[NullType]) {
TTypeId.NULL_TYPE
} else if (typ.isInstanceOf[BooleanType]) {
TTypeId.BOOLEAN_TYPE
} else if (typ.isInstanceOf[FloatType]) {
TTypeId.FLOAT_TYPE
} else if (typ.isInstanceOf[DoubleType]) {
TTypeId.DOUBLE_TYPE
} else if (typ.isInstanceOf[VarCharType]) {
TTypeId.STRING_TYPE
} else if (typ.isInstanceOf[CharType]) {
TTypeId.STRING_TYPE
} else {
null
}
def toTTypeId(typ: LogicalType): TTypeId = typ match {
case _: NullType => TTypeId.NULL_TYPE
case _: BooleanType => TTypeId.BOOLEAN_TYPE
case _: FloatType => TTypeId.FLOAT_TYPE
case _: DoubleType => TTypeId.DOUBLE_TYPE
case _: VarCharType => TTypeId.STRING_TYPE
case _: CharType => TTypeId.STRING_TYPE
case _ => null
}
}

View File

@ -17,11 +17,11 @@
package org.apache.kyuubi.engine.flink.session
import org.apache.flink.table.client.gateway.Executor
import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
import org.apache.kyuubi.session.{SessionHandle, SessionManager}
@ -31,7 +31,7 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
override protected def isServer: Boolean = false
val operationManager = new FlinkSQLOperationManager()
val executor: Executor = new LocalExecutor(engineContext)
val executor = new LocalExecutor(engineContext)
override def start(): Unit = {
super.start()
@ -44,8 +44,25 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
password: String,
ipAddress: String,
conf: Map[String, String]): SessionHandle = {
executor.openSession("")
null
val sessionHandle = SessionHandle(protocol)
val sessionId = sessionHandle.identifier.toString
executor.openSession(sessionId)
val sessionContext = FlinkEngineUtils.getSessionContext(executor, sessionId)
val sessionImpl = new FlinkSessionImpl(
protocol,
user,
password,
ipAddress,
conf,
this,
sessionHandle,
sessionContext)
setSession(sessionHandle, sessionImpl)
sessionHandle
}
override def closeSession(sessionHandle: SessionHandle): Unit = {

View File

@ -30,15 +30,11 @@ class FlinkSessionImpl(
ipAddress: String,
conf: Map[String, String],
sessionManager: SessionManager,
sessionContext: SessionContext)
val handle: SessionHandle,
val sessionContext: SessionContext)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
override val handle: SessionHandle = SessionHandle(protocol)
def getSessionContext: SessionContext = sessionContext
def getExecutor: Executor = sessionManager.asInstanceOf[FlinkSQLSessionManager].executor
def getSessionId: String = handle.toString
def executor: Executor = sessionManager.asInstanceOf[FlinkSQLSessionManager].executor
def sessionId: String = handle.identifier.toString
}

View File

@ -0,0 +1,40 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file target/unit-tests.log
log4j.rootLogger=DEBUG, CA, FA
# Console Appender
log4j.appender.CA=org.apache.log4j.ConsoleAppender
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
log4j.appender.CA.Threshold = FATAL
# File Appender
log4j.appender.FA=org.apache.log4j.FileAppender
log4j.appender.FA.append=false
log4j.appender.FA.file=target/unit-tests.log
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n
# Set the logger level of File Appender to WARN
log4j.appender.FA.Threshold = DEBUG
# SPARK-34128Suppress undesirable TTransportException warnings involved in THRIFT-4805
log4j.appender.console.filter.1=org.apache.log4j.varia.StringMatchFilter
log4j.appender.console.filter.1.StringToMatch=Thrift error occurred during processing of message
log4j.appender.console.filter.1.AcceptOnMatch=false

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink
import scala.collection.JavaConverters._
import org.apache.flink.client.cli.{CustomCommandLine, DefaultCLI}
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
trait WithFlinkSQLEngine extends KyuubiFunSuite {
protected val flinkConfig = new Configuration()
protected var miniCluster: MiniCluster = _
protected var engine: FlinkSQLEngine = _
// conf will be loaded until start flink engine
def withKyuubiConf: Map[String, String]
val kyuubiConf: KyuubiConf = FlinkSQLEngine.kyuubiConf
protected var connectionUrl: String = _
override def beforeAll(): Unit = {
startMiniCluster()
startFlinkEngine()
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
stopFlinkEngine()
miniCluster.close()
}
def startFlinkEngine(): Unit = {
withKyuubiConf.foreach { case (k, v) =>
System.setProperty(k, v)
kyuubiConf.set(k, v)
}
val engineContext = new DefaultContext(
List.empty.asJava,
flinkConfig,
List[CustomCommandLine](new DefaultCLI).asJava)
FlinkSQLEngine.startEngine(engineContext)
engine = FlinkSQLEngine.currentEngine.get
connectionUrl = engine.frontendServices.head.connectionUrl
}
def stopFlinkEngine(): Unit = {
if (engine != null) {
engine.stop()
engine = null
}
}
private def startMiniCluster(): Unit = {
val cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(flinkConfig)
.setNumSlotsPerTaskManager(1)
.build
miniCluster = new MiniCluster(cfg)
miniCluster.start()
flinkConfig.setString(RestOptions.ADDRESS, miniCluster.getRestAddress.get().getHost)
flinkConfig.setInteger(RestOptions.PORT, miniCluster.getRestAddress.get().getPort)
}
protected def getJdbcUrl: String = s"jdbc:hive2://$connectionUrl/;"
}

View File

@ -17,113 +17,46 @@
package org.apache.kyuubi.engine.flink.operation
import java.net.URL
import java.util
import java.util.Collections
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
import org.apache.flink.client.cli.DefaultCLI
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{ConfigConstants, Configuration, MemorySize, TaskManagerOptions, WebOptions}
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.table.client.gateway.context.{DefaultContext, SessionContext}
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.service.ServiceState._
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.flink.session.{FlinkSessionImpl, FlinkSQLSessionManager}
import org.apache.kyuubi.operation.FetchOrientation
class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
override def withKyuubiConf: Map[String, String] = Map()
class FlinkOperationSuite extends KyuubiFunSuite {
override protected def jdbcUrl: String =
s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/"
val user: String = Utils.currentUser
val password = "anonymous"
val NUM_TMS = 2
val NUM_SLOTS_PER_TM = 2
private def getConfig = {
val config = new Configuration
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"))
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS)
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM)
config.setBoolean(WebOptions.SUBMIT_ENABLE, false)
config
}
val MINI_CLUSTER_RESOURCE =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig)
.setNumberTaskManagers(NUM_TMS)
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM).build)
var clusterClient: ClusterClient[_] = _
var engineContext = new DefaultContext(
Collections.emptyList(),
new Configuration,
Collections.singletonList(new DefaultCLI))
var sessionContext: SessionContext = _
var flinkSession: FlinkSessionImpl = _
private def createLocalExecutor: LocalExecutor =
createLocalExecutor(Collections.emptyList[URL], new Configuration)
private def createLocalExecutor(
dependencies: util.List[URL],
configuration: Configuration): LocalExecutor = {
configuration.addAll(clusterClient.getFlinkConfiguration)
val defaultContext: DefaultContext = new DefaultContext(
dependencies,
configuration,
Collections.singletonList(new DefaultCLI))
new LocalExecutor(defaultContext)
}
override def beforeAll(): Unit = {
MINI_CLUSTER_RESOURCE.before()
clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient
sessionContext = SessionContext.create(engineContext, "test-session-id");
val flinkSQLSessionManager = new FlinkSQLSessionManager(engineContext)
flinkSQLSessionManager.initialize(KyuubiConf())
flinkSession = new FlinkSessionImpl(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
user,
password,
"localhost",
Map(),
flinkSQLSessionManager,
sessionContext)
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
ignore("release session if shared level is CONNECTION") {
logger.info(s"jdbc url is $jdbcUrl")
assert(engine.getServiceState == STARTED)
withJdbcStatement() { _ => }
eventually(Timeout(20.seconds)) {
assert(engine.getServiceState == STOPPED)
}
}
test("get catalogs for flink sql") {
val getCatalogOperation = new GetCatalogs(flinkSession)
getCatalogOperation.run()
val resultSet = getCatalogOperation.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
assert(1 == resultSet.getRowsSize)
assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "default_catalog")
withJdbcStatement() { statement =>
val meta = statement.getConnection.getMetaData
val catalogs = meta.getCatalogs
val expected = Set("default_catalog").toIterator
while (catalogs.next()) {
assert(catalogs.getString("catalogs") === expected.next())
}
assert(!expected.hasNext)
assert(!catalogs.next())
}
}
test("execute statement - select column name with dots") {
val executeStatementOp = new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1)
val executor = createLocalExecutor
executor.openSession("test-session")
executeStatementOp.setExecutor(executor)
executeStatementOp.setSessionId("test-session")
executeStatementOp.run()
val resultSet = executeStatementOp.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
assert(1 == resultSet.getRowsSize)
assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "tmp.hello")
test("execute statement - select column name with dots") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select 'tmp.hello'")
assert(resultSet.next())
assert(resultSet.getString(1) === "tmp.hello")
}
}
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink.operation
import java.net.URL
import java.util
import java.util.Collections
import org.apache.flink.client.cli.DefaultCLI
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{ConfigConstants, Configuration, MemorySize, TaskManagerOptions, WebOptions}
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.table.client.gateway.context.{DefaultContext, SessionContext}
import org.apache.flink.table.client.gateway.local.LocalExecutor
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.flink.session.{FlinkSessionImpl, FlinkSQLSessionManager}
import org.apache.kyuubi.operation.FetchOrientation
import org.apache.kyuubi.session.SessionHandle
class LegacyFlinkOperationSuite extends KyuubiFunSuite {
val user: String = Utils.currentUser
val password = "anonymous"
val NUM_TMS = 2
val NUM_SLOTS_PER_TM = 2
private def getConfig = {
val config = new Configuration
config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"))
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS)
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM)
config.setBoolean(WebOptions.SUBMIT_ENABLE, false)
config
}
val MINI_CLUSTER_RESOURCE =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig)
.setNumberTaskManagers(NUM_TMS)
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM).build)
var clusterClient: ClusterClient[_] = _
var engineContext = new DefaultContext(
Collections.emptyList(),
new Configuration,
Collections.singletonList(new DefaultCLI))
var sessionContext: SessionContext = _
var flinkSession: FlinkSessionImpl = _
private def createLocalExecutor: LocalExecutor =
createLocalExecutor(Collections.emptyList[URL], new Configuration)
private def createLocalExecutor(
dependencies: util.List[URL],
configuration: Configuration): LocalExecutor = {
configuration.addAll(clusterClient.getFlinkConfiguration)
val defaultContext: DefaultContext = new DefaultContext(
dependencies,
configuration,
Collections.singletonList(new DefaultCLI))
new LocalExecutor(defaultContext)
}
override def beforeAll(): Unit = {
MINI_CLUSTER_RESOURCE.before()
clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient
sessionContext = SessionContext.create(engineContext, "test-session-id");
val flinkSQLSessionManager = new FlinkSQLSessionManager(engineContext)
flinkSQLSessionManager.initialize(KyuubiConf())
flinkSession = new FlinkSessionImpl(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
user,
password,
"localhost",
Map(),
flinkSQLSessionManager,
SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6),
sessionContext)
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
}
test("get catalogs for flink sql") {
val getCatalogOperation = new GetCatalogs(flinkSession)
getCatalogOperation.run()
val resultSet = getCatalogOperation.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
assert(1 == resultSet.getRowsSize)
assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "default_catalog")
}
test("execute statement - select column name with dots") {
val executeStatementOp = new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1)
val executor = createLocalExecutor
executor.openSession("test-session")
executeStatementOp.setExecutor(executor)
executeStatementOp.setSessionId("test-session")
executeStatementOp.run()
val resultSet = executeStatementOp.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
assert(1 == resultSet.getRowsSize)
assert(resultSet.getRows.get(0).getColVals.get(0).getStringVal.getValue === "tmp.hello")
}
}

View File

@ -201,6 +201,7 @@ object Utils extends Logging {
// The value follows org.apache.spark.util.ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY
// Hooks need to be invoked before the SparkContext stopped shall use a higher priority.
val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
val FLINK_ENGINE_SHUTDOWN_PRIORITY = 50
/**
* Add some operations that you want into ShutdownHook

View File

@ -577,6 +577,14 @@ object KyuubiConf {
.stringConf
.createOptional
val ENGINE_FLINK_MAIN_RESOURCE: OptionalConfigEntry[String] =
buildConf("session.engine.flink.main.resource")
.doc("The package used to create Flink SQL engine remote job. If it is undefined," +
" Kyuubi will use the default")
.version("1.4.0")
.stringConf
.createOptional
val ENGINE_LOGIN_TIMEOUT: ConfigEntry[Long] = buildConf("session.engine.login.timeout")
.doc("The timeout of creating the connection to remote sql query engine")
.version("1.0.0")

View File

@ -105,6 +105,7 @@ abstract class ThriftBinaryFrontendService(name: String)
s" [$minThreads, $maxThreads] worker threads")
} catch {
case e: Throwable =>
error(e)
throw new KyuubiException(
s"Failed to initialize frontend service on $serverAddr:$portNum.",
e)

View File

@ -267,6 +267,13 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-flink-sql-engine_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-spark-sql-engine_${scala.binary.version}</artifactId>

View File

@ -32,8 +32,9 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.EngineType.{EngineType, SPARK_SQL}
import org.apache.kyuubi.engine.EngineType.{EngineType, FLINK_SQL, SPARK_SQL}
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, SERVER, ShareLevel}
import org.apache.kyuubi.engine.flink.FlinkEngineProcessBuilder
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_REF_ID
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_NAMESPACE
@ -184,8 +185,17 @@ private[kyuubi] class EngineRef(
SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
new SparkProcessBuilder(appUser, conf, extraEngineLog)
case _ => throw new UnsupportedOperationException(s"Unsupported engine type: ${engineType}")
case FLINK_SQL =>
conf.setIfMissing(FlinkEngineProcessBuilder.APP_KEY, defaultEngineName)
// tag is a seq type with comma-separated
conf.set(
FlinkEngineProcessBuilder.TAG_KEY,
conf.getOption(FlinkEngineProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
conf.set(HA_ZK_NAMESPACE, engineSpace)
conf.set(HA_ZK_ENGINE_REF_ID, engineRefId)
new FlinkEngineProcessBuilder(appUser, conf, extraEngineLog)
}
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
try {
info(s"Launching engine:\n$builder")

View File

@ -53,6 +53,8 @@ trait ProcBuilder {
protected def env: Map[String, String] = conf.getEnvs
protected def childProcEnv: Map[String, String] = env
protected val extraEngineLog: Option[OperationLog]
protected val workingDir: Path
@ -61,7 +63,7 @@ trait ProcBuilder {
val pb = new ProcessBuilder(commands: _*)
val envs = pb.environment()
envs.putAll(env.asJava)
envs.putAll(childProcEnv.asJava)
pb.directory(workingDir.toFile)
pb.redirectError(engineLog)
pb.redirectOutput(engineLog)
@ -169,7 +171,7 @@ trait ProcBuilder {
case Some(kyuubiHome) =>
val pb = new ProcessBuilder("/bin/sh", s"$kyuubiHome/bin/stop-application.sh", appId)
pb.environment()
.putAll(env.asJava)
.putAll(childProcEnv.asJava)
pb.redirectError(Redirect.appendTo(engineLog))
pb.redirectOutput(Redirect.appendTo(engineLog))
val process = pb.start()

View File

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink
import java.io.{File, FilenameFilter}
import java.net.URI
import java.nio.file.{Files, Path, Paths}
import com.google.common.annotations.VisibleForTesting
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_FLINK_MAIN_RESOURCE
import org.apache.kyuubi.engine.ProcBuilder
import org.apache.kyuubi.engine.flink.FlinkEngineProcessBuilder.FLINK_ENGINE_BINARY_FILE
import org.apache.kyuubi.operation.log.OperationLog
/**
* A builder to build flink sql engine progress.
*/
class FlinkEngineProcessBuilder(
override val proxyUser: String,
override val conf: KyuubiConf,
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {
override protected def executable: String = {
val flinkEngineHomeOpt = env.get("FLINK_ENGINE_HOME").orElse {
val cwd = getClass.getProtectionDomain.getCodeSource.getLocation.getPath
.split("kyuubi-server")
assert(cwd.length > 1)
Option(
Paths.get(cwd.head)
.resolve("externals")
.resolve("kyuubi-flink-sql-engine")
.toFile)
.map(_.getAbsolutePath)
}
flinkEngineHomeOpt.map { dir =>
Paths.get(dir, "bin", FLINK_ENGINE_BINARY_FILE).toAbsolutePath.toFile.getCanonicalPath
} getOrElse {
throw KyuubiSQLException("FLINK_ENGINE_HOME is not set! " +
"For more detail information on installing and configuring Flink, please visit " +
"https://kyuubi.apache.org/docs/stable/deployment/settings.html#environments")
}
}
override protected def mainResource: Option[String] = {
val jarName = s"${module}_$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"
// 1. get the main resource jar for user specified config first
conf.get(ENGINE_FLINK_MAIN_RESOURCE).filter { userSpecified =>
// skip check exist if not local file.
val uri = new URI(userSpecified)
val schema = if (uri.getScheme != null) uri.getScheme else "file"
schema match {
case "file" => Files.exists(Paths.get(userSpecified))
case _ => true
}
}.orElse {
// 2. get the main resource jar from system build default
env.get(KyuubiConf.KYUUBI_HOME)
.map { Paths.get(_, "externals", "engines", "flink", jarName) }
.filter(Files.exists(_)).map(_.toAbsolutePath.toFile.getCanonicalPath)
}.orElse {
// 3. get the main resource from dev environment
Option(Paths.get("externals", module, "target", jarName))
.filter(Files.exists(_)).orElse {
Some(Paths.get("..", "externals", module, "target", jarName))
}.map(_.toAbsolutePath.toFile.getCanonicalPath)
}
}
override protected def module: String = "kyuubi-flink-sql-engine"
override protected def mainClass: String = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
override protected def childProcEnv: Map[String, String] = conf.getEnvs +
("FLINK_HOME" -> FLINK_HOME) +
("FLINK_CONF_DIR" -> s"$FLINK_HOME/conf") +
("FLINK_SQL_ENGINE_JAR" -> mainResource.get) +
("FLINK_SQL_ENGINE_DYNAMIC_ARGS" ->
conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" "))
override protected def commands: Array[String] = Array(executable)
override protected val workingDir: Path = {
env.get("KYUUBI_WORK_DIR_ROOT").map { root =>
val workingRoot = Paths.get(root).toAbsolutePath
if (!Files.exists(workingRoot)) {
debug(s"Creating KYUUBI_WORK_DIR_ROOT at $workingRoot")
Files.createDirectories(workingRoot)
}
if (Files.isDirectory(workingRoot)) {
workingRoot.toString
} else null
}.map { rootAbs =>
val working = Paths.get(rootAbs, proxyUser)
if (!Files.exists(working)) {
debug(s"Creating $proxyUser's working directory at $working")
Files.createDirectories(working)
}
if (Files.isDirectory(working)) {
working
} else {
Utils.createTempDir(rootAbs, proxyUser)
}
}.getOrElse {
Utils.createTempDir(namePrefix = proxyUser)
}
}
override def toString: String = commands.map {
case arg if arg.startsWith("--") => s"\\\n\t$arg"
case arg => arg
}.mkString(" ")
@VisibleForTesting
def FLINK_HOME: String = {
// prepare FLINK_HOME
val flinkHomeOpt = env.get("FLINK_HOME").orElse {
val cwd = getClass.getProtectionDomain.getCodeSource.getLocation.getPath
.split("kyuubi-server")
assert(cwd.length > 1)
Option(
Paths.get(cwd.head)
.resolve("externals")
.resolve("kyuubi-download")
.resolve("target")
.toFile
.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
dir.isDirectory && name.startsWith("flink-")
}
}))
.flatMap(_.headOption)
.map(_.getAbsolutePath)
}
flinkHomeOpt.map { dir =>
dir
} getOrElse {
throw KyuubiSQLException("FLINK_HOME is not set! " +
"For more detail information on installing and configuring Flink, please visit " +
"https://kyuubi.apache.org/docs/stable/deployment/settings.html#environments")
}
}
private def useKeytab(): Boolean = false
}
object FlinkEngineProcessBuilder {
final val APP_KEY = "yarn.application.name"
final val TAG_KEY = "yarn.tags"
final private val FLINK_ENGINE_BINARY_FILE = "flink-sql-engine.sh"
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi
import scala.sys.process._
import org.apache.kyuubi.engine.flink.FlinkEngineProcessBuilder
trait WithKyuubiServerAndFlinkLocalCluster extends WithKyuubiServer {
private lazy val FLINK_HOME: String =
new FlinkEngineProcessBuilder(Utils.currentUser, conf).FLINK_HOME
override def beforeAll(): Unit = {
s"$FLINK_HOME/bin/start-cluster.sh".!
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
s"$FLINK_HOME/bin/stop-cluster.sh".!
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.flink
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
class FlinkEngineProcessBuilderSuite extends KyuubiFunSuite {
private def conf = KyuubiConf().set("kyuubi.on", "off")
test("flink engine process builder") {
val builder = new FlinkEngineProcessBuilder("vinoyang", conf)
val commands = builder.toString.split(' ')
assert(commands.exists(_ endsWith "flink-sql-engine.sh"))
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation.flink
import org.apache.kyuubi.WithKyuubiServerAndFlinkLocalCluster
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TYPE, FRONTEND_THRIFT_BINARY_BIND_PORT}
import org.apache.kyuubi.operation.HiveJDBCTestHelper
class FlinkOperationSuite extends WithKyuubiServerAndFlinkLocalCluster with HiveJDBCTestHelper {
override val conf: KyuubiConf = KyuubiConf()
.set(ENGINE_TYPE, "FLINK_SQL")
.set(FRONTEND_THRIFT_BINARY_BIND_PORT, 10019)
override protected def jdbcUrl: String = getJdbcUrl
test("get catalogs for flink sql") {
withJdbcStatement() { statement =>
val meta = statement.getConnection.getMetaData
val catalogs = meta.getCatalogs
val expected = Set("default_catalog").toIterator
while (catalogs.next()) {
assert(catalogs.getString("catalogs") === expected.next())
}
assert(!expected.hasNext)
assert(!catalogs.next())
}
}
test("execute statement - select column name with dots") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select 'tmp.hello'")
assert(resultSet.next())
assert(resultSet.getString(1) === "tmp.hello")
}
}
}