From c34d6f9e5fea4db3999001f72cff02508691c2cb Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 5 Feb 2026 17:21:26 +0800 Subject: [PATCH 1/2] Pipe: Optimized the logger semantic && the retry logic of memory error at sink subtask (#17166) * shop * fix * sit * logger --- .../iotdb/confignode/manager/ProcedureManager.java | 4 ++-- .../confignode/persistence/pipe/PipePluginInfo.java | 4 ++-- .../confignode/persistence/pipe/PipeTaskInfo.java | 4 ++-- .../impl/pipe/plugin/CreatePipePluginProcedure.java | 2 +- .../impl/pipe/plugin/DropPipePluginProcedure.java | 2 +- .../config/executor/ClusterConfigTaskExecutor.java | 10 ++++++---- .../org/apache/iotdb/db/utils/ErrorHandlingUtils.java | 1 + .../agent/task/subtask/PipeAbstractSinkSubtask.java | 11 ++++++++++- 8 files changed, 25 insertions(+), 13 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 5f6450769f583..f0f1b9e6bfbff 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -1367,9 +1367,9 @@ public TSStatus createPipe(TCreatePipeReq req) { } } - public TSStatus alterPipe(TAlterPipeReq req) { + public TSStatus alterPipe(final TAlterPipeReq req) { try { - AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req); + final AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req); executor.submitProcedure(procedure); TSStatus status = waitingProcedureFinished(procedure); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index 36206c05fcd16..eae8eea1cbb1f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -155,8 +155,8 @@ public void checkPipePluginExistence( final String exceptionMessage = String.format( "Failed to create or alter pipe, the pipe extractor plugin %s does not exist", - extractorPluginName); - LOGGER.warn(exceptionMessage); + extractorPluginName); + LOGGER.info(exceptionMessage); throw new PipeException(exceptionMessage); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 08f96a25a6192..2bfdbf9e49a3b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -185,7 +185,7 @@ private boolean checkBeforeCreatePipeInternal(final TCreatePipeReq createPipeReq String.format( "Failed to create pipe %s, %s", createPipeRequest.getPipeName(), PIPE_ALREADY_EXIST_MSG); - LOGGER.warn(exceptionMessage); + LOGGER.info(exceptionMessage); throw new PipeException(exceptionMessage); } @@ -209,7 +209,7 @@ private boolean checkAndUpdateRequestBeforeAlterPipeInternal(final TAlterPipeReq final String exceptionMessage = String.format( "Failed to alter pipe %s, %s", alterPipeRequest.getPipeName(), PIPE_NOT_EXIST_MSG); - LOGGER.warn(exceptionMessage); + LOGGER.info(exceptionMessage); throw new PipeException(exceptionMessage); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java index e3a4719a71925..f4fa738428d43 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java @@ -145,7 +145,7 @@ private Flow executeFromLock(ConfigNodeProcedureEnv env) { } } catch (PipeException e) { // The pipe plugin has already created, we should end the procedure - LOGGER.warn( + LOGGER.info( "Pipe plugin {} is already created, end the CreatePipePluginProcedure({})", pluginName, pluginName); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java index 665a3782a91d3..efbe1ee6ccdda 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java @@ -143,7 +143,7 @@ private Flow executeFromLock(ConfigNodeProcedureEnv env) { subscriptionInfo.validatePipePluginUsageByTopic(pluginName); } catch (PipeException e) { // if the pipe plugin is a built-in plugin, we should not drop it - LOGGER.warn(e.getMessage()); + LOGGER.info(e.getMessage()); pipePluginCoordinator.unlock(); pipeTaskCoordinator.unlock(); setFailure(new ProcedureException(e.getMessage())); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 784e890b1e5bf..7b5b23e644a23 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -957,10 +957,12 @@ public SettableFuture dropPipePlugin( .setPluginName(dropPipePluginStatement.getPluginName().toUpperCase()) .setIfExistsCondition(dropPipePluginStatement.hasIfExistsCondition())); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) { - LOGGER.warn( - "[{}] Failed to drop pipe plugin {}.", - executionStatus, - dropPipePluginStatement.getPluginName()); + if (TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode() != executionStatus.getCode()) { + LOGGER.warn( + "[{}] Failed to drop pipe plugin {}.", + executionStatus, + dropPipePluginStatement.getPluginName()); + } future.setException(new IoTDBException(executionStatus)); } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index 072cba8b55ce2..963cecceb104c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -109,6 +109,7 @@ public static TSStatus onQueryException(Exception e, String operation, TSStatusC || status.getCode() == TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode() || status.getCode() == TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode() || status.getCode() == TSStatusCode.PATH_ALREADY_EXIST.getStatusCode() + || status.getCode() == TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode() || status.getCode() == TSStatusCode.QUERY_TIMEOUT.getStatusCode()) { LOGGER.info(message); } else { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java index 56e142d6efd04..577b57e3cf727 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java @@ -19,11 +19,13 @@ package org.apache.iotdb.commons.pipe.agent.task.subtask; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException; import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.event.Event; @@ -33,6 +35,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -268,7 +271,13 @@ public void sleep4NonReportException() { @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning protected void handleException(final Event event, final Exception e) { - if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) { + if (e instanceof PipeRuntimeOutOfMemoryCriticalException + || ExceptionUtils.getRootCause(e) instanceof PipeRuntimeOutOfMemoryCriticalException) { + PipeLogger.log( + LOGGER::info, + e, + "Temporarily out of memory in pipe event transferring, will wait for the memory to release."); + } else if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) { if (lastExceptionTime == Long.MAX_VALUE) { lastExceptionTime = System.currentTimeMillis(); } From a19d37661d9fdc9420b8db7911f4b70adb4e2b21 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 6 Feb 2026 09:53:17 +0800 Subject: [PATCH 2/2] ger --- .../iotdb/confignode/persistence/pipe/PipePluginInfo.java | 2 +- .../pipe/agent/task/subtask/PipeAbstractSinkSubtask.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java index eae8eea1cbb1f..9d4f0ed201e64 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java @@ -155,7 +155,7 @@ public void checkPipePluginExistence( final String exceptionMessage = String.format( "Failed to create or alter pipe, the pipe extractor plugin %s does not exist", - extractorPluginName); + extractorPluginName); LOGGER.info(exceptionMessage); throw new PipeException(exceptionMessage); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java index 577b57e3cf727..2c66adf7d9150 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractSinkSubtask.java @@ -35,7 +35,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;