From c92c36c6541fdaacf45705417d467ad4b315e504 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 6 Feb 2026 10:50:39 +0800 Subject: [PATCH 1/5] idemp --- .../visitor/PipeStatementTSStatusVisitor.java | 221 +++++++++--------- 1 file changed, 106 insertions(+), 115 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java index 7d1b08b123894..49b9b9a87c77c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.receiver.visitor; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; @@ -43,6 +44,8 @@ import org.apache.iotdb.db.queryengine.plan.statement.metadata.view.CreateLogicalViewStatement; import org.apache.iotdb.rpc.TSStatusCode; +import java.util.stream.Collectors; + /** * This visitor translated some {@link TSStatus} to pipe related status to help sender classify them * and apply different error handling tactics. Please DO NOT modify the {@link TSStatus} returned by @@ -53,227 +56,215 @@ public class PipeStatementTSStatusVisitor extends StatementVisitor node.accept(this, subStatus)) + .collect(Collectors.toList())) + : node.accept(this, status); + } + + @Override + public TSStatus visitNode(final StatementNode node, final TSStatus status) { + return status; } @Override public TSStatus visitLoadFile( - final LoadTsFileStatement loadTsFileStatement, final TSStatus context) { - if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() - || context.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode() - && context.getMessage() != null - && context.getMessage().contains("memory")) { + final LoadTsFileStatement loadTsFileStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode() + || status.getCode() == TSStatusCode.LOAD_FILE_ERROR.getStatusCode() + && status.getMessage() != null + && status.getMessage().contains("memory")) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return super.visitLoadFile(loadTsFileStatement, context); + return super.visitLoadFile(loadTsFileStatement, status); } @Override public TSStatus visitInsertTablet( - final InsertTabletStatement insertTabletStatement, final TSStatus context) { - return visitInsertBase(insertTabletStatement, context); + final InsertTabletStatement insertTabletStatement, final TSStatus status) { + return visitInsertBase(insertTabletStatement, status); } @Override public TSStatus visitInsertRow( - final InsertRowStatement insertRowStatement, final TSStatus context) { - return visitInsertBase(insertRowStatement, context); + final InsertRowStatement insertRowStatement, final TSStatus status) { + return visitInsertBase(insertRowStatement, status); } @Override public TSStatus visitInsertRows( - final InsertRowsStatement insertRowsStatement, final TSStatus context) { - return visitInsertBase(insertRowsStatement, context); + final InsertRowsStatement insertRowsStatement, final TSStatus status) { + return visitInsertBase(insertRowsStatement, status); } @Override public TSStatus visitInsertMultiTablets( - final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus context) { - return visitInsertBase(insertMultiTabletsStatement, context); + final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus status) { + return visitInsertBase(insertMultiTabletsStatement, status); } @Override public TSStatus visitInsertBase( - final InsertBaseStatement insertBaseStatement, final TSStatus context) { + final InsertBaseStatement insertBaseStatement, final TSStatus status) { // If the system is read-only, we shall not classify it into temporary unavailable exception to // avoid to many logs - if (context.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) { + if (status.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) { + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.OUT_OF_TTL.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) { + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { - if (context.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING) + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { + if (status.getMessage().contains(DataTypeMismatchException.REGISTERED_TYPE_STRING) && config.isEnablePartialInsert()) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - if (context.getMessage().contains("does not exist")) { + if (status.getMessage().contains("does not exist")) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } } - return visitStatement(insertBaseStatement, context); + return visitStatement(insertBaseStatement, status); } @Override public TSStatus visitCreateTimeseries( - final CreateTimeSeriesStatement statement, final TSStatus context) { - return visitGeneralCreateTimeSeries(statement, context); + final CreateTimeSeriesStatement statement, final TSStatus status) { + return visitGeneralCreateTimeSeries(statement, status); } @Override public TSStatus visitCreateAlignedTimeseries( - final CreateAlignedTimeSeriesStatement statement, final TSStatus context) { - return visitGeneralCreateTimeSeries(statement, context); + final CreateAlignedTimeSeriesStatement statement, final TSStatus status) { + return visitGeneralCreateTimeSeries(statement, status); } - private TSStatus visitGeneralCreateTimeSeries(final Statement statement, final TSStatus context) { - if (context.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode() - || context.getCode() == TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) { + private TSStatus visitGeneralCreateTimeSeries(final Statement statement, final TSStatus status) { + if (status.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode() + || status.getCode() == TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.PATH_ALREADY_EXIST.getStatusCode() - || context.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) { + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.PATH_ALREADY_EXIST.getStatusCode() + || status.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return visitStatement(statement, context); + return visitStatement(statement, status); } @Override public TSStatus visitCreateMultiTimeSeries( - final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, final TSStatus context) { - return visitGeneralCreateMultiTimeseries(createMultiTimeSeriesStatement, context); + final CreateMultiTimeSeriesStatement createMultiTimeSeriesStatement, final TSStatus status) { + return visitGeneralCreateMultiTimeSeries(createMultiTimeSeriesStatement, status); } @Override public TSStatus visitInternalCreateTimeseries( final InternalCreateTimeSeriesStatement internalCreateTimeSeriesStatement, - final TSStatus context) { - return visitGeneralCreateMultiTimeseries(internalCreateTimeSeriesStatement, context); + final TSStatus status) { + return visitGeneralCreateMultiTimeSeries(internalCreateTimeSeriesStatement, status); } @Override public TSStatus visitInternalCreateMultiTimeSeries( final InternalCreateMultiTimeSeriesStatement internalCreateMultiTimeSeriesStatement, - final TSStatus context) { - return visitGeneralCreateMultiTimeseries(internalCreateMultiTimeSeriesStatement, context); + final TSStatus status) { + return visitGeneralCreateMultiTimeSeries(internalCreateMultiTimeSeriesStatement, status); } - private TSStatus visitGeneralCreateMultiTimeseries( - final Statement statement, final TSStatus context) { - if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - for (final TSStatus status : context.getSubStatus()) { - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && status.getCode() != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode() - && status.getCode() != TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) { - return visitStatement(statement, context); - } - } + private TSStatus visitGeneralCreateMultiTimeSeries( + final Statement statement, final TSStatus status) { + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || status.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode() + || status.getCode() == TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) { + .setMessage(status.getMessage()); + } else if (status.getCode() == TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return visitStatement(statement, context); + return visitStatement(statement, status); } @Override public TSStatus visitAlterTimeSeries( - final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus context) { - if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { - if (context.getMessage().contains("already")) { + final AlterTimeSeriesStatement alterTimeSeriesStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { + if (status.getMessage().contains("already")) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getMessage().contains("does not")) { + .setMessage(status.getMessage()); + } else if (status.getMessage().contains("does not")) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - } else if (context.getCode() == TSStatusCode.PATH_NOT_EXIST.getStatusCode()) { + } else if (status.getCode() == TSStatusCode.PATH_NOT_EXIST.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return visitStatement(alterTimeSeriesStatement, context); + return visitStatement(alterTimeSeriesStatement, status); } @Override public TSStatus visitCreateLogicalView( - final CreateLogicalViewStatement createLogicalViewStatement, final TSStatus context) { - if (context.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); - } else if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - for (final TSStatus status : context.getSubStatus()) { - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && status.getCode() != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - return visitStatement(createLogicalViewStatement, context); - } - } + final CreateLogicalViewStatement createLogicalViewStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return super.visitCreateLogicalView(createLogicalViewStatement, context); + return super.visitCreateLogicalView(createLogicalViewStatement, status); } @Override public TSStatus visitActivateTemplate( - final ActivateTemplateStatement activateTemplateStatement, final TSStatus context) { - return visitGeneralActivateTemplate(activateTemplateStatement, context); + final ActivateTemplateStatement activateTemplateStatement, final TSStatus status) { + return visitGeneralActivateTemplate(activateTemplateStatement, status); } @Override public TSStatus visitBatchActivateTemplate( - final BatchActivateTemplateStatement batchActivateTemplateStatement, final TSStatus context) { - boolean userConflict = false; - if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - for (final TSStatus status : context.getSubStatus()) { - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { - return visitStatement(batchActivateTemplateStatement, context); - } - if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() - && context.isSetMessage() - && context.getMessage().contains("has not been set any template")) { - userConflict = true; - } - } - return (userConflict - ? new TSStatus( - TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) - : new TSStatus( - TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) - .setMessage(context.getMessage()); + final BatchActivateTemplateStatement batchActivateTemplateStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + || status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { + return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(status.getMessage()); + } + if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() + && status.isSetMessage() + && status.getMessage().contains("has not been set any template")) { + return new TSStatus( + TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(status.getMessage()); } - return visitGeneralActivateTemplate(batchActivateTemplateStatement, context); + return visitGeneralActivateTemplate(batchActivateTemplateStatement, status); } private TSStatus visitGeneralActivateTemplate( - final Statement activateTemplateStatement, final TSStatus context) { - if (context.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { + final Statement activateTemplateStatement, final TSStatus status) { + if (status.getCode() == TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - if (context.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() - && context.isSetMessage() - && context.getMessage().contains("has not been set any template")) { + if (status.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode() + && status.isSetMessage() + && status.getMessage().contains("has not been set any template")) { return new TSStatus( TSStatusCode.PIPE_RECEIVER_PARALLEL_OR_USER_CONFLICT_EXCEPTION.getStatusCode()) - .setMessage(context.getMessage()); + .setMessage(status.getMessage()); } - return visitStatement(activateTemplateStatement, context); + return visitStatement(activateTemplateStatement, status); } } From 9dfa5c7feee94cd3b7c940eac28acb3917ae8247 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 6 Feb 2026 11:03:56 +0800 Subject: [PATCH 2/5] grass --- .../it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java index 53da5d3f498ad..fb499ae5e43a5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java @@ -400,8 +400,6 @@ public void testDropRoleIdempotent() throws Exception { Collections.singleton("2,")); } - // Table model - private void testIdempotent( final List beforeSqlList, final String testSql, From 9f0ee6e42f40e6d6642cbd2dc5a255298eaba3a5 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 6 Feb 2026 12:13:39 +0800 Subject: [PATCH 3/5] fix --- .../java/org/apache/iotdb/SessionExample.java | 22 +++++++++---------- .../thrift/IoTDBDataNodeReceiver.java | 4 ++-- ...tementDataTypeConvertExecutionVisitor.java | 12 +++++----- ...tementDataTypeConvertExecutionVisitor.java | 10 ++++----- 4 files changed, 22 insertions(+), 26 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index c9e8d42aacf59..bcbf7a61adf34 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -90,24 +90,24 @@ public static void main(String[] args) } // createTemplate(); - createTimeseries(); - createMultiTimeseries(); - createMultiTimeseriesWithNullPartical(); - insertRecord(); - insertTablet(); + // createTimeseries(); + // createMultiTimeseries(); + // createMultiTimeseriesWithNullPartical(); + // insertRecord(); + // insertTablet(); // insertTabletWithNullValues(); // insertTablets(); - // insertRecords(); + insertRecords(); // insertText(); // selectInto(); // createAndDropContinuousQueries(); // nonQuery(); - queryByIterator(); + // queryByIterator(); // queryWithTimeout(); - rawDataQuery(); - lastDataQuery(); - aggregationQuery(); - groupByQuery(); + // rawDataQuery(); + // lastDataQuery(); + // aggregationQuery(); + // groupByQuery(); // deleteData(); // deleteTimeseries(); // setTimeout(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index f0796a3b1a038..f161a2496669b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -840,7 +840,7 @@ private TSStatus executeStatementAndClassifyExceptions( receiverId.get(), statement.getPipeLoggingString(), result); - return statement.accept(STATEMENT_STATUS_VISITOR, result); + return STATEMENT_STATUS_VISITOR.process(statement, result); } } catch (final Exception e) { PipeLogger.log( @@ -849,7 +849,7 @@ private TSStatus executeStatementAndClassifyExceptions( "Receiver id = %s: Exception encountered while executing statement %s: ", receiverId.get(), statement.getPipeLoggingString()); - return statement.accept(STATEMENT_EXCEPTION_VISITOR, e); + return STATEMENT_EXCEPTION_VISITOR.process(statement, e); } finally { if (Objects.nonNull(allocatedMemoryBlock)) { allocatedMemoryBlock.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java index d19aa71c23478..10d0423e6fe93 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java @@ -154,9 +154,8 @@ public Optional visitLoadFile( TSStatus result; try { result = - statement.accept( - IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(statement, databaseName)); + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process( + statement, statementExecutor.execute(statement, databaseName)); // Retry max 5 times if the write process is rejected for (int i = 0; @@ -167,15 +166,14 @@ public Optional visitLoadFile( i++) { Thread.sleep(100L * (i + 1)); result = - statement.accept( - IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(statement, databaseName)); + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process( + statement, statementExecutor.execute(statement, databaseName)); } } catch (final Exception e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } - result = statement.accept(IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR, e); + result = IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR.process(statement, e); } if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java index 282b378a2d268..ae60b87450aa8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java @@ -114,9 +114,8 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null, tr TSStatus result; try { result = - statement.accept( - IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(statement)); + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process( + statement, statementExecutor.execute(statement)); // Retry max 5 times if the write process is rejected for (int i = 0; @@ -127,9 +126,8 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null, tr i++) { Thread.sleep(100L * (i + 1)); result = - statement.accept( - IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR, - statementExecutor.execute(statement)); + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.process( + statement, statementExecutor.execute(statement)); } } catch (final Exception e) { if (e instanceof InterruptedException) { From 652c8c3c1e1958db6877bb3b4b2686bcfba0fb2c Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 6 Feb 2026 12:27:53 +0800 Subject: [PATCH 4/5] fix --- .../java/org/apache/iotdb/SessionExample.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index bcbf7a61adf34..c9e8d42aacf59 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -90,24 +90,24 @@ public static void main(String[] args) } // createTemplate(); - // createTimeseries(); - // createMultiTimeseries(); - // createMultiTimeseriesWithNullPartical(); - // insertRecord(); - // insertTablet(); + createTimeseries(); + createMultiTimeseries(); + createMultiTimeseriesWithNullPartical(); + insertRecord(); + insertTablet(); // insertTabletWithNullValues(); // insertTablets(); - insertRecords(); + // insertRecords(); // insertText(); // selectInto(); // createAndDropContinuousQueries(); // nonQuery(); - // queryByIterator(); + queryByIterator(); // queryWithTimeout(); - // rawDataQuery(); - // lastDataQuery(); - // aggregationQuery(); - // groupByQuery(); + rawDataQuery(); + lastDataQuery(); + aggregationQuery(); + groupByQuery(); // deleteData(); // deleteTimeseries(); // setTimeout(); From 85b7c0f51daae50337a2200d1067c2f3d86629d4 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 6 Feb 2026 14:21:50 +0800 Subject: [PATCH 5/5] coverage --- .../PipeStatementTsStatusVisitorTest.java | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java new file mode 100644 index 0000000000000..81715716a495b --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/PipeStatementTsStatusVisitorTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.utils.StatusUtils; +import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +public class PipeStatementTsStatusVisitorTest { + + @Test + public void testTTLIdempotency() { + Assert.assertEquals( + TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode(), + IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR + .process( + new InsertRowsStatement(), + new TSStatus(TSStatusCode.MULTIPLE_ERROR.getStatusCode()) + .setSubStatus( + Arrays.asList( + StatusUtils.OK, new TSStatus(TSStatusCode.OUT_OF_TTL.getStatusCode())))) + .getCode()); + } +}