Skip to content

KAFKA-15852: Move UncleanLeaderElectionTest to server module#21689

Open
mimaison wants to merge 3 commits intoapache:trunkfrom
mimaison:kafka-15852
Open

KAFKA-15852: Move UncleanLeaderElectionTest to server module#21689
mimaison wants to merge 3 commits intoapache:trunkfrom
mimaison:kafka-15852

Conversation

@mimaison
Copy link
Member

@mimaison mimaison commented Mar 9, 2026

Convert the test from QuorumTestHarness to @ClusterTest.

Since we can have @ParameterizedTest with @ClusterTest I created
Classic and Consumer protocol versions of all the tests that use a
consumer.

@github-actions github-actions bot added the core Kafka Broker label Mar 9, 2026
assertInstanceOf(InvalidConfigurationException.class, e.getCause());
}

public void verifyUncleanLeaderElectionEnabled(GroupProtocol groupProtocol) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Doesn't need to be public.

Or we can better move it to utils to be reused.

assertEquals(List.of("first", "third"), consumeAllMessages(2, groupProtocol));
}

public void verifyUncleanLeaderElectionDisabled(GroupProtocol groupProtocol) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Contributor

@akhileshchg akhileshchg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR looks great. LGTM with minor comments.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimaison thanks for this migration

new IllegalStateException("Cannot get topic: " + topic + ", partition: " + partition + " in server metadata cache"));
}

private static <K, V> List<ConsumerRecord<K, V>> pollUntilAtLeastNumRecords(Consumer<K, V> consumer, int numRecords) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you merge pollRecordsUntilTrue with pollUntilAtLeastNumRecords? For example:

    private static <K, V> List<ConsumerRecord<K, V>> pollUntilAtLeastNumRecords(Consumer<K, V> consumer, int numRecords) throws Exception {
        final List<ConsumerRecord<K, V>> records = new ArrayList<>();
        waitForCondition(() -> {
            consumer.poll(Duration.ofMillis(100L)).forEach(records::add);
            return records.size() >= numRecords;
        }, DEFAULT_MAX_WAIT_MS, () -> "Consumed " + records.size() + " records before timeout instead of the expected " + numRecords + " records");
        return records;
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original methods in kafka.utils.TestUtils are called from multiple places hence why I kept them separate there too.

* @return The current leader for the topic partition
* @throws InterruptedException If waitForCondition is interrupted
*/
public static int awaitLeaderChange(ClusterInstance cluster, TopicPartition tp, Optional<Integer> expectedLeaderOpt, long timeout) throws InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    public static int awaitLeaderChange(ClusterInstance cluster, TopicPartition tp, Optional<Integer> expectedLeaderOpt, long timeout) throws InterruptedException {
        if (expectedLeaderOpt.isPresent()) {
            LOG.debug("Waiting for leader of {} to change to {}", tp, expectedLeaderOpt.get());
        } else {
            LOG.debug("Waiting for a leader to be elected for {}", tp);
        }

        Supplier<Optional<Integer>> newLeaderExists = () -> cluster.brokers().values().stream()
            .filter(broker -> expectedLeaderOpt.isEmpty() || broker.config().brokerId() == expectedLeaderOpt.get())
            .filter(broker -> broker.replicaManager().onlinePartition(tp).exists(partition -> partition.leaderLogIfLocal().isDefined()))
            .map(broker -> broker.config().brokerId())
            .findFirst();

        waitForCondition(
            () -> newLeaderExists.get().isPresent(),
            timeout,
            "Did not observe leader change for partition " + tp + " after " + timeout + " ms"
        );

        return newLeaderExists.get().get();
    }

return admin.incrementalAlterConfigs(Map.of(configResource, configEntries));
}

private void waitForNoLeaderAndIsrHasOldLeaderId(MetadataCache metadataCache, int leaderId) throws InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    private void waitForNoLeaderAndIsrHasOldLeaderId(MetadataCache metadataCache, int leaderId) throws InterruptedException {
        waitForCondition(
            () -> metadataCache.getLeaderAndIsr(TOPIC, PARTITION_ID)
                    .filter(leaderAndIsr -> leaderAndIsr.leader() == LeaderConstants.NO_LEADER)
                    .filter(leaderAndIsr -> leaderAndIsr.isr().equals(Set.of(leaderId)))
                    .isPresent(),
                DEFAULT_MAX_WAIT_MS,
                "Timed out waiting for broker metadata cache updates the info for topic partition:" + TOPIC_PARTITION);
    }

private AlterConfigsResult alterTopicConfigs(Map<String, String> configs) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC);

Collection<AlterConfigOp> configEntries = configs.entrySet().stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        var configEntries = configs.entrySet().stream()
            .map(e -> new AlterConfigOp(new ConfigEntry(e.getKey(), e.getValue()),
                    AlterConfigOp.OpType.SET))
            .toList();

We could use a single map operation to create the AlterConfigOp instances

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I pushed an update. Thanks

private static final int BROKER_ID_0 = 0;
private static final int BROKER_ID_1 = 1;
private static final Random RANDOM = new Random();
private static final String TOPIC = "topic" + RANDOM.nextLong();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the previous behaviour was to generate a different topic name for each test case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each test uses a new cluster, so instead I was thinking of removing the randomness altogether to make things simpler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Kafka Broker

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants