Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,12 @@ public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) {

@Override
public CompletableFuture<Void> resetCursor(long timestamp) {
CompletableFuture<Void> future = new CompletableFuture<>();
if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) {
return CompletableFuture.failedFuture(new SubscriptionBusyException("Failed to fence subscription"));
}

final CompletableFuture<Void> future = new CompletableFuture<>();
inProgressResetCursorFuture = future;
PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder(topicName, cursor,
config.getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());

Expand All @@ -854,6 +859,8 @@ public void findEntryComplete(Position position, Object ctx) {
log.warn("[{}][{}] Unable to find position for timestamp {}."
+ " Unable to reset cursor to first position",
topicName, subName, timestamp);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
inProgressResetCursorFuture = null;
future.completeExceptionally(
new SubscriptionInvalidCursorPosition(
"Unable to find position for specified timestamp"));
Expand All @@ -866,14 +873,16 @@ public void findEntryComplete(Position position, Object ctx) {
} else {
finalPosition = position.getNext();
}
CompletableFuture<Void> resetCursorFuture = resetCursor(finalPosition);
CompletableFuture<Void> resetCursorFuture = resetCursorInternal(finalPosition, future, true);
FutureUtil.completeAfter(future, resetCursorFuture);
}

@Override
public void findEntryFailed(ManagedLedgerException exception,
Optional<Position> failedReadPosition, Object ctx) {
// todo - what can go wrong here that needs to be retried?
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
inProgressResetCursorFuture = null;
if (exception instanceof ConcurrentFindCursorPositionException) {
future.completeExceptionally(new SubscriptionBusyException(exception.getMessage()));
} else {
Expand All @@ -887,11 +896,17 @@ public void findEntryFailed(ManagedLedgerException exception,

@Override
public CompletableFuture<Void> resetCursor(Position finalPosition) {
if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) {
final CompletableFuture<Void> future = new CompletableFuture<>();
return resetCursorInternal(finalPosition, future, false);
}

private CompletableFuture<Void> resetCursorInternal(Position finalPosition, CompletableFuture<Void> future,
boolean alreadyFenced) {
if (!alreadyFenced
&& !IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) {
return CompletableFuture.failedFuture(new SubscriptionBusyException("Failed to fence subscription"));
}

final CompletableFuture<Void> future = new CompletableFuture<>();
inProgressResetCursorFuture = future;
final CompletableFuture<Void> disconnectFuture;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -36,6 +41,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -457,6 +463,62 @@ public void run() {
}
}

@Test
public void testConcurrentResetCursorByTimestamp() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testConcurrentResetTimestamp_"
+ System.currentTimeMillis();
final String subscriptionName = "test-sub-name";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
assertEquals(topicRef.getProducers().size(), 1);

for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

PersistentSubscription subscription = (PersistentSubscription) topicRef.getSubscription(subscriptionName);

ManagedCursor originalCursor = subscription.getCursor();
ManagedCursor spyCursor = spy(originalCursor);

Field cursorField = PersistentSubscription.class.getDeclaredField("cursor");
cursorField.setAccessible(true);
cursorField.set(subscription, spyCursor);

AtomicInteger findCallCount = new AtomicInteger(0);
doAnswer(invocation -> {
findCallCount.incrementAndGet();
return invocation.callRealMethod();
}).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any(), any(), any(), anyBoolean());

long resetTimestamp = System.currentTimeMillis();
List<CompletableFuture<Void>> futures = new ArrayList<>();
CyclicBarrier barrier = new CyclicBarrier(4);

for (int i = 0; i < 4; i++) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
barrier.await();
admin.topics().resetCursor(topicName, subscriptionName, resetTimestamp);
} catch (Exception e) {
}
});
futures.add(future);
}

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

assertEquals(findCallCount.get(), 1,
"asyncFindNewestMatching should only be called once due to subscription fencing");
}

@Test
public void testSeekOnPartitionedTopic() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/testSeekPartitions";
Expand Down
Loading