Skip to content

KAFKA-20277: Add session store with headers support for IQv2#21670

Open
bbejeck wants to merge 5 commits intoapache:trunkfrom
bbejeck:KAFKA-20277_verify_existing_IQv2_methods_for_session_stores
Open

KAFKA-20277: Add session store with headers support for IQv2#21670
bbejeck wants to merge 5 commits intoapache:trunkfrom
bbejeck:KAFKA-20277_verify_existing_IQv2_methods_for_session_stores

Conversation

@bbejeck
Copy link
Member

@bbejeck bbejeck commented Mar 8, 2026

This PR is NOT about implementing IQv2s for header-store, but provides
IQv2s for headers store through the session state stores. So the results
do not contain the headers even though the headers are preserved in the
headers state store.

Part of KIP-1271.

@bbejeck bbejeck changed the title KAFKA-20277: Add session store with headers infra and support for IQv2 KAFKA-20277: Add session store with headers support for IQv2 Mar 8, 2026
@bbejeck bbejeck force-pushed the KAFKA-20277_verify_existing_IQv2_methods_for_session_stores branch from f154e45 to 6c7e81b Compare March 9, 2026 22:06
@bbejeck bbejeck force-pushed the KAFKA-20277_verify_existing_IQv2_methods_for_session_stores branch from 6c7e81b to 819d3d8 Compare March 10, 2026 13:45
@mjsax mjsax added the kip Requires or implements a KIP label Mar 11, 2026
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Just some nits/question. Feel free to merge.

public boolean isSession() {
return false;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we revert this to get rid of the whole file in the PR?

public KeyValueIterator<Windowed<K>, AggregationWithHeaders<AGG>> findSessions(final K key,
final Instant earliestSessionEndTime,
final Instant latestSessionStartTime) {
return super.findSessions(key, earliestSessionEndTime, latestSessionStartTime);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we overriding this method if we only call super anyway?

if (query instanceof WindowRangeQuery) {
final WindowRangeQuery<?, ?> windowRangeQuery = (WindowRangeQuery<?, ?>) query;
if (windowRangeQuery.getKey().isPresent()) {
result = runRangeQueryWithHeadersUnwrap(query, positionBound, config);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
result = runRangeQueryWithHeadersUnwrap(query, positionBound, config);
result = runRangeQuery(query, positionBound, config);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the name simple -- it's a private method only, so it seems unnecessary to have a too complex name.

final QueryResult<R> result;

if (query instanceof WindowRangeQuery) {
final WindowRangeQuery<?, ?> windowRangeQuery = (WindowRangeQuery<?, ?>) query;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using <?, ?> instead of <K, V> like we do in MeteredSessionStore.

private <R> QueryResult<R> runRangeQueryWithHeadersUnwrap(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
final WindowRangeQuery<K, ?> typedQuery = (WindowRangeQuery<K, ?>) query;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above. Why ? ?

final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
final MeteredWindowedKeyValueIterator<K, ?> typedResult =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants