KAFKA-20269 [3/N]: Refactor assignment update for delayed streams assignments#21696
Conversation
3aaad34 to
49d7ae8
Compare
|
cc @lucasbru |
| if (member.get().instanceId().isPresent()) { | ||
| throw new UnsupportedOperationException("Static members are not supported yet."); | ||
| } |
There was a problem hiding this comment.
This is not present in the previous code. What's the rational for adding it here?
There was a problem hiding this comment.
I was concerned that we would forget to update this method since #21565 is already in flight. But looking at that PR, we should have a merge conflict which is enough.
98e51ff to
1e14359
Compare
|
@squah-confluent Could you please rebase? |
Refactor the streams target assignment update method to return both the target assignment epoch and target assignment. When assignment batching or assignment offload are implemented, the target assignment update method may return the last target assignment, depending on timings and the group coordinator config.
1e14359 to
8017f23
Compare
|
Rebased |
lucasbru
left a comment
There was a problem hiding this comment.
Two questions, otherwise lgtm
|
|
||
| return new UpdateTargetAssignmentResult<>( | ||
| Math.max(1, group.assignmentEpoch()), | ||
| updatedMember.map(member -> group.targetAssignment(member.memberId())) |
There was a problem hiding this comment.
Not sure why we are not returning TasksTuple.EMPTY here. Effect should be the same though.
There was a problem hiding this comment.
Fixed it, thanks!
| Optional<List<Status>> returnedStatus, | ||
| Map<String, String> assignmentConfigs | ||
| ) { | ||
| boolean initialDelayActive = timer.isScheduled(streamsInitialRebalanceKey(group.groupId())); |
There was a problem hiding this comment.
In the new code this will be called when the timer expires. Will timer.isScheduled be false then?
There was a problem hiding this comment.
I think it's fine since the timer will be removed from the set of timer tasks before we get here:
and isScheduled checks that set of tasks:
…tor-target-assignment-update-for-delayed-assignments-streams
|
@squah-confluent Is it safe to merge this one? |
…tor-target-assignment-update-for-delayed-assignments-streams
Refactor the streams target assignment update method to return both the
target assignment epoch and target assignment. When assignment batching
or assignment offload are implemented, the target assignment update
method may return the last target assignment, depending on timings and
the group coordinator config.
Reviewers: Lucas Brutschy lbrutschy@confluent.io, David Jacot
djacot@confluent.io