From 627e6ea5db648abcd548aa350c1bc1d449624c80 Mon Sep 17 00:00:00 2001 From: Nissim Shiman Date: Fri, 5 Dec 2025 18:40:27 +0000 Subject: [PATCH] NIFI-12598 Allow Starting/Stopping of PG components to be recorded in Flow Configuration History when started/stopped at PG level --- .../nifi/audit/ProcessGroupAuditor.java | 38 +++++ .../nifi/audit/TestProcessGroupAuditor.java | 137 ++++++++++++++++-- 2 files changed, 165 insertions(+), 10 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java index 33259dac3906..a7adad13073a 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/audit/ProcessGroupAuditor.java @@ -31,8 +31,10 @@ import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.parameter.ParameterContext; import org.apache.nifi.registry.flow.VersionControlInformation; +import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.VersionControlInformationDTO; import org.apache.nifi.web.dao.ProcessGroupDAO; @@ -223,6 +225,7 @@ public void scheduleComponentsAdvice(ProceedingJoinPoint proceedingJoinPoint, St } saveUpdateProcessGroupAction(groupId, operation); + saveActions(getComponentActions(groupId, componentIds, operation), logger); } /** @@ -273,6 +276,21 @@ private List getComponentActions(final String groupId, final Collection< port = processGroup.findOutputPort(componentId); if (port != null) { actions.add(generateUpdateConnectableAction(port, operation, Component.OutputPort)); + continue; + } + + ProcessGroup internalProcessGroup = processGroup.findProcessGroup(componentId); + if (internalProcessGroup != null) { + actions.add(generateUpdateProcessGroupAction(internalProcessGroup, operation)); + continue; + } + + RemoteGroupPort remoteGroupPort = processGroup.findRemoteGroupPort(componentId); + if (remoteGroupPort != null) { + RemoteProcessGroup remoteProcessGroup = remoteGroupPort.getRemoteProcessGroup(); + if (remoteProcessGroup != null) { + actions.add(generateUpdateRemoteProcessGroupAction(remoteProcessGroup, operation)); + } } } @@ -402,6 +420,26 @@ private void saveUpdateProcessGroupAction(final String groupId, final Operation saveAction(action, logger); } + private Action generateUpdateProcessGroupAction(final ProcessGroup processGroup, final Operation operation) { + final FlowChangeAction action = createFlowChangeAction(); + action.setSourceId(processGroup.getIdentifier()); + action.setSourceName(processGroup.getName()); + action.setSourceType(Component.ProcessGroup); + action.setOperation(operation); + + return action; + } + + private Action generateUpdateRemoteProcessGroupAction(final RemoteProcessGroup remoteProcessGroup, final Operation operation) { + final FlowChangeAction action = createFlowChangeAction(); + action.setSourceId(remoteProcessGroup.getIdentifier()); + action.setSourceName(remoteProcessGroup.getName()); + action.setSourceType(Component.RemoteProcessGroup); + action.setOperation(operation); + + return action; + } + private Action generateUpdateConnectableAction(final Connectable connectable, final Operation operation, final Component component) { final FlowChangeAction action = createFlowChangeAction(); action.setSourceId(connectable.getIdentifier()); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java index 0e8d78b1a33c..3d5f5bd1aca6 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/audit/TestProcessGroupAuditor.java @@ -34,6 +34,9 @@ import org.apache.nifi.controller.service.StandardControllerServiceNode; import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.RemoteProcessGroup; +import org.apache.nifi.groups.StatelessGroupNode; +import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.web.dao.impl.StandardProcessGroupDAO; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -73,10 +76,13 @@ public class TestProcessGroupAuditor { private static final String PG_1 = "processGroup1"; + private static final String PG_2 = "processGroup2"; private static final String PROC_1 = "processor1"; private static final String PROC_2 = "processor2"; private static final String INPUT_PORT = "inputPort"; private static final String OUTPUT_PORT = "outputPort"; + private static final String REMOTE_PG = "remotePG"; + private static final String REMOTE_GROUP_INPUT_PORT = "remoteGroupInputPort"; private static final String CS_1 = "controllerService1"; private static final String USER_ID = "user-id"; @@ -117,26 +123,137 @@ void setUp() { void testVerifyStartProcessGroupAuditing() { final ProcessorNode processor1 = mock(StandardProcessorNode.class); final ProcessorNode processor2 = mock(StandardProcessorNode.class); + final Port inputPort = mock(Port.class); + final Port outputPort = mock(Port.class); + final ProcessGroup innerProcessGroup = mock(ProcessGroup.class); + final StatelessGroupNode statelessGroup = mock(StatelessGroupNode.class); + final RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class); + final RemoteGroupPort remoteGroupInputPort = mock(RemoteGroupPort.class); + + when(processor1.getName()).thenReturn(PROC_1); when(processor1.getProcessGroup()).thenReturn(processGroup); - when(processor2.getProcessGroup()).thenReturn(processGroup); when(processor1.getConnectableType()).thenReturn(ConnectableType.PROCESSOR); + + when(processor2.getName()).thenReturn(PROC_2); + when(processor2.getProcessGroup()).thenReturn(processGroup); when(processor2.getConnectableType()).thenReturn(ConnectableType.PROCESSOR); + when(inputPort.getName()).thenReturn(INPUT_PORT); + when(inputPort.getProcessGroup()).thenReturn(processGroup); + when(inputPort.getConnectableType()).thenReturn(ConnectableType.INPUT_PORT); + + when(outputPort.getName()).thenReturn(OUTPUT_PORT); + when(outputPort.getProcessGroup()).thenReturn(processGroup); + when(outputPort.getConnectableType()).thenReturn(ConnectableType.OUTPUT_PORT); + + when(innerProcessGroup.getName()).thenReturn(PG_2); + when(statelessGroup.getProcessGroup()).thenReturn(processGroup); + when(statelessGroup.getConnectableType()).thenReturn(ConnectableType.STATELESS_GROUP); + + when(remoteProcessGroup.getName()).thenReturn(REMOTE_PG); + when(remoteGroupInputPort.getProcessGroup()).thenReturn(processGroup); + when(remoteGroupInputPort.getConnectableType()).thenReturn(ConnectableType.REMOTE_INPUT_PORT); + + when(processGroup.findProcessor(PROC_1)).thenReturn(processor1); + when(processGroup.findProcessor(PROC_2)).thenReturn(processor2); + + when(processGroup.findProcessor(INPUT_PORT)).thenReturn(null); + when(processGroup.findInputPort(INPUT_PORT)).thenReturn(inputPort); + + when(processGroup.findProcessor(OUTPUT_PORT)).thenReturn(null); + when(processGroup.findInputPort(OUTPUT_PORT)).thenReturn(null); + when(processGroup.findOutputPort(OUTPUT_PORT)).thenReturn(outputPort); + + when(processGroup.findProcessor(PG_2)).thenReturn(null); + when(processGroup.findInputPort(PG_2)).thenReturn(null); + when(processGroup.findOutputPort(PG_2)).thenReturn(null); + when(processGroup.findProcessGroup(PG_2)).thenReturn(innerProcessGroup); + + when(processGroup.findProcessor(REMOTE_GROUP_INPUT_PORT)).thenReturn(null); + when(processGroup.findInputPort(REMOTE_GROUP_INPUT_PORT)).thenReturn(null); + when(processGroup.findOutputPort(REMOTE_GROUP_INPUT_PORT)).thenReturn(null); + when(processGroup.findProcessGroup(REMOTE_GROUP_INPUT_PORT)).thenReturn(null); + when(processGroup.findRemoteGroupPort(REMOTE_GROUP_INPUT_PORT)).thenReturn(remoteGroupInputPort); + when(remoteGroupInputPort.getRemoteProcessGroup()).thenReturn(remoteProcessGroup); + when(flowManager.getGroup(eq(PG_1))).thenReturn(processGroup); when(flowManager.findConnectable(eq(PROC_1))).thenReturn(processor1); when(flowManager.findConnectable(eq(PROC_2))).thenReturn(processor2); + when(flowManager.findConnectable(eq(INPUT_PORT))).thenReturn(inputPort); + when(flowManager.findConnectable(eq(OUTPUT_PORT))).thenReturn(outputPort); + when(flowManager.findConnectable(eq(PG_2))).thenReturn(statelessGroup); + when(flowManager.findConnectable(eq(REMOTE_GROUP_INPUT_PORT))).thenReturn(remoteGroupInputPort); + when(flowController.getFlowManager()).thenReturn(flowManager); - processGroupDAO.scheduleComponents(PG_1, ScheduledState.RUNNING, new HashSet<>(Arrays.asList(PROC_1, PROC_2))); + processGroupDAO.scheduleComponents(PG_1, ScheduledState.RUNNING, new HashSet<>(Arrays.asList(PROC_1, PG_2, PROC_2, INPUT_PORT, REMOTE_GROUP_INPUT_PORT, OUTPUT_PORT))); + + verify(auditService, times(2)).addActions(argumentCaptorActions.capture()); + final List> actions = argumentCaptorActions.getAllValues(); + assertEquals(2, actions.size()); + final Iterator> actionsIterator = actions.iterator(); - verify(auditService).addActions(argumentCaptorActions.capture()); - final List actions = argumentCaptorActions.getValue(); - assertEquals(1, actions.size()); - final Action action = actions.getFirst(); - assertInstanceOf(FlowChangeAction.class, action); - assertEquals(USER_ID, action.getUserIdentity()); - assertEquals("ProcessGroup", action.getSourceType().name()); - assertEquals(Operation.Start, action.getOperation()); + // pg started + final List pgActions = actionsIterator.next(); + assertEquals(1, pgActions.size()); + final Action pgAction = pgActions.iterator().next(); + assertInstanceOf(FlowChangeAction.class, pgAction); + assertEquals(USER_ID, pgAction.getUserIdentity()); + assertEquals("ProcessGroup", pgAction.getSourceType().name()); + assertEquals(Operation.Start, pgAction.getOperation()); + + List componentActions = actionsIterator.next(); + assertEquals(6, componentActions.size()); + + componentActions.sort(Comparator.comparing(Action::getSourceName)); + + // inputPort started + final Iterator actionIterator = componentActions.iterator(); + Action componentAction = actionIterator.next(); + assertInstanceOf(FlowChangeAction.class, componentAction); + assertEquals(USER_ID, componentAction.getUserIdentity()); + assertEquals("InputPort", componentAction.getSourceType().name()); + assertEquals(INPUT_PORT, componentAction.getSourceName()); + assertEquals(Operation.Start, componentAction.getOperation()); + + // outputPort started + componentAction = actionIterator.next(); + assertInstanceOf(FlowChangeAction.class, componentAction); + assertEquals(USER_ID, componentAction.getUserIdentity()); + assertEquals("OutputPort", componentAction.getSourceType().name()); + assertEquals(OUTPUT_PORT, componentAction.getSourceName()); + assertEquals(Operation.Start, componentAction.getOperation()); + + // inner stateless pg started + componentAction = actionIterator.next(); + assertInstanceOf(FlowChangeAction.class, componentAction); + assertEquals(USER_ID, componentAction.getUserIdentity()); + assertEquals("ProcessGroup", componentAction.getSourceType().name()); + assertEquals(PG_2, componentAction.getSourceName()); + assertEquals(Operation.Start, componentAction.getOperation()); + + // processors started + componentAction = actionIterator.next(); + assertInstanceOf(FlowChangeAction.class, componentAction); + assertEquals(USER_ID, componentAction.getUserIdentity()); + assertEquals("Processor", componentAction.getSourceType().name()); + assertEquals(PROC_1, componentAction.getSourceName()); + assertEquals(Operation.Start, componentAction.getOperation()); + + componentAction = actionIterator.next(); + assertInstanceOf(FlowChangeAction.class, componentAction); + assertEquals(USER_ID, componentAction.getUserIdentity()); + assertEquals("Processor", componentAction.getSourceType().name()); + assertEquals(PROC_2, componentAction.getSourceName()); + assertEquals(Operation.Start, componentAction.getOperation()); + + // remote PG started + componentAction = actionIterator.next(); + assertInstanceOf(FlowChangeAction.class, componentAction); + assertEquals(USER_ID, componentAction.getUserIdentity()); + assertEquals("RemoteProcessGroup", componentAction.getSourceType().name()); + assertEquals(REMOTE_PG, componentAction.getSourceName()); + assertEquals(Operation.Start, componentAction.getOperation()); } @Test