From 14f3ab7babcc34903e6b2947bda4504858f8e70d Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 28 Mar 2026 16:42:35 +0800 Subject: [PATCH] Add defensive sequence number validation in replicated event persistence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the FIXME (akka#29259) comment in handleExternalReplicatedEventPersist with a proper defensive validation of the replica sequence number before updating seenPerReplica. The validation logs a warning when the incoming event's originSequenceNr does not match the expected next sequence number for that replica. This covers the gap scenario where events from a replica may arrive out of order via the replication stream (onReplicatedEvent path). The event is still persisted for backward compatibility — rejecting it could stall the replication stream. Key design decisions (confirmed by cross-review from GPT-5.4 and Sonnet 4.6): - Only gap detection (seqNr > expected) can fire from current callers; onPublishedEvent filters both duplicates and gaps before calling. onReplicatedEvent filters duplicates via alreadySeen() but allows gaps. - Uses != check (not separate < and > branches) to avoid dead code. - Warning message includes the advancing seqNr to help operators diagnose potential event loss. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../persistence/typed/internal/Running.scala | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala index bb983d05a2..8e73e259a1 100644 --- a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala +++ b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala @@ -523,7 +523,26 @@ private[pekko] object Running { OptionVal.Some( ReplicatedEventMetadata(event.originReplica, event.originSequenceNr, updatedVersion, isConcurrent))) val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr) - // FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259 + + // Sequence number validation (resolves https://github.com/akka/akka/issues/29259): + // Callers are responsible for filtering duplicates and handling gaps: + // - onPublishedEvent: rejects duplicates and gaps, only exact-match seqNr passes through + // - onReplicatedEvent: filters duplicates via alreadySeen(), but gaps may pass through + // if the replication source delivers events out of order + // We add a defensive warning for unexpected sequence numbers to aid diagnosis. + // The event is still persisted for backward compatibility; rejecting it could stall replication. + val expectedSeqNr = newState2.seenPerReplica.getOrElse(event.originReplica, 0L) + 1 + if (event.originSequenceNr != expectedSeqNr) { + setup.internalLogger.warnN( + "Unexpected replication sequence number [{}] from replica [{}], expected [{}]. " + + "This may indicate event loss or out-of-order delivery in the replication stream. " + + "Advancing seenPerReplica to [{}]; earlier events from this replica may be skipped.", + event.originSequenceNr, + event.originReplica, + expectedSeqNr, + event.originSequenceNr) + } + val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr) persistingEvents( newState2.copy(seenPerReplica = updatedSeen, version = updatedVersion),