Skip to content

KAFKA-20134: Fix header-based key deserialization in MeteredTimestampedWindowStoreWithHeaders iterator methods#21705

Open
frankvicky wants to merge 2 commits intoapache:trunkfrom
frankvicky:fix-window-fetchall-bug
Open

KAFKA-20134: Fix header-based key deserialization in MeteredTimestampedWindowStoreWithHeaders iterator methods#21705
frankvicky wants to merge 2 commits intoapache:trunkfrom
frankvicky:fix-window-fetchall-bug

Conversation

@frankvicky
Copy link
Contributor

@frankvicky frankvicky commented Mar 10, 2026

Fixes a bug where MeteredTimestampedWindowStoreWithHeaders iterator
methods fail when key deserializers require headers.

The class inherits iterator-returning methods from MeteredWindowStore:

  • fetchAll(long, long) / backwardFetchAll(long, long) - all() /
    backwardAll() - fetch(K, K, long, long) / backwardFetch(K, K, long, long)

    These methods use the deprecated serdes::keyFrom(rawKey) which
    provides empty headers, causing deserialization failures when headers
    are required.

@github-actions github-actions bot added the triage PRs from the community label Mar 10, 2026
@frankvicky frankvicky changed the title Fix header-based key deserialization in MeteredTimestampedWindowStoreWithHeaders iterator methods MINOR: Fix header-based key deserialization in MeteredTimestampedWindowStoreWithHeaders iterator methods Mar 10, 2026
@mjsax mjsax added kip Requires or implements a KIP and removed triage PRs from the community labels Mar 10, 2026
@mjsax mjsax changed the title MINOR: Fix header-based key deserialization in MeteredTimestampedWindowStoreWithHeaders iterator methods KAFKA-20134: Fix header-based key deserialization in MeteredTimestampedWindowStoreWithHeaders iterator methods Mar 10, 2026

@Override
public KeyValue<Windowed<K>, ValueTimestampHeaders<V>> next() {
final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need null check for next ?

@Override
public Windowed<K> peekNextKey() {
final Windowed<Bytes> bytesKey = iter.peekNextKey();
final K key = serdes.keyFrom(bytesKey.key().get(), new RecordHeaders());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this needs to get updated? We also need to pass in headers here into keyFrom (not just empty headers)? So we need to deserialize the valueTimestampeHeader byte array to parse out the headers.

Not 100% sure what the best way to do this will be... Maybe we cannot call iter.peekNextKey() but need to call iter.next(), and remember that we did this, for the next next() call to not step over a record incorrectly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants