From 4db39e8fdd9f52ed78a53150e5214f97b9d94bc3 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Thu, 15 Jan 2026 19:23:54 -0800 Subject: [PATCH 1/3] [fix][broker] Fence reset cursor by timestamp to avoid concurrent resets --- .../persistent/PersistentSubscription.java | 23 +++++++-- .../broker/service/SubscriptionSeekTest.java | 51 +++++++++++++++++++ 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 786b902b3baca..a5cd832e99f2c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -835,7 +835,12 @@ public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) { @Override public CompletableFuture resetCursor(long timestamp) { - CompletableFuture future = new CompletableFuture<>(); + if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) { + return CompletableFuture.failedFuture(new SubscriptionBusyException("Failed to fence subscription")); + } + + final CompletableFuture future = new CompletableFuture<>(); + inProgressResetCursorFuture = future; PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder(topicName, cursor, config.getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis()); @@ -854,6 +859,8 @@ public void findEntryComplete(Position position, Object ctx) { log.warn("[{}][{}] Unable to find position for timestamp {}." + " Unable to reset cursor to first position", topicName, subName, timestamp); + IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + inProgressResetCursorFuture = null; future.completeExceptionally( new SubscriptionInvalidCursorPosition( "Unable to find position for specified timestamp")); @@ -866,7 +873,7 @@ public void findEntryComplete(Position position, Object ctx) { } else { finalPosition = position.getNext(); } - CompletableFuture resetCursorFuture = resetCursor(finalPosition); + CompletableFuture resetCursorFuture = resetCursorInternal(finalPosition, future, true); FutureUtil.completeAfter(future, resetCursorFuture); } @@ -874,6 +881,8 @@ public void findEntryComplete(Position position, Object ctx) { public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { // todo - what can go wrong here that needs to be retried? + IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); + inProgressResetCursorFuture = null; if (exception instanceof ConcurrentFindCursorPositionException) { future.completeExceptionally(new SubscriptionBusyException(exception.getMessage())); } else { @@ -887,11 +896,17 @@ public void findEntryFailed(ManagedLedgerException exception, @Override public CompletableFuture resetCursor(Position finalPosition) { - if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) { + final CompletableFuture future = new CompletableFuture<>(); + return resetCursorInternal(finalPosition, future, false); + } + + private CompletableFuture resetCursorInternal(Position finalPosition, CompletableFuture future, + boolean alreadyFenced) { + if (!alreadyFenced + && !IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) { return CompletableFuture.failedFuture(new SubscriptionBusyException("Failed to fence subscription")); } - final CompletableFuture future = new CompletableFuture<>(); inProgressResetCursorFuture = future; final CompletableFuture disconnectFuture; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index 2b9924d1d5b41..7f3ac60917b07 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -457,6 +458,56 @@ public void run() { } } + @Test + public void testConcurrentResetCursorByTimestamp() throws Exception { + final String topicName = "persistent://prop/use/ns-abc/testConcurrentResetTimestamp_" + + System.currentTimeMillis(); + final String subscriptionName = "test-sub-name"; + + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topicName).create(); + + admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); + + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + assertNotNull(topicRef); + assertEquals(topicRef.getProducers().size(), 1); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + long resetTimestamp = System.currentTimeMillis(); + List exceptions = new CopyOnWriteArrayList<>(); + class ResetCursorThread extends Thread { + public void run() { + try { + admin.topics().resetCursor(topicName, subscriptionName, resetTimestamp); + } catch (PulsarAdminException e) { + exceptions.add(e); + } + } + } + + List resetCursorThreads = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + ResetCursorThread thread = new ResetCursorThread(); + resetCursorThreads.add(thread); + } + for (int i = 0; i < 4; i++) { + resetCursorThreads.get(i).start(); + } + for (int i = 0; i < 4; i++) { + resetCursorThreads.get(i).join(); + } + + assertTrue(exceptions.size() > 0); + for (PulsarAdminException exception : exceptions) { + assertTrue(exception.getMessage().contains("Failed to fence subscription")); + } + } + @Test public void testSeekOnPartitionedTopic() throws Exception { final String topicName = "persistent://prop/use/ns-abc/testSeekPartitions"; From 6b08dcd5f335f40561accb4b6f55e67803acf21f Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Thu, 15 Jan 2026 19:48:22 -0800 Subject: [PATCH 2/3] Improve test reliability by verifying findMessages call count --- .../broker/service/SubscriptionSeekTest.java | 54 +++++++++++-------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index 7f3ac60917b07..5f9b6b291d44e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -20,12 +20,17 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -37,6 +42,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -478,34 +484,40 @@ public void testConcurrentResetCursorByTimestamp() throws Exception { producer.send(message.getBytes()); } + PersistentSubscription subscription = (PersistentSubscription) topicRef.getSubscription(subscriptionName); + + ManagedCursor originalCursor = subscription.getCursor(); + ManagedCursor spyCursor = spy(originalCursor); + + Field cursorField = PersistentSubscription.class.getDeclaredField("cursor"); + cursorField.setAccessible(true); + cursorField.set(subscription, spyCursor); + + AtomicInteger findCallCount = new AtomicInteger(0); + doAnswer(invocation -> { + findCallCount.incrementAndGet(); + return invocation.callRealMethod(); + }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any(), any(), any(), anyBoolean()); + long resetTimestamp = System.currentTimeMillis(); - List exceptions = new CopyOnWriteArrayList<>(); - class ResetCursorThread extends Thread { - public void run() { + List> futures = new ArrayList<>(); + CyclicBarrier barrier = new CyclicBarrier(4); + + for (int i = 0; i < 4; i++) { + CompletableFuture future = CompletableFuture.runAsync(() -> { try { + barrier.await(); admin.topics().resetCursor(topicName, subscriptionName, resetTimestamp); - } catch (PulsarAdminException e) { - exceptions.add(e); + } catch (Exception e) { } - } + }); + futures.add(future); } - List resetCursorThreads = new ArrayList<>(); - for (int i = 0; i < 4; i++) { - ResetCursorThread thread = new ResetCursorThread(); - resetCursorThreads.add(thread); - } - for (int i = 0; i < 4; i++) { - resetCursorThreads.get(i).start(); - } - for (int i = 0; i < 4; i++) { - resetCursorThreads.get(i).join(); - } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); - assertTrue(exceptions.size() > 0); - for (PulsarAdminException exception : exceptions) { - assertTrue(exception.getMessage().contains("Failed to fence subscription")); - } + assertEquals(findCallCount.get(), 1, + "asyncFindNewestMatching should only be called once due to subscription fencing"); } @Test From 70b15f4dae00084f0e58c154c3c2e963d7fc9827 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Thu, 15 Jan 2026 19:53:58 -0800 Subject: [PATCH 3/3] Fix code style issue --- .../org/apache/pulsar/broker/service/SubscriptionSeekTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index 5f9b6b291d44e..556a3124522c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -41,7 +41,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;