Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ wd_cc_library(
":io-gate",
":trace",
"//src/workerd/jsg:exception",
"//src/workerd/util:autogate",
"//src/workerd/util:duration-exceeded-logger",
"//src/workerd/util:sqlite",
"@capnp-cpp//src/capnp:capnp-rpc",
Expand Down
172 changes: 172 additions & 0 deletions src/workerd/io/actor-cache-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,16 @@ struct ActorCacheTest: public ActorCacheConvenienceWrappers {
gateBrokenPromise(options.monitorOutputGate ? eagerlyReportExceptions(gate.onBroken())
: kj::Promise<void>(kj::READY_NOW)) {}

// Simulates `count` counted alarm handler failures for `alarmTime`, leaving the cache
// in KnownAlarmTime{CLEAN, alarmTime} as AlarmManager would after each retry.
void simulateCountedAlarmRetries(kj::Date alarmTime, int count = 6) {
for (auto i = 0; i < count; i++) {
auto armResult = cache.armAlarmHandler(alarmTime, nullptr, kj::UNIX_EPOCH);
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
cache.cancelDeferredAlarmDeletion();
}
}

~ActorCacheTest() noexcept(false) {
// Make sure if the output gate has been broken, the exception was reported. This is important
// to report errors thrown inside flush(), since those won't otherwise propagate into the test
Expand Down Expand Up @@ -5773,5 +5783,167 @@ KJ_TEST("ActorCache can shutdown") {
});
}

KJ_TEST("ActorCache alarm cleared by abandonAlarm after max counted retry failures") {
// After the alarm scheduler calls abandonAlarm(), the cache correctly forgets the alarm.

ActorCacheTest test;
auto& ws = test.ws;
auto& mockStorage = test.mockStorage;

auto oneMs = 1 * kj::MILLISECONDS + kj::UNIX_EPOCH;

test.setAlarm(oneMs);
mockStorage->expectCall("setAlarm", ws)
.withParams(CAPNP(scheduledTimeMs = 1))
.thenReturn(CAPNP());

// Simulate ALARM_RETRY_MAX_TRIES (= 6) counted handler failures.
// cancelDeferredAlarmDeletion() preserves KnownAlarmTime{CLEAN, oneMs} on each failure
// (alarm still set from cache perspective -- correct for retries 1-5).
test.simulateCountedAlarmRetries(oneMs);

// The alarm scheduler has decided to give up. It calls abandonAlarm() on the actor,
// which clears KnownAlarmTime{CLEAN, oneMs} -> KnownAlarmTime{CLEAN, null}.
auto result = test.cache.abandonAlarm(oneMs).wait(ws);

// Returns kj::none: alarm was cleared, AlarmManager should not re-register.
KJ_ASSERT(result == kj::none);

// getAlarm() now returns null from cache (no storage read).
auto time = expectCached(test.getAlarm());
KJ_ASSERT(time == kj::none);
}

KJ_TEST("ActorCache alarm preserved after ALARM_RETRY_MAX_TRIES uncounted (internal) failures") {
// When all ALARM_RETRY_MAX_TRIES failures are uncounted (retryCountsAgainstLimit=false,
// i.e. infrastructure errors), the alarm scheduler's countedRetry never reaches the limit and
// abandonAlarm is NEVER called. The alarm must remain set throughout so that the scheduler
// can keep retrying indefinitely until the infrastructure issue resolves.

ActorCacheTest test;
auto& ws = test.ws;
auto& mockStorage = test.mockStorage;

auto oneMs = 1 * kj::MILLISECONDS + kj::UNIX_EPOCH;
auto testCurrentTime = kj::UNIX_EPOCH;

test.setAlarm(oneMs);
mockStorage->expectCall("setAlarm", ws)
.withParams(CAPNP(scheduledTimeMs = 1))
.thenReturn(CAPNP());

// Simulate uncounted failures well past ALARM_RETRY_MAX_TRIES (= 6).
// countedRetry stays at 0; AlarmManager never gives up; abandonAlarm is never called.
// We've seen alarms fail hundreds of times due to infrastructure errors in production,
// so we check both at the boundary (6) and well beyond it (100).
for (auto i = 0; i < 100; i++) {
auto armResult = test.cache.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
test.cache.cancelDeferredAlarmDeletion();

// Check at the ALARM_RETRY_MAX_TRIES boundary and at the end.
if (i == 5 || i == 99) {
auto time = expectCached(test.getAlarm());
KJ_ASSERT(time == oneMs);
}
}
}

KJ_TEST("ActorCache abandonAlarm is a no-op when a newer alarm has replaced the abandoned one") {
// If the user sets a new alarm between the last retry failure and the abandonAlarm() call,
// and that new alarm has already flushed to CLEAN, abandonAlarm() must compare the time
// and leave the new alarm untouched.

ActorCacheTest test;
auto& ws = test.ws;
auto& mockStorage = test.mockStorage;

auto oneMs = 1 * kj::MILLISECONDS + kj::UNIX_EPOCH;
auto twoMs = 2 * kj::MILLISECONDS + kj::UNIX_EPOCH;

// Set the original alarm and flush it to CLEAN.
test.setAlarm(oneMs);
mockStorage->expectCall("setAlarm", ws)
.withParams(CAPNP(scheduledTimeMs = 1))
.thenReturn(CAPNP());

// Simulate ALARM_RETRY_MAX_TRIES (= 6) counted failures.
test.simulateCountedAlarmRetries(oneMs);

// Race: user sets a new alarm (twoMs) between the last failure and abandonAlarm().
// It flushes to CLEAN before abandonAlarm() arrives, leaving KnownAlarmTime{CLEAN, twoMs}.
test.setAlarm(twoMs);
mockStorage->expectCall("setAlarm", ws)
.withParams(CAPNP(scheduledTimeMs = 2))
.thenReturn(CAPNP());
// Advance the event loop to process the storage response and complete the FLUSHING→CLEAN
// transition. Without this poll, the state is still FLUSHING when abandonAlarm runs, and
// the existing status check would protect it by accident, hiding the time-check regression.
ws.poll();

// abandonAlarm() for the original oneMs alarm must be a no-op: storedTime (twoMs) !=
// scheduledTime (oneMs), so the time check prevents clearing the new alarm.
// Returns twoMs so AlarmManager can re-register the actor's real alarm.
auto result = test.cache.abandonAlarm(oneMs).wait(ws);

KJ_ASSERT(KJ_ASSERT_NONNULL(result) == twoMs);

// getAlarm() must still return twoMs -- the new alarm was NOT incorrectly cleared.
auto time = expectCached(test.getAlarm());
KJ_ASSERT(time == twoMs);
}

KJ_TEST("ActorCache abandonAlarm returns kj::none when no alarm is stored") {
ActorCacheTest test;
auto& ws = test.ws;

// No alarm ever set. abandonAlarm should return kj::none.
auto result = test.cache.abandonAlarm(1 * kj::MILLISECONDS + kj::UNIX_EPOCH).wait(ws);
KJ_ASSERT(result == kj::none);
}

KJ_TEST("ActorCache abandonAlarm returns kj::none when alarm is DIRTY, not the uncommitted time") {
// If the user set a new alarm T2 that hasn't flushed to storage yet (DIRTY state),
// abandonAlarm must NOT return T2 to AlarmManager for re-registration.
// Returning an uncommitted time would let AlarmManager fire an alarm that was never
// acknowledged by CRDB, potentially racing with the flush. Returns kj::none instead.

ActorCacheTest test;
auto& ws = test.ws;
auto& mockStorage = test.mockStorage;

auto oneMs = 1 * kj::MILLISECONDS + kj::UNIX_EPOCH;
auto twoMs = 2 * kj::MILLISECONDS + kj::UNIX_EPOCH;

// Commit T0 to CLEAN.
test.setAlarm(oneMs);
mockStorage->expectCall("setAlarm", ws)
.withParams(CAPNP(scheduledTimeMs = 1))
.thenReturn(CAPNP());

// Simulate max counted retries for T0.
test.simulateCountedAlarmRetries(oneMs);

// User sets T2. State becomes DIRTY (flush task queued but not yet delivered to storage).
test.setAlarm(twoMs);

// Set up expectation for the T2 flush before driving the event loop.
mockStorage->expectCall("setAlarm", ws)
.withParams(CAPNP(scheduledTimeMs = 2))
.thenReturn(CAPNP());

// abandonAlarm(T0) arrives while T2 is DIRTY. The CLEAN status guard prevents returning
// the uncommitted T2 time -- AlarmManager must not re-register based on it.
auto result = test.cache.abandonAlarm(oneMs).wait(ws);
KJ_ASSERT(result == kj::none);

// Let the T2 flush complete normally.
ws.poll();

// T2 is still in cache -- the DIRTY alarm was preserved, not cleared.
auto time = expectCached(test.getAlarm());
KJ_ASSERT(time == twoMs);
}

} // namespace
} // namespace workerd
18 changes: 18 additions & 0 deletions src/workerd/io/actor-cache.c++
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,24 @@ void ActorCache::cancelDeferredAlarmDeletion() {
}
}

kj::Promise<kj::Maybe<kj::Date>> ActorCache::abandonAlarm(kj::Date scheduledTime) {
KJ_IF_SOME(t, currentAlarmTime.tryGet<KnownAlarmTime>()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked the comment that was here previously, explaining the cases in which we were protecting against deleting the alarm. Was there a reason to remove it?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KJ_IF_SOME(storedTime, t.time) {
if (t.status == KnownAlarmTime::Status::CLEAN) {
if (storedTime == scheduledTime) {
currentAlarmTime = KnownAlarmTime{
.status = KnownAlarmTime::Status::CLEAN, .time = kj::none, .noCache = t.noCache};
return kj::Maybe<kj::Date>(kj::none);
} else {
// The user set a different alarm. Return it so AlarmManager can re-register.
return kj::Maybe<kj::Date>(storedTime);
}
}
}
}
return kj::Maybe<kj::Date>(kj::none);
}

kj::Maybe<kj::Promise<void>> ActorCache::getBackpressure() {
if (dirtyList.sizeInBytes() > lru.options.dirtyListByteLimit && !lru.options.neverFlush) {
// Wait for dirty entries to be flushed.
Expand Down
9 changes: 9 additions & 0 deletions src/workerd/io/actor-cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ class ActorCacheInterface: public ActorCacheOps {

virtual void cancelDeferredAlarmDeletion() = 0;

// Called by AlarmManager when it has given up retrying an alarm after too many counted failures.
// Implementations should clear the alarm from their local state so getAlarm() reflects the
// deletion. Returns the stored alarm time if it differs from scheduledTime (the user set a new
// alarm), or kj::none if the alarm was cleared or no alarm was stored.
virtual kj::Promise<kj::Maybe<kj::Date>> abandonAlarm(kj::Date scheduledTime) {
return kj::Maybe<kj::Date>(kj::none);
}

virtual kj::Maybe<kj::Promise<void>> onNoPendingFlush(SpanParent parentSpan) = 0;

// Implements the respective PITR API calls. The default implementations throw JSG errors saying
Expand Down Expand Up @@ -380,6 +388,7 @@ class ActorCache final: public ActorCacheInterface {
bool noCache = false,
kj::StringPtr actorId = "") override;
void cancelDeferredAlarmDeletion() override;
kj::Promise<kj::Maybe<kj::Date>> abandonAlarm(kj::Date scheduledTime) override;
kj::Maybe<kj::Promise<void>> onNoPendingFlush(SpanParent parentSpan) override;
// See ActorCacheInterface

Expand Down
Loading
Loading