Only one retention cycle in progress at a time#2797
Only one retention cycle in progress at a time#2797He-Pin wants to merge 3 commits intoapache:mainfrom
Conversation
a6ef064 to
9519350
Compare
There was a problem hiding this comment.
Pull request overview
This PR updates the typed persistence retention flow to avoid overlapping retention cycles (snapshot/event deletion), which could otherwise interfere with each other.
Changes:
- Track retention-cycle progress in
BehaviorSetupand add a guard inRunningto avoid starting a new retention cycle while a previous one is still in progress. - Simplify snapshot deletion to delete snapshots up to a max sequence number (dropping the previous lower-bound window logic).
- Update retention-related tests to match the adjusted deletion criteria and retention behavior.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala | Adds the “only one retention at a time” guard and wires retention progress tracking into snapshot/event/snapshot deletion flow. |
| persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala | Introduces retentionInProgress state and helper methods to track retention lifecycle across async steps. |
| persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala | Changes snapshot deletion to always use minSequenceNr = 0L and updates logging/signature accordingly. |
| persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala | Removes now-unused deleteLowerSequenceNr logic from internal retention criteria implementation. |
| persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala | Updates BehaviorSetup construction to pass the new retentionInProgress parameter. |
| persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaSpec.scala | Adjusts expectations to match removal of lower-bound deletion logic. |
| persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala | Updates test BehaviorSetup construction for the new parameter. |
| persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala | Updates assertions to match snapshot deletion criteria changes (min sequence no longer asserted). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| 18 -> (6 -> 12), | ||
| 20 -> (8 -> 14)) | ||
| val expected = | ||
| List(1 -> 0, 3 -> 0, 4 -> 0, 6 -> 0, 7 -> 1, 9 -> 3, 10 -> 4, 12 -> 6, 13 -> 7, 15 -> 9, 18 -> 12, 20 -> 14) |
There was a problem hiding this comment.
This expected list is now on a single very long line that likely exceeds the repo’s scalafmt maxColumn = 120 (see .scalafmt.conf). Please reformat (multi-line) to avoid formatting/lint failures and keep the test readable.
| List(1 -> 0, 3 -> 0, 4 -> 0, 6 -> 0, 7 -> 1, 9 -> 3, 10 -> 4, 12 -> 6, 13 -> 7, 15 -> 9, 18 -> 12, 20 -> 14) | |
| List( | |
| 1 -> 0, | |
| 3 -> 0, | |
| 4 -> 0, | |
| 6 -> 0, | |
| 7 -> 1, | |
| 9 -> 3, | |
| 10 -> 4, | |
| 12 -> 6, | |
| 13 -> 7, | |
| 15 -> 9, | |
| 18 -> 12, | |
| 20 -> 14) |
| if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null || skipRetention()) { | ||
| val newState = applySideEffects(sideEffects, state) | ||
|
|
||
| onWriteDone(setup.context, p) | ||
|
|
||
| tryUnstashOne(newState) | ||
| } else { |
There was a problem hiding this comment.
When a retention cycle is already in progress, this branch skips the entire snapshot+retention step (because skipRetention() is part of the condition), rather than actually deferring the retention request until completion as described in the PR description. This can reduce snapshot frequency (since the snapshot itself is skipped) and may increase recovery time if the actor stops before the next snapshot cycle. Consider still saving the snapshot but deferring only the delete-events/delete-snapshots part (or update the PR description/logging to reflect that retention is dropped rather than deferred).
| // only one retention process at a time | ||
| val inProgress = shouldSnapshotAfterPersist == SnapshotWithRetention && setup.isRetentionInProgress() | ||
| if (inProgress) | ||
| setup.internalLogger.info( |
There was a problem hiding this comment.
Skipping retention ... is logged at INFO every time a snapshot-with-retention is attempted while a prior retention is still running. Under load this could generate a lot of log volume/noise. Consider lowering to DEBUG (or otherwise rate-limiting) since it's an internal, potentially high-frequency condition.
| setup.internalLogger.info( | |
| setup.internalLogger.debug( |
| protected def internalDeleteSnapshots(toSequenceNr: Long): Unit = { | ||
| if (toSequenceNr > 0) { | ||
| val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = fromSequenceNr, maxSequenceNr = toSequenceNr) | ||
| setup.internalLogger.debug2("Deleting snapshots from sequenceNr [{}] to [{}]", fromSequenceNr, toSequenceNr) | ||
| val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = 0L, maxSequenceNr = toSequenceNr) | ||
| setup.internalLogger.debug("Deleting snapshots to sequenceNr [{}]", toSequenceNr) | ||
| setup.snapshotStore | ||
| .tell(SnapshotProtocol.DeleteSnapshots(setup.persistenceId.id, snapshotCriteria), setup.selfClassic) | ||
| } |
There was a problem hiding this comment.
internalDeleteSnapshots now always deletes with minSequenceNr = 0L (i.e., from the beginning) instead of using a bounded lower sequenceNr window. For snapshot stores where range deletion performance depends on the size of the range, repeatedly deleting from 0..N can be significantly more expensive than deleting in chunks. If the intent is purely to prevent overlapping retention cycles, consider restoring a moving lower bound (or add a short rationale here explaining why deleting from 0 is required/acceptable for Pekko’s supported snapshot stores).
832736a to
8cac580
Compare
Track retention lifecycle steps with mutable retentionInProgress state in BehaviorSetup. Key changes: - Add retentionInProgress flag and 6 progress tracking methods with detailed debug logging to BehaviorSetup. - Skip new retention cycle when previous one has not completed yet, logging at INFO level. Next retention will cover skipped retention. - Simplify internalDeleteSnapshots to always use minSequenceNr=0, preventing leftover snapshots when retention is skipped. - Remove now-unnecessary deleteLowerSequenceNr from SnapshotCountRetentionCriteriaImpl. - Fix upstream logging placeholder mismatch bug in retentionProgressDeleteEventsEnded (2 placeholders, 1 argument). The retention process for SnapshotCountRetentionCriteria: 1. Save snapshot when shouldSnapshotAfterPersist returns SnapshotWithRetention. 2. Delete events (when deleteEventsOnSnapshot=true), in background. 3. Delete snapshots (when isOnlyOneSnapshot=false), in background. Upstream: akka/akka-core@57b750a3dc Cherry-picked from akka/akka-core v2.8.0, which is now Apache licensed. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ationale comments - Reformat RetentionCriteriaSpec expected list to multi-line for readability - Change 'Skipping retention' log level from INFO to DEBUG to avoid log noise - Add design rationale comment explaining why snapshot+retention are skipped together (prevents orphaned snapshots that would never be cleaned up) - Add Scaladoc explaining why minSequenceNr=0L is used (simplifies logic, safe for built-in snapshot stores) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
8cac580 to
bd445ef
Compare
|
compilation issues |
Motivation
Port the upstream "only one retention cycle in progress at a time" guard from akka/akka-core commit
57b750a3dc(which is now Apache licensed) to Pekko's event-sourced persistence.When
SnapshotCountRetentionCriteriais configured, a full retention cycle (snapshot → delete events → delete snapshots) can take significant time. If events are persisted faster than retention completes, overlapping cycles can race against each other, causing non-deterministic ordering of snapshot/event deletions and test flakiness.Modification
Core change —
BehaviorSetup:retentionInProgressflag with lifecycle tracking methods (retentionProgressSaveSnapshotStarted,retentionProgressSaveSnapshotEnded,retentionProgressDeleteEventsStarted,retentionProgressDeleteEventsEnded,retentionProgressDeleteSnapshotsStarted,retentionProgressDeleteSnapshotsEnded)Running.scala:SnapshotWithRetentiontriggers, checksetup.isRetentionInProgress()ExternalInteractions.scala:internalDeleteSnapshots— always delete fromminSequenceNr = 0Linstead of tracking a windowed lower bound. This simplifies the logic and is safe because Pekko's built-in snapshot stores handle range deletions efficiently.fromSequenceNrparameterRetentionCriteriaImpl.scala:deleteLowerSequenceNrmethod (no longer needed)Tests:
EventSourcedBehaviorRetentionSpec— update allexpectDeleteSnapshotCompletedcalls to single-param formRetentionCriteriaSpec— removedeleteLowerSequenceNrassertions, reformat long line to multi-line for readabilityResult
References