diff --git a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala index 9972cd77c92..c4d5945cd1e 100644 --- a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala +++ b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala @@ -114,13 +114,12 @@ object EventSourcedBehaviorRetentionSpec extends Matchers { completed } - def expectDeleteSnapshotCompleted(maxSequenceNr: Long, minSequenceNr: Long): DeleteSnapshotsCompleted = { + def expectDeleteSnapshotCompleted(maxSequenceNr: Long): DeleteSnapshotsCompleted = { val wrapped = probe.expectMessageType[WrappedSignal] wrapped.signal shouldBe a[DeleteSnapshotsCompleted] val signal = wrapped.signal.asInstanceOf[DeleteSnapshotsCompleted] signal.target should ===( - DeletionTarget.Criteria( - SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr).withMinSequenceNr(minSequenceNr))) + DeletionTarget.Criteria(SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr))) signal } } @@ -284,25 +283,25 @@ class EventSourcedBehaviorRetentionSpec snapshotSignalProbe.expectSnapshotCompleted(3) snapshotSignalProbe.expectSnapshotCompleted(6) snapshotSignalProbe.expectSnapshotCompleted(9) - snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0) + snapshotSignalProbe.expectDeleteSnapshotCompleted(3) (1 to 3).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(13, (0 until 13).toVector)) snapshotSignalProbe.expectSnapshotCompleted(12) - snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 0) + snapshotSignalProbe.expectDeleteSnapshotCompleted(6) (1 to 3).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(16, (0 until 16).toVector)) snapshotSignalProbe.expectSnapshotCompleted(15) - snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3) + snapshotSignalProbe.expectDeleteSnapshotCompleted(9) (1 to 4).foreach(_ => persistentActor ! Increment) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(20, (0 until 20).toVector)) snapshotSignalProbe.expectSnapshotCompleted(18) - snapshotSignalProbe.expectDeleteSnapshotCompleted(12, 6) + snapshotSignalProbe.expectDeleteSnapshotCompleted(12) snapshotSignalProbe.expectNoMessage() } @@ -331,7 +330,7 @@ class EventSourcedBehaviorRetentionSpec // The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events // after that can be replayed after that snapshot, but replaying the events after toSequenceNr without // starting at the snapshot at toSequenceNr would be invalid. - snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0) + snapshotSignalProbe.expectDeleteSnapshotCompleted(2) // one at a time since snapshotting+event-deletion switches to running state before deleting snapshot so ordering // if sending many commands in one go is not deterministic @@ -339,7 +338,7 @@ class EventSourcedBehaviorRetentionSpec persistentActor ! Increment // 12 snapshotSignalProbe.expectSnapshotCompleted(12) eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 - snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0) + snapshotSignalProbe.expectDeleteSnapshotCompleted(5) persistentActor ! Increment // 13 persistentActor ! Increment // 14 @@ -347,7 +346,7 @@ class EventSourcedBehaviorRetentionSpec persistentActor ! Increment // 15 snapshotSignalProbe.expectSnapshotCompleted(15) eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 9 - snapshotSignalProbe.expectDeleteSnapshotCompleted(8, 2) + snapshotSignalProbe.expectDeleteSnapshotCompleted(8) persistentActor ! Increment // 16 persistentActor ! Increment // 17 @@ -355,7 +354,7 @@ class EventSourcedBehaviorRetentionSpec snapshotSignalProbe.expectSnapshotCompleted(18) eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 12 - snapshotSignalProbe.expectDeleteSnapshotCompleted(11, 5) + snapshotSignalProbe.expectDeleteSnapshotCompleted(11) eventProbe.expectNoMessage() snapshotSignalProbe.expectNoMessage() @@ -381,7 +380,7 @@ class EventSourcedBehaviorRetentionSpec (4 to 10).foreach(_ => persistentActor ! Increment) snapshotSignalProbe.expectSnapshotCompleted(5) snapshotSignalProbe.expectSnapshotCompleted(10) - snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0) + snapshotSignalProbe.expectDeleteSnapshotCompleted(5) (11 to 13).foreach(_ => persistentActor ! Increment) snapshotSignalProbe.expectSnapshotCompleted(13) @@ -395,7 +394,7 @@ class EventSourcedBehaviorRetentionSpec persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(16, (0 until 16).toVector)) snapshotSignalProbe.expectSnapshotCompleted(15) - snapshotSignalProbe.expectDeleteSnapshotCompleted(10, 5) + snapshotSignalProbe.expectDeleteSnapshotCompleted(10) eventProbe.within(3.seconds) { eventProbe.expectNoMessage() snapshotSignalProbe.expectNoMessage() @@ -438,18 +437,18 @@ class EventSourcedBehaviorRetentionSpec snapshotSignalProbe.expectSnapshotCompleted(8) // every-2 through criteria // triggers delete up to snapshot no 2 eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2 - snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) // then delete oldest snapshot + snapshotSignalProbe.expectDeleteSnapshotCompleted(1) // then delete oldest snapshot persistentActor ! Increment // 9 persistentActor ! Increment // 10 snapshotSignalProbe.expectSnapshotCompleted(10) // every-2 through criteria - snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0) + snapshotSignalProbe.expectDeleteSnapshotCompleted(3) eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4 persistentActor ! Increment // 11 persistentActor ! Increment // 12 snapshotSignalProbe.expectSnapshotCompleted(12) // every-2 through criteria - snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0) + snapshotSignalProbe.expectDeleteSnapshotCompleted(5) eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 persistentActor ! Increment // 13 @@ -463,13 +462,13 @@ class EventSourcedBehaviorRetentionSpec persistentActor ! Increment // 14 snapshotSignalProbe.expectSnapshotCompleted(14) // every-2 through criteria eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 8 - snapshotSignalProbe.expectDeleteSnapshotCompleted(7, 1) + snapshotSignalProbe.expectDeleteSnapshotCompleted(7) persistentActor ! Increment // 15 persistentActor ! Increment // 16 snapshotSignalProbe.expectSnapshotCompleted(16) // every-2 through criteria eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 10 - snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3) + snapshotSignalProbe.expectDeleteSnapshotCompleted(9) eventProbe.within(3.seconds) { eventProbe.expectNoMessage() @@ -501,31 +500,31 @@ class EventSourcedBehaviorRetentionSpec snapshotSignalProbe.expectSnapshotCompleted(2) snapshotSignalProbe.expectSnapshotCompleted(3) snapshotSignalProbe.expectSnapshotCompleted(4) - deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) + deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(1) persistentActor ! Increment snapshotSignalProbe.expectSnapshotCompleted(5) - deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0) + deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(2) persistentActor ! Increment snapshotSignalProbe.expectSnapshotCompleted(6) - deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0) + deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(3) persistentActor ! Increment snapshotSignalProbe.expectSnapshotCompleted(7) - deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1) + deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(4) persistentActor ! Increment snapshotSignalProbe.expectSnapshotCompleted(8) - deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2) + deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(5) persistentActor ! Increment snapshotSignalProbe.expectSnapshotCompleted(9) - deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3) + deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(6) persistentActor ! Increment snapshotSignalProbe.expectSnapshotCompleted(10) - deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(7, 4) + deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(7) persistentActor ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(10, (0 until 10).toVector)) @@ -554,32 +553,32 @@ class EventSourcedBehaviorRetentionSpec persistentActor ! Increment // 5 snapshotSignalProbe.expectSnapshotCompleted(5) eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2 - snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) + snapshotSignalProbe.expectDeleteSnapshotCompleted(1) persistentActor ! Increment // 6 snapshotSignalProbe.expectSnapshotCompleted(6) eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3 - snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0) + snapshotSignalProbe.expectDeleteSnapshotCompleted(2) persistentActor ! Increment // 7 snapshotSignalProbe.expectSnapshotCompleted(7) eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4 - snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0) + snapshotSignalProbe.expectDeleteSnapshotCompleted(3) persistentActor ! Increment // 8 snapshotSignalProbe.expectSnapshotCompleted(8) eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 5 - snapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1) + snapshotSignalProbe.expectDeleteSnapshotCompleted(4) persistentActor ! Increment // 9 snapshotSignalProbe.expectSnapshotCompleted(9) eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6 - snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2) + snapshotSignalProbe.expectDeleteSnapshotCompleted(5) persistentActor ! Increment // 10 snapshotSignalProbe.expectSnapshotCompleted(10) eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 7 - snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3) + snapshotSignalProbe.expectDeleteSnapshotCompleted(6) } "snapshot on recovery if expected snapshot is missing" in { diff --git a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala index 6124e83264a..ded56b0bf56 100644 --- a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala +++ b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala @@ -87,7 +87,8 @@ class EventSourcedBehaviorWatchSpec stashState = new StashState(context.asInstanceOf[ActorContext[InternalProtocol]], settings), replication = None, publishEvents = false, - internalLoggerFactory = () => logger) + internalLoggerFactory = () => logger, + retentionInProgress = false) "A typed persistent parent actor watching a child" must { diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala index 6e80c2b75eb..38b94166a03 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/BehaviorSetup.scala @@ -22,6 +22,7 @@ import pekko.actor.{ ActorRef => ClassicActorRef } import pekko.actor.Cancellable import pekko.actor.typed.Signal import pekko.actor.typed.scaladsl.ActorContext +import pekko.actor.typed.scaladsl.LoggerOps import pekko.annotation.InternalApi import pekko.persistence._ import pekko.persistence.typed.EventAdapter @@ -71,7 +72,8 @@ private[pekko] final class BehaviorSetup[C, E, S]( val stashState: StashState, val replication: Option[ReplicationSetup], val publishEvents: Boolean, - private val internalLoggerFactory: () => Logger) { + private val internalLoggerFactory: () => Logger, + private var retentionInProgress: Boolean) { import BehaviorSetup._ import InternalProtocol.RecoveryTickEvent @@ -197,6 +199,110 @@ private[pekko] final class BehaviorSetup[C, E, S]( } } + // The retention process for SnapshotCountRetentionCriteria looks like this: + // 1. Save snapshot after persisting events when shouldSnapshotAfterPersist returned SnapshotWithRetention. + // 2. Delete events (when deleteEventsOnSnapshot=true), runs in background. + // 3. Delete snapshots (when isOnlyOneSnapshot=false), runs in background. + + def isRetentionInProgress(): Boolean = + retentionInProgress + + def retentionProgressSaveSnapshotStarted(sequenceNr: Long): Unit = { + retention match { + case SnapshotCountRetentionCriteriaImpl(_, _, _) => + internalLogger.debug("Starting retention at seqNr [{}], saving snapshot.", sequenceNr) + retentionInProgress = true + case _ => + } + } + + def retentionProgressSaveSnapshotEnded(sequenceNr: Long, success: Boolean): Unit = { + retention match { + case SnapshotCountRetentionCriteriaImpl(_, _, deleteEvents) if retentionInProgress => + if (!success) { + internalLogger.debug("Retention at seqNr [{}] is completed, saving snapshot failed.", sequenceNr) + retentionInProgress = false + } else if (deleteEvents) { + internalLogger.debug("Retention at seqNr [{}], saving snapshot was successful.", sequenceNr) + } else if (isOnlyOneSnapshot) { + // no delete of events and no delete of snapshots => done + internalLogger.debug("Retention at seqNr [{}] is completed, saving snapshot was successful.", sequenceNr) + retentionInProgress = false + } else { + internalLogger.debug("Retention at seqNr [{}], saving snapshot was successful.", sequenceNr) + } + case _ => + } + } + + def retentionProgressDeleteEventsStarted(sequenceNr: Long, deleteToSequenceNr: Long): Unit = { + retention match { + case SnapshotCountRetentionCriteriaImpl(_, _, true) if retentionInProgress => + if (deleteToSequenceNr > 0) { + internalLogger.debug2( + "Retention at seqNr [{}], deleting events to seqNr [{}].", + sequenceNr, + deleteToSequenceNr) + } else { + internalLogger.debug("Retention is completed, no events to delete.") + retentionInProgress = false + } + case _ => + } + } + + def retentionProgressDeleteEventsEnded(deleteToSequenceNr: Long, success: Boolean): Unit = { + retention match { + case SnapshotCountRetentionCriteriaImpl(_, _, true) if retentionInProgress => + if (!success) { + internalLogger.debug( + "Retention is completed, deleting events to seqNr [{}] failed.", + deleteToSequenceNr) + retentionInProgress = false + } else if (isOnlyOneSnapshot) { + // no delete of snapshots => done + internalLogger.debug( + "Retention is completed, deleting events to seqNr [{}] was successful.", + deleteToSequenceNr) + retentionInProgress = false + } else { + internalLogger.debug("Retention, deleting events to seqNr [{}] was successful.", deleteToSequenceNr) + } + case _ => + } + } + + def retentionProgressDeleteSnapshotsStarted(deleteToSequenceNr: Long): Unit = { + retention match { + case SnapshotCountRetentionCriteriaImpl(_, _, _) if retentionInProgress => + if (deleteToSequenceNr > 0) { + internalLogger.debug("Retention, deleting snapshots to seqNr [{}].", deleteToSequenceNr) + } else { + internalLogger.debug("Retention is completed, no snapshots to delete.") + retentionInProgress = false + } + case _ => + } + } + + def retentionProgressDeleteSnapshotsEnded(deleteToSequenceNr: Long, success: Boolean): Unit = { + retention match { + case SnapshotCountRetentionCriteriaImpl(_, _, _) if retentionInProgress => + if (success) { + // delete snapshot is last step => done + internalLogger.debug( + "Retention is completed, deleting snapshots to seqNr [{}] was successful.", + deleteToSequenceNr) + retentionInProgress = false + } else { + internalLogger.debug("Retention is completed, deleting snapshots to seqNr [{}] failed.", deleteToSequenceNr) + retentionInProgress = false + } + + case _ => + } + } + } /** diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 0aa0df8bed7..75ebbb1b436 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -209,7 +209,8 @@ private[pekko] final case class EventSourcedBehaviorImpl[Command, Event, State]( stashState = stashState, replication = replication, publishEvents = publishEvents, - internalLoggerFactory = () => internalLogger()) + internalLoggerFactory = () => internalLogger(), + retentionInProgress = false) // needs to accept Any since we also can get messages from the journal // not part of the user facing Command protocol diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala index 417af3ddadc..f56411ad5e7 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/ExternalInteractions.scala @@ -223,11 +223,15 @@ private[pekko] trait SnapshotInteractions[C, E, S] { } } - /** Deletes the snapshots up to and including the `sequenceNr`. */ - protected def internalDeleteSnapshots(fromSequenceNr: Long, toSequenceNr: Long): Unit = { + /** + * Deletes the snapshots up to and including the `sequenceNr`. + * Uses `minSequenceNr = 0L` to always delete from the beginning, which simplifies + * the retention bookkeeping by removing the need to track a separate lower bound. + */ + protected def internalDeleteSnapshots(toSequenceNr: Long): Unit = { if (toSequenceNr > 0) { - val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = fromSequenceNr, maxSequenceNr = toSequenceNr) - setup.internalLogger.debug2("Deleting snapshots from sequenceNr [{}] to [{}]", fromSequenceNr, toSequenceNr) + val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = 0L, maxSequenceNr = toSequenceNr) + setup.internalLogger.debug("Deleting snapshots to sequenceNr [{}]", toSequenceNr) setup.snapshotStore .tell(SnapshotProtocol.DeleteSnapshots(setup.persistenceId.id, snapshotCriteria), setup.selfClassic) } diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala index 0bc9a57b10e..a07935e4328 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaImpl.scala @@ -44,16 +44,6 @@ import pekko.persistence.typed.scaladsl math.max(0, lastSequenceNr - (keepNSnapshots.toLong * snapshotEveryNEvents)) } - /** - * Should only be used when `BehaviorSetup.isOnlyOneSnapshot` is false. - */ - def deleteLowerSequenceNr(upperSequenceNr: Long): Long = { - // We could use 0 as fromSequenceNr to delete all older snapshots, but that might be inefficient for - // large ranges depending on how it's implemented in the snapshot plugin. Therefore we use the - // same window as defined for how much to keep in the retention criteria - math.max(0, upperSequenceNr - (keepNSnapshots.toLong * snapshotEveryNEvents)) - } - override def withDeleteEventsOnSnapshot: SnapshotCountRetentionCriteriaImpl = copy(deleteEventsOnSnapshot = true) diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala index bb983d05a29..a43eb278b16 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala @@ -813,13 +813,27 @@ private[pekko] object Running { this } else { visibleState = state - if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null) { + def skipRetention(): Boolean = { + // Only one retention process (snapshot + optional event/snapshot deletion) at a time. + // When retention is already in progress, both the snapshot and the subsequent + // deletion steps are skipped together. This keeps retention state simple and avoids + // transiently exceeding the intended snapshot count. The next retention cycle at a + // higher seqNr will cover the range of the skipped one, so no data is lost. + val inProgress = shouldSnapshotAfterPersist == SnapshotWithRetention && setup.isRetentionInProgress() + if (inProgress) + setup.internalLogger.debug( + "Skipping retention at seqNr [{}] because previous retention has not completed yet. " + + "Next retention will cover skipped retention.", + state.seqNr) + inProgress + } + if (shouldSnapshotAfterPersist == NoSnapshot || state.state == null || skipRetention()) { val newState = applySideEffects(sideEffects, state) - onWriteDone(setup.context, p) - tryUnstashOne(newState) } else { + if (shouldSnapshotAfterPersist == SnapshotWithRetention) + setup.retentionProgressSaveSnapshotStarted(state.seqNr) internalSaveSnapshot(state) new StoringSnapshot(state, sideEffects, shouldSnapshotAfterPersist) } @@ -910,18 +924,22 @@ private[pekko] object Running { setup.retention match { case DisabledRetentionCriteria => // no further actions case s @ SnapshotCountRetentionCriteriaImpl(_, _, true) => + setup.retentionProgressSaveSnapshotEnded(state.seqNr, success = true) // deleteEventsOnSnapshot == true, deletion of old events val deleteEventsToSeqNr = { if (setup.isOnlyOneSnapshot) meta.sequenceNr // delete all events up to the snapshot else s.deleteUpperSequenceNr(meta.sequenceNr) // keepNSnapshots batches of events } // snapshot deletion then happens on event deletion success in Running.onDeleteEventsJournalResponse + setup.retentionProgressDeleteEventsStarted(state.seqNr, deleteEventsToSeqNr) internalDeleteEvents(meta.sequenceNr, deleteEventsToSeqNr) case s @ SnapshotCountRetentionCriteriaImpl(_, _, false) => + setup.retentionProgressSaveSnapshotEnded(state.seqNr, success = true) // deleteEventsOnSnapshot == false, deletion of old snapshots if (!setup.isOnlyOneSnapshot) { val deleteSnapshotsToSeqNr = s.deleteUpperSequenceNr(meta.sequenceNr) - internalDeleteSnapshots(s.deleteLowerSequenceNr(deleteSnapshotsToSeqNr), deleteSnapshotsToSeqNr) + setup.retentionProgressDeleteSnapshotsStarted(deleteSnapshotsToSeqNr) + internalDeleteSnapshots(deleteSnapshotsToSeqNr) } case unexpected => throw new IllegalStateException(s"Unexpected retention criteria: $unexpected") } @@ -930,6 +948,8 @@ private[pekko] object Running { Some(SnapshotCompleted(SnapshotMetadata.fromClassic(meta))) case SaveSnapshotFailure(meta, error) => + if (snapshotReason == SnapshotWithRetention) + setup.retentionProgressSaveSnapshotEnded(state.seqNr, success = false) setup.internalLogger.warn2("Failed to save snapshot given metadata [{}] due to: {}", meta, error.getMessage) Some(SnapshotFailed(SnapshotMetadata.fromClassic(meta), error)) @@ -1029,20 +1049,23 @@ private[pekko] object Running { val signal = response match { case DeleteMessagesSuccess(toSequenceNr) => setup.internalLogger.debug("Persistent events to sequenceNr [{}] deleted successfully.", toSequenceNr) + setup.retentionProgressDeleteEventsEnded(toSequenceNr, success = true) setup.retention match { case DisabledRetentionCriteria => // no further actions - case s: SnapshotCountRetentionCriteriaImpl => + case _: SnapshotCountRetentionCriteriaImpl => if (!setup.isOnlyOneSnapshot) { // The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events // after that can be replayed after that snapshot, but replaying the events after toSequenceNr without // starting at the snapshot at toSequenceNr would be invalid. val deleteSnapshotsToSeqNr = toSequenceNr - 1 - internalDeleteSnapshots(s.deleteLowerSequenceNr(deleteSnapshotsToSeqNr), deleteSnapshotsToSeqNr) + setup.retentionProgressDeleteSnapshotsStarted(deleteSnapshotsToSeqNr) + internalDeleteSnapshots(deleteSnapshotsToSeqNr) } case unexpected => throw new IllegalStateException(s"Unexpected retention criteria: $unexpected") } Some(DeleteEventsCompleted(toSequenceNr)) case DeleteMessagesFailure(e, toSequenceNr) => + setup.retentionProgressDeleteEventsEnded(toSequenceNr, success = false) Some(DeleteEventsFailed(toSequenceNr, e)) case _ => None @@ -1063,8 +1086,10 @@ private[pekko] object Running { def onDeleteSnapshotResponse(response: SnapshotProtocol.Response, state: S): Behavior[InternalProtocol] = { val signal = response match { case DeleteSnapshotsSuccess(criteria) => + setup.retentionProgressDeleteSnapshotsEnded(criteria.maxSequenceNr, success = true) Some(DeleteSnapshotsCompleted(DeletionTarget.Criteria(SnapshotSelectionCriteria.fromClassic(criteria)))) case DeleteSnapshotsFailure(criteria, error) => + setup.retentionProgressDeleteSnapshotsEnded(criteria.maxSequenceNr, success = false) Some(DeleteSnapshotsFailed(DeletionTarget.Criteria(SnapshotSelectionCriteria.fromClassic(criteria)), error)) case DeleteSnapshotSuccess(meta) => Some(DeleteSnapshotsCompleted(DeletionTarget.Individual(SnapshotMetadata.fromClassic(meta)))) diff --git a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaSpec.scala b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaSpec.scala index d8a903ab4e1..93207a459df 100644 --- a/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaSpec.scala +++ b/persistence-typed/src/test/scala/org/apache/pekko/persistence/typed/internal/RetentionCriteriaSpec.scala @@ -39,23 +39,22 @@ class RetentionCriteriaSpec extends TestSuite with Matchers with AnyWordSpecLike "have valid sequenceNr range based on keepNSnapshots" in { val criteria = RetentionCriteria.snapshotEvery(3, 2).asInstanceOf[SnapshotCountRetentionCriteriaImpl] val expected = List( - 1 -> (0 -> 0), - 3 -> (0 -> 0), - 4 -> (0 -> 0), - 6 -> (0 -> 0), - 7 -> (0 -> 1), - 9 -> (0 -> 3), - 10 -> (0 -> 4), - 12 -> (0 -> 6), - 13 -> (1 -> 7), - 15 -> (3 -> 9), - 18 -> (6 -> 12), - 20 -> (8 -> 14)) + 1 -> 0, + 3 -> 0, + 4 -> 0, + 6 -> 0, + 7 -> 1, + 9 -> 3, + 10 -> 4, + 12 -> 6, + 13 -> 7, + 15 -> 9, + 18 -> 12, + 20 -> 14) expected.foreach { - case (seqNr, (lower, upper)) => + case (seqNr, upper) => withClue(s"seqNr=$seqNr:") { criteria.deleteUpperSequenceNr(seqNr) should ===(upper) - criteria.deleteLowerSequenceNr(upper) should ===(lower) } } }