Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,12 @@ object EventSourcedBehaviorRetentionSpec extends Matchers {
completed
}

def expectDeleteSnapshotCompleted(maxSequenceNr: Long, minSequenceNr: Long): DeleteSnapshotsCompleted = {
def expectDeleteSnapshotCompleted(maxSequenceNr: Long): DeleteSnapshotsCompleted = {
val wrapped = probe.expectMessageType[WrappedSignal]
wrapped.signal shouldBe a[DeleteSnapshotsCompleted]
val signal = wrapped.signal.asInstanceOf[DeleteSnapshotsCompleted]
signal.target should ===(
DeletionTarget.Criteria(
SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr).withMinSequenceNr(minSequenceNr)))
DeletionTarget.Criteria(SnapshotSelectionCriteria.latest.withMaxSequenceNr(maxSequenceNr)))
signal
}
}
Expand Down Expand Up @@ -284,25 +283,25 @@ class EventSourcedBehaviorRetentionSpec
snapshotSignalProbe.expectSnapshotCompleted(3)
snapshotSignalProbe.expectSnapshotCompleted(6)
snapshotSignalProbe.expectSnapshotCompleted(9)
snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
snapshotSignalProbe.expectDeleteSnapshotCompleted(3)

(1 to 3).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(13, (0 until 13).toVector))
snapshotSignalProbe.expectSnapshotCompleted(12)
snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 0)
snapshotSignalProbe.expectDeleteSnapshotCompleted(6)

(1 to 3).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(16, (0 until 16).toVector))
snapshotSignalProbe.expectSnapshotCompleted(15)
snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3)
snapshotSignalProbe.expectDeleteSnapshotCompleted(9)

(1 to 4).foreach(_ => persistentActor ! Increment)
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(20, (0 until 20).toVector))
snapshotSignalProbe.expectSnapshotCompleted(18)
snapshotSignalProbe.expectDeleteSnapshotCompleted(12, 6)
snapshotSignalProbe.expectDeleteSnapshotCompleted(12)

snapshotSignalProbe.expectNoMessage()
}
Expand Down Expand Up @@ -331,31 +330,31 @@ class EventSourcedBehaviorRetentionSpec
// The reason for -1 is that a snapshot at the exact toSequenceNr is still useful and the events
// after that can be replayed after that snapshot, but replaying the events after toSequenceNr without
// starting at the snapshot at toSequenceNr would be invalid.
snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
snapshotSignalProbe.expectDeleteSnapshotCompleted(2)

// one at a time since snapshotting+event-deletion switches to running state before deleting snapshot so ordering
// if sending many commands in one go is not deterministic
persistentActor ! Increment // 11
persistentActor ! Increment // 12
snapshotSignalProbe.expectSnapshotCompleted(12)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
snapshotSignalProbe.expectDeleteSnapshotCompleted(5)

persistentActor ! Increment // 13
persistentActor ! Increment // 14
persistentActor ! Increment // 11
persistentActor ! Increment // 15
snapshotSignalProbe.expectSnapshotCompleted(15)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 9
snapshotSignalProbe.expectDeleteSnapshotCompleted(8, 2)
snapshotSignalProbe.expectDeleteSnapshotCompleted(8)

persistentActor ! Increment // 16
persistentActor ! Increment // 17
persistentActor ! Increment // 18
snapshotSignalProbe.expectSnapshotCompleted(18)

eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 12
snapshotSignalProbe.expectDeleteSnapshotCompleted(11, 5)
snapshotSignalProbe.expectDeleteSnapshotCompleted(11)

eventProbe.expectNoMessage()
snapshotSignalProbe.expectNoMessage()
Expand All @@ -381,7 +380,7 @@ class EventSourcedBehaviorRetentionSpec
(4 to 10).foreach(_ => persistentActor ! Increment)
snapshotSignalProbe.expectSnapshotCompleted(5)
snapshotSignalProbe.expectSnapshotCompleted(10)
snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
snapshotSignalProbe.expectDeleteSnapshotCompleted(5)

(11 to 13).foreach(_ => persistentActor ! Increment)
snapshotSignalProbe.expectSnapshotCompleted(13)
Expand All @@ -395,7 +394,7 @@ class EventSourcedBehaviorRetentionSpec
persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(16, (0 until 16).toVector))
snapshotSignalProbe.expectSnapshotCompleted(15)
snapshotSignalProbe.expectDeleteSnapshotCompleted(10, 5)
snapshotSignalProbe.expectDeleteSnapshotCompleted(10)
eventProbe.within(3.seconds) {
eventProbe.expectNoMessage()
snapshotSignalProbe.expectNoMessage()
Expand Down Expand Up @@ -438,18 +437,18 @@ class EventSourcedBehaviorRetentionSpec
snapshotSignalProbe.expectSnapshotCompleted(8) // every-2 through criteria
// triggers delete up to snapshot no 2
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2
snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0) // then delete oldest snapshot
snapshotSignalProbe.expectDeleteSnapshotCompleted(1) // then delete oldest snapshot

persistentActor ! Increment // 9
persistentActor ! Increment // 10
snapshotSignalProbe.expectSnapshotCompleted(10) // every-2 through criteria
snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
snapshotSignalProbe.expectDeleteSnapshotCompleted(3)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4

persistentActor ! Increment // 11
persistentActor ! Increment // 12
snapshotSignalProbe.expectSnapshotCompleted(12) // every-2 through criteria
snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 0)
snapshotSignalProbe.expectDeleteSnapshotCompleted(5)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6

persistentActor ! Increment // 13
Expand All @@ -463,13 +462,13 @@ class EventSourcedBehaviorRetentionSpec
persistentActor ! Increment // 14
snapshotSignalProbe.expectSnapshotCompleted(14) // every-2 through criteria
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 8
snapshotSignalProbe.expectDeleteSnapshotCompleted(7, 1)
snapshotSignalProbe.expectDeleteSnapshotCompleted(7)

persistentActor ! Increment // 15
persistentActor ! Increment // 16
snapshotSignalProbe.expectSnapshotCompleted(16) // every-2 through criteria
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 10
snapshotSignalProbe.expectDeleteSnapshotCompleted(9, 3)
snapshotSignalProbe.expectDeleteSnapshotCompleted(9)

eventProbe.within(3.seconds) {
eventProbe.expectNoMessage()
Expand Down Expand Up @@ -501,31 +500,31 @@ class EventSourcedBehaviorRetentionSpec
snapshotSignalProbe.expectSnapshotCompleted(2)
snapshotSignalProbe.expectSnapshotCompleted(3)
snapshotSignalProbe.expectSnapshotCompleted(4)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(1)

persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(5)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(2)

persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(6)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(3)

persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(7)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(4)

persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(8)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(5)

persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(9)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(6)

persistentActor ! Increment
snapshotSignalProbe.expectSnapshotCompleted(10)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(7, 4)
deleteSnapshotSignalProbe.expectDeleteSnapshotCompleted(7)

persistentActor ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(10, (0 until 10).toVector))
Expand Down Expand Up @@ -554,32 +553,32 @@ class EventSourcedBehaviorRetentionSpec
persistentActor ! Increment // 5
snapshotSignalProbe.expectSnapshotCompleted(5)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 2
snapshotSignalProbe.expectDeleteSnapshotCompleted(1, 0)
snapshotSignalProbe.expectDeleteSnapshotCompleted(1)

persistentActor ! Increment // 6
snapshotSignalProbe.expectSnapshotCompleted(6)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 3
snapshotSignalProbe.expectDeleteSnapshotCompleted(2, 0)
snapshotSignalProbe.expectDeleteSnapshotCompleted(2)

persistentActor ! Increment // 7
snapshotSignalProbe.expectSnapshotCompleted(7)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 4
snapshotSignalProbe.expectDeleteSnapshotCompleted(3, 0)
snapshotSignalProbe.expectDeleteSnapshotCompleted(3)

persistentActor ! Increment // 8
snapshotSignalProbe.expectSnapshotCompleted(8)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 5
snapshotSignalProbe.expectDeleteSnapshotCompleted(4, 1)
snapshotSignalProbe.expectDeleteSnapshotCompleted(4)

persistentActor ! Increment // 9
snapshotSignalProbe.expectSnapshotCompleted(9)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 6
snapshotSignalProbe.expectDeleteSnapshotCompleted(5, 2)
snapshotSignalProbe.expectDeleteSnapshotCompleted(5)

persistentActor ! Increment // 10
snapshotSignalProbe.expectSnapshotCompleted(10)
eventProbe.expectMessageType[Success[DeleteEventsCompleted]].value.toSequenceNr shouldEqual 7
snapshotSignalProbe.expectDeleteSnapshotCompleted(6, 3)
snapshotSignalProbe.expectDeleteSnapshotCompleted(6)
}

"snapshot on recovery if expected snapshot is missing" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ class EventSourcedBehaviorWatchSpec
stashState = new StashState(context.asInstanceOf[ActorContext[InternalProtocol]], settings),
replication = None,
publishEvents = false,
internalLoggerFactory = () => logger)
internalLoggerFactory = () => logger,
retentionInProgress = false)

"A typed persistent parent actor watching a child" must {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import pekko.actor.{ ActorRef => ClassicActorRef }
import pekko.actor.Cancellable
import pekko.actor.typed.Signal
import pekko.actor.typed.scaladsl.ActorContext
import pekko.actor.typed.scaladsl.LoggerOps
import pekko.annotation.InternalApi
import pekko.persistence._
import pekko.persistence.typed.EventAdapter
Expand Down Expand Up @@ -71,7 +72,8 @@ private[pekko] final class BehaviorSetup[C, E, S](
val stashState: StashState,
val replication: Option[ReplicationSetup],
val publishEvents: Boolean,
private val internalLoggerFactory: () => Logger) {
private val internalLoggerFactory: () => Logger,
private var retentionInProgress: Boolean) {

import BehaviorSetup._
import InternalProtocol.RecoveryTickEvent
Expand Down Expand Up @@ -197,6 +199,110 @@ private[pekko] final class BehaviorSetup[C, E, S](
}
}

// The retention process for SnapshotCountRetentionCriteria looks like this:
// 1. Save snapshot after persisting events when shouldSnapshotAfterPersist returned SnapshotWithRetention.
// 2. Delete events (when deleteEventsOnSnapshot=true), runs in background.
// 3. Delete snapshots (when isOnlyOneSnapshot=false), runs in background.

def isRetentionInProgress(): Boolean =
retentionInProgress

def retentionProgressSaveSnapshotStarted(sequenceNr: Long): Unit = {
retention match {
case SnapshotCountRetentionCriteriaImpl(_, _, _) =>
internalLogger.debug("Starting retention at seqNr [{}], saving snapshot.", sequenceNr)
retentionInProgress = true
case _ =>
}
}

def retentionProgressSaveSnapshotEnded(sequenceNr: Long, success: Boolean): Unit = {
retention match {
case SnapshotCountRetentionCriteriaImpl(_, _, deleteEvents) if retentionInProgress =>
if (!success) {
internalLogger.debug("Retention at seqNr [{}] is completed, saving snapshot failed.", sequenceNr)
retentionInProgress = false
} else if (deleteEvents) {
internalLogger.debug("Retention at seqNr [{}], saving snapshot was successful.", sequenceNr)
} else if (isOnlyOneSnapshot) {
// no delete of events and no delete of snapshots => done
internalLogger.debug("Retention at seqNr [{}] is completed, saving snapshot was successful.", sequenceNr)
retentionInProgress = false
} else {
internalLogger.debug("Retention at seqNr [{}], saving snapshot was successful.", sequenceNr)
}
case _ =>
}
}

def retentionProgressDeleteEventsStarted(sequenceNr: Long, deleteToSequenceNr: Long): Unit = {
retention match {
case SnapshotCountRetentionCriteriaImpl(_, _, true) if retentionInProgress =>
if (deleteToSequenceNr > 0) {
internalLogger.debug2(
"Retention at seqNr [{}], deleting events to seqNr [{}].",
sequenceNr,
deleteToSequenceNr)
} else {
internalLogger.debug("Retention is completed, no events to delete.")
retentionInProgress = false
}
case _ =>
}
}

def retentionProgressDeleteEventsEnded(deleteToSequenceNr: Long, success: Boolean): Unit = {
retention match {
case SnapshotCountRetentionCriteriaImpl(_, _, true) if retentionInProgress =>
if (!success) {
internalLogger.debug(
"Retention is completed, deleting events to seqNr [{}] failed.",
deleteToSequenceNr)
retentionInProgress = false
} else if (isOnlyOneSnapshot) {
// no delete of snapshots => done
internalLogger.debug(
"Retention is completed, deleting events to seqNr [{}] was successful.",
deleteToSequenceNr)
retentionInProgress = false
} else {
internalLogger.debug("Retention, deleting events to seqNr [{}] was successful.", deleteToSequenceNr)
}
case _ =>
}
}

def retentionProgressDeleteSnapshotsStarted(deleteToSequenceNr: Long): Unit = {
retention match {
case SnapshotCountRetentionCriteriaImpl(_, _, _) if retentionInProgress =>
if (deleteToSequenceNr > 0) {
internalLogger.debug("Retention, deleting snapshots to seqNr [{}].", deleteToSequenceNr)
} else {
internalLogger.debug("Retention is completed, no snapshots to delete.")
retentionInProgress = false
}
case _ =>
}
}

def retentionProgressDeleteSnapshotsEnded(deleteToSequenceNr: Long, success: Boolean): Unit = {
retention match {
case SnapshotCountRetentionCriteriaImpl(_, _, _) if retentionInProgress =>
if (success) {
// delete snapshot is last step => done
internalLogger.debug(
"Retention is completed, deleting snapshots to seqNr [{}] was successful.",
deleteToSequenceNr)
retentionInProgress = false
} else {
internalLogger.debug("Retention is completed, deleting snapshots to seqNr [{}] failed.", deleteToSequenceNr)
retentionInProgress = false
}

case _ =>
}
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ private[pekko] final case class EventSourcedBehaviorImpl[Command, Event, State](
stashState = stashState,
replication = replication,
publishEvents = publishEvents,
internalLoggerFactory = () => internalLogger())
internalLoggerFactory = () => internalLogger(),
retentionInProgress = false)

// needs to accept Any since we also can get messages from the journal
// not part of the user facing Command protocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,15 @@ private[pekko] trait SnapshotInteractions[C, E, S] {
}
}

/** Deletes the snapshots up to and including the `sequenceNr`. */
protected def internalDeleteSnapshots(fromSequenceNr: Long, toSequenceNr: Long): Unit = {
/**
* Deletes the snapshots up to and including the `sequenceNr`.
* Uses `minSequenceNr = 0L` to always delete from the beginning, which simplifies
* the retention bookkeeping by removing the need to track a separate lower bound.
*/
protected def internalDeleteSnapshots(toSequenceNr: Long): Unit = {
if (toSequenceNr > 0) {
val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = fromSequenceNr, maxSequenceNr = toSequenceNr)
setup.internalLogger.debug2("Deleting snapshots from sequenceNr [{}] to [{}]", fromSequenceNr, toSequenceNr)
val snapshotCriteria = SnapshotSelectionCriteria(minSequenceNr = 0L, maxSequenceNr = toSequenceNr)
setup.internalLogger.debug("Deleting snapshots to sequenceNr [{}]", toSequenceNr)
setup.snapshotStore
.tell(SnapshotProtocol.DeleteSnapshots(setup.persistenceId.id, snapshotCriteria), setup.selfClassic)
}
Expand Down
Loading
Loading