Skip to content

KAFKA-19983: Fix MockProcessorContext doesn't work with WindowStores.#21677

Draft
chickenchickenlove wants to merge 1 commit intoapache:trunkfrom
chickenchickenlove:KAFKA-19983
Draft

KAFKA-19983: Fix MockProcessorContext doesn't work with WindowStores.#21677
chickenchickenlove wants to merge 1 commit intoapache:trunkfrom
chickenchickenlove:KAFKA-19983

Conversation

@chickenchickenlove
Copy link
Contributor

Description

After Kafka 4.0.0, MockProcessorContext doesn't work with WindowStores.
This is a regression caused by #16906.

Some StateStores accept a StateStoreContext, but internally require an InternalProcessorContext.
Because of this, tests using MockProcessorContext no longer work with non-logging, non-caching WindowStores.

This PR is draft based on my current understanding of the problem and one possible direction for fixing it.
I would like to use this PR as a starting point for discussion and align on the right approach with the community.

@github-actions github-actions bot added triage PRs from the community streams labels Mar 8, 2026
@chickenchickenlove
Copy link
Contributor Author

@mjsax
Sorry for the sudden mention. 🙇‍♂️

I opened a draft PR and wanted to use it as a starting point for discussion.
While working on this, I came up with two possible directions.

#1
The first option is to treat the fact that MockProcessorContext no longer supports non-logging, non-caching WindowStore as a regression.

Because of the changes introduced in #16906, a path that previously appears to have worked with only StateStoreContext now requires InternalProcessorContext, which means it can no longer be tested with MockProcessorContext.
In that case, it throws the following error:

"This component requires internal features of Kafka Streams and must be disabled for unit tests."

However, this path seems to have been testable before, so I think there is a reasonable argument that this should be considered a regression. also, this direction is aligned with what InMemorySessionStore already do.

#2
The second option is to treat the current behavior as intended and instead adjust MockProcessorContext#getStateStoreContext() so that it returns a StateStoreContext implementation that also implements InternalProcessorContext.

With that approach, we could implement only the InternalProcessorContext methods that are actually required for non-logging, non-caching WindowStore support, and let the other methods throw UnsupportedOperationException if they are called.
The StateStoreContext methods required by InternalProcessorContext could simply delegate to the existing StateStoreContext implementation.

However, in that case, MockProcessorContext.getStateStoreContext() would return an InternalProcessorContext, which means asInternalProcessorContext() would always succeed. Therefore, if asInternalProcessorContext() is intended to serve as a guardrail to block certain usage in tests, this approach may not be appropriate, since it would also always pass in paths that are supposed to work in practice.

At the moment, the draft PR is based on option #1, but I would really appreciate your thoughts on which direction would be more appropriate! Thanks for taking the time to look at this. 🙇‍♂️

@javad87
Copy link

javad87 commented Mar 9, 2026

@mjsax Thanks for addressing this regression! I have many services that use MockProcessorContext for unit tests. It’s very useful because I only need to test the processor logic itself, rather than setting up a full TopologyTestDriver. However, due to this regression, many of my tests are now failing.

I have two questions:

  1. Do you have an approximate timeline for when a fix might be available?

  2. If it takes longer to resolve, what would be the cleanest workaround that avoids changing a large number of unit tests?

For example, I currently have the following setup for my tests:

@BeforeEach
void setUp() throws Exception {
    var properties = new KafkaStreamsApplicationConfig();
    properties.setSchemaRegistryUrl("mock://localhost:8081");
    properties.setSchemaRegistryAutoRegisterSchemas(true);
    properties.setServiceId("notification-Id");

    this.ts = Instant.now().toEpochMilli();
    this.eventContext = new KafkaEventContext(new MockProducerFactory());
    this.eventContext.configure(properties);
    this.eventContext.getMessageTable().registerLexicon(Messages.class);
    this.channel = eventContext.getEventChannel();

    var notificationTopology = new NotificationTopology(properties, eventContext);

    this.context = new MockProcessorContext<>(properties);
    this.windicatorStore = Stores.keyValueStoreBuilder(
            Stores.inMemoryKeyValueStore(WINDICATOR_STORE),
            Serdes.String(), notificationTopology.getWindicatorSerde())
        .withLoggingDisabled()
        .build();

    this.windowEventStore = Stores.windowStoreBuilder(
        Stores.inMemoryWindowStore(WINDOW_EVENT_STORE, Duration.ofHours(30), Duration.ofHours(30), false),
        Serdes.String(), notificationTopology.getClassifiedEventSerde()).withLoggingDisabled().build();

    this.windowEventStore.init(this.context.getStateStoreContext(), windowEventStore);
    this.windicatorStore.init(this.context.getStateStoreContext(), windicatorStore);
    this.context.addStateStore(windicatorStore);
    this.context.addStateStore(windowEventStore);

    this.aggregator = new Aggregator(this.channel);
    aggregator.init(this.context);
}

and using MockProcessorContext for scheduling Punctuator and capturing forwarded record, e.g.:

final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
final Punctuator punctuator = capturedPunctuator.getPunctuator();
punctuator.punctuate(ts + Duration.ofMinutes(3 * 60 + 1).toMillis());
// then
assertEquals(1, context.forwarded().size());

@mjsax
Copy link
Member

mjsax commented Mar 11, 2026

Just hitting the same issue on #21684 -- The PR goes the route of proposal (2) on this to, to change add InternalProcessorContext to the returned StateStoreContext to allow for the cast.

I was not aware of this PR until I cycled back to the Jira ticket...

My PR does not fully address the ticket though I believe? So we would still need this PR to complete it, and also remove all code which becomes unnecessary as we can always cast to InternalProcessorContext going forward?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

streams triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants