diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index d56f214751456..de7e9d0644442 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -508,6 +508,9 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { return; } + InFlightTask inFlightTask = (InFlightTask) ctx; + inFlightTask.setEntries(Collections.emptyList()); + // Reduce read batch size to avoid flooding bookies with retries readBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMinReadBatchSize(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 605e1cd7ae39f..db4c4c70a34e6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -44,8 +44,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -642,12 +640,8 @@ public void testReplicatePeekAndSkip() throws Exception { @Test(timeOut = 30000) public void testReplicatorClearBacklog() throws Exception { - // This test is to verify that reset cursor fails on global topic - SortedSet testDests = new TreeSet<>(); - final TopicName dest = TopicName .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic")); - testDests.add(dest.toString()); @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); @@ -655,12 +649,31 @@ public void testReplicatorClearBacklog() throws Exception { @Cleanup MessageConsumer consumer1 = new MessageConsumer(url3, dest); - // Produce from cluster1 and consume from the rest + // Produce from cluster1 to trigger topic and replicator creation producer1.produce(2); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); - PersistentReplicator replicator = (PersistentReplicator) spy( - topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0))); - replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null); + PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators() + .get(topic.getReplicators().keySet().stream().toList().get(0)); + + // Wait until the first batch of messages is fully replicated (backlog = 0), + // so that disconnect() won't be rejected due to non-zero backlog + Awaitility.await().untilAsserted(() -> { + assertEquals(replicator.getNumberOfEntriesInBacklog(), 0); + }); + + // Disconnect replicator to stop geo-replication, so new messages will form backlog + pauseReplicator(replicator); + + // Produce more messages while replication is paused, these will accumulate as backlog + producer1.produce(2); + + // wait for backlog to accumulate + Awaitility.await().untilAsserted(() -> { + assertEquals(replicator.getNumberOfEntriesInBacklog(), 2); + }); + + // Clear the backlog replicator.clearBacklog().get(); Thread.sleep(100); replicator.updateRates(); // for code-coverage @@ -672,25 +685,39 @@ public void testReplicatorClearBacklog() throws Exception { @Test(timeOut = 30000) public void testReplicatorExpireMsgAsync() throws Exception { - // This test is to verify that reset cursor fails on global topic - SortedSet testDests = new TreeSet<>(); - final TopicName dest = TopicName .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic")); - testDests.add(dest.toString()); - @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); @Cleanup MessageConsumer consumer1 = new MessageConsumer(url3, dest); - // Produce from cluster1 and consume from the rest + // Produce from cluster1 to trigger topic and replicator creation producer1.produce(2); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); - PersistentReplicator replicator = (PersistentReplicator) spy( - topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0))); - replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null); + PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators() + .get(topic.getReplicators().keySet().stream().toList().get(0)); + + // Wait until the first batch of messages is fully replicated (backlog = 0), + // so that disconnect() won't be rejected due to non-zero backlog + Awaitility.await().untilAsserted(() -> { + assertEquals(replicator.getNumberOfEntriesInBacklog(), 0); + }); + + // Disconnect replicator to stop geo-replication, so new messages will form backlog + pauseReplicator(replicator); + + // Produce more messages while replication is paused, these will accumulate as backlog + producer1.produce(2); + + // wait for backlog to accumulate + Awaitility.await().untilAsserted(() -> { + assertEquals(replicator.getNumberOfEntriesInBacklog(), 2); + }); + + // Clear the backlog replicator.clearBacklog().get(); Thread.sleep(100); replicator.updateRates(); // for code-coverage @@ -1653,7 +1680,7 @@ private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, @Test public void testReplicatorWithFailedAck() throws Exception { - log.info("--- Starting ReplicatorTest::testReplication ---"); + log.info("--- Starting ReplicatorTest::testReplicatorWithFailedAck ---"); String namespace = BrokerTestUtil.newUniqueName("pulsar/ns"); admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1")); @@ -1686,14 +1713,28 @@ public void testReplicatorWithFailedAck() throws Exception { }).when(spyCursor).asyncDelete(Mockito.any(Position.class), Mockito.any(AsyncCallbacks.DeleteCallback.class), Mockito.any()); + // Mock the readEntriesFailed scenario: + // Use AtomicBoolean to control whether to trigger read failure + // Initialized to true to ensure the first readMoreEntries after replicator startup is intercepted. + AtomicBoolean isMakeReadFail = new AtomicBoolean(true); + doAnswer(invocation -> { + if (isMakeReadFail.get()) { + AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(2); + Object ctx = invocation.getArgument(3); + log.info("asyncReadEntriesOrWait will be failed"); + callback.readEntriesFailed(new ManagedLedgerException("Mocked read failure"), ctx); + return null; + } else { + log.info("asyncReadEntriesOrWait will proceed normally"); + return invocation.callRealMethod(); + } + }).when(spyCursor).asyncReadEntriesOrWait(Mockito.anyInt(), Mockito.anyLong(), + Mockito.any(AsyncCallbacks.ReadEntriesCallback.class), Mockito.any(), Mockito.any(Position.class)); + log.info("--- Starting producer --- " + url1); admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"), false); - // Produce from cluster1 and consume from the rest - producer1.produce(2); - - MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get(); - Position lastPosition = PositionFactory.create(lastMessageId.getLedgerId(), lastMessageId.getEntryId()); + // Wait for replicator to start Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS) .ignoreExceptions() .untilAsserted(() -> { @@ -1703,9 +1744,33 @@ public void testReplicatorWithFailedAck() throws Exception { replicator.getState()); }); - // Make sure all the data has replicated to the remote cluster before close the cursor. - Awaitility.await().untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), lastPosition)); + // --- Test readEntriesFailed scenario --- + // isMakeReadFail is already true, replicator's readMoreEntries keeps failing + // Record current mark delete position + Position posBeforeReadFail = cursor.getMarkDeletedPosition(); + + // Produce messages; since reads keep failing, messages cannot be replicated + producer1.produce(2); + + MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get(); + Position lastPosition = PositionFactory.create(lastMessageId.getLedgerId(), lastMessageId.getEntryId()); + + // During 2 seconds of continuous read failure, mark delete position should not advance + Awaitility.await() + .during(2, TimeUnit.SECONDS) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), posBeforeReadFail)); + + // Disable the read failure flag; replicator will read normally on retry, thus resuming replication + isMakeReadFail.set(false); + + // Wait for replicator to recover from read failure and complete replication + // (mark delete catches up to the latest position) + Awaitility.await().timeout(30, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(cursor.getMarkDeletedPosition(), lastPosition); + }); + // --- Test DeleteCallback scenario --- isMakeAckFail.set(true); producer1.produce(10);