[KYUUBI #1400] Expose hasMoreEngineLogs and getEngineLog for BeeLine mode

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
Expose hasMoreEngineLogs and getEngineLog for BeeLine mode.

Refer the API of HiveStatement/KyuubiStatement:
- hasMoreLogs
- getQueryLog

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1400 from turboFei/refactor_kyuubi_connection.

Closes #1400

9716e8f4 [fwang12] address comments
1da636d3 [fwang12] public
acaf0c7a [fwang12] refactor
54cc1125 [fwang12] save
b66f2585 [fwang12] Make preparation for beeline mode

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
This commit is contained in:
fwang12 2021-11-17 10:01:23 +08:00
parent bf9736e31b
commit 09c8a4093f
2 changed files with 104 additions and 52 deletions

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.jdbc.hive;
import java.sql.SQLException;
public class ClosedConnectionException extends SQLException {
private static final long serialVersionUID = 0;
public ClosedConnectionException(String msg) {
super(msg);
}
}

View File

@ -98,7 +98,11 @@ import org.apache.kyuubi.jdbc.hive.Utils.JdbcConnectionParams;
*/
public class KyuubiConnection implements java.sql.Connection {
public static final Logger LOG = LoggerFactory.getLogger(KyuubiConnection.class.getName());
private static final Long ENGINE_LOG_THREAD_END_DELAY = 10 * 1000L;
private static boolean isBeeLineMode = false;
public static void setBeeLineMode(boolean isBeeLineMode) {
KyuubiConnection.isBeeLineMode = isBeeLineMode;
}
private String jdbcUriString;
private String host;
@ -113,12 +117,16 @@ public class KyuubiConnection implements java.sql.Connection {
private boolean isClosed = true;
private SQLWarning warningChain = null;
private TSessionHandle sessHandle = null;
private TOperationHandle launchEngineOpHandle = null;
private final List<TProtocolVersion> supportedProtocols = new LinkedList<TProtocolVersion>();
private int loginTimeout = 0;
private TProtocolVersion protocol;
private int fetchSize = KyuubiStatement.DEFAULT_FETCH_SIZE;
private String initFile = null;
private boolean initFileCompleted = false;
private TOperationHandle launchEngineOpHandle = null;
private boolean engineLogInflight = true;
private boolean launchEngineOpCompleted = false;
public KyuubiConnection(String uri, Properties info) throws SQLException {
setupLoginTimeout();
@ -164,7 +172,7 @@ public class KyuubiConnection implements java.sql.Connection {
// open client session
openSession();
getLaunchEngineLog();
showLaunchEngineLog();
executeInitSql();
} else {
int maxRetries = 1;
@ -184,9 +192,10 @@ public class KyuubiConnection implements java.sql.Connection {
client = new TCLIService.Client(new TBinaryProtocol(transport));
// open client session
openSession();
getLaunchEngineLog();
executeInitSql();
if (!isBeeLineMode) {
showLaunchEngineLog();
executeInitSql();
}
break;
} catch (Exception e) {
LOG.warn("Failed to connect to " + connParams.getHost() + ":" + connParams.getPort());
@ -220,54 +229,32 @@ public class KyuubiConnection implements java.sql.Connection {
client = newSynchronizedClient(client);
}
private void getLaunchEngineLog() {
if (launchEngineOpHandle != null) {
LOG.info("Starting to get launch engine log.");
Thread logThread = new Thread("engine-launch-log") {
boolean launchEngineCompleted = false;
long timeToEnd = Long.MAX_VALUE;
boolean continueToFetch = true;
@Override
public void run() {
try {
while (continueToFetch && System.currentTimeMillis() < timeToEnd) {
List<String> logs = fetchEngineLogs();
if (launchEngineCompleted && logs.isEmpty()) {
continueToFetch = false;
}
for (String log: logs) {
LOG.info(log);
}
if (!launchEngineCompleted && launchEngineOpCompleted()) {
launchEngineCompleted = true;
timeToEnd = System.currentTimeMillis() + ENGINE_LOG_THREAD_END_DELAY;
}
Thread.sleep(300);
}
} catch (Exception e) {
// do nothing
}
LOG.info("Finished to get launch engine log.");
}
};
logThread.start();
}
/**
* Check whether launch engine operation might be producing more logs to be fetched.
* This method is a public API for usage outside of Kyuubi, although it is not part of the
* interface java.sql.Connection.
* @return true if launch engine operation might be producing more logs. It does not indicate
* if last log lines have been fetched by getEngineLog.
*/
public boolean hasMoreEngineLogs() {
return launchEngineOpHandle != null && (!launchEngineOpCompleted || engineLogInflight);
}
private boolean launchEngineOpCompleted() {
TGetOperationStatusReq opStatusReq = new TGetOperationStatusReq(launchEngineOpHandle);
try {
return client.GetOperationStatus(opStatusReq).getOperationCompleted() != 0;
} catch (TException e) {
return true;
/**
* Get the launch engine operation logs of current connection.
* This method is a public API for usage outside of Kyuubi, although it is not part of the
* interface java.sql.Connection.
* This method gets the incremental logs during launching engine, and uses fetchSize holden by
* KyuubiStatement object.
* @return a list of logs. It can be empty if there are no new logs to be retrieved at that time.
* @throws SQLException
* @throws ClosedConnectionException if connection has been closed
*/
public List<String> getEngineLog() throws SQLException, ClosedConnectionException {
if (isClosed()) {
throw new ClosedConnectionException("Method getEngineLog() failed. The " +
"connection has been closed.");
}
}
private List<String> fetchEngineLogs() throws SQLException {
TFetchResultsReq fetchResultsReq = new TFetchResultsReq(launchEngineOpHandle,
TFetchOrientation.FETCH_NEXT, fetchSize);
fetchResultsReq.setFetchType((short) 1);
@ -282,10 +269,45 @@ public class KyuubiConnection implements java.sql.Connection {
} catch (TException e) {
throw new SQLException("Error building result set for query log", e);
}
engineLogInflight = !logs.isEmpty();
TGetOperationStatusReq opStatusReq = new TGetOperationStatusReq(launchEngineOpHandle);
try {
launchEngineOpCompleted = client.GetOperationStatus(opStatusReq).getOperationCompleted() != 0;
} catch (TException e) {
launchEngineOpCompleted = true;
}
return Collections.unmodifiableList(logs);
}
private void executeInitSql() throws SQLException {
private void showLaunchEngineLog() {
if (launchEngineOpHandle != null) {
LOG.info("Starting to get launch engine log.");
Thread logThread = new Thread("engine-launch-log") {
@Override
public void run() {
try {
while (hasMoreEngineLogs()) {
List<String> logs = getEngineLog();
for (String log: logs) {
LOG.info(log);
}
Thread.sleep(300);
}
} catch (Exception e) {
// do nothing
}
LOG.info("Finished to get launch engine log.");
}
};
logThread.start();
}
}
public void executeInitSql() throws SQLException {
if (initFileCompleted) return;
if (initFile != null) {
try {
List<String> sqlList = parseInitFile(initFile);
@ -304,6 +326,7 @@ public class KyuubiConnection implements java.sql.Connection {
throw new SQLException(e.getMessage());
}
}
initFileCompleted = true;
}
public static List<String> parseInitFile(String initFile) throws IOException {