KAFKA-20066: Implement KIP-1251: Assignment epochs for consumer groups [3/N]#21692
KAFKA-20066: Implement KIP-1251: Assignment epochs for consumer groups [3/N]#21692lucliu1108 wants to merge 8 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Updates consumer-group offset commit fencing to use per-partition assignment epochs (per KIP-1251) so that modern-protocol consumers aren’t falsely fenced when their member epoch lags behind the broker’s current epoch.
Changes:
- Relax modern consumer-group OffsetCommit validation from strict
clientEpoch == brokerEpochto per-partitionassignmentEpoch <= clientEpoch <= brokerEpoch. - Resolve
Uuid.ZERO_UUIDtopic IDs from coordinator metadata during OffsetCommit validation to support assignment-epoch checks. - Add/adjust unit tests covering assignment-epoch validation across assigned, pending-revocation, and unassigned partitions.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java | Implements per-partition assignment-epoch validation for modern-protocol offset commits when the client epoch is older than the broker epoch. |
| group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java | Resolves ZERO_UUID topic IDs from metadata before per-partition validation during OffsetCommit handling. |
| group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java | Adds parameterized tests for relaxed offset commit validation using assignment epochs. |
| group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMemberTest.java | Adds tests verifying assigned vs pending-revocation epoch lookup behavior. |
| group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java | Adds a regression test ensuring ZERO_UUID topic IDs are resolved for assignment-epoch validation. |
| group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java | Updates test fixtures to use assignment-with-epochs structure. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return (topicName, topicId, partitionId) -> { | ||
| // Search for the partition in the assigned partitions, then in partitions pending revocation. | ||
| Integer assignmentEpoch = member.assignmentEpoch(topicId, partitionId); | ||
| if (assignmentEpoch == null) { | ||
| assignmentEpoch = member.pendingRevocationEpoch(topicId, partitionId); | ||
| } | ||
|
|
||
| if (assignmentEpoch == null) { | ||
| throw new StaleMemberEpochException(String.format( | ||
| "Partition %s-%d is not assigned or pending revocation for member.", | ||
| topicName, partitionId)); | ||
| } | ||
|
|
||
| if (receivedMemberEpoch < assignmentEpoch) { | ||
| throw new StaleMemberEpochException( | ||
| String.format("The received member epoch %d is older than the assignment epoch %d for partition %s-%d.", |
There was a problem hiding this comment.
createAssignmentEpochValidator relies on a non-ZERO_UUID topicId to find assignment epochs (it indexes member assignments by topic ID). However, OffsetMetadataManager.commitTransactionalOffset(...) currently invokes validator.validate(...) with Uuid.ZERO_UUID for TxnOffsetCommit requests, which will cause assignment-epoch validation to fail as "not assigned" whenever the member epoch is older than the broker epoch. To keep behavior consistent with the non-transactional OffsetCommit path, resolve the topic ID from metadata (topic name -> id) for TxnOffsetCommit as well, and pass the resolved ID into the validator.
| // Validate commit per-partition | ||
| validator.validate( | ||
| topic.name(), | ||
| topic.topicId(), | ||
| resolvedTopicId, | ||
| partition.partitionIndex() | ||
| ); |
There was a problem hiding this comment.
CommitPartitionValidator.validate(...) can now throw StaleMemberEpochException on a per-partition basis (assignment-epoch checks). In commitOffset, that exception currently bubbles out of the per-partition loop, causing the whole OffsetCommit request to fail (and typically returning the same error for all partitions) instead of returning per-partition error codes and still committing valid partitions. Catch StaleMemberEpochException around validator.validate(...), set the partition error (e.g., Errors.STALE_MEMBER_EPOCH), and continue without appending a record for that partition.
| // Resolve topic ID if it's ZERO_UUID | ||
| Uuid resolvedTopicId = topic.topicId(); | ||
| if (resolvedTopicId.equals(Uuid.ZERO_UUID)) { | ||
| resolvedTopicId = groupMetadataManager.image() | ||
| .topicMetadata(topic.name()) | ||
| .map(CoordinatorMetadataImage.TopicMetadata::id) | ||
| .orElse(Uuid.ZERO_UUID); | ||
| } |
There was a problem hiding this comment.
Topic ID resolution for Uuid.ZERO_UUID is done inside the partition loop, but the resolved value is identical for every partition in the topic. Resolve once per topic (before iterating partitions) to avoid redundant metadata lookups and make the control flow clearer.
| // Resolve topic ID if it's ZERO_UUID | ||
| Uuid resolvedTopicId = topic.topicId(); | ||
| if (resolvedTopicId.equals(Uuid.ZERO_UUID)) { | ||
| resolvedTopicId = groupMetadataManager.image() | ||
| .topicMetadata(topic.name()) | ||
| .map(CoordinatorMetadataImage.TopicMetadata::id) | ||
| .orElse(Uuid.ZERO_UUID); | ||
| } | ||
| // Validate commit per-partition | ||
| validator.validate( | ||
| topic.name(), | ||
| topic.topicId(), | ||
| resolvedTopicId, | ||
| partition.partitionIndex() | ||
| ); | ||
|
|
There was a problem hiding this comment.
resolvedTopicId is used for validation, but below this block the code still logs, responds, and persists offsets using topic.topicId() from the request. When the request uses ZERO_UUID, this will keep writing legacy records with ZERO_UUID even though the coordinator can resolve the real topic ID, weakening topic-id mismatch detection in offset fetch/delete logic. Consider using resolvedTopicId consistently for the stored OffsetAndMetadata (and potentially the response/log fields) once it has been resolved.
lucasbru
left a comment
There was a problem hiding this comment.
Hi @lucliu1108, thanks for the PR! I left three comments
| return (topicName, topicId, partitionId) -> { | ||
| // Resolve topic ID if it's ZERO_UUID (for older API versions without topic ID). | ||
| Uuid resolvedTopicId = topicId; | ||
| if (topicId.equals(Uuid.ZERO_UUID) && metadataImageSupplier != null) { |
There was a problem hiding this comment.
Why do we need this null check?
| /** | ||
| * Creates a validator that checks if the received member epoch is valid for each partition's assignment epoch. | ||
| * A commit is rejected if the partition is not assigned to the member | ||
| * or if the received client-side epoch is older than the partition's assignment epoch(KIP-1251). |
There was a problem hiding this comment.
| * or if the received client-side epoch is older than the partition's assignment epoch(KIP-1251). | |
| * or if the received client-side epoch is older than the partition's assignment epoch (KIP-1251). |
| /** | ||
| * The supplier for the metadata image, used to resolve topic names to IDs. | ||
| */ | ||
| private final Supplier<CoordinatorMetadataImage> metadataImageSupplier; |
There was a problem hiding this comment.
Not sure I like this. There are two problems with this
- we need to re-resolve each topic id for each partition, causing performance overhead.
- we complicate the ConsumerGroup object quit a bit.
How about we already resolve topic IDs in OffsetMetadataManager.commitOffset() before calling validator.validate()?
Uuid resolvedTopicId = topic.topicId();
if (resolvedTopicId.equals(Uuid.ZERO_UUID)) {
resolvedTopicId = groupMetadataManager.image()
.topicMetadata(topic.name())
.map(CoordinatorMetadataImage.TopicMetadata::id)
.orElse(Uuid.ZERO_UUID);
}
squah-confluent
left a comment
There was a problem hiding this comment.
Thanks for the PR!
| // For members using the consumer protocol, the epoch must either match the last epoch sent | ||
| // in a heartbeat or be greater than or equal to the partition's assignment epoch. |
There was a problem hiding this comment.
This comment looks misplaced.
| // For members using the consumer protocol, the epoch must either match the last epoch sent | |
| // in a heartbeat or be greater than or equal to the partition's assignment epoch. | |
| // For members using the classic protocol, the epoch must match the last epoch sent | |
| // in a heartbeat. |
| return CommitPartitionValidator.NO_OP; | ||
| } | ||
|
|
||
| // For members using the consumer protocol |
There was a problem hiding this comment.
| // For members using the consumer protocol | |
| // For members using the consumer protocol, the epoch must either match the last epoch sent | |
| // in a heartbeat or be greater than or equal to the partition's assignment epoch. |
| } | ||
| if (memberEpoch > member.memberEpoch()) { |
There was a problem hiding this comment.
nit: newline
| } | |
| if (memberEpoch > member.memberEpoch()) { | |
| } | |
| if (memberEpoch > member.memberEpoch()) { |
| * @param member The consumer group member. | ||
| * @param receivedMemberEpoch The member epoch from the offset commit request. | ||
| * @return A validator that checks each partition's assignment epoch. |
There was a problem hiding this comment.
Could we re-use the streams javadoc wording?
| * @param member The consumer group member. | |
| * @param receivedMemberEpoch The member epoch from the offset commit request. | |
| * @return A validator that checks each partition's assignment epoch. | |
| * @param member The member whose assignments are being validated. | |
| * @param receivedMemberEpoch The received member epoch. | |
| * @return A validator for per-partition validation. |
|
|
||
| final OffsetAndMetadata offsetAndMetadata = OffsetAndMetadata.fromRequest( | ||
| topic.topicId(), | ||
| resolvedTopicId, |
There was a problem hiding this comment.
We shouldn't bake the topic id when the offset commit specifies a topic name only. This impacts behavior when topics are recreated.
| resolvedTopicId, | |
| topic.topicId(), |
| int partitionId = 0; | ||
| int memberEpoch = 10; | ||
| int assignmentEpoch = 7; | ||
| boolean isTransactional = false; |
There was a problem hiding this comment.
Since the main driver for the KIP was transactional offset commits, it's best that we test both transactional and non-transactional offset commits.
| "member-id", "", assignmentEpoch, isTransactional, version | ||
| ); | ||
| StaleMemberEpochException ex = assertThrows(StaleMemberEpochException.class, () -> | ||
| validator.validate(unassignedTopicName, unassignedTopicId, partitionId)); |
There was a problem hiding this comment.
Can we test both an unassigned topic and unassigned partition?
| assertThrows(UnsupportedVersionException.class, () -> | ||
| group.validateOffsetCommit("member-id", "", memberEpoch, isTransactional, version)); | ||
| } | ||
| // client epoch (11) > broker epoch (10) - exception thrown directly from validateOffsetCommit |
There was a problem hiding this comment.
missing newline
| // client epoch (11) > broker epoch (10) - exception thrown directly from validateOffsetCommit | |
| // client epoch (11) > broker epoch (10) - exception thrown directly from validateOffsetCommit |
| mkTopicAssignmentWithEpochs(topicId, assignmentEpoch, partitionId))) | ||
| .build()); | ||
|
|
||
| // client epoch = broker epoch |
There was a problem hiding this comment.
style nit: Could we write these as short sentences?
| .setAssignedPartitions(mkAssignmentWithEpochs( | ||
| mkTopicAssignmentWithEpochs(assignedTopicId, assignmentEpoch, partitionId))) | ||
| .setPartitionsPendingRevocation(mkAssignmentWithEpochs( | ||
| mkTopicAssignmentWithEpochs(assignedTopicId, assignmentEpoch, partitionId + 1))) |
There was a problem hiding this comment.
| mkTopicAssignmentWithEpochs(assignedTopicId, assignmentEpoch, partitionId + 1))) | |
| mkTopicAssignmentWithEpochs(assignedTopicId, assignmentEpoch, partitionId + 1))) |
Summary
This PR changes the offset commit fencing logic, as a follow-up to
#21558 change on assignment
structure to include epoch information.
It introduces per-partition assignment epochs to relax the strict member
epoch validation for consumer group offset commits. When receiving an
offset commit request that includes the client-side member epoch and a
member ID, we previously require checking
for a valid offset commit, which could lead to false fencing.
We now allow a relaxed offset commit check using an assignment epoch for
each assigned partition and each member,
This prevents false rejections of legitimate offset commits when a
member's epoch is bumped but the client hasn't received the update yet.
Reviewers: Sean Quah squah@confluent.io, Lucas Brutschy
lbrutschy@confluent.io