Compare commits
10 Commits
3405586590
...
d45a60de6e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d45a60de6e | ||
|
|
39df2d974a | ||
|
|
0101167df4 | ||
|
|
efc0b1c066 | ||
|
|
e5239f919c | ||
|
|
bd7d1e200e | ||
|
|
36b91ba2eb | ||
|
|
31483c1dc4 | ||
|
|
143dac95bf | ||
|
|
7f2f5a4d20 |
@ -63,7 +63,7 @@ Returns all the batches.
|
||||
|
||||
The created [Batch](#batch) object.
|
||||
|
||||
### GET /batches/{batchId}
|
||||
### GET /batches/${batchId}
|
||||
|
||||
Returns the batch information.
|
||||
|
||||
|
||||
@ -161,7 +161,8 @@ class ExecuteStatement(
|
||||
.build(executor)
|
||||
val result = executeOperation.invoke[TableResult](sessionId, operation)
|
||||
jobId = result.getJobClient.asScala.map(_.getJobID)
|
||||
result.await()
|
||||
// after FLINK-24461, TableResult#await() would block insert statements
|
||||
// until the job finishes, instead of returning row affected immediately
|
||||
resultSet = ResultSet.fromTableResult(result)
|
||||
}
|
||||
|
||||
|
||||
@ -164,7 +164,12 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
|
||||
}
|
||||
}
|
||||
}
|
||||
super.closeSession(sessionHandle)
|
||||
try {
|
||||
super.closeSession(sessionHandle)
|
||||
} catch {
|
||||
case e: KyuubiSQLException =>
|
||||
warn(s"Error closing session ${sessionHandle}", e)
|
||||
}
|
||||
if (shareLevel == ShareLevel.CONNECTION) {
|
||||
info("Session stopped due to shared level is Connection.")
|
||||
stopSession()
|
||||
|
||||
@ -41,7 +41,7 @@ class CatalogShim_v2_4 extends SparkCatalogShim {
|
||||
catalogName: String,
|
||||
schemaPattern: String): Seq[Row] = {
|
||||
(spark.sessionState.catalog.listDatabases(schemaPattern) ++
|
||||
getGlobalTempViewManager(spark, schemaPattern)).map(Row(_, ""))
|
||||
getGlobalTempViewManager(spark, schemaPattern)).map(Row(_, SparkCatalogShim.SESSION_CATALOG))
|
||||
}
|
||||
|
||||
def setCurrentDatabase(spark: SparkSession, databaseName: String): Unit = {
|
||||
|
||||
@ -129,13 +129,12 @@ class CatalogShim_v3_0 extends CatalogShim_v2_4 {
|
||||
spark: SparkSession,
|
||||
catalogName: String,
|
||||
schemaPattern: String): Seq[Row] = {
|
||||
val catalog = getCatalog(spark, catalogName)
|
||||
var schemas = getSchemasWithPattern(catalog, schemaPattern)
|
||||
if (catalogName == SparkCatalogShim.SESSION_CATALOG) {
|
||||
val viewMgr = getGlobalTempViewManager(spark, schemaPattern)
|
||||
schemas = schemas ++ viewMgr
|
||||
super.getSchemas(spark, catalogName, schemaPattern)
|
||||
} else {
|
||||
val catalog = getCatalog(spark, catalogName)
|
||||
getSchemasWithPattern(catalog, schemaPattern).map(Row(_, catalog.name))
|
||||
}
|
||||
schemas.map(Row(_, catalog.name))
|
||||
}
|
||||
|
||||
override def setCurrentDatabase(spark: SparkSession, databaseName: String): Unit = {
|
||||
|
||||
@ -39,7 +39,7 @@ class Log4j12DivertAppender extends WriterAppender {
|
||||
setLayout(lo)
|
||||
|
||||
addFilter { _: LoggingEvent =>
|
||||
if (OperationLog.getCurrentOperationLog == null) Filter.DENY else Filter.NEUTRAL
|
||||
if (OperationLog.getCurrentOperationLog.isDefined) Filter.NEUTRAL else Filter.DENY
|
||||
}
|
||||
|
||||
/**
|
||||
@ -51,8 +51,7 @@ class Log4j12DivertAppender extends WriterAppender {
|
||||
// That should've gone into our writer. Notify the LogContext.
|
||||
val logOutput = writer.toString
|
||||
writer.reset()
|
||||
val log = OperationLog.getCurrentOperationLog
|
||||
if (log != null) log.write(logOutput)
|
||||
OperationLog.getCurrentOperationLog.foreach(_.write(logOutput))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.kyuubi.operation.log
|
||||
|
||||
import java.io.CharArrayWriter
|
||||
import java.util.concurrent.locks.ReadWriteLock
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
@ -27,6 +28,8 @@ import org.apache.logging.log4j.core.appender.{AbstractWriterAppender, ConsoleAp
|
||||
import org.apache.logging.log4j.core.filter.AbstractFilter
|
||||
import org.apache.logging.log4j.core.layout.PatternLayout
|
||||
|
||||
import org.apache.kyuubi.reflection.DynFields
|
||||
|
||||
class Log4j2DivertAppender(
|
||||
name: String,
|
||||
layout: StringLayout,
|
||||
@ -52,22 +55,19 @@ class Log4j2DivertAppender(
|
||||
|
||||
addFilter(new AbstractFilter() {
|
||||
override def filter(event: LogEvent): Filter.Result = {
|
||||
if (OperationLog.getCurrentOperationLog == null) {
|
||||
Filter.Result.DENY
|
||||
} else {
|
||||
if (OperationLog.getCurrentOperationLog.isDefined) {
|
||||
Filter.Result.NEUTRAL
|
||||
} else {
|
||||
Filter.Result.DENY
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
def initLayout(): StringLayout = {
|
||||
LogManager.getRootLogger.asInstanceOf[org.apache.logging.log4j.core.Logger]
|
||||
.getAppenders.values().asScala
|
||||
.find(ap => ap.isInstanceOf[ConsoleAppender] && ap.getLayout.isInstanceOf[StringLayout])
|
||||
.map(_.getLayout.asInstanceOf[StringLayout])
|
||||
.getOrElse(PatternLayout.newBuilder().withPattern(
|
||||
"%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n").build())
|
||||
}
|
||||
private val writeLock = DynFields.builder()
|
||||
.hiddenImpl(classOf[AbstractWriterAppender[_]], "readWriteLock")
|
||||
.build[ReadWriteLock](this)
|
||||
.get()
|
||||
.writeLock
|
||||
|
||||
/**
|
||||
* Overrides AbstractWriterAppender.append(), which does the real logging. No need
|
||||
@ -75,11 +75,15 @@ class Log4j2DivertAppender(
|
||||
*/
|
||||
override def append(event: LogEvent): Unit = {
|
||||
super.append(event)
|
||||
// That should've gone into our writer. Notify the LogContext.
|
||||
val logOutput = writer.toString
|
||||
writer.reset()
|
||||
val log = OperationLog.getCurrentOperationLog
|
||||
if (log != null) log.write(logOutput)
|
||||
writeLock.lock()
|
||||
try {
|
||||
// That should've gone into our writer. Notify the LogContext.
|
||||
val logOutput = writer.toString
|
||||
writer.reset()
|
||||
OperationLog.getCurrentOperationLog.foreach(_.write(logOutput))
|
||||
} finally {
|
||||
writeLock.unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -95,7 +99,7 @@ object Log4j2DivertAppender {
|
||||
|
||||
def initialize(): Unit = {
|
||||
val ap = new Log4j2DivertAppender()
|
||||
org.apache.logging.log4j.LogManager.getRootLogger()
|
||||
org.apache.logging.log4j.LogManager.getRootLogger
|
||||
.asInstanceOf[org.apache.logging.log4j.core.Logger].addAppender(ap)
|
||||
ap.start()
|
||||
}
|
||||
|
||||
@ -44,7 +44,7 @@ object OperationLog extends Logging {
|
||||
OPERATION_LOG.set(operationLog)
|
||||
}
|
||||
|
||||
def getCurrentOperationLog: OperationLog = OPERATION_LOG.get()
|
||||
def getCurrentOperationLog: Option[OperationLog] = Option(OPERATION_LOG.get)
|
||||
|
||||
def removeCurrentOperationLog(): Unit = OPERATION_LOG.remove()
|
||||
|
||||
|
||||
@ -282,9 +282,9 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
|
||||
shutdown = true
|
||||
val shutdownTimeout: Long =
|
||||
if (isServer) {
|
||||
conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
|
||||
} else {
|
||||
conf.get(SERVER_EXEC_POOL_SHUTDOWN_TIMEOUT)
|
||||
} else {
|
||||
conf.get(ENGINE_EXEC_POOL_SHUTDOWN_TIMEOUT)
|
||||
}
|
||||
|
||||
ThreadUtils.shutdown(timeoutChecker, Duration(shutdownTimeout, TimeUnit.MILLISECONDS))
|
||||
|
||||
@ -61,10 +61,10 @@ class OperationLogSuite extends KyuubiFunSuite {
|
||||
assert(!Files.exists(logFile))
|
||||
|
||||
OperationLog.setCurrentOperationLog(operationLog)
|
||||
assert(OperationLog.getCurrentOperationLog === operationLog)
|
||||
assert(OperationLog.getCurrentOperationLog === Some(operationLog))
|
||||
|
||||
OperationLog.removeCurrentOperationLog()
|
||||
assert(OperationLog.getCurrentOperationLog === null)
|
||||
assert(OperationLog.getCurrentOperationLog.isEmpty)
|
||||
|
||||
operationLog.write(msg1 + "\n")
|
||||
assert(Files.exists(logFile))
|
||||
|
||||
@ -29,6 +29,7 @@ import java.util.List;
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.hive.common.util.HiveStringUtils;
|
||||
|
||||
public class KyuubiBeeLine extends BeeLine {
|
||||
public static final String KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER =
|
||||
@ -191,4 +192,9 @@ public class KyuubiBeeLine extends BeeLine {
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean dispatch(String line) {
|
||||
return super.dispatch(HiveStringUtils.removeComments(line));
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,6 +21,7 @@ import java.io.*;
|
||||
import java.sql.*;
|
||||
import java.util.*;
|
||||
import org.apache.hive.beeline.logs.KyuubiBeelineInPlaceUpdateStream;
|
||||
import org.apache.hive.common.util.HiveStringUtils;
|
||||
import org.apache.kyuubi.jdbc.hive.JdbcConnectionParams;
|
||||
import org.apache.kyuubi.jdbc.hive.KyuubiStatement;
|
||||
import org.apache.kyuubi.jdbc.hive.Utils;
|
||||
@ -505,6 +506,59 @@ public class KyuubiCommands extends Commands {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String handleMultiLineCmd(String line) throws IOException {
|
||||
line = HiveStringUtils.removeComments(line);
|
||||
Character mask =
|
||||
(System.getProperty("jline.terminal", "").equals("jline.UnsupportedTerminal"))
|
||||
? null
|
||||
: jline.console.ConsoleReader.NULL_MASK;
|
||||
|
||||
while (isMultiLine(line) && beeLine.getOpts().isAllowMultiLineCommand()) {
|
||||
StringBuilder prompt = new StringBuilder(beeLine.getPrompt());
|
||||
if (!beeLine.getOpts().isSilent()) {
|
||||
for (int i = 0; i < prompt.length() - 1; i++) {
|
||||
if (prompt.charAt(i) != '>') {
|
||||
prompt.setCharAt(i, i % 2 == 0 ? '.' : ' ');
|
||||
}
|
||||
}
|
||||
}
|
||||
String extra;
|
||||
// avoid NPE below if for some reason -e argument has multi-line command
|
||||
if (beeLine.getConsoleReader() == null) {
|
||||
throw new RuntimeException(
|
||||
"Console reader not initialized. This could happen when there "
|
||||
+ "is a multi-line command using -e option and which requires further reading from console");
|
||||
}
|
||||
if (beeLine.getOpts().isSilent() && beeLine.getOpts().getScriptFile() != null) {
|
||||
extra = beeLine.getConsoleReader().readLine(null, mask);
|
||||
} else {
|
||||
extra = beeLine.getConsoleReader().readLine(prompt.toString());
|
||||
}
|
||||
|
||||
if (extra == null) { // it happens when using -f and the line of cmds does not end with ;
|
||||
break;
|
||||
}
|
||||
extra = HiveStringUtils.removeComments(extra);
|
||||
if (extra != null && !extra.isEmpty()) {
|
||||
line += "\n" + extra;
|
||||
}
|
||||
}
|
||||
return line;
|
||||
}
|
||||
|
||||
// returns true if statement represented by line is not complete and needs additional reading from
|
||||
// console. Used in handleMultiLineCmd method assumes line would never be null when this method is
|
||||
// called
|
||||
private boolean isMultiLine(String line) {
|
||||
if (line.endsWith(beeLine.getOpts().getDelimiter()) || beeLine.isComment(line)) {
|
||||
return false;
|
||||
}
|
||||
// handles the case like line = show tables; --test comment
|
||||
List<String> cmds = getCmdList(line, false);
|
||||
return cmds.isEmpty() || !cmds.get(cmds.size() - 1).startsWith("--");
|
||||
}
|
||||
|
||||
static class KyuubiLogRunnable implements Runnable {
|
||||
private final KyuubiCommands commands;
|
||||
private final KyuubiLoggable kyuubiLoggable;
|
||||
|
||||
@ -29,4 +29,17 @@ public class KyuubiBeeLineTest {
|
||||
int result = kyuubiBeeLine.initArgs(new String[0]);
|
||||
assertEquals(0, result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKyuubiBeelineComment() {
|
||||
KyuubiBeeLine kyuubiBeeLine = new KyuubiBeeLine();
|
||||
int result = kyuubiBeeLine.initArgsFromCliVars(new String[] {"-e", "--comment show database;"});
|
||||
assertEquals(0, result);
|
||||
result = kyuubiBeeLine.initArgsFromCliVars(new String[] {"-e", "--comment\n show database;"});
|
||||
assertEquals(1, result);
|
||||
result =
|
||||
kyuubiBeeLine.initArgsFromCliVars(
|
||||
new String[] {"-e", "--comment line 1 \n --comment line 2 \n show database;"});
|
||||
assertEquals(1, result);
|
||||
}
|
||||
}
|
||||
|
||||
@ -26,6 +26,7 @@ import java.sql.SQLException;
|
||||
import java.util.*;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hive.service.rpc.thrift.TStatus;
|
||||
import org.apache.hive.service.rpc.thrift.TStatusCode;
|
||||
import org.slf4j.Logger;
|
||||
@ -190,12 +191,20 @@ public class Utils {
|
||||
}
|
||||
}
|
||||
|
||||
Pattern confPattern = Pattern.compile("([^;]*)([^;]*);?");
|
||||
|
||||
// parse hive conf settings
|
||||
String confStr = jdbcURI.getQuery();
|
||||
if (confStr != null) {
|
||||
Matcher confMatcher = pattern.matcher(confStr);
|
||||
Matcher confMatcher = confPattern.matcher(confStr);
|
||||
while (confMatcher.find()) {
|
||||
connParams.getHiveConfs().put(confMatcher.group(1), confMatcher.group(2));
|
||||
String connParam = confMatcher.group(1);
|
||||
if (StringUtils.isNotBlank(connParam) && connParam.contains("=")) {
|
||||
int symbolIndex = connParam.indexOf('=');
|
||||
connParams
|
||||
.getHiveConfs()
|
||||
.put(connParam.substring(0, symbolIndex), connParam.substring(symbolIndex + 1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -21,8 +21,13 @@ package org.apache.kyuubi.jdbc.hive;
|
||||
import static org.apache.kyuubi.jdbc.hive.Utils.extractURLComponents;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
@ -35,23 +40,76 @@ public class UtilsTest {
|
||||
private String expectedPort;
|
||||
private String expectedCatalog;
|
||||
private String expectedDb;
|
||||
private Map<String, String> expectedHiveConf;
|
||||
private String uri;
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<String[]> data() {
|
||||
public static Collection<Object[]> data() throws UnsupportedEncodingException {
|
||||
return Arrays.asList(
|
||||
new String[][] {
|
||||
{"localhost", "10009", null, "db", "jdbc:hive2:///db;k1=v1?k2=v2#k3=v3"},
|
||||
{"localhost", "10009", null, "default", "jdbc:hive2:///"},
|
||||
{"localhost", "10009", null, "default", "jdbc:kyuubi://"},
|
||||
{"localhost", "10009", null, "default", "jdbc:hive2://"},
|
||||
{"hostname", "10018", null, "db", "jdbc:hive2://hostname:10018/db;k1=v1?k2=v2#k3=v3"},
|
||||
new Object[][] {
|
||||
{
|
||||
"localhost",
|
||||
"10009",
|
||||
null,
|
||||
"db",
|
||||
new ImmutableMap.Builder<String, String>().put("k2", "v2").build(),
|
||||
"jdbc:hive2:///db;k1=v1?k2=v2#k3=v3"
|
||||
},
|
||||
{
|
||||
"localhost",
|
||||
"10009",
|
||||
null,
|
||||
"default",
|
||||
new ImmutableMap.Builder<String, String>().build(),
|
||||
"jdbc:hive2:///"
|
||||
},
|
||||
{
|
||||
"localhost",
|
||||
"10009",
|
||||
null,
|
||||
"default",
|
||||
new ImmutableMap.Builder<String, String>().build(),
|
||||
"jdbc:kyuubi://"
|
||||
},
|
||||
{
|
||||
"localhost",
|
||||
"10009",
|
||||
null,
|
||||
"default",
|
||||
new ImmutableMap.Builder<String, String>().build(),
|
||||
"jdbc:hive2://"
|
||||
},
|
||||
{
|
||||
"hostname",
|
||||
"10018",
|
||||
null,
|
||||
"db",
|
||||
new ImmutableMap.Builder<String, String>().put("k2", "v2").build(),
|
||||
"jdbc:hive2://hostname:10018/db;k1=v1?k2=v2#k3=v3"
|
||||
},
|
||||
{
|
||||
"hostname",
|
||||
"10018",
|
||||
"catalog",
|
||||
"db",
|
||||
new ImmutableMap.Builder<String, String>().put("k2", "v2").build(),
|
||||
"jdbc:hive2://hostname:10018/catalog/db;k1=v1?k2=v2#k3=v3"
|
||||
},
|
||||
{
|
||||
"hostname",
|
||||
"10018",
|
||||
"catalog",
|
||||
"db",
|
||||
new ImmutableMap.Builder<String, String>()
|
||||
.put("k2", "v2")
|
||||
.put("k3", "-Xmx2g -XX:+PrintGCDetails -XX:HeapDumpPath=/heap.hprof")
|
||||
.build(),
|
||||
"jdbc:hive2://hostname:10018/catalog/db;k1=v1?"
|
||||
+ URLEncoder.encode(
|
||||
"k2=v2;k3=-Xmx2g -XX:+PrintGCDetails -XX:HeapDumpPath=/heap.hprof",
|
||||
StandardCharsets.UTF_8.toString())
|
||||
.replaceAll("\\+", "%20")
|
||||
+ "#k4=v4"
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -61,11 +119,13 @@ public class UtilsTest {
|
||||
String expectedPort,
|
||||
String expectedCatalog,
|
||||
String expectedDb,
|
||||
Map<String, String> expectedHiveConf,
|
||||
String uri) {
|
||||
this.expectedHost = expectedHost;
|
||||
this.expectedPort = expectedPort;
|
||||
this.expectedCatalog = expectedCatalog;
|
||||
this.expectedDb = expectedDb;
|
||||
this.expectedHiveConf = expectedHiveConf;
|
||||
this.uri = uri;
|
||||
}
|
||||
|
||||
@ -76,5 +136,6 @@ public class UtilsTest {
|
||||
assertEquals(Integer.parseInt(expectedPort), jdbcConnectionParams1.getPort());
|
||||
assertEquals(expectedCatalog, jdbcConnectionParams1.getCatalogName());
|
||||
assertEquals(expectedDb, jdbcConnectionParams1.getDbName());
|
||||
assertEquals(expectedHiveConf, jdbcConnectionParams1.getHiveConfs());
|
||||
}
|
||||
}
|
||||
|
||||
@ -79,6 +79,11 @@
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>jcl-over-slf4j</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-slf4j-impl</artifactId>
|
||||
|
||||
@ -92,10 +92,11 @@ class KyuubiSyncThriftClient private (
|
||||
remoteEngineBroken = false
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
warn(s"The engine[$engineId] alive probe fails", e)
|
||||
val engineIdStr = engineId.getOrElse("")
|
||||
warn(s"The engine[$engineIdStr] alive probe fails", e)
|
||||
val now = System.currentTimeMillis()
|
||||
if (now - engineLastAlive > engineAliveTimeout) {
|
||||
error(s"Mark the engine[$engineId] not alive with no recent alive probe" +
|
||||
error(s"Mark the engine[$engineIdStr] not alive with no recent alive probe" +
|
||||
s" success: ${now - engineLastAlive} ms exceeds timeout $engineAliveTimeout ms")
|
||||
remoteEngineBroken = true
|
||||
}
|
||||
|
||||
@ -17,7 +17,8 @@
|
||||
|
||||
package org.apache.kyuubi.server.api.v1
|
||||
|
||||
import java.util.Locale
|
||||
import java.util
|
||||
import java.util.{Collections, Locale}
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import javax.ws.rs._
|
||||
import javax.ws.rs.core.MediaType
|
||||
@ -270,12 +271,16 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
|
||||
Option(sessionManager.getBatchSessionImpl(sessionHandle)).map { batchSession =>
|
||||
try {
|
||||
val submissionOp = batchSession.batchJobSubmissionOp
|
||||
val rowSet = submissionOp.getOperationLogRowSet(
|
||||
FetchOrientation.FETCH_NEXT,
|
||||
from,
|
||||
size)
|
||||
val logRowSet = rowSet.getColumns.get(0).getStringVal.getValues.asScala
|
||||
new OperationLog(logRowSet.asJava, logRowSet.size)
|
||||
val rowSet = submissionOp.getOperationLogRowSet(FetchOrientation.FETCH_NEXT, from, size)
|
||||
val columns = rowSet.getColumns
|
||||
val logRowSet: util.List[String] =
|
||||
if (columns == null || columns.size == 0) {
|
||||
Collections.emptyList()
|
||||
} else {
|
||||
assert(columns.size == 1)
|
||||
columns.get(0).getStringVal.getValues
|
||||
}
|
||||
new OperationLog(logRowSet, logRowSet.size)
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
val errorMsg = s"Error getting operation log for batchId: $batchId"
|
||||
|
||||
Loading…
Reference in New Issue
Block a user