Skip to content

Commit fcde485

Browse files
Fix hasMessageAvailable will return true after seeking to a timestamp newer than the last message (#556)
1 parent 64fbda0 commit fcde485

2 files changed

Lines changed: 41 additions & 18 deletions

File tree

lib/ConsumerImpl.cc

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1642,26 +1642,31 @@ void ConsumerImpl::hasMessageAvailableAsync(const HasMessageAvailableCallback& c
16421642
callback(result, {});
16431643
return;
16441644
}
1645-
auto handleResponse = [self, response, callback] {
1645+
bool lastSeekIsByTimestamp = false;
1646+
{
1647+
LockGuard lock{self->mutex_};
1648+
if (self->lastSeekArg_.has_value() &&
1649+
std::holds_alternative<SeekTimestampType>(self->lastSeekArg_.value())) {
1650+
lastSeekIsByTimestamp = true;
1651+
}
1652+
}
1653+
auto handleResponse = [self, lastSeekIsByTimestamp, response, callback] {
16461654
if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) {
16471655
// We only care about comparing ledger ids and entry ids as mark delete position
16481656
// doesn't have other ids such as batch index
16491657
auto compareResult = compareLedgerAndEntryId(response.getMarkDeletePosition(),
16501658
response.getLastMessageId());
1651-
callback(ResultOk, self->config_.isStartMessageIdInclusive() ? compareResult <= 0
1652-
: compareResult < 0);
1659+
// When the consumer has sought by timestamp, broker will ignore the
1660+
// startMessageIdInclusive config, so the compare should still be exclusive
1661+
if (lastSeekIsByTimestamp || !self->config_.isStartMessageIdInclusive()) {
1662+
callback(ResultOk, compareResult < 0);
1663+
} else {
1664+
callback(ResultOk, compareResult <= 0);
1665+
}
16531666
} else {
16541667
callback(ResultOk, false);
16551668
}
16561669
};
1657-
bool lastSeekIsByTimestamp = false;
1658-
{
1659-
LockGuard lock{self->mutex_};
1660-
if (self->lastSeekArg_.has_value() &&
1661-
std::holds_alternative<SeekTimestampType>(self->lastSeekArg_.value())) {
1662-
lastSeekIsByTimestamp = true;
1663-
}
1664-
}
16651670
if (self->config_.isStartMessageIdInclusive() && !lastSeekIsByTimestamp) {
16661671
self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) {
16671672
if (result != ResultOk) {

tests/ReaderTest.cc

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424

2525
#include <atomic>
2626
#include <chrono>
27+
#include <cstdint>
2728
#include <functional>
2829
#include <future>
30+
#include <limits>
2931
#include <set>
3032
#include <string>
3133
#include <thread>
@@ -865,13 +867,7 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekToEnd) {
865867
}
866868

867869
ASSERT_EQ(ResultOk, reader.seek(MessageId::latest()));
868-
// After seek-to-end the broker may close the consumer and trigger reconnect; allow a short
869-
// delay for hasMessageAvailable to become false (avoids flakiness when reconnect completes).
870-
for (int i = 0; i < 50; i++) {
871-
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
872-
if (!hasMessageAvailable) break;
873-
std::this_thread::sleep_for(std::chrono::milliseconds(100));
874-
}
870+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
875871
ASSERT_FALSE(hasMessageAvailable);
876872

877873
producer.send(MessageBuilder().setContent("msg-2").build());
@@ -983,6 +979,28 @@ TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) {
983979
assertStartMessageId(false, secondMsgId);
984980
}
985981

982+
TEST_P(ReaderSeekTest, testSeekToEndByTimestamp) {
983+
auto topic = "test-seek-to-end-by-timestamp-" + std::to_string(time(nullptr));
984+
Producer producer;
985+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
986+
987+
ReaderConfiguration readerConf;
988+
readerConf.setStartMessageIdInclusive(GetParam());
989+
990+
Reader reader;
991+
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), readerConf, reader));
992+
993+
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build()));
994+
// Server side (Java) uses signal 64 bits integers to represent the timestamp, so use max int64_t here to
995+
// seek to the end of topic.
996+
auto now = std::numeric_limits<int64_t>::max();
997+
ASSERT_EQ(ResultOk, reader.seek(now));
998+
999+
bool hasMessageAvailable;
1000+
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
1001+
ASSERT_FALSE(hasMessageAvailable);
1002+
}
1003+
9861004
// Regression test for segfault when Reader is used with messageListenerThreads=0.
9871005
// Verifies ExecutorServiceProvider(0) does not cause undefined behavior and
9881006
// ConsumerImpl::messageReceived does not dereference null listenerExecutor_.

0 commit comments

Comments
 (0)