Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2023,8 +2023,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertTrue(testGroupDescription.groupEpoch.isEmpty)
assertTrue(testGroupDescription.targetAssignmentEpoch.isEmpty)
} else {
assertEquals(Optional.of(3), testGroupDescription.groupEpoch)
assertEquals(Optional.of(3), testGroupDescription.targetAssignmentEpoch)
assertEquals(Optional.of(4), testGroupDescription.groupEpoch)
assertEquals(Optional.of(4), testGroupDescription.targetAssignmentEpoch)
}

assertEquals(testGroupId, testGroupDescription.groupId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCo
new DescribedGroup()
.setGroupId("grp-1")
.setGroupState(ConsumerGroupState.STABLE.toString)
.setGroupEpoch(1)
.setAssignmentEpoch(1)
.setGroupEpoch(2)
.setAssignmentEpoch(2)
.setAssignorName("uniform")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC

// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)

// Create the topic.
Expand Down Expand Up @@ -132,7 +132,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC
}, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")

// Verify the response.
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)

// Leave the group.
Expand Down Expand Up @@ -382,7 +382,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC

// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)

// Create the topic.
Expand Down Expand Up @@ -416,7 +416,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC

// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)

val oldMemberId = consumerGroupHeartbeatResponse.data.memberId
Expand Down Expand Up @@ -451,7 +451,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC

// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
// The 2 member IDs should be different
assertNotEquals(oldMemberId, consumerGroupHeartbeatResponse.data.memberId)
Expand Down Expand Up @@ -493,7 +493,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC

// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), consumerGroupHeartbeatResponse.data.assignment)

// Create the topic.
Expand Down Expand Up @@ -526,7 +526,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC
}, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")

// Verify the response.
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)

// A new static member tries to join the group with an inuse instanceid.
Expand Down Expand Up @@ -554,8 +554,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not re-join the group successfully. Last response $consumerGroupHeartbeatResponse.")

// Verify the response. The group epoch bumps upto 4 which eventually reflects in the new member epoch.
assertEquals(4, consumerGroupHeartbeatResponse.data.memberEpoch)
// Verify the response. The group epoch bumps upto 5 which eventually reflects in the new member epoch.
assertEquals(5, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
}

Expand Down Expand Up @@ -598,7 +598,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC

// Verify the response.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(5000, consumerGroupHeartbeatResponse.data.heartbeatIntervalMs)

// Alter consumer heartbeat interval config
Expand Down Expand Up @@ -706,7 +706,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC

// Verify initial join success.
assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)

// Create the topic to trigger partition assignment.
val topicId = createTopic(
Expand Down Expand Up @@ -735,8 +735,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response $consumerGroupHeartbeatResponse.")

// Verify member has epoch > 0 (should be 2).
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
// Verify member has epoch > 0 (should be 3).
assertEquals(3, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)

// Simulate a fenced member attempting to rejoin with epoch=0.
Expand All @@ -754,11 +754,11 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC

// Verify the full response.
// Since the subscription/metadata hasn't changed, the member should get
// their current state back with the same epoch (2) and assignment.
// their current state back with the same epoch (3) and assignment.
val expectedRejoinResponse = new ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code)
.setMemberId(memberId)
.setMemberEpoch(2)
.setMemberEpoch(3)
.setHeartbeatIntervalMs(rejoinResponse.data.heartbeatIntervalMs)
.setAssignment(expectedAssignment)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,14 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
syncGroupWithOldProtocol(
groupId = groupId,
memberId = memberId2,
generationId = 2
generationId = 3
)
)

// Member 2 heartbeats.
heartbeat(
groupId = groupId,
generationId = 2,
generationId = 3,
memberId = memberId2
)

Expand All @@ -323,7 +323,7 @@ class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends GroupCoord
// Member 2 heartbeats and gets REBALANCE_IN_PROGRESS.
heartbeat(
groupId = groupId,
generationId = 2,
generationId = 3,
memberId = memberId2,
expectedError = Errors.REBALANCE_IN_PROGRESS
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ class ShareGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoord
new DescribedGroup()
.setGroupId("grp-1")
.setGroupState(GroupState.STABLE.toString)
.setGroupEpoch(1)
.setAssignmentEpoch(1)
.setGroupEpoch(2)
.setAssignmentEpoch(2)
.setAssignorName("simple")
.setAuthorizedOperations(authorizedOperationsInt)
.setMembers(List(
Expand Down
Loading
Loading