Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1367,9 +1367,9 @@ public TSStatus createPipe(TCreatePipeReq req) {
}
}

public TSStatus alterPipe(TAlterPipeReq req) {
public TSStatus alterPipe(final TAlterPipeReq req) {
try {
AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
final AlterPipeProcedureV2 procedure = new AlterPipeProcedureV2(req);
executor.submitProcedure(procedure);
TSStatus status = waitingProcedureFinished(procedure);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void checkPipePluginExistence(
String.format(
"Failed to create or alter pipe, the pipe extractor plugin %s does not exist",
extractorPluginName);
LOGGER.warn(exceptionMessage);
LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private boolean checkBeforeCreatePipeInternal(final TCreatePipeReq createPipeReq
String.format(
"Failed to create pipe %s, %s",
createPipeRequest.getPipeName(), PIPE_ALREADY_EXIST_MSG);
LOGGER.warn(exceptionMessage);
LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}

Expand All @@ -209,7 +209,7 @@ private boolean checkAndUpdateRequestBeforeAlterPipeInternal(final TAlterPipeReq
final String exceptionMessage =
String.format(
"Failed to alter pipe %s, %s", alterPipeRequest.getPipeName(), PIPE_NOT_EXIST_MSG);
LOGGER.warn(exceptionMessage);
LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private Flow executeFromLock(ConfigNodeProcedureEnv env) {
}
} catch (PipeException e) {
// The pipe plugin has already created, we should end the procedure
LOGGER.warn(
LOGGER.info(
"Pipe plugin {} is already created, end the CreatePipePluginProcedure({})",
pluginName,
pluginName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private Flow executeFromLock(ConfigNodeProcedureEnv env) {
subscriptionInfo.validatePipePluginUsageByTopic(pluginName);
} catch (PipeException e) {
// if the pipe plugin is a built-in plugin, we should not drop it
LOGGER.warn(e.getMessage());
LOGGER.info(e.getMessage());
pipePluginCoordinator.unlock();
pipeTaskCoordinator.unlock();
setFailure(new ProcedureException(e.getMessage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,10 +957,12 @@ public SettableFuture<ConfigTaskResult> dropPipePlugin(
.setPluginName(dropPipePluginStatement.getPluginName().toUpperCase())
.setIfExistsCondition(dropPipePluginStatement.hasIfExistsCondition()));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
LOGGER.warn(
"[{}] Failed to drop pipe plugin {}.",
executionStatus,
dropPipePluginStatement.getPluginName());
if (TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode() != executionStatus.getCode()) {
LOGGER.warn(
"[{}] Failed to drop pipe plugin {}.",
executionStatus,
dropPipePluginStatement.getPluginName());
}
future.setException(new IoTDBException(executionStatus));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public static TSStatus onQueryException(Exception e, String operation, TSStatusC
|| status.getCode() == TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode()
|| status.getCode() == TSStatusCode.TEMPLATE_INCOMPATIBLE.getStatusCode()
|| status.getCode() == TSStatusCode.PATH_ALREADY_EXIST.getStatusCode()
|| status.getCode() == TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode()
|| status.getCode() == TSStatusCode.QUERY_TIMEOUT.getStatusCode()) {
LOGGER.info(message);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.apache.iotdb.commons.pipe.agent.task.subtask;

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkCriticalException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import org.apache.iotdb.commons.pipe.agent.task.execution.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
Expand All @@ -33,6 +35,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -268,7 +271,13 @@ public void sleep4NonReportException() {

@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
protected void handleException(final Event event, final Exception e) {
if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) {
if (e instanceof PipeRuntimeOutOfMemoryCriticalException
|| ExceptionUtils.getRootCause(e) instanceof PipeRuntimeOutOfMemoryCriticalException) {
PipeLogger.log(
LOGGER::info,
e,
"Temporarily out of memory in pipe event transferring, will wait for the memory to release.");
} else if (e instanceof PipeRuntimeSinkNonReportTimeConfigurableException) {
if (lastExceptionTime == Long.MAX_VALUE) {
lastExceptionTime = System.currentTimeMillis();
}
Expand Down
Loading