Skip to content

KAFKA-20179 : Avoid headers deserialization during changelogging#21676

Draft
muralibasani wants to merge 2 commits intoapache:trunkfrom
muralibasani:KAFKA-20179
Draft

KAFKA-20179 : Avoid headers deserialization during changelogging#21676
muralibasani wants to merge 2 commits intoapache:trunkfrom
muralibasani:KAFKA-20179

Conversation

@muralibasani
Copy link
Contributor

@muralibasani muralibasani commented Mar 8, 2026

Changelog stores were eagerly deserializing header bytes on every put, only for the producer to re-serialize them.

Added SerializedHeaders — a lazy Headers wrapper that holds the raw bytes and defers parsing until the producer actually needs them via toArray().

The three changelog store wrappers (KV, window, session) now use rawHeaderBytes() and SerializedHeaders instead of the eager headers() call, and vector clock entries are appended without triggering deserialization.

@github-actions github-actions bot added triage PRs from the community streams labels Mar 8, 2026
@mjsax
Copy link
Member

mjsax commented Mar 9, 2026

This PR does not really address the problem. It just shifts the deserialization we try to avoid to a different place.

To really solve the problem, we need to change the KafkaProducer code, ie, the code that build the "record batches" which are sent to Kafka over the wire. For example, there is DefaultRecord#writeTo method, which iterates over all header to serializer them -- it's using a for loop, so it's implicitly calling toArray() which trigger the materialize() step introduces in this PR, just to serialize the records again...

We need to change the whole call stack, to be able to literally pass a byte[] array instead of a Headers object into the Producer (maybe via ProducerRecord?), and change all code which currently works on the Headers object, to consider the case that Headers would be null, and the new byte[] rawHeaders is present.

@muralibasani
Copy link
Contributor Author

muralibasani commented Mar 10, 2026

This PR does not really address the problem. It just shifts the deserialization we try to avoid to a different place.

To really solve the problem, we need to change the KafkaProducer code, ie, the code that build the "record batches" which are sent to Kafka over the wire. For example, there is DefaultRecord#writeTo method, which iterates over all header to serializer them -- it's using a for loop, so it's implicitly calling toArray() which trigger the materialize() step introduces in this PR, just to serialize the records again...

We need to change the whole call stack, to be able to literally pass a byte[] array instead of a Headers object into the Producer (maybe via ProducerRecord?), and change all code which currently works on the Headers object, to consider the case that Headers would be null, and the new byte[] rawHeaders is present.

@mjsax thank you for taking a look.

Tried to make a few changes. They look complicated indeed. ProcessorContextImpl logChange and vector clock changes.

Passed raw header bytes through the producer call stack .. from the changelog stores all the way down to DefaultRecord.writeTo(), so we never deserialize and re-serialize headers just to write them to the changelog topic.

When the vector clock is enabled, we manually splice the new entries into the raw byte array instead of materializing a Headers object.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants