From 9b5a7ac6036cfe2e4e796cda8509d8557e4cc555 Mon Sep 17 00:00:00 2001 From: gosonzhang Date: Thu, 9 Apr 2026 16:01:04 +0800 Subject: [PATCH] [improve][broker] Enhanced process logic of `PersistentReplicator.cancellationPendingReadTasks` --- .../persistent/PersistentReplicator.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index d56f214751456..9167607853fef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -945,24 +945,21 @@ protected void doRewindCursor(boolean triggerReadMoreEntries) { } private void cancelPendingReadTasks(boolean canceledPendingRead) { - InFlightTask readingTask = null; + int pendingReadCount = 0; synchronized (inFlightTasks) { for (InFlightTask task : inFlightTasks) { task.setSkipReadResultDueToCursorRewind(true); if (task.entries == null) { - if (readingTask != null) { - log.error("Unexpected state because there are more than one tasks' state is pending read. {}", - inFlightTasks); + if (canceledPendingRead) { + task.setEntries(Collections.emptyList()); } - readingTask = task; + pendingReadCount++; } } - // Correct state to avoid a replicate stuck because a pending reading task occupies permits. - // There is at most one reading task. - // The task will never receive a read completed callback if cancel pending reading successfully. - if (canceledPendingRead && readingTask != null) { - readingTask.setEntries(Collections.emptyList()); - } + } + if (pendingReadCount > 1) { + log.error("Found Geo ({}) unexpected state {} tasks are pending read. {}", + replicatorId, pendingReadCount, inFlightTasks); } }