KAFKA-20269: Refactor target assignment update for delayed assignments#21664
Conversation
When assignment batching or assignment offload are implemented, the methods to update the target assignment will no longer always return a new target assignment, depending on timings and the group coordinator config.
|
@lucasbru Could you check the streams part of this change? |
| @@ -3937,7 +3956,7 @@ private Assignment updateTargetAssignment( | |||
| * @param records The list to accumulate any new records. | |||
| * @return The new target assignment for the updated member, or EMPTY if no member specified. | |||
There was a problem hiding this comment.
nit: do we not want to update the return javadocs? I suppose this goes to the next PR?
There was a problem hiding this comment.
Thanks for the review.
After applying #21664 (comment) I'm not sure it makes sense to update the return javadocs.
| returnedStatus.add( | ||
| new Status() | ||
| .setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code()) | ||
| .setStatusDetail("Assignment delayed due to the configured initial rebalance delay.") |
There was a problem hiding this comment.
I think we can also get here also if the initial assignment is offloaded. Should we reflect this in the status detail?
There was a problem hiding this comment.
After applying #21664 (comment) it no longer makes sense to add the status in this PR.
| Optional<Assignment> updatedTargetAssignment = Optional.empty(); | ||
| if (groupEpoch > group.assignmentEpoch()) { | ||
| targetAssignment = updateTargetAssignment( | ||
| updatedTargetAssignment = maybeUpdateTargetAssignment( |
There was a problem hiding this comment.
I wonder whether we should go a step further here. Have you considered pushing all the logic into maybeUpdateTargetAssignment? For instance, maybeUpdateTargetAssignment could return a record(epoch, assignment) and handle all the conditions that we have here. Thoughts?
…d epoch and assignment pair
Refactor target assignment update methods to return both the target
assignment epoch and target assignment. When assignment batching or
assignment offload are implemented, the target assignment update methods
may return the last target assignment, depending on timings and the
group coordinator config.