|
15 | 15 | */ |
16 | 16 | package io.javaoperatorsdk.operator.api.reconciler; |
17 | 17 |
|
| 18 | +import java.util.HashMap; |
18 | 19 | import java.util.Map; |
19 | 20 | import java.util.Optional; |
20 | 21 | import java.util.Set; |
|
29 | 30 |
|
30 | 31 | import io.fabric8.kubernetes.api.model.HasMetadata; |
31 | 32 | import io.fabric8.kubernetes.client.KubernetesClient; |
| 33 | +import io.javaoperatorsdk.operator.ReconcilerUtilsInternal; |
32 | 34 | import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; |
33 | 35 | import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; |
34 | 36 | import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedWorkflowAndDependentResourceContext; |
|
38 | 40 | import io.javaoperatorsdk.operator.processing.event.NoEventSourceForClassException; |
39 | 41 | import io.javaoperatorsdk.operator.processing.event.ResourceID; |
40 | 42 |
|
41 | | -import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.compareResourceVersions; |
42 | | - |
43 | 43 | public class DefaultContext<P extends HasMetadata> implements Context<P> { |
44 | 44 | private static final Logger log = LoggerFactory.getLogger(DefaultContext.class); |
45 | 45 |
|
@@ -84,22 +84,47 @@ public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType, boolea |
84 | 84 | throw new IllegalArgumentException("Can only de-duplicate HasMetadata descendants"); |
85 | 85 | } |
86 | 86 |
|
| 87 | + final var idToLatest = deduplicate ? new HashMap<ResourceID, String>() : null; |
87 | 88 | final var stream = |
88 | 89 | controller.getEventSourceManager().getEventSourcesFor(expectedType).stream() |
89 | 90 | .<R>mapMulti( |
90 | | - (es, consumer) -> es.getSecondaryResources(primaryResource).forEach(consumer)); |
| 91 | + (es, consumer) -> |
| 92 | + es.getSecondaryResources(primaryResource) |
| 93 | + .forEach( |
| 94 | + r -> { |
| 95 | + var reject = false; |
| 96 | + if (deduplicate) { |
| 97 | + final boolean[] rejectAr = new boolean[1]; |
| 98 | + final var hm = (HasMetadata) r; |
| 99 | + final var resourceVersion = hm.getMetadata().getResourceVersion(); |
| 100 | + idToLatest.merge( |
| 101 | + ResourceID.fromResource(hm), |
| 102 | + resourceVersion, |
| 103 | + (existing, replacement) -> { |
| 104 | + final var comparison = |
| 105 | + ReconcilerUtilsInternal.compareResourceVersions( |
| 106 | + existing, replacement); |
| 107 | + rejectAr[0] = |
| 108 | + comparison == 0; // rejecting resource if version is equal |
| 109 | + return comparison >= 0 ? existing : replacement; |
| 110 | + }); |
| 111 | + reject = rejectAr[0]; |
| 112 | + } |
| 113 | + // only keep resources that don't have the same id and resource |
| 114 | + // version |
| 115 | + if (!reject) { |
| 116 | + consumer.accept(r); |
| 117 | + } |
| 118 | + })); |
91 | 119 | if (deduplicate) { |
92 | 120 | //noinspection unchecked |
93 | 121 | return stream |
94 | 122 | .map(HasMetadata.class::cast) |
95 | | - .collect( |
96 | | - Collectors.toUnmodifiableMap( |
97 | | - ResourceID::fromResource, |
98 | | - Function.identity(), |
99 | | - (existing, replacement) -> |
100 | | - compareResourceVersions(existing, replacement) >= 0 ? existing : replacement)) |
101 | | - .values() |
102 | | - .stream() |
| 123 | + .filter( |
| 124 | + hm -> { |
| 125 | + final var resourceVersion = hm.getMetadata().getResourceVersion(); |
| 126 | + return resourceVersion.equals(idToLatest.get(ResourceID.fromResource(hm))); |
| 127 | + }) |
103 | 128 | .map(hasMetadata -> (R) hasMetadata); |
104 | 129 | } else { |
105 | 130 | return stream; |
|
0 commit comments