Skip to content

KAFKA-20173: Metered layer of KV-stores needs to pass Headers#21684

Open
mjsax wants to merge 6 commits intoapache:trunkfrom
mjsax:kafka-20173-meteredkeyvaluestore-headers
Open

KAFKA-20173: Metered layer of KV-stores needs to pass Headers#21684
mjsax wants to merge 6 commits intoapache:trunkfrom
mjsax:kafka-20173-meteredkeyvaluestore-headers

Conversation

@mjsax
Copy link
Member

@mjsax mjsax commented Mar 9, 2026

Updates the metered ks-stores layer to pass the context headers into
serdes. Simplifies the code with some refactoring.

@mjsax mjsax added streams kip Requires or implements a KIP labels Mar 9, 2026
protected V outerValue(final byte[] value) {
return value != null ? serdes.valueFrom(value, new RecordHeaders()) : null;
protected byte[] serializeValue(final V value) {
return value != null ? serdes.rawValue(value, internalContext.headers()) : null;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key question -- should we pass context.headers() or new RecordHeaders for non-header stores. Both solutions have advantages and disadvantages.

Using new RecordHeaders is strictly more backward compatible; the idea to pass in context.headers() feels like a "bug fix" though -- we should have always done this IMHO. That's why I opted for this solution.

Of course, we can also "fix" this "bug" by arguing: we stay 100% backward compatible and pass in new RecordHeaders and user get the fix by enabling the new headers stores...

Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO. I think new RecordHeaders looks more correct (from backward compatibility POV), but essentially that's okay to do context.headers() because:

  • If user doesn't use headers aware serializer, headers will be ignored (ether empty or not)
  • The idea of this ticket is to actually fix code base to propagate headers, and if we can propagate original headers instead of "mocked" one we go for it
  • It doesn't make any difference if users aren't using headers state stores / serializers, but helps us to consistently apply simple paradigm: if there is a access to original headers -> go for it, if not fallback to new RecordHeaders

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax
Regarding backward compability, I think this risk is very low because, most serdes ignore headers - the default interface methods delegate to the non-headers versions. Using internalContext.headers() is semantically correct behavior - serdes should have access to record context.
1Q: should we check if internalContext != null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1Q: should we check if internalContext != null?

I would say no? If internalContext would be null, it would be a bug (I believe), so we should rather expose such an issue directly, and fail fast by crashing, and not "mask" the bug by not failing?

}

public RawAndDeserializedValue<V> getWithBinary(final K key) {
RawAndDeserializedValue<V> getWithBinary(final K key) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup

@mjsax mjsax changed the title KAFKA-20173: Metered layer of kv-stores need to pass Headers KAFKA-20173: Metered layer of KV-stores needs to pass Headers Mar 9, 2026
Copy link
Contributor

@UladzislauBlok UladzislauBlok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM Overall. Left few small comments
FYI:

Found 3 test failures:
FAILED ❌ MockProcessorContextAPITest > shouldStoreAndReturnStateStores()
FAILED ❌ WordCountProcessorTest > test()
FAILED ❌ WordCountTransformerTest > test()

Comment on lines +119 to +125
MeteredKeyValueStore(
final KeyValueStore<Bytes, byte[]> inner,
final String metricsScope,
final Time time,
final Serde<K> keySerde,
final Serde<V> valueSerde
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offtopic

Question about formatting fix: do we have consistent code style across code base? I mean literally cli formatted or ide setting
I was able to find this: https://kafka.apache.org/community/developer/#streams-api , but I don't think that's enough to keep code style consistent

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we don't have anything that would do strict enforcement...

protected V outerValue(final byte[] value) {
return value != null ? serdes.valueFrom(value, new RecordHeaders()) : null;
protected byte[] serializeValue(final V value) {
return value != null ? serdes.rawValue(value, internalContext.headers()) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO. I think new RecordHeaders looks more correct (from backward compatibility POV), but essentially that's okay to do context.headers() because:

  • If user doesn't use headers aware serializer, headers will be ignored (ether empty or not)
  • The idea of this ticket is to actually fix code base to propagate headers, and if we can propagate original headers instead of "mocked" one we go for it
  • It doesn't make any difference if users aren't using headers state stores / serializers, but helps us to consistently apply simple paradigm: if there is a access to original headers -> go for it, if not fallback to new RecordHeaders

throw new UnsupportedOperationException("Position is not supported for " + getClass().getSimpleName());
}

protected Bytes keyBytes(final K key, final Headers headers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was focusing on MeteredKeyValueStore only, as the PR is already large enough. -- Would need to revisit both MeteredTimestampedKeyValueStore and MeteredTimestampedKeyValueStoreWithHeaders as follow up to further cleanup, in a follow up PR.

Objects.requireNonNull(key, "key cannot be null");
try {
final long validTo = maybeMeasureLatency(() -> inner.put(keyBytes(key), plainValueSerdes.rawValue(value), timestamp), time, putSensor);
final long validTo = maybeMeasureLatency(() -> inner.put(serializeKey(key), plainValueSerdes.rawValue(value), timestamp), time, putSensor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plainValueSerdes.rawValue(value). do we need headers there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch -- similar to my other comment -- focus of this PR was MeteredKeyValueStore -- we haven't even implemented "header-versioned-store" yet. Will address in follow up PR.


@Test
public void shouldThrowIfIncompatibleSerdeForKey() throws ClassNotFoundException {
@SuppressWarnings("rawtypes")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity: why new RecordHeaders() leads to @SuppressWarnings("rawtypes")?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't -- this is additional side cleanup. We use Class below, which is the offender (and we cannot switch to Class<?> either to avoid the rawtype, because we want to test the wrong type condition...)

mjsax added 4 commits March 9, 2026 12:14
Updates the metered ks-stores layer to pass the context headers into serdes.
Simplifies the code with some refactoring.
@mjsax mjsax force-pushed the kafka-20173-meteredkeyvaluestore-headers branch from 6f5fc81 to ee99c2c Compare March 9, 2026 19:42
Comment on lines +49 to +70
store.init(context.getStateStoreContext(), store);
store.init(
new AbstractProcessorContext<>(new TaskId(0, 0), new StreamsConfig(context.appConfigs()), (StreamsMetricsImpl) context.metrics(), null) {
Copy link
Contributor

@UladzislauBlok UladzislauBlok Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume MockProcessorContext doesn't have a method to mock headers, and that's the reason why you re-implemented it, right?

btw, don't think this comment is still correct:

/**
 * Demonstrate the use of {@link MockProcessorContext} for testing the {@link Processor} in the {@link WordCountProcessorDemo}.
 */

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. That's a good point. I totally missed that this is example code... Need to do this differently.

Copy link
Member Author

@mjsax mjsax Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
// Caching is disabled by default, but FYI: caching is also not supported by MockProcessorContext.
.build();
store.init(context.getStateStoreContext(), store);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

@aliehsaeedii aliehsaeedii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks.

protected V outerValue(final byte[] value) {
return value != null ? serdes.valueFrom(value, new RecordHeaders()) : null;
protected byte[] serializeValue(final V value) {
return value != null ? serdes.rawValue(value, internalContext.headers()) : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax
Regarding backward compability, I think this risk is very low because, most serdes ignore headers - the default interface methods delegate to the non-headers versions. Using internalContext.headers() is semantically correct behavior - serdes should have access to record context.
1Q: should we check if internalContext != null?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants