fix(confluent): Fix KIP-848 consumer assignment and seek handling#525
fix(confluent): Fix KIP-848 consumer assignment and seek handling#525
Conversation
With the KIP-848 consumer group protocol, calling seek() inside the on_assign callback fails with _UNKNOWN_PARTITION because rdkafka hasn't fully registered the partition yet. Fix this by deferring incremental_assign until after the user's on_assign callback, so any seek() calls made in the callback are incorporated into the starting offsets passed to rdkafka. Also fix the test_kip848_e2e topic cleanup to handle pre-existing topics from unclean test teardown, remove the xfail marker from test_pause_resume_rebalancing (it now passes), and bump confluent-kafka to >=2.13.2 with Kafka 4+ (cp-kafka:8.0.0) required for KIP-848 support. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The __pending_seeks block in poll() was never reached: seeks deferred during on_assign are cleared in the assignment callback's finally block (after incremental_assign), before poll() can execute. Remove it along with a leftover commented-out STALE_MEMBER_EPOCH line. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
After the previous cleanup, __pending_seeks was written to in seek() but never read — its values were unused since incremental_assign reads from __offsets instead. Remove the dict and all its callsites. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| for partition, offset in offsets.items() | ||
| ] | ||
| if self.__is_cooperative_sticky: | ||
| if self.__is_kip848: |
There was a problem hiding this comment.
With cooperative-sticky, __assign is called from the assignment_callback before on_assign, and calling incremental_assign there is safe — rdkafka processes it immediately and seek() works fine afterwards inside the callback.
With KIP-848, calling incremental_assign at that point and then calling seek() inside the user's on_assign callback fails with _UNKNOWN_PARTITION. The root cause is that with KIP-848, rdkafka hasn't fully committed the incremental assignment to its internal state by the time the Python callback returns control to user code — so the partition isn't yet seekable.
The fix is to defer incremental_assign until after on_assign completes (in the finally block), at which point we use self.__offsets — which already reflects any seek() calls the user made — as the starting offsets. This way, we never need to call seek() at all; the desired position is baked directly into the incremental_assign call.
So the pass here means: skip incremental_assign for now, because it will be called with the correct final offsets later in the finally block, after on_assign has run.
From Claude
Fix several bugs when using confluent-kafka 2.13.2+ with Kafka 4+ and the KIP-848 consumer group protocol.
Core fix: KIP-848 assignment and seek handling
With the old rebalancing protocols (eager and cooperative-sticky),
incremental_assignorassignis called from within theon_assigncallback, and anyseek()calls made by the user in that callback take effect immediately via rdkafka.With KIP-848, calling
seek()insideon_assignfails with_UNKNOWN_PARTITION— rdkafka hasn't fully committed the incremental assignment to its internal state by the time user code runs in the callback. The solution is to deferincremental_assignuntil afteron_assigncompletes (in thefinallyblock), usingself.__offsetsas the starting positions. Sinceself.__offsetsalready reflects anyseek()calls the user made in the callback, the desired position is baked directly into theincremental_assigncall, and no separateseek()is needed.This required adding a separate
__is_kip848flag (distinct from__is_cooperative_sticky) so the two protocols can be handled differently in__assignandseek().Other changes
confluent-kafkato>=2.13.2; KIP-848 requires Kafka 4+, so updaterun-kafka.shto usecp-kafka:8.0.0withKAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS=classic,consumertest_kip848_e2eto handle a pre-existing topic from unclean test teardownxfailmarker fromTestKafkaStreamsKip848::test_pause_resume_rebalancing— it now passes