diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java new file mode 100644 index 000000000..ee159aaf3 --- /dev/null +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.jdbc.hive; + +import java.sql.SQLException; + +public class ClosedConnectionException extends SQLException { + + private static final long serialVersionUID = 0; + + public ClosedConnectionException(String msg) { + super(msg); + } +} diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java index ca5974aea..9acc9591e 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java @@ -98,7 +98,11 @@ import org.apache.kyuubi.jdbc.hive.Utils.JdbcConnectionParams; */ public class KyuubiConnection implements java.sql.Connection { public static final Logger LOG = LoggerFactory.getLogger(KyuubiConnection.class.getName()); - private static final Long ENGINE_LOG_THREAD_END_DELAY = 10 * 1000L; + private static boolean isBeeLineMode = false; + + public static void setBeeLineMode(boolean isBeeLineMode) { + KyuubiConnection.isBeeLineMode = isBeeLineMode; + } private String jdbcUriString; private String host; @@ -113,12 +117,16 @@ public class KyuubiConnection implements java.sql.Connection { private boolean isClosed = true; private SQLWarning warningChain = null; private TSessionHandle sessHandle = null; - private TOperationHandle launchEngineOpHandle = null; private final List supportedProtocols = new LinkedList(); private int loginTimeout = 0; private TProtocolVersion protocol; private int fetchSize = KyuubiStatement.DEFAULT_FETCH_SIZE; private String initFile = null; + private boolean initFileCompleted = false; + + private TOperationHandle launchEngineOpHandle = null; + private boolean engineLogInflight = true; + private boolean launchEngineOpCompleted = false; public KyuubiConnection(String uri, Properties info) throws SQLException { setupLoginTimeout(); @@ -164,7 +172,7 @@ public class KyuubiConnection implements java.sql.Connection { // open client session openSession(); - getLaunchEngineLog(); + showLaunchEngineLog(); executeInitSql(); } else { int maxRetries = 1; @@ -184,9 +192,10 @@ public class KyuubiConnection implements java.sql.Connection { client = new TCLIService.Client(new TBinaryProtocol(transport)); // open client session openSession(); - getLaunchEngineLog(); - executeInitSql(); - + if (!isBeeLineMode) { + showLaunchEngineLog(); + executeInitSql(); + } break; } catch (Exception e) { LOG.warn("Failed to connect to " + connParams.getHost() + ":" + connParams.getPort()); @@ -220,54 +229,32 @@ public class KyuubiConnection implements java.sql.Connection { client = newSynchronizedClient(client); } - private void getLaunchEngineLog() { - if (launchEngineOpHandle != null) { - LOG.info("Starting to get launch engine log."); - Thread logThread = new Thread("engine-launch-log") { - boolean launchEngineCompleted = false; - long timeToEnd = Long.MAX_VALUE; - boolean continueToFetch = true; - - @Override - public void run() { - try { - while (continueToFetch && System.currentTimeMillis() < timeToEnd) { - List logs = fetchEngineLogs(); - if (launchEngineCompleted && logs.isEmpty()) { - continueToFetch = false; - } - - for (String log: logs) { - LOG.info(log); - } - - if (!launchEngineCompleted && launchEngineOpCompleted()) { - launchEngineCompleted = true; - timeToEnd = System.currentTimeMillis() + ENGINE_LOG_THREAD_END_DELAY; - } - - Thread.sleep(300); - } - } catch (Exception e) { - // do nothing - } - LOG.info("Finished to get launch engine log."); - } - }; - logThread.start(); - } + /** + * Check whether launch engine operation might be producing more logs to be fetched. + * This method is a public API for usage outside of Kyuubi, although it is not part of the + * interface java.sql.Connection. + * @return true if launch engine operation might be producing more logs. It does not indicate + * if last log lines have been fetched by getEngineLog. + */ + public boolean hasMoreEngineLogs() { + return launchEngineOpHandle != null && (!launchEngineOpCompleted || engineLogInflight); } - private boolean launchEngineOpCompleted() { - TGetOperationStatusReq opStatusReq = new TGetOperationStatusReq(launchEngineOpHandle); - try { - return client.GetOperationStatus(opStatusReq).getOperationCompleted() != 0; - } catch (TException e) { - return true; + /** + * Get the launch engine operation logs of current connection. + * This method is a public API for usage outside of Kyuubi, although it is not part of the + * interface java.sql.Connection. + * This method gets the incremental logs during launching engine, and uses fetchSize holden by + * KyuubiStatement object. + * @return a list of logs. It can be empty if there are no new logs to be retrieved at that time. + * @throws SQLException + * @throws ClosedConnectionException if connection has been closed + */ + public List getEngineLog() throws SQLException, ClosedConnectionException { + if (isClosed()) { + throw new ClosedConnectionException("Method getEngineLog() failed. The " + + "connection has been closed."); } - } - - private List fetchEngineLogs() throws SQLException { TFetchResultsReq fetchResultsReq = new TFetchResultsReq(launchEngineOpHandle, TFetchOrientation.FETCH_NEXT, fetchSize); fetchResultsReq.setFetchType((short) 1); @@ -282,10 +269,45 @@ public class KyuubiConnection implements java.sql.Connection { } catch (TException e) { throw new SQLException("Error building result set for query log", e); } + engineLogInflight = !logs.isEmpty(); + + TGetOperationStatusReq opStatusReq = new TGetOperationStatusReq(launchEngineOpHandle); + try { + launchEngineOpCompleted = client.GetOperationStatus(opStatusReq).getOperationCompleted() != 0; + } catch (TException e) { + launchEngineOpCompleted = true; + } + return Collections.unmodifiableList(logs); } - private void executeInitSql() throws SQLException { + private void showLaunchEngineLog() { + if (launchEngineOpHandle != null) { + LOG.info("Starting to get launch engine log."); + Thread logThread = new Thread("engine-launch-log") { + + @Override + public void run() { + try { + while (hasMoreEngineLogs()) { + List logs = getEngineLog(); + for (String log: logs) { + LOG.info(log); + } + Thread.sleep(300); + } + } catch (Exception e) { + // do nothing + } + LOG.info("Finished to get launch engine log."); + } + }; + logThread.start(); + } + } + + public void executeInitSql() throws SQLException { + if (initFileCompleted) return; if (initFile != null) { try { List sqlList = parseInitFile(initFile); @@ -304,6 +326,7 @@ public class KyuubiConnection implements java.sql.Connection { throw new SQLException(e.getMessage()); } } + initFileCompleted = true; } public static List parseInitFile(String initFile) throws IOException {