diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index e13a81b144d..55a347b89a5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -19,6 +19,23 @@ import com.alibaba.fastjson2.JSON; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import java.nio.ByteBuffer; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.broker.BrokerController; @@ -52,24 +69,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - public class PopConsumerService extends ServiceThread { private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); @@ -287,6 +286,12 @@ public void setFifoBlocked(PopConsumerContext context, } public boolean isFifoBlocked(PopConsumerContext context, String groupId, String topicId, int queueId) { + // If server-side reset offset is enabled, and there is a reset offset, + // then return false to make sure that the reset offset takes effect. + if (brokerController.getBrokerConfig().isUseServerSideResetOffset() && + this.brokerController.getConsumerOffsetManager().hasOffsetReset(topicId, groupId, queueId)) { + return false; + } return brokerController.getConsumerOrderInfoManager().checkBlock( context.getAttemptId(), topicId, groupId, queueId, context.getInvisibleTime()); }