diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala index bb983d05a2..8e73e259a1 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala @@ -523,7 +523,26 @@ private[pekko] object Running { OptionVal.Some( ReplicatedEventMetadata(event.originReplica, event.originSequenceNr, updatedVersion, isConcurrent))) val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr) - // FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259 + + // Sequence number validation (resolves https://github.com/akka/akka/issues/29259): + // Callers are responsible for filtering duplicates and handling gaps: + // - onPublishedEvent: rejects duplicates and gaps, only exact-match seqNr passes through + // - onReplicatedEvent: filters duplicates via alreadySeen(), but gaps may pass through + // if the replication source delivers events out of order + // We add a defensive warning for unexpected sequence numbers to aid diagnosis. + // The event is still persisted for backward compatibility; rejecting it could stall replication. + val expectedSeqNr = newState2.seenPerReplica.getOrElse(event.originReplica, 0L) + 1 + if (event.originSequenceNr != expectedSeqNr) { + setup.internalLogger.warnN( + "Unexpected replication sequence number [{}] from replica [{}], expected [{}]. " + + "This may indicate event loss or out-of-order delivery in the replication stream. " + + "Advancing seenPerReplica to [{}]; earlier events from this replica may be skipped.", + event.originSequenceNr, + event.originReplica, + expectedSeqNr, + event.originSequenceNr) + } + val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr) persistingEvents( newState2.copy(seenPerReplica = updatedSeen, version = updatedVersion),