diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 38946fd1611..42391e79a30 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -310,6 +310,9 @@ public long getMinOffsetInQueue(String topic, int queueId) { if (minOffsetInTieredStore < 0) { return minOffsetInNextStore; } + if (minOffsetInNextStore < 0) { + return minOffsetInTieredStore; + } return Math.min(minOffsetInNextStore, minOffsetInTieredStore); } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java index f88779f09b2..5134a016f69 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java @@ -240,6 +240,12 @@ public void testGetMinOffsetInQueue() { Mockito.when(flatFile.getConsumeQueueMinOffset()).thenReturn(10L); Assert.assertEquals(10L, currentStore.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId())); + + // When local store returns -1 (no valid offset), tiered store offset should be used + long tieredOffset = flatFile.getConsumeQueueMinOffset(); + Assert.assertTrue("tiered offset should be valid for this test", tieredOffset >= 0); + Mockito.when(defaultStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(-1L); + Assert.assertEquals(tieredOffset, currentStore.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId())); } @Test