Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,26 @@ private[pekko] object Running {
OptionVal.Some(
ReplicatedEventMetadata(event.originReplica, event.originSequenceNr, updatedVersion, isConcurrent)))
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr)
// FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259

// Sequence number validation (resolves https://github.com/akka/akka/issues/29259):
// Callers are responsible for filtering duplicates and handling gaps:
// - onPublishedEvent: rejects duplicates and gaps, only exact-match seqNr passes through
// - onReplicatedEvent: filters duplicates via alreadySeen(), but gaps may pass through
// if the replication source delivers events out of order
// We add a defensive warning for unexpected sequence numbers to aid diagnosis.
// The event is still persisted for backward compatibility; rejecting it could stall replication.
val expectedSeqNr = newState2.seenPerReplica.getOrElse(event.originReplica, 0L) + 1
if (event.originSequenceNr != expectedSeqNr) {
setup.internalLogger.warnN(
"Unexpected replication sequence number [{}] from replica [{}], expected [{}]. " +
"This may indicate event loss or out-of-order delivery in the replication stream. " +
"Advancing seenPerReplica to [{}]; earlier events from this replica may be skipped.",
event.originSequenceNr,
event.originReplica,
expectedSeqNr,
event.originSequenceNr)
}

val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr)
persistingEvents(
newState2.copy(seenPerReplica = updatedSeen, version = updatedVersion),
Expand Down
Loading