Skip to content

Commit 4ada736

Browse files
authored
fix: concurrency issue with filtering and caching update (#3191)
Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
1 parent 0afd4da commit 4ada736

File tree

9 files changed

+248
-12
lines changed

9 files changed

+248
-12
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,10 @@ protected <R> void submit(
174174
DependentResourceNode<R, P> dependentResourceNode,
175175
NodeExecutor<R, P> nodeExecutor,
176176
String operation) {
177+
logger()
178+
.debug("Submitting to {}: {} primaryID: {}", operation, dependentResourceNode, primaryID);
177179
final Future<?> future = executorService.submit(nodeExecutor);
178180
markAsExecuting(dependentResourceNode, future);
179-
logger()
180-
.debug("Submitted to {}: {} primaryID: {}", operation, dependentResourceNode, primaryID);
181181
}
182182

183183
protected <R> void registerOrDeregisterEventSourceBasedOnActivation(

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,13 @@ public void handleRecentResourceCreate(ResourceID resourceID, R resource) {
183183

184184
@Override
185185
public Optional<R> get(ResourceID resourceID) {
186-
var res = cache.get(resourceID);
186+
// The order of these two lookups matters. If we queried the informer cache first,
187+
// a race condition could occur: we might not find the resource there yet, then
188+
// process an informer event that evicts the temporary resource cache entry. At that
189+
// point the resource would already be present in the informer cache, but we would
190+
// have missed it in both caches during this call.
187191
Optional<R> resource = temporaryResourceCache.getResourceFromCache(resourceID);
192+
var res = cache.get(resourceID);
188193
if (comparableResourceVersions
189194
&& resource.isPresent()
190195
&& res.filter(

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,14 @@ class InformerEventSourceTest {
7878
@BeforeEach
7979
void setup() {
8080
final var informerConfig = mock(InformerConfiguration.class);
81+
SecondaryToPrimaryMapper secondaryToPrimaryMapper = mock(SecondaryToPrimaryMapper.class);
82+
when(informerEventSourceConfiguration.getSecondaryToPrimaryMapper())
83+
.thenReturn(secondaryToPrimaryMapper);
84+
when(secondaryToPrimaryMapper.toPrimaryResourceIDs(any()))
85+
.thenReturn(Set.of(ResourceID.fromResource(testDeployment())));
8186
when(informerEventSourceConfiguration.getInformerConfig()).thenReturn(informerConfig);
8287
when(informerConfig.getEffectiveNamespaces(any())).thenReturn(DEFAULT_NAMESPACES_SET);
83-
when(informerEventSourceConfiguration.getSecondaryToPrimaryMapper())
84-
.thenReturn(mock(SecondaryToPrimaryMapper.class));
8588
when(informerEventSourceConfiguration.getResourceClass()).thenReturn(Deployment.class);
86-
8789
informerEventSource =
8890
spy(
8991
new InformerEventSource<>(informerEventSourceConfiguration, clientMock) {
@@ -97,11 +99,6 @@ public synchronized void start() {}
9799

98100
informerEventSource.setEventHandler(eventHandlerMock);
99101
informerEventSource.setControllerConfiguration(mockControllerConfig);
100-
SecondaryToPrimaryMapper secondaryToPrimaryMapper = mock(SecondaryToPrimaryMapper.class);
101-
when(informerEventSourceConfiguration.getSecondaryToPrimaryMapper())
102-
.thenReturn(secondaryToPrimaryMapper);
103-
when(secondaryToPrimaryMapper.toPrimaryResourceIDs(any()))
104-
.thenReturn(Set.of(ResourceID.fromResource(testDeployment())));
105102
informerEventSource.start();
106103
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
107104
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.baseapi.cachingfilteringupdate;
17+
18+
import io.fabric8.kubernetes.api.model.Namespaced;
19+
import io.fabric8.kubernetes.client.CustomResource;
20+
import io.fabric8.kubernetes.model.annotation.Group;
21+
import io.fabric8.kubernetes.model.annotation.ShortNames;
22+
import io.fabric8.kubernetes.model.annotation.Version;
23+
24+
@Group("sample.javaoperatorsdk")
25+
@Version("v1")
26+
@ShortNames("cfu")
27+
public class CachingFilteringUpdateCustomResource
28+
extends CustomResource<Void, CachingFilteringUpdateStatus> implements Namespaced {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.baseapi.cachingfilteringupdate;
17+
18+
import java.time.Duration;
19+
20+
import org.junit.jupiter.api.Test;
21+
import org.junit.jupiter.api.extension.RegisterExtension;
22+
23+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
24+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
25+
26+
import static org.assertj.core.api.Assertions.assertThat;
27+
import static org.awaitility.Awaitility.await;
28+
29+
class CachingFilteringUpdateIT {
30+
31+
public static final int RESOURCE_NUMBER = 250;
32+
CachingFilteringUpdateReconciler reconciler = new CachingFilteringUpdateReconciler();
33+
34+
@RegisterExtension
35+
LocallyRunOperatorExtension operator =
36+
LocallyRunOperatorExtension.builder().withReconciler(reconciler).build();
37+
38+
@Test
39+
void testResourceAccessAfterUpdate() {
40+
for (int i = 0; i < RESOURCE_NUMBER; i++) {
41+
operator.create(createCustomResource(i));
42+
}
43+
await()
44+
.pollDelay(Duration.ofSeconds(5))
45+
.atMost(Duration.ofMinutes(1))
46+
.until(
47+
() -> {
48+
if (reconciler.isIssueFound()) {
49+
// Stop waiting as soon as an issue is detected.
50+
return true;
51+
}
52+
// Use a single representative resource to detect that updates have completed.
53+
var res =
54+
operator.get(
55+
CachingFilteringUpdateCustomResource.class,
56+
"resource" + (RESOURCE_NUMBER - 1));
57+
return res != null
58+
&& res.getStatus() != null
59+
&& Boolean.TRUE.equals(res.getStatus().getUpdated());
60+
});
61+
62+
if (operator.getReconcilerOfType(CachingFilteringUpdateReconciler.class).isIssueFound()) {
63+
throw new IllegalStateException("Error already found.");
64+
}
65+
66+
for (int i = 0; i < RESOURCE_NUMBER; i++) {
67+
var res = operator.get(CachingFilteringUpdateCustomResource.class, "resource" + i);
68+
assertThat(res.getStatus()).isNotNull();
69+
assertThat(res.getStatus().getUpdated()).isTrue();
70+
}
71+
}
72+
73+
public CachingFilteringUpdateCustomResource createCustomResource(int i) {
74+
CachingFilteringUpdateCustomResource resource = new CachingFilteringUpdateCustomResource();
75+
resource.setMetadata(
76+
new ObjectMetaBuilder()
77+
.withName("resource" + i)
78+
.withNamespace(operator.getNamespace())
79+
.build());
80+
return resource;
81+
}
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.baseapi.cachingfilteringupdate;
17+
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
22+
import io.fabric8.kubernetes.api.model.ConfigMap;
23+
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
24+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
25+
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
26+
import io.javaoperatorsdk.operator.api.reconciler.Context;
27+
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
28+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
29+
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
30+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
31+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
32+
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
33+
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
34+
35+
@ControllerConfiguration
36+
public class CachingFilteringUpdateReconciler
37+
implements Reconciler<CachingFilteringUpdateCustomResource> {
38+
39+
private final AtomicBoolean issueFound = new AtomicBoolean(false);
40+
41+
@Override
42+
public UpdateControl<CachingFilteringUpdateCustomResource> reconcile(
43+
CachingFilteringUpdateCustomResource resource,
44+
Context<CachingFilteringUpdateCustomResource> context) {
45+
46+
context.resourceOperations().serverSideApply(prepareCM(resource));
47+
var cachedCM = context.getSecondaryResource(ConfigMap.class);
48+
if (cachedCM.isEmpty()) {
49+
issueFound.set(true);
50+
throw new IllegalStateException("Error for resource: " + ResourceID.fromResource(resource));
51+
}
52+
53+
ensureStatusExists(resource);
54+
resource.getStatus().setUpdated(true);
55+
return UpdateControl.patchStatus(resource);
56+
}
57+
58+
private static ConfigMap prepareCM(CachingFilteringUpdateCustomResource p) {
59+
var cm =
60+
new ConfigMapBuilder()
61+
.withMetadata(
62+
new ObjectMetaBuilder()
63+
.withName(p.getMetadata().getName())
64+
.withNamespace(p.getMetadata().getNamespace())
65+
.build())
66+
.withData(Map.of("name", p.getMetadata().getName()))
67+
.build();
68+
cm.addOwnerReference(p);
69+
return cm;
70+
}
71+
72+
@Override
73+
public List<EventSource<?, CachingFilteringUpdateCustomResource>> prepareEventSources(
74+
EventSourceContext<CachingFilteringUpdateCustomResource> context) {
75+
InformerEventSource<ConfigMap, CachingFilteringUpdateCustomResource> cmES =
76+
new InformerEventSource<>(
77+
InformerEventSourceConfiguration.from(
78+
ConfigMap.class, CachingFilteringUpdateCustomResource.class)
79+
.build(),
80+
context);
81+
return List.of(cmES);
82+
}
83+
84+
private void ensureStatusExists(CachingFilteringUpdateCustomResource resource) {
85+
CachingFilteringUpdateStatus status = resource.getStatus();
86+
if (status == null) {
87+
status = new CachingFilteringUpdateStatus();
88+
resource.setStatus(status);
89+
}
90+
}
91+
92+
public boolean isIssueFound() {
93+
return issueFound.get();
94+
}
95+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright Java Operator SDK Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.javaoperatorsdk.operator.baseapi.cachingfilteringupdate;
17+
18+
public class CachingFilteringUpdateStatus {
19+
20+
private Boolean updated;
21+
22+
public Boolean getUpdated() {
23+
return updated;
24+
}
25+
26+
public void setUpdated(Boolean updated) {
27+
this.updated = updated;
28+
}
29+
}

operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/ComparableResourceVersionsDisabledIT.java renamed to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/createupdateeventfilter/PreviousAnnotationDisabledIT.java

File renamed without changes.

sample-operators/mysql-schema/src/test/java/io/javaoperatorsdk/operator/sample/MySQLSchemaOperatorE2E.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ void test() {
127127
.delete();
128128

129129
await()
130-
.atMost(2, MINUTES)
130+
.atMost(4, MINUTES)
131131
.ignoreExceptions()
132132
.untilAsserted(
133133
() -> {

0 commit comments

Comments
 (0)