diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java index 8fd0a179063c7..9b06d524da110 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java @@ -55,6 +55,59 @@ public abstract class AbstractCli { + /** + * Returns true if the SQLException is likely due to connection loss. Used so that CLI can rethrow + * and trigger reconnection. + */ + static boolean isConnectionRelated(SQLException e) { + if (e == null) { + return false; + } + if (matchesConnectionFailure(e.getMessage())) { + return true; + } + Throwable cause = e.getCause(); + return cause != null && matchesConnectionFailure(cause.getMessage()); + } + + private static boolean matchesConnectionFailure(String msg) { + if (msg == null) { + return false; + } + String lower = msg.toLowerCase(); + return lower.contains("connection") + || lower.contains("refused") + || lower.contains("timeout") + || lower.contains("closed") + || lower.contains("reset") + || lower.contains("network") + || lower.contains("broken pipe"); + } + + /** + * Returns true if the SQLException indicates a session/statement state error (e.g. statement ID + * no longer valid after reconnect). Used to show a friendly message instead of the raw exception. + */ + static boolean isSessionOrStatementError(SQLException e) { + if (e == null) { + return false; + } + if (e.getMessage() != null && matchesSessionOrStatementFailure(e.getMessage())) { + return true; + } + Throwable cause = e.getCause(); + return cause != null + && cause.getMessage() != null + && matchesSessionOrStatementFailure(cause.getMessage()); + } + + private static boolean matchesSessionOrStatementFailure(String msg) { + String lower = msg.toLowerCase(); + return lower.contains("doesn't exist in this session") + || lower.contains("statementid") + || lower.contains("statement id"); + } + static final String HOST_ARGS = "h"; static final String HOST_NAME = "host"; @@ -395,7 +448,8 @@ static void echoStarting(CliContext ctx) { ctx.getPrinter().println("---------------------"); } - static OperationResult handleInputCmd(CliContext ctx, String cmd, IoTDBConnection connection) { + static OperationResult handleInputCmd(CliContext ctx, String cmd, IoTDBConnection connection) + throws SQLException { lastProcessStatus = CODE_OK; String specialCmd = cmd.toLowerCase().trim(); @@ -496,7 +550,8 @@ private static int setTimestampDisplay(CliContext ctx, String specialCmd, String * @return execute result code */ private static int setTimeZone( - CliContext ctx, String specialCmd, String cmd, IoTDBConnection connection) { + CliContext ctx, String specialCmd, String cmd, IoTDBConnection connection) + throws SQLException { String[] values = specialCmd.split("="); if (values.length != 2) { ctx.getPrinter() @@ -506,6 +561,12 @@ private static int setTimeZone( } try { connection.setTimeZone(cmd.split("=")[1].trim()); + } catch (SQLException e) { + if (isConnectionRelated(e)) { + throw e; + } + ctx.getPrinter().println(String.format("Time zone format error: %s", e.getMessage())); + return CODE_ERROR; } catch (Exception e) { ctx.getPrinter().println(String.format("Time zone format error: %s", e.getMessage())); return CODE_ERROR; @@ -514,10 +575,13 @@ private static int setTimeZone( return CODE_OK; } - private static int showTimeZone(CliContext ctx, IoTDBConnection connection) { + private static int showTimeZone(CliContext ctx, IoTDBConnection connection) throws SQLException { try { ctx.getPrinter().println("Current time zone: " + connection.getTimeZone()); } catch (Exception e) { + if (e instanceof SQLException && isConnectionRelated((SQLException) e)) { + throw (SQLException) e; + } ctx.getPrinter().println("Cannot get time zone from server side because: " + e.getMessage()); return CODE_ERROR; } @@ -550,7 +614,8 @@ private static int importCmd( } @SuppressWarnings({"squid:S3776"}) // Suppress high Cognitive Complexity warning - private static int executeQuery(CliContext ctx, IoTDBConnection connection, String cmd) { + private static int executeQuery(CliContext ctx, IoTDBConnection connection, String cmd) + throws SQLException { int executeStatus = CODE_OK; long startTime = System.currentTimeMillis(); try (Statement statement = connection.createStatement()) { @@ -611,6 +676,18 @@ private static int executeQuery(CliContext ctx, IoTDBConnection connection, Stri } else { ctx.getPrinter().println("Msg: " + SUCCESS_MESSAGE); } + } catch (SQLException e) { + if (isConnectionRelated(e)) { + throw e; + } + if (isSessionOrStatementError(e)) { + ctx.getPrinter() + .println( + "Reconnected, but the previous command could not be completed. Please run your command again."); + } else { + ctx.getPrinter().println("Msg: " + e); + } + executeStatus = CODE_ERROR; } catch (Exception e) { ctx.getPrinter().println("Msg: " + e); executeStatus = CODE_ERROR; @@ -859,7 +936,8 @@ enum OperationResult { NO_OPER } - static boolean processCommand(CliContext ctx, String s, IoTDBConnection connection) { + static boolean processCommand(CliContext ctx, String s, IoTDBConnection connection) + throws SQLException { if (s == null) { return true; } diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java index b34cfe249c4a6..4fbbfd9d0a400 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java @@ -51,6 +51,35 @@ public class Cli extends AbstractCli { // TODO: Make non-static private static final Properties info = new Properties(); + /** Number of reconnection attempts when connection is lost during interactive session. */ + private static final int RECONNECT_RETRY_NUM = 3; + + /** Delay in ms between reconnection attempts. */ + private static final long RECONNECT_RETRY_INTERVAL_MS = 1000; + + /** Result of reading and processing one line; used to support reconnection. */ + private static class ReadLineResult { + final boolean stop; + final String failedCommand; + + ReadLineResult(boolean stop, String failedCommand) { + this.stop = stop; + this.failedCommand = failedCommand; + } + + static ReadLineResult continueLoop() { + return new ReadLineResult(false, null); + } + + static ReadLineResult stopLoop() { + return new ReadLineResult(true, null); + } + + static ReadLineResult reconnectAndRetry(String command) { + return new ReadLineResult(false, command); + } + } + /** * IoTDB Client main function. * @@ -155,6 +184,29 @@ private static boolean parseCommandLine( return true; } + private static IoTDBConnection openConnection() throws SQLException { + return (IoTDBConnection) + DriverManager.getConnection(Config.IOTDB_URL_PREFIX + host + ":" + port + "/", info); + } + + private static void setupConnection(IoTDBConnection connection) + throws java.sql.SQLException, org.apache.thrift.TException { + connection.setQueryTimeout(queryTimeout); + properties = connection.getServerProperties(); + AGGREGRATE_TIME_LIST.addAll(properties.getSupportedTimeAggregationOperations()); + timestampPrecision = properties.getTimestampPrecision(); + } + + private static void closeConnectionQuietly(IoTDBConnection connection) { + if (connection != null) { + try { + connection.close(); + } catch (SQLException ignored) { + // ignore + } + } + } + private static void serve(CliContext ctx) { try { useSsl = commandLine.getOptionValue(USE_SSL_ARGS); @@ -195,36 +247,87 @@ private static void executeSql(CliContext ctx) throws TException { } private static void receiveCommands(CliContext ctx) throws TException { - try (IoTDBConnection connection = - (IoTDBConnection) - DriverManager.getConnection(Config.IOTDB_URL_PREFIX + host + ":" + port + "/", info)) { - connection.setQueryTimeout(queryTimeout); - properties = connection.getServerProperties(); - AGGREGRATE_TIME_LIST.addAll(properties.getSupportedTimeAggregationOperations()); - timestampPrecision = properties.getTimestampPrecision(); - + IoTDBConnection connection = null; + try { + connection = openConnection(); + setupConnection(connection); echoStarting(ctx); displayLogo(ctx, properties.getLogo(), properties.getVersion(), properties.getBuildInfo()); ctx.getPrinter().println(String.format("Successfully login at %s:%s", host, port)); while (true) { - boolean readLine = readerReadLine(ctx, connection); - if (readLine) { + ReadLineResult result = readerReadLine(ctx, connection); + if (result.stop) { break; } + if (result.failedCommand != null) { + // Connection failed during processCommand; try to reconnect and retry the command. + closeConnectionQuietly(connection); + connection = null; + boolean reconnected = false; + for (int attempt = 1; attempt <= RECONNECT_RETRY_NUM; attempt++) { + if (attempt > 1) { + try { + Thread.sleep(RECONNECT_RETRY_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + ctx.getErr().printf("%s: Reconnection interrupted.%n", IOTDB_ERROR_PREFIX); + ctx.exit(CODE_ERROR); + } + } + try { + connection = openConnection(); + setupConnection(connection); + ctx.getPrinter().println("Connection lost. Reconnected. Retrying command."); + processCommand(ctx, result.failedCommand, connection); + reconnected = true; + break; + } catch (SQLException e) { + if (isSessionOrStatementError(e)) { + // Reconnect succeeded but retry failed due to session/statement state; ask user to + // run the command again. + ctx.getPrinter() + .println( + "Reconnected, but the previous command could not be completed. Please run your command again."); + reconnected = true; + break; + } + if (attempt == RECONNECT_RETRY_NUM) { + ctx.getErr() + .printf( + "%s: Could not reconnect after %d attempts. Please check that the server is running and try again.%n", + IOTDB_ERROR_PREFIX, RECONNECT_RETRY_NUM); + ctx.exit(CODE_ERROR); + } + } + } + if (!reconnected) { + break; + } + } } } catch (SQLException e) { ctx.getErr().printf("%s: %s%n", IOTDB_ERROR_PREFIX, e.getMessage()); ctx.exit(CODE_ERROR); + } finally { + closeConnectionQuietly(connection); } } - private static boolean readerReadLine(CliContext ctx, IoTDBConnection connection) { + private static ReadLineResult readerReadLine(CliContext ctx, IoTDBConnection connection) { String s; try { s = ctx.getLineReader().readLine(cliPrefix + "> ", null); - boolean continues = processCommand(ctx, s, connection); - if (!continues) { - return true; + try { + boolean continues = processCommand(ctx, s, connection); + if (!continues) { + return ReadLineResult.stopLoop(); + } + } catch (SQLException e) { + if (isConnectionRelated(e)) { + return ReadLineResult.reconnectAndRetry(s); + } + ctx.getErr().printf("%s: %s%n", IOTDB_ERROR_PREFIX, e.getMessage()); + return ReadLineResult.stopLoop(); } } catch (UserInterruptException e) { // Exit on signal INT requires confirmation. @@ -233,12 +336,12 @@ private static boolean readerReadLine(CliContext ctx, IoTDBConnection connection // Exit on EOF (usually by pressing CTRL+D). ctx.exit(CODE_OK); } catch (IllegalArgumentException e) { - if (e.getMessage().contains("history")) { - return false; + if (e.getMessage() != null && e.getMessage().contains("history")) { + return ReadLineResult.continueLoop(); } throw e; } - return false; + return ReadLineResult.continueLoop(); } private static void readLine(CliContext ctx) { diff --git a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/AbstractCliTest.java b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/AbstractCliTest.java index 635676ce2af8e..2b23f19760eee 100644 --- a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/AbstractCliTest.java +++ b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/AbstractCliTest.java @@ -39,6 +39,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.SQLException; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.mockito.Mockito.when; @@ -150,7 +152,7 @@ private void isTwoStringArrayEqual(String[] expected, String[] actual) { } @Test - public void testHandleInputInputCmd() { + public void testHandleInputInputCmd() throws SQLException { CliContext ctx = new CliContext(System.in, System.out, System.err, ExitType.EXCEPTION); assertEquals( OperationResult.STOP_OPER,