From 9acde43cc43d9a1a4c0df8acf90a1a1d7378442d Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Mon, 23 Mar 2026 08:40:12 +0800 Subject: [PATCH 1/5] [fix][broker] Missing assignment of the corresponding InFlightTask in PersistentReplicator.readEntriesFailed --- .../persistent/PersistentReplicator.java | 8 +++ .../pulsar/broker/service/ReplicatorTest.java | 58 +++++++++++++++---- 2 files changed, 56 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index d56f214751456..1686ad8d3bc8a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -508,6 +508,9 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { return; } + InFlightTask inFlightTask = (InFlightTask) ctx; + inFlightTask.setEntries(Collections.emptyList()); + // Reduce read batch size to avoid flooding bookies with retries readBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMinReadBatchSize(); @@ -989,4 +992,9 @@ protected boolean hasPendingRead() { String getReplicatorId() { return replicatorId; } + + @VisibleForTesting + public void incrementWaitForCursorRewindingRefCnf() { + waitForCursorRewindingRefCnf++; + } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 605e1cd7ae39f..674e71b93d456 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -660,7 +660,7 @@ public void testReplicatorClearBacklog() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); PersistentReplicator replicator = (PersistentReplicator) spy( topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0))); - replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null); + replicator.incrementWaitForCursorRewindingRefCnf(); replicator.clearBacklog().get(); Thread.sleep(100); replicator.updateRates(); // for code-coverage @@ -690,7 +690,7 @@ public void testReplicatorExpireMsgAsync() throws Exception { PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); PersistentReplicator replicator = (PersistentReplicator) spy( topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0))); - replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null); + replicator.incrementWaitForCursorRewindingRefCnf(); replicator.clearBacklog().get(); Thread.sleep(100); replicator.updateRates(); // for code-coverage @@ -1653,7 +1653,7 @@ private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, @Test public void testReplicatorWithFailedAck() throws Exception { - log.info("--- Starting ReplicatorTest::testReplication ---"); + log.info("--- Starting ReplicatorTest::testReplicatorWithFailedAck ---"); String namespace = BrokerTestUtil.newUniqueName("pulsar/ns"); admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1")); @@ -1686,14 +1686,28 @@ public void testReplicatorWithFailedAck() throws Exception { }).when(spyCursor).asyncDelete(Mockito.any(Position.class), Mockito.any(AsyncCallbacks.DeleteCallback.class), Mockito.any()); + // Mock the readEntriesFailed scenario: + // Use AtomicBoolean to control whether to trigger read failure + // Initialized to true to ensure the first readMoreEntries after replicator startup is intercepted. + AtomicBoolean isMakeReadFail = new AtomicBoolean(true); + doAnswer(invocation -> { + if (isMakeReadFail.get()) { + AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(2); + Object ctx = invocation.getArgument(3); + log.info("asyncReadEntriesOrWait will be failed"); + callback.readEntriesFailed(new ManagedLedgerException("Mocked read failure"), ctx); + return null; + } else { + log.info("asyncReadEntriesOrWait will proceed normally"); + return invocation.callRealMethod(); + } + }).when(spyCursor).asyncReadEntriesOrWait(Mockito.anyInt(), Mockito.anyLong(), + Mockito.any(AsyncCallbacks.ReadEntriesCallback.class), Mockito.any(), Mockito.any(Position.class)); + log.info("--- Starting producer --- " + url1); admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"), false); - // Produce from cluster1 and consume from the rest - producer1.produce(2); - - MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get(); - Position lastPosition = PositionFactory.create(lastMessageId.getLedgerId(), lastMessageId.getEntryId()); + // Wait for replicator to start Awaitility.await().pollInterval(1, TimeUnit.SECONDS).timeout(30, TimeUnit.SECONDS) .ignoreExceptions() .untilAsserted(() -> { @@ -1703,9 +1717,33 @@ public void testReplicatorWithFailedAck() throws Exception { replicator.getState()); }); - // Make sure all the data has replicated to the remote cluster before close the cursor. - Awaitility.await().untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), lastPosition)); + // --- Test readEntriesFailed scenario --- + // isMakeReadFail is already true, replicator's readMoreEntries keeps failing + // Record current mark delete position + Position posBeforeReadFail = cursor.getMarkDeletedPosition(); + + // Produce messages; since reads keep failing, messages cannot be replicated + producer1.produce(2); + + MessageIdImpl lastMessageId = (MessageIdImpl) topic.getLastMessageId().get(); + Position lastPosition = PositionFactory.create(lastMessageId.getLedgerId(), lastMessageId.getEntryId()); + + // During 2 seconds of continuous read failure, mark delete position should not advance + Awaitility.await() + .during(2, TimeUnit.SECONDS) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(cursor.getMarkDeletedPosition(), posBeforeReadFail)); + + // Disable the read failure flag; replicator will read normally on retry, thus resuming replication + isMakeReadFail.set(false); + + // Wait for replicator to recover from read failure and complete replication + // (mark delete catches up to the latest position) + Awaitility.await().timeout(30, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(cursor.getMarkDeletedPosition(), lastPosition); + }); + // --- Test DeleteCallback scenario --- isMakeAckFail.set(true); producer1.produce(10); From 1c976a8ed8f257ef8babb3762aac2f239d32d35b Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Tue, 24 Mar 2026 15:26:26 +0800 Subject: [PATCH 2/5] Remove the `incrementWaitForCursorRewindingRefCnf()` function and adjust the implementations of `testReplicatorClearBacklog()` and `testReplicatorExpireMsgAsync()` --- .../persistent/PersistentReplicator.java | 5 -- .../pulsar/broker/service/ReplicatorTest.java | 62 ++++++++++++++----- 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 1686ad8d3bc8a..de7e9d0644442 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -992,9 +992,4 @@ protected boolean hasPendingRead() { String getReplicatorId() { return replicatorId; } - - @VisibleForTesting - public void incrementWaitForCursorRewindingRefCnf() { - waitForCursorRewindingRefCnf++; - } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 674e71b93d456..9ef72d131590f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -642,12 +642,8 @@ public void testReplicatePeekAndSkip() throws Exception { @Test(timeOut = 30000) public void testReplicatorClearBacklog() throws Exception { - // This test is to verify that reset cursor fails on global topic - SortedSet testDests = new TreeSet<>(); - final TopicName dest = TopicName .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic")); - testDests.add(dest.toString()); @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); @@ -655,12 +651,31 @@ public void testReplicatorClearBacklog() throws Exception { @Cleanup MessageConsumer consumer1 = new MessageConsumer(url3, dest); - // Produce from cluster1 and consume from the rest + // Produce from cluster1 to trigger topic and replicator creation producer1.produce(2); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); - PersistentReplicator replicator = (PersistentReplicator) spy( - topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0))); - replicator.incrementWaitForCursorRewindingRefCnf(); + PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators() + .get(topic.getReplicators().keySet().stream().toList().get(0)); + + // Wait until the first batch of messages is fully replicated (backlog = 0), + // so that disconnect() won't be rejected due to non-zero backlog + Awaitility.await().untilAsserted(() -> { + assertEquals(replicator.getNumberOfEntriesInBacklog(), 0); + }); + + // Disconnect replicator to stop geo-replication, so new messages will form backlog + pauseReplicator(replicator); + + // Produce more messages while replication is paused ¡ª these will accumulate as backlog + producer1.produce(2); + + // wait for backlog to accumulate + Awaitility.await().untilAsserted(() -> { + assertEquals(replicator.getNumberOfEntriesInBacklog(), 2); + }); + + // Clear the backlog replicator.clearBacklog().get(); Thread.sleep(100); replicator.updateRates(); // for code-coverage @@ -672,12 +687,8 @@ public void testReplicatorClearBacklog() throws Exception { @Test(timeOut = 30000) public void testReplicatorExpireMsgAsync() throws Exception { - // This test is to verify that reset cursor fails on global topic - SortedSet testDests = new TreeSet<>(); - final TopicName dest = TopicName .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic")); - testDests.add(dest.toString()); @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); @@ -685,12 +696,31 @@ public void testReplicatorExpireMsgAsync() throws Exception { @Cleanup MessageConsumer consumer1 = new MessageConsumer(url3, dest); - // Produce from cluster1 and consume from the rest + // Produce from cluster1 to trigger topic and replicator creation producer1.produce(2); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); - PersistentReplicator replicator = (PersistentReplicator) spy( - topic.getReplicators().get(topic.getReplicators().keySet().stream().toList().get(0))); - replicator.incrementWaitForCursorRewindingRefCnf(); + PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators() + .get(topic.getReplicators().keySet().stream().toList().get(0)); + + // Wait until the first batch of messages is fully replicated (backlog = 0), + // so that disconnect() won't be rejected due to non-zero backlog + Awaitility.await().untilAsserted(() -> { + assertEquals(replicator.getNumberOfEntriesInBacklog(), 0); + }); + + // Disconnect replicator to stop geo-replication, so new messages will form backlog + pauseReplicator(replicator); + + // Produce more messages while replication is paused ¡ª these will accumulate as backlog + producer1.produce(2); + + // wait for backlog to accumulate + Awaitility.await().untilAsserted(() -> { + assertEquals(replicator.getNumberOfEntriesInBacklog(), 2); + }); + + // Clear the backlog replicator.clearBacklog().get(); Thread.sleep(100); replicator.updateRates(); // for code-coverage From 7ef7ecedea32ba734ac093d594a792d7852fc5db Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Tue, 24 Mar 2026 15:26:45 +0800 Subject: [PATCH 3/5] fix --- .../org/apache/pulsar/broker/service/ReplicatorTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 9ef72d131590f..b577ed320a337 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -667,7 +667,7 @@ public void testReplicatorClearBacklog() throws Exception { // Disconnect replicator to stop geo-replication, so new messages will form backlog pauseReplicator(replicator); - // Produce more messages while replication is paused ¡ª these will accumulate as backlog + // Produce more messages while replication is paused �� these will accumulate as backlog producer1.produce(2); // wait for backlog to accumulate @@ -689,7 +689,6 @@ public void testReplicatorExpireMsgAsync() throws Exception { final TopicName dest = TopicName .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic")); - @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); @@ -712,7 +711,7 @@ public void testReplicatorExpireMsgAsync() throws Exception { // Disconnect replicator to stop geo-replication, so new messages will form backlog pauseReplicator(replicator); - // Produce more messages while replication is paused ¡ª these will accumulate as backlog + // Produce more messages while replication is paused �� these will accumulate as backlog producer1.produce(2); // wait for backlog to accumulate From cfe97e223ad7e965804f38791b51bb7eacec1d5a Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Tue, 24 Mar 2026 15:35:54 +0800 Subject: [PATCH 4/5] fix --- .../java/org/apache/pulsar/broker/service/ReplicatorTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index b577ed320a337..cfdc462333a5b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -44,8 +44,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; From 68f99f8cb077f830e8d5f892ead31e44e7fbd8f0 Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Tue, 24 Mar 2026 16:26:39 +0800 Subject: [PATCH 5/5] remove garbled characters --- .../java/org/apache/pulsar/broker/service/ReplicatorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index cfdc462333a5b..db4c4c70a34e6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -665,7 +665,7 @@ public void testReplicatorClearBacklog() throws Exception { // Disconnect replicator to stop geo-replication, so new messages will form backlog pauseReplicator(replicator); - // Produce more messages while replication is paused �� these will accumulate as backlog + // Produce more messages while replication is paused, these will accumulate as backlog producer1.produce(2); // wait for backlog to accumulate @@ -709,7 +709,7 @@ public void testReplicatorExpireMsgAsync() throws Exception { // Disconnect replicator to stop geo-replication, so new messages will form backlog pauseReplicator(replicator); - // Produce more messages while replication is paused �� these will accumulate as backlog + // Produce more messages while replication is paused, these will accumulate as backlog producer1.produce(2); // wait for backlog to accumulate