Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,59 @@

public abstract class AbstractCli {

/**
* Returns true if the SQLException is likely due to connection loss. Used so that CLI can rethrow
* and trigger reconnection.
*/
static boolean isConnectionRelated(SQLException e) {
if (e == null) {
return false;
}
if (matchesConnectionFailure(e.getMessage())) {
return true;
}
Throwable cause = e.getCause();
return cause != null && matchesConnectionFailure(cause.getMessage());
}

private static boolean matchesConnectionFailure(String msg) {
if (msg == null) {
return false;
}
String lower = msg.toLowerCase();
return lower.contains("connection")
|| lower.contains("refused")
|| lower.contains("timeout")
|| lower.contains("closed")
|| lower.contains("reset")
|| lower.contains("network")
|| lower.contains("broken pipe");
}

/**
* Returns true if the SQLException indicates a session/statement state error (e.g. statement ID
* no longer valid after reconnect). Used to show a friendly message instead of the raw exception.
*/
static boolean isSessionOrStatementError(SQLException e) {
if (e == null) {
return false;
}
if (e.getMessage() != null && matchesSessionOrStatementFailure(e.getMessage())) {
return true;
}
Throwable cause = e.getCause();
return cause != null
&& cause.getMessage() != null
&& matchesSessionOrStatementFailure(cause.getMessage());
}

private static boolean matchesSessionOrStatementFailure(String msg) {
String lower = msg.toLowerCase();
return lower.contains("doesn't exist in this session")
|| lower.contains("statementid")
|| lower.contains("statement id");
}

static final String HOST_ARGS = "h";
static final String HOST_NAME = "host";

Expand Down Expand Up @@ -395,7 +448,8 @@ static void echoStarting(CliContext ctx) {
ctx.getPrinter().println("---------------------");
}

static OperationResult handleInputCmd(CliContext ctx, String cmd, IoTDBConnection connection) {
static OperationResult handleInputCmd(CliContext ctx, String cmd, IoTDBConnection connection)
throws SQLException {
lastProcessStatus = CODE_OK;
String specialCmd = cmd.toLowerCase().trim();

Expand Down Expand Up @@ -496,7 +550,8 @@ private static int setTimestampDisplay(CliContext ctx, String specialCmd, String
* @return execute result code
*/
private static int setTimeZone(
CliContext ctx, String specialCmd, String cmd, IoTDBConnection connection) {
CliContext ctx, String specialCmd, String cmd, IoTDBConnection connection)
throws SQLException {
String[] values = specialCmd.split("=");
if (values.length != 2) {
ctx.getPrinter()
Expand All @@ -506,6 +561,12 @@ private static int setTimeZone(
}
try {
connection.setTimeZone(cmd.split("=")[1].trim());
} catch (SQLException e) {
if (isConnectionRelated(e)) {
throw e;
}
ctx.getPrinter().println(String.format("Time zone format error: %s", e.getMessage()));
return CODE_ERROR;
} catch (Exception e) {
ctx.getPrinter().println(String.format("Time zone format error: %s", e.getMessage()));
return CODE_ERROR;
Expand All @@ -514,10 +575,13 @@ private static int setTimeZone(
return CODE_OK;
}

private static int showTimeZone(CliContext ctx, IoTDBConnection connection) {
private static int showTimeZone(CliContext ctx, IoTDBConnection connection) throws SQLException {
try {
ctx.getPrinter().println("Current time zone: " + connection.getTimeZone());
} catch (Exception e) {
if (e instanceof SQLException && isConnectionRelated((SQLException) e)) {
throw (SQLException) e;
}
ctx.getPrinter().println("Cannot get time zone from server side because: " + e.getMessage());
return CODE_ERROR;
}
Expand Down Expand Up @@ -550,7 +614,8 @@ private static int importCmd(
}

@SuppressWarnings({"squid:S3776"}) // Suppress high Cognitive Complexity warning
private static int executeQuery(CliContext ctx, IoTDBConnection connection, String cmd) {
private static int executeQuery(CliContext ctx, IoTDBConnection connection, String cmd)
throws SQLException {
int executeStatus = CODE_OK;
long startTime = System.currentTimeMillis();
try (Statement statement = connection.createStatement()) {
Expand Down Expand Up @@ -611,6 +676,18 @@ private static int executeQuery(CliContext ctx, IoTDBConnection connection, Stri
} else {
ctx.getPrinter().println("Msg: " + SUCCESS_MESSAGE);
}
} catch (SQLException e) {
if (isConnectionRelated(e)) {
throw e;
}
if (isSessionOrStatementError(e)) {
ctx.getPrinter()
.println(
"Reconnected, but the previous command could not be completed. Please run your command again.");
} else {
ctx.getPrinter().println("Msg: " + e);
}
executeStatus = CODE_ERROR;
} catch (Exception e) {
ctx.getPrinter().println("Msg: " + e);
executeStatus = CODE_ERROR;
Expand Down Expand Up @@ -859,7 +936,8 @@ enum OperationResult {
NO_OPER
}

static boolean processCommand(CliContext ctx, String s, IoTDBConnection connection) {
static boolean processCommand(CliContext ctx, String s, IoTDBConnection connection)
throws SQLException {
if (s == null) {
return true;
}
Expand Down
137 changes: 120 additions & 17 deletions iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,35 @@ public class Cli extends AbstractCli {
// TODO: Make non-static
private static final Properties info = new Properties();

/** Number of reconnection attempts when connection is lost during interactive session. */
private static final int RECONNECT_RETRY_NUM = 3;

/** Delay in ms between reconnection attempts. */
private static final long RECONNECT_RETRY_INTERVAL_MS = 1000;

/** Result of reading and processing one line; used to support reconnection. */
private static class ReadLineResult {
final boolean stop;
final String failedCommand;

ReadLineResult(boolean stop, String failedCommand) {
this.stop = stop;
this.failedCommand = failedCommand;
}

static ReadLineResult continueLoop() {
return new ReadLineResult(false, null);
}

static ReadLineResult stopLoop() {
return new ReadLineResult(true, null);
}

static ReadLineResult reconnectAndRetry(String command) {
return new ReadLineResult(false, command);
}
}

/**
* IoTDB Client main function.
*
Expand Down Expand Up @@ -155,6 +184,29 @@ private static boolean parseCommandLine(
return true;
}

private static IoTDBConnection openConnection() throws SQLException {
return (IoTDBConnection)
DriverManager.getConnection(Config.IOTDB_URL_PREFIX + host + ":" + port + "/", info);
}

private static void setupConnection(IoTDBConnection connection)
throws java.sql.SQLException, org.apache.thrift.TException {
connection.setQueryTimeout(queryTimeout);
properties = connection.getServerProperties();
AGGREGRATE_TIME_LIST.addAll(properties.getSupportedTimeAggregationOperations());
timestampPrecision = properties.getTimestampPrecision();
}

private static void closeConnectionQuietly(IoTDBConnection connection) {
if (connection != null) {
try {
connection.close();
} catch (SQLException ignored) {
// ignore
}
}
}

private static void serve(CliContext ctx) {
try {
useSsl = commandLine.getOptionValue(USE_SSL_ARGS);
Expand Down Expand Up @@ -195,36 +247,87 @@ private static void executeSql(CliContext ctx) throws TException {
}

private static void receiveCommands(CliContext ctx) throws TException {
try (IoTDBConnection connection =
(IoTDBConnection)
DriverManager.getConnection(Config.IOTDB_URL_PREFIX + host + ":" + port + "/", info)) {
connection.setQueryTimeout(queryTimeout);
properties = connection.getServerProperties();
AGGREGRATE_TIME_LIST.addAll(properties.getSupportedTimeAggregationOperations());
timestampPrecision = properties.getTimestampPrecision();

IoTDBConnection connection = null;
try {
connection = openConnection();
setupConnection(connection);
echoStarting(ctx);
displayLogo(ctx, properties.getLogo(), properties.getVersion(), properties.getBuildInfo());
ctx.getPrinter().println(String.format("Successfully login at %s:%s", host, port));
while (true) {
boolean readLine = readerReadLine(ctx, connection);
if (readLine) {
ReadLineResult result = readerReadLine(ctx, connection);
if (result.stop) {
break;
}
if (result.failedCommand != null) {
// Connection failed during processCommand; try to reconnect and retry the command.
closeConnectionQuietly(connection);
connection = null;
boolean reconnected = false;
for (int attempt = 1; attempt <= RECONNECT_RETRY_NUM; attempt++) {
if (attempt > 1) {
try {
Thread.sleep(RECONNECT_RETRY_INTERVAL_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
ctx.getErr().printf("%s: Reconnection interrupted.%n", IOTDB_ERROR_PREFIX);
ctx.exit(CODE_ERROR);
}
}
try {
connection = openConnection();
setupConnection(connection);
ctx.getPrinter().println("Connection lost. Reconnected. Retrying command.");
processCommand(ctx, result.failedCommand, connection);
reconnected = true;
break;
} catch (SQLException e) {
if (isSessionOrStatementError(e)) {
// Reconnect succeeded but retry failed due to session/statement state; ask user to
// run the command again.
ctx.getPrinter()
.println(
"Reconnected, but the previous command could not be completed. Please run your command again.");
reconnected = true;
break;
}
if (attempt == RECONNECT_RETRY_NUM) {
ctx.getErr()
.printf(
"%s: Could not reconnect after %d attempts. Please check that the server is running and try again.%n",
IOTDB_ERROR_PREFIX, RECONNECT_RETRY_NUM);
ctx.exit(CODE_ERROR);
}
}
}
if (!reconnected) {
break;
}
}
}
} catch (SQLException e) {
ctx.getErr().printf("%s: %s%n", IOTDB_ERROR_PREFIX, e.getMessage());
ctx.exit(CODE_ERROR);
} finally {
closeConnectionQuietly(connection);
}
}

private static boolean readerReadLine(CliContext ctx, IoTDBConnection connection) {
private static ReadLineResult readerReadLine(CliContext ctx, IoTDBConnection connection) {
String s;
try {
s = ctx.getLineReader().readLine(cliPrefix + "> ", null);
boolean continues = processCommand(ctx, s, connection);
if (!continues) {
return true;
try {
boolean continues = processCommand(ctx, s, connection);
if (!continues) {
return ReadLineResult.stopLoop();
}
} catch (SQLException e) {
if (isConnectionRelated(e)) {
return ReadLineResult.reconnectAndRetry(s);
}
ctx.getErr().printf("%s: %s%n", IOTDB_ERROR_PREFIX, e.getMessage());
return ReadLineResult.stopLoop();
}
} catch (UserInterruptException e) {
// Exit on signal INT requires confirmation.
Expand All @@ -233,12 +336,12 @@ private static boolean readerReadLine(CliContext ctx, IoTDBConnection connection
// Exit on EOF (usually by pressing CTRL+D).
ctx.exit(CODE_OK);
} catch (IllegalArgumentException e) {
if (e.getMessage().contains("history")) {
return false;
if (e.getMessage() != null && e.getMessage().contains("history")) {
return ReadLineResult.continueLoop();
}
throw e;
}
return false;
return ReadLineResult.continueLoop();
}

private static void readLine(CliContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.SQLException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -150,7 +152,7 @@ private void isTwoStringArrayEqual(String[] expected, String[] actual) {
}

@Test
public void testHandleInputInputCmd() {
public void testHandleInputInputCmd() throws SQLException {
CliContext ctx = new CliContext(System.in, System.out, System.err, ExitType.EXCEPTION);
assertEquals(
OperationResult.STOP_OPER,
Expand Down