From da91b742e3424287195cf92c599b16d088ff7842 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 15 Jan 2026 15:53:26 +0100 Subject: [PATCH 01/12] Event filtering now records resource action and previous resource MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is important to have a correct further event propagation Signed-off-by: Attila Mészáros --- .../processing/event/EventProcessor.java | 2 +- .../processing/event/EventSourceManager.java | 2 +- .../{controller => }/ResourceAction.java | 2 +- .../controller/ControllerEventSource.java | 7 +++- .../controller/ResourceDeleteEvent.java | 1 + .../source/controller/ResourceEvent.java | 1 + .../informer/ExtendedResourceEvent.java | 42 +++++++++++++++++++ .../source/informer/InformerEventSource.java | 27 +++++------- .../informer/ManagedInformerEventSource.java | 22 +++++++--- .../informer/TemporaryResourceCache.java | 19 +++++---- .../processing/event/EventProcessorTest.java | 2 +- .../event/ResourceStateManagerTest.java | 2 +- .../controller/ControllerEventSourceTest.java | 1 + .../informer/InformerEventSourceTest.java | 15 ++++--- .../TemporaryPrimaryResourceCacheTest.java | 37 +++++++++++----- 15 files changed, 130 insertions(+), 52 deletions(-) rename operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/{controller => }/ResourceAction.java (90%) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ExtendedResourceEvent.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 3685b509aa..b476c39614 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -37,7 +37,7 @@ import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; import io.javaoperatorsdk.operator.processing.event.source.Cache; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 411fc10e31..62e19394c8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -37,9 +37,9 @@ import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource; import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceAction.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceAction.java similarity index 90% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceAction.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceAction.java index 33c4c5a2d6..fff8680913 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceAction.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceAction.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.javaoperatorsdk.operator.processing.event.source.controller; +package io.javaoperatorsdk.operator.processing.event.source; public enum ResourceAction { ADDED, diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index db80c0f4a9..c92d8a0c5b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -28,6 +28,7 @@ import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.MDCUtils; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter; import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource; @@ -139,13 +140,15 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso @Override public void onAdd(T resource) { - var handling = temporaryResourceCache.onAddOrUpdateEvent(resource); + var handling = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, resource, null); handleEvent(ResourceAction.ADDED, resource, null, null, handling != EventHandling.NEW); } @Override public void onUpdate(T oldCustomResource, T newCustomResource) { - var handling = temporaryResourceCache.onAddOrUpdateEvent(newCustomResource); + var handling = + temporaryResourceCache.onAddOrUpdateEvent( + ResourceAction.UPDATED, newCustomResource, oldCustomResource); handleEvent( ResourceAction.UPDATED, newCustomResource, diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceDeleteEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceDeleteEvent.java index ac21250051..6219207faf 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceDeleteEvent.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceDeleteEvent.java @@ -19,6 +19,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; /** * Extends ResourceEvent for informer Delete events, it holds also information if the final state is diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEvent.java index 395f3755fb..88f9bf8716 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEvent.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEvent.java @@ -21,6 +21,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; public class ResourceEvent extends Event { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ExtendedResourceEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ExtendedResourceEvent.java new file mode 100644 index 0000000000..4ae476a3de --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ExtendedResourceEvent.java @@ -0,0 +1,42 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.Optional; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; + +/** Used only for resource event filtering. */ +public class ExtendedResourceEvent extends ResourceEvent { + + private HasMetadata previousResource; + + public ExtendedResourceEvent( + ResourceAction action, + ResourceID resourceID, + HasMetadata latestResource, + HasMetadata previousResource) { + super(action, resourceID, latestResource); + this.previousResource = previousResource; + } + + public Optional getPreviousResource() { + return Optional.ofNullable(previousResource); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 247a471df2..2cb81dede4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -32,7 +32,7 @@ import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION; @@ -107,7 +107,7 @@ public void onAdd(R newResource) { resourceType().getSimpleName(), newResource.getMetadata().getResourceVersion()); } - onAddOrUpdate(Operation.ADD, newResource, null); + onAddOrUpdate(ResourceAction.ADDED, newResource, null); } @Override @@ -120,7 +120,7 @@ public void onUpdate(R oldObject, R newObject) { newObject.getMetadata().getResourceVersion(), oldObject.getMetadata().getResourceVersion()); } - onAddOrUpdate(Operation.UPDATE, newObject, oldObject); + onAddOrUpdate(ResourceAction.UPDATED, newObject, oldObject); } @Override @@ -156,27 +156,27 @@ public synchronized void start() { manager().list().forEach(primaryToSecondaryIndex::onAddOrUpdate); } - private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldObject) { + private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R oldObject) { primaryToSecondaryIndex.onAddOrUpdate(newObject); var resourceID = ResourceID.fromResource(newObject); - var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(newObject); + var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject); if (eventHandling != EventHandling.NEW) { log.debug( "{} event propagation for {}. Resource ID: {}", eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping", - operation, + action, ResourceID.fromResource(newObject)); - } else if (eventAcceptedByFilter(operation, newObject, oldObject)) { + } else if (eventAcceptedByFilter(action, newObject, oldObject)) { log.debug( "Propagating event for {}, resource with same version not result of a reconciliation." + " Resource ID: {}", - operation, + action, resourceID); propagateEvent(newObject); } else { - log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID); + log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID); } } @@ -251,11 +251,11 @@ public boolean allowsNamespaceChanges() { return configuration().followControllerNamespaceChanges(); } - private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObject) { + private boolean eventAcceptedByFilter(ResourceAction operation, R newObject, R oldObject) { if (genericFilter != null && !genericFilter.accept(newObject)) { return false; } - if (operation == Operation.ADD) { + if (operation == ResourceAction.ADDED) { return onAddFilter == null || onAddFilter.accept(newObject); } else { return onUpdateFilter == null || onUpdateFilter.accept(newObject, oldObject); @@ -266,9 +266,4 @@ private boolean acceptedByDeleteFilters(R resource, boolean b) { return (onDeleteFilter == null || onDeleteFilter.accept(resource, b)) && (genericFilter == null || genericFilter.accept(resource)); } - - private enum Operation { - ADD, - UPDATE - } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 620edd729e..4a33d23bfd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -42,7 +42,7 @@ import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.*; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent; @SuppressWarnings("rawtypes") @@ -90,6 +90,7 @@ public void changeNamespaces(Set namespaces) { * Also makes sure that the even produced by this update is filtered, thus does not trigger the * reconciliation. */ + @SuppressWarnings("unchecked") public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator updateMethod) { ResourceID id = ResourceID.fromResource(resourceToUpdate); if (log.isDebugEnabled()) { @@ -110,12 +111,21 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< res.ifPresent( r -> { R latestResource = (R) r.getResource().orElseThrow(); - // for update we need to have a historic resource, this might be improved to mimic more - // realistic scenario + + // as previous resource version we use the one from successful update, since + // we process new event here only if that is more recent then the event from our update. + // Note that this is equivalent with the scenario when an informer watch connection + // would + // reconnect and loose some events in between. + // If that update was not successful we still record the previous version from the + // actual + // event in the ExtendedResourceEvent. + R extendedResourcePrevVersion = + (r instanceof ExtendedResourceEvent) + ? (R) ((ExtendedResourceEvent) r).getPreviousResource().orElse(null) + : null; R prevVersionOfResource = - updatedForLambda != null - ? updatedForLambda - : (r.getAction() == ResourceAction.UPDATED ? latestResource : null); + updatedForLambda != null ? updatedForLambda : extendedResourcePrevVersion; handleEvent( r.getAction(), latestResource, diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index f8254c1bf4..7e46dcf060 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -27,7 +27,7 @@ import io.javaoperatorsdk.operator.api.reconciler.ReconcileUtils; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; @@ -94,17 +94,23 @@ public synchronized Optional doneEventFilterModify( } public void onDeleteEvent(T resource, boolean unknownState) { - onEvent(resource, unknownState, true); + onEvent(ResourceAction.DELETED, resource, null, unknownState, true); } /** * @return true if the resourceVersion was obsolete */ - public EventHandling onAddOrUpdateEvent(T resource) { - return onEvent(resource, false, false); + public EventHandling onAddOrUpdateEvent( + ResourceAction action, T resource, T prevResourceVersion) { + return onEvent(action, resource, prevResourceVersion, false, false); } - private synchronized EventHandling onEvent(T resource, boolean unknownState, boolean delete) { + private synchronized EventHandling onEvent( + ResourceAction action, + T resource, + T prevResourceVersion, + boolean unknownState, + boolean delete) { if (!comparableResourceVersions) { return EventHandling.NEW; } @@ -139,8 +145,7 @@ private synchronized EventHandling onEvent(T resource, boolean unknownState, boo ed.setLastEvent( delete ? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState) - : new ResourceEvent( - ResourceAction.UPDATED, resourceId, resource)); // todo true action + : new ExtendedResourceEvent(action, resourceId, resource, prevResourceVersion)); return EventHandling.DEFER; } else { return result; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index ac187d7eb9..bff9ef3dbd 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -38,8 +38,8 @@ import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java index 25e93a813c..d480dd06f8 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java @@ -20,7 +20,7 @@ import org.junit.jupiter.api.Test; import io.javaoperatorsdk.operator.TestUtils; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; import static org.assertj.core.api.Assertions.assertThat; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java index baef2110df..ef3e56ce8b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java @@ -34,6 +34,7 @@ import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.filter.GenericFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 0fc721cccb..84fa15ac51 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -99,7 +99,8 @@ void skipsEventPropagation() { when(temporaryResourceCacheMock.getResourceFromCache(any())) .thenReturn(Optional.of(testDeployment())); - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.OBSOLETE); + when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + .thenReturn(EventHandling.OBSOLETE); informerEventSource.onAdd(testDeployment()); informerEventSource.onUpdate(testDeployment(), testDeployment()); @@ -109,7 +110,8 @@ void skipsEventPropagation() { @Test void processEventPropagationWithoutAnnotation() { - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW); + when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + .thenReturn(EventHandling.NEW); informerEventSource.onUpdate(testDeployment(), testDeployment()); verify(eventHandlerMock, times(1)).handleEvent(any()); @@ -117,7 +119,8 @@ void processEventPropagationWithoutAnnotation() { @Test void processEventPropagationWithIncorrectAnnotation() { - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW); + when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + .thenReturn(EventHandling.NEW); informerEventSource.onAdd( new DeploymentBuilder(testDeployment()) .editMetadata() @@ -134,12 +137,14 @@ void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() { cachedDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); when(temporaryResourceCacheMock.getResourceFromCache(any())) .thenReturn(Optional.of(cachedDeployment)); - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any())).thenReturn(EventHandling.NEW); + when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + .thenReturn(EventHandling.NEW); informerEventSource.onUpdate(cachedDeployment, testDeployment()); verify(eventHandlerMock, times(1)).handleEvent(any()); - verify(temporaryResourceCacheMock, times(1)).onAddOrUpdateEvent(testDeployment()); + verify(temporaryResourceCacheMock, times(1)) + .onAddOrUpdateEvent(any(), eq(testDeployment()), any()); } @Test diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java index 4c5d137fd3..0d58b45a29 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java @@ -28,6 +28,7 @@ import io.fabric8.kubernetes.api.model.ConfigMapBuilder; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import static org.assertj.core.api.Assertions.assertThat; @@ -61,7 +62,9 @@ void updateNotAddsTheResourceIntoCacheIfLaterVersionKnown() { var testResource = testResource(); temporaryResourceCache.onAddOrUpdateEvent( - testResource.toBuilder().editMetadata().withResourceVersion("3").endMetadata().build()); + ResourceAction.ADDED, + testResource.toBuilder().editMetadata().withResourceVersion("3").endMetadata().build(), + null); temporaryResourceCache.putResource(testResource); @@ -101,11 +104,13 @@ void removesResourceFromCache() { ConfigMap testResource = propagateTestResourceToCache(); temporaryResourceCache.onAddOrUpdateEvent( + ResourceAction.ADDED, new ConfigMapBuilder(testResource) .editMetadata() .withResourceVersion("3") .endMetadata() - .build()); + .build(), + null); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) .isNotPresent(); @@ -130,7 +135,11 @@ void lockedEventBeforePut() throws Exception { ExecutorService ex = Executors.newSingleThreadExecutor(); try { - var result = ex.submit(() -> temporaryResourceCache.onAddOrUpdateEvent(testResource)); + var result = + ex.submit( + () -> + temporaryResourceCache.onAddOrUpdateEvent( + ResourceAction.ADDED, testResource, null)); temporaryResourceCache.putResource(testResource); assertThat(result.isDone()).isFalse(); @@ -146,7 +155,8 @@ void putBeforeEvent() { var testResource = testResource(); // first ensure an event is not known - var result = temporaryResourceCache.onAddOrUpdateEvent(testResource); + var result = + temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); assertThat(result).isEqualTo(EventHandling.NEW); var nextResource = testResource(); @@ -154,7 +164,7 @@ void putBeforeEvent() { temporaryResourceCache.putResource(nextResource); // the result is obsolete - result = temporaryResourceCache.onAddOrUpdateEvent(nextResource); + result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, nextResource, null); assertThat(result).isEqualTo(EventHandling.OBSOLETE); } @@ -163,7 +173,8 @@ void putBeforeEventWithEventFiltering() { var testResource = testResource(); // first ensure an event is not known - var result = temporaryResourceCache.onAddOrUpdateEvent(testResource); + var result = + temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); assertThat(result).isEqualTo(EventHandling.NEW); var nextResource = testResource(); @@ -175,7 +186,7 @@ void putBeforeEventWithEventFiltering() { temporaryResourceCache.doneEventFilterModify(resourceId, "3"); // the result is obsolete - result = temporaryResourceCache.onAddOrUpdateEvent(nextResource); + result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, nextResource, null); assertThat(result).isEqualTo(EventHandling.OBSOLETE); } @@ -184,7 +195,8 @@ void putAfterEventWithEventFilteringNoPost() { var testResource = testResource(); // first ensure an event is not known - var result = temporaryResourceCache.onAddOrUpdateEvent(testResource); + var result = + temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); assertThat(result).isEqualTo(EventHandling.NEW); var nextResource = testResource(); @@ -192,7 +204,9 @@ void putAfterEventWithEventFilteringNoPost() { var resourceId = ResourceID.fromResource(testResource); temporaryResourceCache.startEventFilteringModify(resourceId); - result = temporaryResourceCache.onAddOrUpdateEvent(nextResource); + result = + temporaryResourceCache.onAddOrUpdateEvent( + ResourceAction.UPDATED, nextResource, testResource); // the result is deferred assertThat(result).isEqualTo(EventHandling.DEFER); temporaryResourceCache.putResource(nextResource); @@ -213,7 +227,8 @@ void putAfterEventWithEventFilteringWithPost() { // completing with the 3 rv. var nextResource = testResource(); nextResource.getMetadata().setResourceVersion("4"); - var result = temporaryResourceCache.onAddOrUpdateEvent(nextResource); + var result = + temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, nextResource, null); assertThat(result).isEqualTo(EventHandling.DEFER); var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId, "3"); @@ -225,7 +240,7 @@ void putAfterEventWithEventFilteringWithPost() { void rapidDeletion() { var testResource = testResource(); - temporaryResourceCache.onAddOrUpdateEvent(testResource); + temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); temporaryResourceCache.onDeleteEvent( new ConfigMapBuilder(testResource) .editMetadata() From 43fcce366f88a2a1eb222e1b2f4e4d8322e88a49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Fri, 16 Jan 2026 11:50:10 +0100 Subject: [PATCH 02/12] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../processing/event/source/informer/InformerEventSource.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 2cb81dede4..98f230255b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -251,11 +251,11 @@ public boolean allowsNamespaceChanges() { return configuration().followControllerNamespaceChanges(); } - private boolean eventAcceptedByFilter(ResourceAction operation, R newObject, R oldObject) { + private boolean eventAcceptedByFilter(ResourceAction action, R newObject, R oldObject) { if (genericFilter != null && !genericFilter.accept(newObject)) { return false; } - if (operation == ResourceAction.ADDED) { + if (action == ResourceAction.ADDED) { return onAddFilter == null || onAddFilter.accept(newObject); } else { return onUpdateFilter == null || onUpdateFilter.accept(newObject, oldObject); From 8e3ad5075f319e9b8feaf747c9ba76a88356cf2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Fri, 16 Jan 2026 16:47:20 +0100 Subject: [PATCH 03/12] tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../controller/ControllerEventSource.java | 2 +- .../source/informer/InformerEventSource.java | 2 +- .../informer/ManagedInformerEventSource.java | 7 +- .../informer/InformerEventSourceTest.java | 247 ++++++++++++++++-- 4 files changed, 234 insertions(+), 24 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index c92d8a0c5b..b4784e1b6d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -84,7 +84,7 @@ public synchronized void start() { } @Override - public synchronized void handleEvent( + protected synchronized void handleEvent( ResourceAction action, T resource, T oldResource, diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 98f230255b..24a95e7f67 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -139,7 +139,7 @@ public synchronized void onDelete(R resource, boolean b) { } @Override - public void handleEvent( + protected void handleEvent( ResourceAction action, R resource, R oldResource, diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 4a33d23bfd..f198184468 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -130,14 +130,15 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< r.getAction(), latestResource, prevVersionOfResource, - !(r instanceof ResourceDeleteEvent) - || ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown(), + (r instanceof ResourceDeleteEvent) + ? ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown() + : null, false); }); } } - public abstract void handleEvent( + protected abstract void handleEvent( ResourceAction action, R resource, R oldResource, diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 84fa15ac51..57a69f6182 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -15,16 +15,22 @@ */ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.time.Duration; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.javaoperatorsdk.operator.MockKubernetesClient; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.api.config.BaseConfigurationService; @@ -35,17 +41,24 @@ import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -56,9 +69,11 @@ class InformerEventSourceTest { private static final String PREV_RESOURCE_VERSION = "0"; private static final String DEFAULT_RESOURCE_VERSION = "1"; + ExecutorService executorService = Executors.newSingleThreadExecutor(); + private InformerEventSource informerEventSource; private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class); - private final TemporaryResourceCache temporaryResourceCacheMock = + private TemporaryResourceCache temporaryResourceCache = mock(TemporaryResourceCache.class); private final EventHandler eventHandlerMock = mock(EventHandler.class); private final InformerEventSourceConfiguration informerEventSourceConfiguration = @@ -74,11 +89,12 @@ void setup() { when(informerEventSourceConfiguration.getResourceClass()).thenReturn(Deployment.class); informerEventSource = - new InformerEventSource<>(informerEventSourceConfiguration, clientMock) { - // mocking start - @Override - public synchronized void start() {} - }; + spy( + new InformerEventSource<>(informerEventSourceConfiguration, clientMock) { + // mocking start + @Override + public synchronized void start() {} + }); var mockControllerConfig = mock(ControllerConfiguration.class); when(mockControllerConfig.getConfigurationService()).thenReturn(new BaseConfigurationService()); @@ -91,15 +107,15 @@ public synchronized void start() {} when(secondaryToPrimaryMapper.toPrimaryResourceIDs(any())) .thenReturn(Set.of(ResourceID.fromResource(testDeployment()))); informerEventSource.start(); - informerEventSource.setTemporalResourceCache(temporaryResourceCacheMock); + informerEventSource.setTemporalResourceCache(temporaryResourceCache); } @Test void skipsEventPropagation() { - when(temporaryResourceCacheMock.getResourceFromCache(any())) + when(temporaryResourceCache.getResourceFromCache(any())) .thenReturn(Optional.of(testDeployment())); - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) .thenReturn(EventHandling.OBSOLETE); informerEventSource.onAdd(testDeployment()); @@ -110,7 +126,7 @@ void skipsEventPropagation() { @Test void processEventPropagationWithoutAnnotation() { - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) .thenReturn(EventHandling.NEW); informerEventSource.onUpdate(testDeployment(), testDeployment()); @@ -119,7 +135,7 @@ void processEventPropagationWithoutAnnotation() { @Test void processEventPropagationWithIncorrectAnnotation() { - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) .thenReturn(EventHandling.NEW); informerEventSource.onAdd( new DeploymentBuilder(testDeployment()) @@ -135,22 +151,21 @@ void processEventPropagationWithIncorrectAnnotation() { void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() { Deployment cachedDeployment = testDeployment(); cachedDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); - when(temporaryResourceCacheMock.getResourceFromCache(any())) + when(temporaryResourceCache.getResourceFromCache(any())) .thenReturn(Optional.of(cachedDeployment)); - when(temporaryResourceCacheMock.onAddOrUpdateEvent(any(), any(), any())) + when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) .thenReturn(EventHandling.NEW); informerEventSource.onUpdate(cachedDeployment, testDeployment()); verify(eventHandlerMock, times(1)).handleEvent(any()); - verify(temporaryResourceCacheMock, times(1)) - .onAddOrUpdateEvent(any(), eq(testDeployment()), any()); + verify(temporaryResourceCache, times(1)).onAddOrUpdateEvent(any(), eq(testDeployment()), any()); } @Test void genericFilterForEvents() { informerEventSource.setGenericFilter(r -> false); - when(temporaryResourceCacheMock.getResourceFromCache(any())).thenReturn(Optional.empty()); + when(temporaryResourceCache.getResourceFromCache(any())).thenReturn(Optional.empty()); informerEventSource.onAdd(testDeployment()); informerEventSource.onUpdate(testDeployment(), testDeployment()); @@ -162,7 +177,7 @@ void genericFilterForEvents() { @Test void filtersOnAddEvents() { informerEventSource.setOnAddFilter(r -> false); - when(temporaryResourceCacheMock.getResourceFromCache(any())).thenReturn(Optional.empty()); + when(temporaryResourceCache.getResourceFromCache(any())).thenReturn(Optional.empty()); informerEventSource.onAdd(testDeployment()); @@ -172,7 +187,7 @@ void filtersOnAddEvents() { @Test void filtersOnUpdateEvents() { informerEventSource.setOnUpdateFilter((r1, r2) -> false); - when(temporaryResourceCacheMock.getResourceFromCache(any())).thenReturn(Optional.empty()); + when(temporaryResourceCache.getResourceFromCache(any())).thenReturn(Optional.empty()); informerEventSource.onUpdate(testDeployment(), testDeployment()); @@ -182,13 +197,207 @@ void filtersOnUpdateEvents() { @Test void filtersOnDeleteEvents() { informerEventSource.setOnDeleteFilter((r, b) -> false); - when(temporaryResourceCacheMock.getResourceFromCache(any())).thenReturn(Optional.empty()); + when(temporaryResourceCache.getResourceFromCache(any())).thenReturn(Optional.empty()); informerEventSource.onDelete(testDeployment(), true); verify(eventHandlerMock, never()).handleEvent(any()); } + @Test + void handlesPrevResourceVersionForUpdate() { + withRealTemporaryResourceCache(); + var deployment = testDeployment(); + CountDownLatch latch = new CountDownLatch(1); + + executorService.submit( + () -> + informerEventSource.eventFilteringUpdateAndCacheResource( + deployment, + r -> { + var resp = testDeployment(); + incResourceVersion(resp, 1); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return resp; + })); + informerEventSource.onUpdate(deployment, incResourceVersion(testDeployment(), 2)); + latch.countDown(); + + await() + .untilAsserted( + () -> { + verify(informerEventSource, times(1)) + .handleEvent( + eq(ResourceAction.UPDATED), + argThat( + newResource -> { + assertThat(newResource.getMetadata().getResourceVersion()) + .isEqualTo("3"); + return true; + }), + argThat( + newResource -> { + assertThat(newResource.getMetadata().getResourceVersion()) + .isEqualTo("2"); + return true; + }), + isNull(), + eq(false)); + }); + } + + @Test + void handlesPrevResourceVersionForUpdateInCaseOfException() { + withRealTemporaryResourceCache(); + + withRealTemporaryResourceCache(); + var deployment = testDeployment(); + CountDownLatch latch = new CountDownLatch(1); + + executorService.submit( + () -> + informerEventSource.eventFilteringUpdateAndCacheResource( + deployment, + r -> { + try { + latch.await(); + throw new KubernetesClientException("fake"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + })); + informerEventSource.onUpdate(deployment, incResourceVersion(testDeployment(), 1)); + latch.countDown(); + + await() + .untilAsserted( + () -> { + verify(informerEventSource, times(1)) + .handleEvent( + eq(ResourceAction.UPDATED), + argThat( + newResource -> { + assertThat(newResource.getMetadata().getResourceVersion()) + .isEqualTo("2"); + return true; + }), + argThat( + newResource -> { + assertThat(newResource.getMetadata().getResourceVersion()) + .isEqualTo("1"); + return true; + }), + isNull(), + eq(false)); + }); + } + + @Test + void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { + withRealTemporaryResourceCache(); + + withRealTemporaryResourceCache(); + var deployment = testDeployment(); + CountDownLatch latch = new CountDownLatch(1); + + executorService.submit( + () -> + informerEventSource.eventFilteringUpdateAndCacheResource( + deployment, + r -> { + var resp = testDeployment(); + incResourceVersion(resp, 1); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return resp; + })); + + informerEventSource.onUpdate( + incResourceVersion(testDeployment(), 1), incResourceVersion(testDeployment(), 2)); + informerEventSource.onUpdate( + incResourceVersion(testDeployment(), 2), incResourceVersion(testDeployment(), 3)); + latch.countDown(); + + await() + .untilAsserted( + () -> { + verify(informerEventSource, times(1)) + .handleEvent( + eq(ResourceAction.UPDATED), + argThat( + newResource -> { + assertThat(newResource.getMetadata().getResourceVersion()) + .isEqualTo("4"); + return true; + }), + argThat( + newResource -> { + assertThat(newResource.getMetadata().getResourceVersion()) + .isEqualTo("2"); + return true; + }), + isNull(), + eq(false)); + }); + } + + @Test + void doesNotPropagateEventIfReceivedBeforeUpdate() { + withRealTemporaryResourceCache(); + var deployment = testDeployment(); + CountDownLatch latch = new CountDownLatch(1); + + executorService.submit( + () -> + informerEventSource.eventFilteringUpdateAndCacheResource( + deployment, + r -> { + var resp = testDeployment(); + incResourceVersion(resp, 2); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return resp; + })); + informerEventSource.onUpdate(deployment, incResourceVersion(testDeployment(), 1)); + latch.countDown(); + + await() + .pollDelay(Duration.ofMillis(100)) + .untilAsserted( + () -> { + verify(informerEventSource, never()) + .handleEvent(any(), any(), any(), any(), anyBoolean()); + }); + } + + private void withRealTemporaryResourceCache() { + temporaryResourceCache = new TemporaryResourceCache<>(true); + informerEventSource.setTemporalResourceCache(temporaryResourceCache); + } + + R incResourceVersion(R resource, int increment) { + var v = resource.getMetadata().getResourceVersion(); + if (v == null) { + throw new IllegalArgumentException("Resource version is null"); + } + resource.getMetadata().setResourceVersion(versionPlus(v, increment)); + return resource; + } + + String versionPlus(String resourceVersion, int increment) { + return "" + (Integer.parseInt(resourceVersion) + increment); + } + @Test void informerStoppedHandlerShouldBeCalledWhenInformerStops() { final var exception = new RuntimeException("Informer stopped exceptionally!"); From 074d42028fdcef8fd721de4d9db399a3c89fb24f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Fri, 16 Jan 2026 20:17:15 +0100 Subject: [PATCH 04/12] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/ManagedInformerEventSource.java | 5 +++-- .../informer/InformerEventSourceTest.java | 21 ++++++++++++------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index f198184468..fa04f6c03f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -108,7 +108,7 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< id, updatedResource == null ? null : updatedResource.getMetadata().getResourceVersion()); var updatedForLambda = updatedResource; - res.ifPresent( + res.ifPresentOrElse( r -> { R latestResource = (R) r.getResource().orElseThrow(); @@ -134,7 +134,8 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< ? ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown() : null, false); - }); + }, + () -> log.debug("No new event present after the filtering update; id: {}", id)); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 57a69f6182..8807088082 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectMeta; @@ -64,12 +65,13 @@ import static org.mockito.Mockito.when; @SuppressWarnings({"rawtypes", "unchecked"}) +@TestInstance(value = TestInstance.Lifecycle.PER_METHOD) class InformerEventSourceTest { private static final String PREV_RESOURCE_VERSION = "0"; private static final String DEFAULT_RESOURCE_VERSION = "1"; - ExecutorService executorService = Executors.newSingleThreadExecutor(); + ExecutorService executorService = Executors.newCachedThreadPool(); private InformerEventSource informerEventSource; private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class); @@ -205,7 +207,7 @@ void filtersOnDeleteEvents() { } @Test - void handlesPrevResourceVersionForUpdate() { + void handlesPrevResourceVersionForUpdate() throws InterruptedException { withRealTemporaryResourceCache(); var deployment = testDeployment(); CountDownLatch latch = new CountDownLatch(1); @@ -224,7 +226,10 @@ void handlesPrevResourceVersionForUpdate() { } return resp; })); - informerEventSource.onUpdate(deployment, incResourceVersion(testDeployment(), 2)); + Thread.sleep(50); + informerEventSource.onUpdate( + incResourceVersion(deployment, 1), incResourceVersion(testDeployment(), 2)); + latch.countDown(); await() @@ -251,7 +256,7 @@ void handlesPrevResourceVersionForUpdate() { } @Test - void handlesPrevResourceVersionForUpdateInCaseOfException() { + void handlesPrevResourceVersionForUpdateInCaseOfException() throws InterruptedException { withRealTemporaryResourceCache(); withRealTemporaryResourceCache(); @@ -270,6 +275,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfException() { throw new RuntimeException(e); } })); + Thread.sleep(50); informerEventSource.onUpdate(deployment, incResourceVersion(testDeployment(), 1)); latch.countDown(); @@ -297,7 +303,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfException() { } @Test - void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { + void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() throws InterruptedException { withRealTemporaryResourceCache(); withRealTemporaryResourceCache(); @@ -318,7 +324,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { } return resp; })); - + Thread.sleep(50); informerEventSource.onUpdate( incResourceVersion(testDeployment(), 1), incResourceVersion(testDeployment(), 2)); informerEventSource.onUpdate( @@ -349,7 +355,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { } @Test - void doesNotPropagateEventIfReceivedBeforeUpdate() { + void doesNotPropagateEventIfReceivedBeforeUpdate() throws InterruptedException { withRealTemporaryResourceCache(); var deployment = testDeployment(); CountDownLatch latch = new CountDownLatch(1); @@ -368,6 +374,7 @@ void doesNotPropagateEventIfReceivedBeforeUpdate() { } return resp; })); + Thread.sleep(50); informerEventSource.onUpdate(deployment, incResourceVersion(testDeployment(), 1)); latch.countDown(); From 5e6937e5b33d2e20b61b29e0aedd005ad8991f33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 20 Jan 2026 15:31:49 +0100 Subject: [PATCH 05/12] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/InformerEventSourceTest.java | 118 ++++++++---------- 1 file changed, 53 insertions(+), 65 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 8807088082..4e7bcf0b93 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.UnaryOperator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -207,29 +208,12 @@ void filtersOnDeleteEvents() { } @Test - void handlesPrevResourceVersionForUpdate() throws InterruptedException { + void handlesPrevResourceVersionForUpdate() { withRealTemporaryResourceCache(); - var deployment = testDeployment(); - CountDownLatch latch = new CountDownLatch(1); - executorService.submit( - () -> - informerEventSource.eventFilteringUpdateAndCacheResource( - deployment, - r -> { - var resp = testDeployment(); - incResourceVersion(resp, 1); - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return resp; - })); - Thread.sleep(50); + CountDownLatch latch = sendForEventFilteringUpdate(2); informerEventSource.onUpdate( - incResourceVersion(deployment, 1), incResourceVersion(testDeployment(), 2)); - + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); latch.countDown(); await() @@ -276,7 +260,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfException() throws InterruptedEx } })); Thread.sleep(50); - informerEventSource.onUpdate(deployment, incResourceVersion(testDeployment(), 1)); + informerEventSource.onUpdate(deployment, withResourceVersion(testDeployment(), 2)); latch.countDown(); await() @@ -303,32 +287,16 @@ void handlesPrevResourceVersionForUpdateInCaseOfException() throws InterruptedEx } @Test - void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() throws InterruptedException { + void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { withRealTemporaryResourceCache(); withRealTemporaryResourceCache(); var deployment = testDeployment(); - CountDownLatch latch = new CountDownLatch(1); - - executorService.submit( - () -> - informerEventSource.eventFilteringUpdateAndCacheResource( - deployment, - r -> { - var resp = testDeployment(); - incResourceVersion(resp, 1); - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return resp; - })); - Thread.sleep(50); + CountDownLatch latch = sendForEventFilteringUpdate(deployment, 2); informerEventSource.onUpdate( - incResourceVersion(testDeployment(), 1), incResourceVersion(testDeployment(), 2)); + withResourceVersion(testDeployment(), 2), withResourceVersion(testDeployment(), 3)); informerEventSource.onUpdate( - incResourceVersion(testDeployment(), 2), incResourceVersion(testDeployment(), 3)); + withResourceVersion(testDeployment(), 3), withResourceVersion(testDeployment(), 4)); latch.countDown(); await() @@ -355,27 +323,12 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() throws Interru } @Test - void doesNotPropagateEventIfReceivedBeforeUpdate() throws InterruptedException { + void doesNotPropagateEventIfReceivedBeforeUpdate() { withRealTemporaryResourceCache(); var deployment = testDeployment(); - CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latch = sendForEventFilteringUpdate(deployment, 2); - executorService.submit( - () -> - informerEventSource.eventFilteringUpdateAndCacheResource( - deployment, - r -> { - var resp = testDeployment(); - incResourceVersion(resp, 2); - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return resp; - })); - Thread.sleep(50); - informerEventSource.onUpdate(deployment, incResourceVersion(testDeployment(), 1)); + informerEventSource.onUpdate(deployment, deploymentWithResourceVersion(2)); latch.countDown(); await() @@ -387,24 +340,59 @@ void doesNotPropagateEventIfReceivedBeforeUpdate() throws InterruptedException { }); } + private CountDownLatch sendForEventFilteringUpdate(int resourceVersion) { + return sendForEventFilteringUpdate(testDeployment(), resourceVersion); + } + + private CountDownLatch sendForEventFilteringUpdate(Deployment deployment, int resourceVersion) { + return sendForEventFilteringUpdate( + deployment, r -> withResourceVersion(deployment, resourceVersion)); + } + + private CountDownLatch sendForEventFilteringUpdate( + Deployment resource, UnaryOperator updateMethod) { + try { + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch sendOnGoingLatch = new CountDownLatch(1); + executorService.submit( + () -> + informerEventSource.eventFilteringUpdateAndCacheResource( + resource, + r -> { + try { + sendOnGoingLatch.countDown(); + latch.await(); + var resp = updateMethod.apply(r); + return resp; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + })); + sendOnGoingLatch.await(); + return latch; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + private void withRealTemporaryResourceCache() { temporaryResourceCache = new TemporaryResourceCache<>(true); informerEventSource.setTemporalResourceCache(temporaryResourceCache); } - R incResourceVersion(R resource, int increment) { + Deployment deploymentWithResourceVersion(int resourceVersion) { + return withResourceVersion(testDeployment(), resourceVersion); + } + + R withResourceVersion(R resource, int resourceVersion) { var v = resource.getMetadata().getResourceVersion(); if (v == null) { throw new IllegalArgumentException("Resource version is null"); } - resource.getMetadata().setResourceVersion(versionPlus(v, increment)); + resource.getMetadata().setResourceVersion("" + resourceVersion); return resource; } - String versionPlus(String resourceVersion, int increment) { - return "" + (Integer.parseInt(resourceVersion) + increment); - } - @Test void informerStoppedHandlerShouldBeCalledWhenInformerStops() { final var exception = new RuntimeException("Informer stopped exceptionally!"); From 2a4c8f1e63543ab9256c975c7f11e527acd7cb0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 20 Jan 2026 15:37:40 +0100 Subject: [PATCH 06/12] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/InformerEventSourceTest.java | 114 +++++------------- 1 file changed, 33 insertions(+), 81 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 4e7bcf0b93..2354caf3d3 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -216,74 +216,23 @@ void handlesPrevResourceVersionForUpdate() { deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); latch.countDown(); - await() - .untilAsserted( - () -> { - verify(informerEventSource, times(1)) - .handleEvent( - eq(ResourceAction.UPDATED), - argThat( - newResource -> { - assertThat(newResource.getMetadata().getResourceVersion()) - .isEqualTo("3"); - return true; - }), - argThat( - newResource -> { - assertThat(newResource.getMetadata().getResourceVersion()) - .isEqualTo("2"); - return true; - }), - isNull(), - eq(false)); - }); + expectHandleEvent(3, 2); } @Test - void handlesPrevResourceVersionForUpdateInCaseOfException() throws InterruptedException { + void handlesPrevResourceVersionForUpdateInCaseOfException() { withRealTemporaryResourceCache(); - withRealTemporaryResourceCache(); - var deployment = testDeployment(); - CountDownLatch latch = new CountDownLatch(1); - - executorService.submit( - () -> - informerEventSource.eventFilteringUpdateAndCacheResource( - deployment, - r -> { - try { - latch.await(); - throw new KubernetesClientException("fake"); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - })); - Thread.sleep(50); - informerEventSource.onUpdate(deployment, withResourceVersion(testDeployment(), 2)); + CountDownLatch latch = + sendForEventFilteringUpdate( + testDeployment(), + r -> { + throw new KubernetesClientException("fake"); + }); + informerEventSource.onUpdate(testDeployment(), withResourceVersion(testDeployment(), 2)); latch.countDown(); - await() - .untilAsserted( - () -> { - verify(informerEventSource, times(1)) - .handleEvent( - eq(ResourceAction.UPDATED), - argThat( - newResource -> { - assertThat(newResource.getMetadata().getResourceVersion()) - .isEqualTo("2"); - return true; - }), - argThat( - newResource -> { - assertThat(newResource.getMetadata().getResourceVersion()) - .isEqualTo("1"); - return true; - }), - isNull(), - eq(false)); - }); + expectHandleEvent(2, 1); } @Test @@ -299,6 +248,27 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { withResourceVersion(testDeployment(), 3), withResourceVersion(testDeployment(), 4)); latch.countDown(); + expectHandleEvent(4, 2); + } + + @Test + void doesNotPropagateEventIfReceivedBeforeUpdate() { + withRealTemporaryResourceCache(); + var deployment = testDeployment(); + CountDownLatch latch = sendForEventFilteringUpdate(deployment, 2); + informerEventSource.onUpdate(deployment, deploymentWithResourceVersion(2)); + latch.countDown(); + + await() + .pollDelay(Duration.ofMillis(100)) + .untilAsserted( + () -> { + verify(informerEventSource, never()) + .handleEvent(any(), any(), any(), any(), anyBoolean()); + }); + } + + private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) { await() .untilAsserted( () -> { @@ -308,13 +278,13 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { argThat( newResource -> { assertThat(newResource.getMetadata().getResourceVersion()) - .isEqualTo("4"); + .isEqualTo("" + newResourceVersion); return true; }), argThat( newResource -> { assertThat(newResource.getMetadata().getResourceVersion()) - .isEqualTo("2"); + .isEqualTo("" + oldResourceVersion); return true; }), isNull(), @@ -322,24 +292,6 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { }); } - @Test - void doesNotPropagateEventIfReceivedBeforeUpdate() { - withRealTemporaryResourceCache(); - var deployment = testDeployment(); - CountDownLatch latch = sendForEventFilteringUpdate(deployment, 2); - - informerEventSource.onUpdate(deployment, deploymentWithResourceVersion(2)); - latch.countDown(); - - await() - .pollDelay(Duration.ofMillis(100)) - .untilAsserted( - () -> { - verify(informerEventSource, never()) - .handleEvent(any(), any(), any(), any(), anyBoolean()); - }); - } - private CountDownLatch sendForEventFilteringUpdate(int resourceVersion) { return sendForEventFilteringUpdate(testDeployment(), resourceVersion); } From 62c66cac1590bb3ef5e4eebab04e49bcb27ef146 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 20 Jan 2026 16:41:33 +0100 Subject: [PATCH 07/12] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../controller/ControllerEventSource.java | 6 +- .../javaoperatorsdk/operator/TestUtils.java | 8 +- .../event/source/EventFilterTestUtils.java | 64 ++++++++++++++++ .../controller/ControllerEventSourceTest.java | 32 ++++++++ .../informer/InformerEventSourceTest.java | 73 ++++--------------- 5 files changed, 118 insertions(+), 65 deletions(-) create mode 100644 operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/EventFilterTestUtils.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index b4784e1b6d..5305189b2c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -139,13 +139,13 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso } @Override - public void onAdd(T resource) { + public synchronized void onAdd(T resource) { var handling = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, resource, null); handleEvent(ResourceAction.ADDED, resource, null, null, handling != EventHandling.NEW); } @Override - public void onUpdate(T oldCustomResource, T newCustomResource) { + public synchronized void onUpdate(T oldCustomResource, T newCustomResource) { var handling = temporaryResourceCache.onAddOrUpdateEvent( ResourceAction.UPDATED, newCustomResource, oldCustomResource); @@ -158,7 +158,7 @@ public void onUpdate(T oldCustomResource, T newCustomResource) { } @Override - public void onDelete(T resource, boolean deletedFinalStateUnknown) { + public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown) { temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown); // delete event is quite special here, that requires special care, since we clean up caches on // delete event. diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java index 956b3d9475..24e36cbe33 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java @@ -32,6 +32,10 @@ public static TestCustomResource testCustomResource() { return testCustomResource(new ResourceID(UUID.randomUUID().toString(), "test")); } + public static TestCustomResource testCustomResource1() { + return testCustomResource(new ResourceID("test1", "default")); + } + public static CustomResourceDefinition testCRD(String scope) { return new CustomResourceDefinitionBuilder() .editOrNewSpec() @@ -43,10 +47,6 @@ public static CustomResourceDefinition testCRD(String scope) { .build(); } - public static TestCustomResource testCustomResource1() { - return testCustomResource(new ResourceID("test1", "default")); - } - public static ResourceID testCustomResource1Id() { return new ResourceID("test1", "default"); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/EventFilterTestUtils.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/EventFilterTestUtils.java new file mode 100644 index 0000000000..72bcac0f54 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/EventFilterTestUtils.java @@ -0,0 +1,64 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.processing.event.source; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.UnaryOperator; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource; + +public class EventFilterTestUtils { + + static ExecutorService executorService = Executors.newCachedThreadPool(); + + public static CountDownLatch sendForEventFilteringUpdate( + ManagedInformerEventSource eventSource, R resource, UnaryOperator updateMethod) { + try { + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch sendOnGoingLatch = new CountDownLatch(1); + executorService.submit( + () -> + eventSource.eventFilteringUpdateAndCacheResource( + resource, + r -> { + try { + sendOnGoingLatch.countDown(); + latch.await(); + var resp = updateMethod.apply(r); + return resp; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + })); + sendOnGoingLatch.await(); + return latch; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public static R withResourceVersion(R resource, int resourceVersion) { + var v = resource.getMetadata().getResourceVersion(); + if (v == null) { + throw new IllegalArgumentException("Resource version is null"); + } + resource.getMetadata().setResourceVersion("" + resourceVersion); + return resource; + } +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java index ef3e56ce8b..19f94402e6 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java @@ -17,8 +17,10 @@ import java.time.LocalDateTime; import java.util.List; +import java.util.concurrent.CountDownLatch; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import io.javaoperatorsdk.operator.MockKubernetesClient; @@ -34,12 +36,14 @@ import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase; +import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils; import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.filter.GenericFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; +import static io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils.withResourceVersion; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; @@ -164,6 +168,33 @@ void genericFilterFiltersOutAddUpdateAndDeleteEvents() { verify(eventHandler, never()).handleEvent(any()); } + @Disabled + @Test + void testEventFiltering() throws InterruptedException { + source = spy(new ControllerEventSource<>(new TestController(null, null, null))); + setUpSource(source, true, controllerConfig); + + var latch = sendForEventFilteringUpdate(2); + source.onUpdate(testResourceWithVersion(1), testResourceWithVersion(2)); + latch.countDown(); + Thread.sleep(100); + verify(source, never()).handleEvent(any(), any(), any(), any(), anyBoolean()); + } + + private TestCustomResource testResourceWithVersion(int v) { + return withResourceVersion(TestUtils.testCustomResource1(), v); + } + + private CountDownLatch sendForEventFilteringUpdate(int v) { + return sendForEventFilteringUpdate(TestUtils.testCustomResource1(), v); + } + + private CountDownLatch sendForEventFilteringUpdate( + TestCustomResource testResource, int resourceVersion) { + return EventFilterTestUtils.sendForEventFilteringUpdate( + source, testResource, r -> withResourceVersion(testResource, resourceVersion)); + } + @SuppressWarnings("unchecked") private static class TestController extends Controller { @@ -224,6 +255,7 @@ public TestConfiguration( .withOnAddFilter(onAddFilter) .withOnUpdateFilter(onUpdateFilter) .withGenericFilter(genericFilter) + .withComparableResourceVersions(true) .buildForController(), false); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 2354caf3d3..c7f5db58f1 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -15,19 +15,14 @@ */ package io.javaoperatorsdk.operator.processing.event.source.informer; -import java.time.Duration; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.function.UnaryOperator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; -import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; @@ -43,12 +38,14 @@ import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils; import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET; +import static io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils.withResourceVersion; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -72,8 +69,6 @@ class InformerEventSourceTest { private static final String PREV_RESOURCE_VERSION = "0"; private static final String DEFAULT_RESOURCE_VERSION = "1"; - ExecutorService executorService = Executors.newCachedThreadPool(); - private InformerEventSource informerEventSource; private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class); private TemporaryResourceCache temporaryResourceCache = @@ -224,12 +219,14 @@ void handlesPrevResourceVersionForUpdateInCaseOfException() { withRealTemporaryResourceCache(); CountDownLatch latch = - sendForEventFilteringUpdate( + EventFilterTestUtils.sendForEventFilteringUpdate( + informerEventSource, testDeployment(), r -> { throw new KubernetesClientException("fake"); }); - informerEventSource.onUpdate(testDeployment(), withResourceVersion(testDeployment(), 2)); + informerEventSource.onUpdate( + deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); latch.countDown(); expectHandleEvent(2, 1); @@ -239,7 +236,6 @@ void handlesPrevResourceVersionForUpdateInCaseOfException() { void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { withRealTemporaryResourceCache(); - withRealTemporaryResourceCache(); var deployment = testDeployment(); CountDownLatch latch = sendForEventFilteringUpdate(deployment, 2); informerEventSource.onUpdate( @@ -252,20 +248,16 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { } @Test - void doesNotPropagateEventIfReceivedBeforeUpdate() { + void doesNotPropagateEventIfReceivedBeforeUpdate() throws InterruptedException { withRealTemporaryResourceCache(); - var deployment = testDeployment(); - CountDownLatch latch = sendForEventFilteringUpdate(deployment, 2); - informerEventSource.onUpdate(deployment, deploymentWithResourceVersion(2)); + + CountDownLatch latch = sendForEventFilteringUpdate(2); + informerEventSource.onUpdate( + deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); latch.countDown(); - await() - .pollDelay(Duration.ofMillis(100)) - .untilAsserted( - () -> { - verify(informerEventSource, never()) - .handleEvent(any(), any(), any(), any(), anyBoolean()); - }); + Thread.sleep(100); + verify(informerEventSource, never()).handleEvent(any(), any(), any(), any(), anyBoolean()); } private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) { @@ -297,34 +289,8 @@ private CountDownLatch sendForEventFilteringUpdate(int resourceVersion) { } private CountDownLatch sendForEventFilteringUpdate(Deployment deployment, int resourceVersion) { - return sendForEventFilteringUpdate( - deployment, r -> withResourceVersion(deployment, resourceVersion)); - } - - private CountDownLatch sendForEventFilteringUpdate( - Deployment resource, UnaryOperator updateMethod) { - try { - CountDownLatch latch = new CountDownLatch(1); - CountDownLatch sendOnGoingLatch = new CountDownLatch(1); - executorService.submit( - () -> - informerEventSource.eventFilteringUpdateAndCacheResource( - resource, - r -> { - try { - sendOnGoingLatch.countDown(); - latch.await(); - var resp = updateMethod.apply(r); - return resp; - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - })); - sendOnGoingLatch.await(); - return latch; - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + return EventFilterTestUtils.sendForEventFilteringUpdate( + informerEventSource, deployment, r -> withResourceVersion(deployment, resourceVersion)); } private void withRealTemporaryResourceCache() { @@ -336,15 +302,6 @@ Deployment deploymentWithResourceVersion(int resourceVersion) { return withResourceVersion(testDeployment(), resourceVersion); } - R withResourceVersion(R resource, int resourceVersion) { - var v = resource.getMetadata().getResourceVersion(); - if (v == null) { - throw new IllegalArgumentException("Resource version is null"); - } - resource.getMetadata().setResourceVersion("" + resourceVersion); - return resource; - } - @Test void informerStoppedHandlerShouldBeCalledWhenInformerStops() { final var exception = new RuntimeException("Informer stopped exceptionally!"); From 6bb19e2aaec97be7ad6bf5e1e64cd8eecad50aac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 20 Jan 2026 17:23:26 +0100 Subject: [PATCH 08/12] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../controller/ControllerEventSource.java | 25 +++--- .../controller/ControllerEventSourceTest.java | 78 ++++++++++++++++++- 2 files changed, 90 insertions(+), 13 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index 5305189b2c..ff63693e46 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -140,21 +140,26 @@ private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldReso @Override public synchronized void onAdd(T resource) { - var handling = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, resource, null); - handleEvent(ResourceAction.ADDED, resource, null, null, handling != EventHandling.NEW); + handleOnAddOrUpdate(ResourceAction.ADDED, null, resource); } @Override public synchronized void onUpdate(T oldCustomResource, T newCustomResource) { + handleOnAddOrUpdate(ResourceAction.UPDATED, oldCustomResource, newCustomResource); + } + + private void handleOnAddOrUpdate( + ResourceAction action, T oldCustomResource, T newCustomResource) { var handling = - temporaryResourceCache.onAddOrUpdateEvent( - ResourceAction.UPDATED, newCustomResource, oldCustomResource); - handleEvent( - ResourceAction.UPDATED, - newCustomResource, - oldCustomResource, - null, - handling != EventHandling.NEW); + temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource); + if (handling != EventHandling.NEW) { + handleEvent( + ResourceAction.UPDATED, + newCustomResource, + oldCustomResource, + null, + handling != EventHandling.NEW); + } } @Override diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java index 19f94402e6..e04b0c9807 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java @@ -23,6 +23,7 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.javaoperatorsdk.operator.MockKubernetesClient; import io.javaoperatorsdk.operator.ReconcilerUtilsInternal; import io.javaoperatorsdk.operator.TestUtils; @@ -44,6 +45,8 @@ import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils.withResourceVersion; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; @@ -168,17 +171,86 @@ void genericFilterFiltersOutAddUpdateAndDeleteEvents() { verify(eventHandler, never()).handleEvent(any()); } - @Disabled @Test - void testEventFiltering() throws InterruptedException { + void testEventFilteringBasicScenario() throws InterruptedException { source = spy(new ControllerEventSource<>(new TestController(null, null, null))); setUpSource(source, true, controllerConfig); var latch = sendForEventFilteringUpdate(2); source.onUpdate(testResourceWithVersion(1), testResourceWithVersion(2)); latch.countDown(); + Thread.sleep(100); - verify(source, never()).handleEvent(any(), any(), any(), any(), anyBoolean()); + verify(eventHandler, never()).handleEvent(any()); + } + + @Test + void eventFilteringNewEventDuringUpdate() { + source = spy(new ControllerEventSource<>(new TestController(null, null, null))); + setUpSource(source, true, controllerConfig); + + var latch = sendForEventFilteringUpdate(2); + source.onUpdate(testResourceWithVersion(2), testResourceWithVersion(3)); + latch.countDown(); + + await().untilAsserted(() -> expectHandleEvent(3, 2)); + } + + @Disabled("todo") + @Test + void eventFilteringMoreNewEventsDuringUpdate() { + source = spy(new ControllerEventSource<>(new TestController(null, null, null))); + setUpSource(source, true, controllerConfig); + + var latch = sendForEventFilteringUpdate(2); + source.onUpdate(testResourceWithVersion(2), testResourceWithVersion(3)); + source.onUpdate(testResourceWithVersion(3), testResourceWithVersion(4)); + latch.countDown(); + + await().untilAsserted(() -> expectHandleEvent(4, 2)); + } + + @Test + void eventFilteringExceptionDuringUpdate() { + source = spy(new ControllerEventSource<>(new TestController(null, null, null))); + setUpSource(source, true, controllerConfig); + + var latch = + EventFilterTestUtils.sendForEventFilteringUpdate( + source, + TestUtils.testCustomResource1(), + r -> { + throw new KubernetesClientException("fake"); + }); + source.onUpdate(testResourceWithVersion(1), testResourceWithVersion(2)); + latch.countDown(); + + expectHandleEvent(2, 1); + } + + private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) { + await() + .untilAsserted( + () -> { + verify(eventHandler, times(1)).handleEvent(any()); + verify(source, times(1)) + .handleEvent( + eq(ResourceAction.UPDATED), + argThat( + r -> { + assertThat(r.getMetadata().getResourceVersion()) + .isEqualTo("" + newResourceVersion); + return true; + }), + argThat( + r -> { + assertThat(r.getMetadata().getResourceVersion()) + .isEqualTo("" + oldResourceVersion); + return true; + }), + isNull(), + eq(false)); + }); } private TestCustomResource testResourceWithVersion(int v) { From cce78e44c30d054c680f2944934248d222f4a3f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 20 Jan 2026 17:44:06 +0100 Subject: [PATCH 09/12] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../controller/ControllerEventSource.java | 22 ++++-------- .../source/informer/InformerEventSource.java | 6 +--- .../informer/ManagedInformerEventSource.java | 9 ++--- .../controller/ControllerEventSourceTest.java | 35 +++++++++---------- .../informer/InformerEventSourceTest.java | 6 ++-- 5 files changed, 27 insertions(+), 51 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index ff63693e46..0a6ff9d8bd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -85,25 +85,20 @@ public synchronized void start() { @Override protected synchronized void handleEvent( - ResourceAction action, - T resource, - T oldResource, - Boolean deletedFinalStateUnknown, - boolean filterEvent) { + ResourceAction action, T resource, T oldResource, Boolean deletedFinalStateUnknown) { try { if (log.isDebugEnabled()) { log.debug( - "Event received for resource: {} version: {} uuid: {} action: {} filter event: {}", + "Event received for resource: {} version: {} uuid: {} action: {}", ResourceID.fromResource(resource), getVersion(resource), resource.getMetadata().getUid(), - action, - filterEvent); + action); log.trace("Event Old resource: {},\n new resource: {}", oldResource, resource); } MDCUtils.addResourceInfo(resource); controller.getEventSourceManager().broadcastOnResourceEvent(action, resource, oldResource); - if (isAcceptedByFilters(action, resource, oldResource) && !filterEvent) { + if (isAcceptedByFilters(action, resource, oldResource)) { if (deletedFinalStateUnknown != null) { getEventHandler() .handleEvent( @@ -153,12 +148,7 @@ private void handleOnAddOrUpdate( var handling = temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource); if (handling != EventHandling.NEW) { - handleEvent( - ResourceAction.UPDATED, - newCustomResource, - oldCustomResource, - null, - handling != EventHandling.NEW); + handleEvent(ResourceAction.UPDATED, newCustomResource, oldCustomResource, null); } } @@ -167,7 +157,7 @@ public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown) temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown); // delete event is quite special here, that requires special care, since we clean up caches on // delete event. - handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown, false); + handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 24a95e7f67..6743ff436a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -140,11 +140,7 @@ public synchronized void onDelete(R resource, boolean b) { @Override protected void handleEvent( - ResourceAction action, - R resource, - R oldResource, - Boolean deletedFinalStateUnknown, - boolean filterEvent) { + ResourceAction action, R resource, R oldResource, Boolean deletedFinalStateUnknown) { propagateEvent(resource); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index fa04f6c03f..9278400dde 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -132,19 +132,14 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< prevVersionOfResource, (r instanceof ResourceDeleteEvent) ? ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown() - : null, - false); + : null); }, () -> log.debug("No new event present after the filtering update; id: {}", id)); } } protected abstract void handleEvent( - ResourceAction action, - R resource, - R oldResource, - Boolean deletedFinalStateUnknown, - boolean filterEvent); + ResourceAction action, R resource, R oldResource, Boolean deletedFinalStateUnknown); @SuppressWarnings("unchecked") @Override diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java index e04b0c9807..df450b29a6 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java @@ -20,7 +20,6 @@ import java.util.concurrent.CountDownLatch; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.client.KubernetesClientException; @@ -76,10 +75,10 @@ void skipsEventHandlingIfGenerationNotIncreased() { TestCustomResource oldCustomResource = TestUtils.testCustomResource(); oldCustomResource.getMetadata().setFinalizers(List.of(FINALIZER)); - source.handleEvent(ResourceAction.UPDATED, customResource, oldCustomResource, null, false); + source.handleEvent(ResourceAction.UPDATED, customResource, oldCustomResource, null); verify(eventHandler, times(1)).handleEvent(any()); - source.handleEvent(ResourceAction.UPDATED, customResource, customResource, null, false); + source.handleEvent(ResourceAction.UPDATED, customResource, customResource, null); verify(eventHandler, times(1)).handleEvent(any()); } @@ -87,12 +86,12 @@ void skipsEventHandlingIfGenerationNotIncreased() { void dontSkipEventHandlingIfMarkedForDeletion() { TestCustomResource customResource1 = TestUtils.testCustomResource(); - source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null, false); + source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null); verify(eventHandler, times(1)).handleEvent(any()); // mark for deletion customResource1.getMetadata().setDeletionTimestamp(LocalDateTime.now().toString()); - source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null, false); + source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null); verify(eventHandler, times(2)).handleEvent(any()); } @@ -100,11 +99,11 @@ void dontSkipEventHandlingIfMarkedForDeletion() { void normalExecutionIfGenerationChanges() { TestCustomResource customResource1 = TestUtils.testCustomResource(); - source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null, false); + source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null); verify(eventHandler, times(1)).handleEvent(any()); customResource1.getMetadata().setGeneration(2L); - source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null, false); + source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null); verify(eventHandler, times(2)).handleEvent(any()); } @@ -115,10 +114,10 @@ void handlesAllEventIfNotGenerationAware() { TestCustomResource customResource1 = TestUtils.testCustomResource(); - source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null, false); + source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null); verify(eventHandler, times(1)).handleEvent(any()); - source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null, false); + source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null); verify(eventHandler, times(2)).handleEvent(any()); } @@ -126,7 +125,7 @@ void handlesAllEventIfNotGenerationAware() { void eventWithNoGenerationProcessedIfNoFinalizer() { TestCustomResource customResource1 = TestUtils.testCustomResource(); - source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null, false); + source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null); verify(eventHandler, times(1)).handleEvent(any()); } @@ -135,7 +134,7 @@ void eventWithNoGenerationProcessedIfNoFinalizer() { void callsBroadcastsOnResourceEvents() { TestCustomResource customResource1 = TestUtils.testCustomResource(); - source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null, false); + source.handleEvent(ResourceAction.UPDATED, customResource1, customResource1, null); verify(testController.getEventSourceManager(), times(1)) .broadcastOnResourceEvent( @@ -151,8 +150,8 @@ void filtersOutEventsOnAddAndUpdate() { source = new ControllerEventSource<>(new TestController(onAddFilter, onUpdatePredicate, null)); setUpSource(source, true, controllerConfig); - source.handleEvent(ResourceAction.ADDED, cr, null, null, false); - source.handleEvent(ResourceAction.UPDATED, cr, cr, null, false); + source.handleEvent(ResourceAction.ADDED, cr, null, null); + source.handleEvent(ResourceAction.UPDATED, cr, cr, null); verify(eventHandler, never()).handleEvent(any()); } @@ -164,9 +163,9 @@ void genericFilterFiltersOutAddUpdateAndDeleteEvents() { source = new ControllerEventSource<>(new TestController(null, null, res -> false)); setUpSource(source, true, controllerConfig); - source.handleEvent(ResourceAction.ADDED, cr, null, null, false); - source.handleEvent(ResourceAction.UPDATED, cr, cr, null, false); - source.handleEvent(ResourceAction.DELETED, cr, cr, true, false); + source.handleEvent(ResourceAction.ADDED, cr, null, null); + source.handleEvent(ResourceAction.UPDATED, cr, cr, null); + source.handleEvent(ResourceAction.DELETED, cr, cr, true); verify(eventHandler, never()).handleEvent(any()); } @@ -196,7 +195,6 @@ void eventFilteringNewEventDuringUpdate() { await().untilAsserted(() -> expectHandleEvent(3, 2)); } - @Disabled("todo") @Test void eventFilteringMoreNewEventsDuringUpdate() { source = spy(new ControllerEventSource<>(new TestController(null, null, null))); @@ -248,8 +246,7 @@ private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) { .isEqualTo("" + oldResourceVersion); return true; }), - isNull(), - eq(false)); + isNull()); }); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index c7f5db58f1..6361704605 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -50,7 +50,6 @@ import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -257,7 +256,7 @@ void doesNotPropagateEventIfReceivedBeforeUpdate() throws InterruptedException { latch.countDown(); Thread.sleep(100); - verify(informerEventSource, never()).handleEvent(any(), any(), any(), any(), anyBoolean()); + verify(informerEventSource, never()).handleEvent(any(), any(), any(), any()); } private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) { @@ -279,8 +278,7 @@ private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) { .isEqualTo("" + oldResourceVersion); return true; }), - isNull(), - eq(false)); + isNull()); }); } From e51d43923cc24192c887f21a0361791aca4b069d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 20 Jan 2026 20:11:58 +0100 Subject: [PATCH 10/12] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/controller/ControllerEventSource.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index 0a6ff9d8bd..65005e586b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -147,8 +147,14 @@ private void handleOnAddOrUpdate( ResourceAction action, T oldCustomResource, T newCustomResource) { var handling = temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource); - if (handling != EventHandling.NEW) { + if (handling == EventHandling.NEW) { handleEvent(ResourceAction.UPDATED, newCustomResource, oldCustomResource, null); + } else if (log.isDebugEnabled()) { + log.debug( + "{} event propagation for action: {} resource id: {} ", + handling, + action, + ResourceID.fromResource(newCustomResource)); } } From e23172f563d670695d7645d80b8b52d506d82ea5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 20 Jan 2026 20:21:34 +0100 Subject: [PATCH 11/12] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/controller/ControllerEventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index 65005e586b..8412e1ccbe 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -148,7 +148,7 @@ private void handleOnAddOrUpdate( var handling = temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource); if (handling == EventHandling.NEW) { - handleEvent(ResourceAction.UPDATED, newCustomResource, oldCustomResource, null); + handleEvent(action, newCustomResource, oldCustomResource, null); } else if (log.isDebugEnabled()) { log.debug( "{} event propagation for action: {} resource id: {} ", From 0f5ec200c092180a0c2d4571c313ce79bb70d44f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 21 Jan 2026 12:55:28 +0100 Subject: [PATCH 12/12] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/TemporaryResourceCache.java | 3 +- .../informer/InformerEventSourceTest.java | 38 ++++++--- .../TemporaryPrimaryResourceCacheTest.java | 82 ++++++++++++++----- 3 files changed, 90 insertions(+), 33 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 7e46dcf060..eb76387a80 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -127,9 +127,8 @@ private synchronized EventHandling onEvent( } var cached = cache.get(resourceId); EventHandling result = EventHandling.NEW; - int comp = 0; if (cached != null) { - comp = ReconcileUtils.compareResourceVersions(resource, cached); + int comp = ReconcileUtils.compareResourceVersions(resource, cached); if (comp >= 0 || unknownState) { cache.remove(resourceId); // we propagate event only for our update or newer other can be discarded since we know we diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 6361704605..e2c3de8975 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -15,13 +15,13 @@ */ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.time.Duration; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.apps.Deployment; @@ -62,7 +62,6 @@ import static org.mockito.Mockito.when; @SuppressWarnings({"rawtypes", "unchecked"}) -@TestInstance(value = TestInstance.Lifecycle.PER_METHOD) class InformerEventSourceTest { private static final String PREV_RESOURCE_VERSION = "0"; @@ -146,12 +145,11 @@ void processEventPropagationWithIncorrectAnnotation() { @Test void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() { + withRealTemporaryResourceCache(); + Deployment cachedDeployment = testDeployment(); cachedDeployment.getMetadata().setResourceVersion(PREV_RESOURCE_VERSION); - when(temporaryResourceCache.getResourceFromCache(any())) - .thenReturn(Optional.of(cachedDeployment)); - when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) - .thenReturn(EventHandling.NEW); + temporaryResourceCache.putResource(cachedDeployment); informerEventSource.onUpdate(cachedDeployment, testDeployment()); @@ -247,7 +245,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { } @Test - void doesNotPropagateEventIfReceivedBeforeUpdate() throws InterruptedException { + void doesNotPropagateEventIfReceivedBeforeUpdate() { withRealTemporaryResourceCache(); CountDownLatch latch = sendForEventFilteringUpdate(2); @@ -255,8 +253,28 @@ void doesNotPropagateEventIfReceivedBeforeUpdate() throws InterruptedException { deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); latch.countDown(); - Thread.sleep(100); - verify(informerEventSource, never()).handleEvent(any(), any(), any(), any()); + assertNoEventProduced(); + } + + @Test + void filterAddEventBeforeUpdate() { + withRealTemporaryResourceCache(); + + CountDownLatch latch = sendForEventFilteringUpdate(2); + informerEventSource.onAdd(deploymentWithResourceVersion(1)); + latch.countDown(); + + assertNoEventProduced(); + } + + private void assertNoEventProduced() { + await() + .pollDelay(Duration.ofMillis(50)) + .timeout(Duration.ofMillis(51)) + .untilAsserted( + () -> { + verify(informerEventSource, never()).handleEvent(any(), any(), any(), any()); + }); } private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) { @@ -292,7 +310,7 @@ private CountDownLatch sendForEventFilteringUpdate(Deployment deployment, int re } private void withRealTemporaryResourceCache() { - temporaryResourceCache = new TemporaryResourceCache<>(true); + temporaryResourceCache = spy(new TemporaryResourceCache<>(true)); informerEventSource.setTemporalResourceCache(temporaryResourceCache); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java index 0d58b45a29..592a552433 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java @@ -16,12 +16,8 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.ConfigMap; @@ -126,28 +122,72 @@ void nonComparableResourceVersionsDisables() { .isEmpty(); } - @Disabled("todo") @Test - void lockedEventBeforePut() throws Exception { + void eventReceivedDuringFiltering() throws Exception { var testResource = testResource(); temporaryResourceCache.startEventFilteringModify(ResourceID.fromResource(testResource)); - ExecutorService ex = Executors.newSingleThreadExecutor(); - try { - var result = - ex.submit( - () -> - temporaryResourceCache.onAddOrUpdateEvent( - ResourceAction.ADDED, testResource, null)); - - temporaryResourceCache.putResource(testResource); - assertThat(result.isDone()).isFalse(); - temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "3"); - assertThat(result.get(10, TimeUnit.SECONDS)).isEqualTo(EventHandling.NEW); - } finally { - ex.shutdownNow(); - } + temporaryResourceCache.putResource(testResource); + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) + .isPresent(); + + temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) + .isEmpty(); + + var doneRes = + temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "2"); + + assertThat(doneRes).isEmpty(); + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) + .isEmpty(); + } + + @Test + void newerEventDuringFiltering() { + var testResource = testResource(); + + temporaryResourceCache.startEventFilteringModify(ResourceID.fromResource(testResource)); + + temporaryResourceCache.putResource(testResource); + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) + .isPresent(); + + var testResource2 = testResource(); + testResource2.getMetadata().setResourceVersion("3"); + temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, testResource2, testResource); + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) + .isEmpty(); + + var doneRes = + temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "2"); + + assertThat(doneRes).isPresent(); + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) + .isEmpty(); + } + + @Test + void eventAfterFiltering() { + var testResource = testResource(); + + temporaryResourceCache.startEventFilteringModify(ResourceID.fromResource(testResource)); + + temporaryResourceCache.putResource(testResource); + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) + .isPresent(); + + var doneRes = + temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "2"); + + assertThat(doneRes).isEmpty(); + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) + .isPresent(); + + temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) + .isEmpty(); } @Test