From 756d852384e16c18d9d5ab19924ff524d3ec8138 Mon Sep 17 00:00:00 2001 From: FrenchGithubUser Date: Wed, 4 Mar 2026 12:29:29 +0100 Subject: [PATCH 1/4] test: events sent during remote join handshake should not be lost --- tests/federation_room_join_test.go | 126 +++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/tests/federation_room_join_test.go b/tests/federation_room_join_test.go index 03b12ee0..544bbef2 100644 --- a/tests/federation_room_join_test.go +++ b/tests/federation_room_join_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "net/url" "strings" @@ -574,6 +575,131 @@ func TestSendJoinPartialStateResponse(t *testing.T) { must.HaveInOrder(t, sendJoinResp.ServersInRoom, []string{"hs1"}) } +// This test verifies that events sent into a room between a /make_join and +// /send_join are not lost to the joining server. When an event is created +// during the join handshake, the join event's prev_events (set at make_join +// time) won't reference it, creating two forward extremities. The server +// handling the join should ensure the joining server can discover the missed +// event, for example by sending a follow-up event that references both +// extremities, prompting the joining server to backfill. +// +// See https://github.com/element-hq/synapse/pull/19390 +func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) { + deployment := complement.Deploy(t, 1) + defer deployment.Destroy(t) + + alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{}) + + // We track the message event ID sent between make_join and send_join. + // After send_join, we wait for hs1 to send us either: + // - the message event itself, or + // - any event whose prev_events reference the message (e.g. a dummy event) + var messageEventID string + messageDiscoverableWaiter := helpers.NewWaiter() + + srv := federation.NewServer(t, deployment, + federation.HandleKeyRequests(), + ) + srv.UnexpectedRequestsAreErrors = false + + // Custom /send handler: the Complement server won't be in the room until + // send_join completes, so we can't use HandleTransactionRequests (which + // requires the room in srv.rooms). Instead we parse the raw transaction. + srv.Mux().Handle("/_matrix/federation/v1/send/{transactionID}", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + body, _ := io.ReadAll(req.Body) + txn := gjson.ParseBytes(body) + txn.Get("pdus").ForEach(func(_, pdu gjson.Result) bool { + eventID := pdu.Get("event_id").String() + eventType := pdu.Get("type").String() + t.Logf("Received PDU via /send: type=%s id=%s", eventType, eventID) + + if messageEventID == "" { + return true + } + + // Check if this IS the message event (server pushed it directly). + if eventID == messageEventID { + messageDiscoverableWaiter.Finish() + return true + } + + // Check if this event's prev_events reference the message + // (e.g. a dummy event tying the forward extremities together). + pdu.Get("prev_events").ForEach(func(_, prevEvent gjson.Result) bool { + if prevEvent.String() == messageEventID { + messageDiscoverableWaiter.Finish() + return false + } + return true + }) + + return true + }) + w.WriteHeader(200) + w.Write([]byte(`{"pdus":{}}`)) + })).Methods("PUT") + + cancel := srv.Listen() + defer cancel() + + // Alice creates a room on hs1. + roomID := alice.MustCreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + }) + + charlie := srv.UserID("charlie") + origin := srv.ServerName() + fedClient := srv.FederationClient(deployment) + + // Step 1: make_join, hs1 returns a join event template whose prev_events + // reflect the current room DAG tips. + makeJoinResp, err := fedClient.MakeJoin( + context.Background(), origin, + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + roomID, charlie, + ) + must.NotError(t, "MakeJoin", err) + + // Step 2: Alice sends a message on hs1. This advances the DAG past the + // point captured by make_join's prev_events. The Complement server is not + // yet in the room, so it won't receive this event via normal federation. + messageEventID = alice.SendEventSynced(t, roomID, b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "Message sent between make_join and send_join", + }, + }) + t.Logf("Alice sent message %s between make_join and send_join", messageEventID) + + // Step 3: Build and sign the join event, then send_join. + // The join event's prev_events are from step 1 (before the message), + // so persisting it on hs1 creates two forward extremities: the message + // and the join. + verImpl, err := gomatrixserverlib.GetRoomVersion(makeJoinResp.RoomVersion) + must.NotError(t, "GetRoomVersion", err) + eb := verImpl.NewEventBuilderFromProtoEvent(&makeJoinResp.JoinEvent) + joinEvent, err := eb.Build(time.Now(), srv.ServerName(), srv.KeyID, srv.Priv) + must.NotError(t, "Build join event", err) + + _, err = fedClient.SendJoin( + context.Background(), origin, + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + joinEvent, + ) + must.NotError(t, "SendJoin", err) + + // Step 4: hs1 should make the missed message discoverable to the joining + // server. We accept either receiving the message event directly, or + // receiving any event whose prev_events reference it (allowing the + // joining server to backfill). + messageDiscoverableWaiter.Waitf(t, 5*time.Second, + "Timed out waiting for message event %s to become discoverable — "+ + "the event sent between make_join and send_join was lost to the "+ + "joining server", messageEventID, + ) +} + // given an event JSON, return the type and state_key, joined with a "|" func typeAndStateKeyForEvent(result gjson.Result) string { return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|") From 7e3dd34980f7931a61991d36a0795be74c7af21a Mon Sep 17 00:00:00 2001 From: FrenchGithubUser Date: Tue, 24 Mar 2026 12:08:19 +0100 Subject: [PATCH 2/4] apply suggestions --- tests/federation_room_join_test.go | 32 ++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/tests/federation_room_join_test.go b/tests/federation_room_join_test.go index 544bbef2..40eea08e 100644 --- a/tests/federation_room_join_test.go +++ b/tests/federation_room_join_test.go @@ -600,19 +600,32 @@ func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) { srv := federation.NewServer(t, deployment, federation.HandleKeyRequests(), ) + // After send_join, hs1 will start sending us federation transactions via + // /_matrix/federation/v1/send/{txnID}. Since we handle /send manually + // below, any other requests (e.g. key fetches) that arrive unexpectedly + // should be tolerated rather than treated as test failures. srv.UnexpectedRequestsAreErrors = false - // Custom /send handler: the Complement server won't be in the room until - // send_join completes, so we can't use HandleTransactionRequests (which - // requires the room in srv.rooms). Instead we parse the raw transaction. + // Custom /send handler: hs1 will push new room events to us via federation + // transactions once we've joined. We use a raw handler because the + // Complement server is not fully in the room until send_join completes, so + // we can't use HandleTransactionRequests (which requires the room in + // srv.rooms). Instead we parse the raw transaction body ourselves. srv.Mux().Handle("/_matrix/federation/v1/send/{transactionID}", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - body, _ := io.ReadAll(req.Body) + body, err := io.ReadAll(req.Body) + if err != nil { + t.Fatalf("failed to read request body in /send handler: %v", err) + return + } txn := gjson.ParseBytes(body) txn.Get("pdus").ForEach(func(_, pdu gjson.Result) bool { eventID := pdu.Get("event_id").String() eventType := pdu.Get("type").String() t.Logf("Received PDU via /send: type=%s id=%s", eventType, eventID) + // messageEventID is set after make_join but before send_join. + // Transactions can arrive before that window, so skip PDUs that + // arrive before we know which event to look for. if messageEventID == "" { return true } @@ -623,8 +636,12 @@ func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) { return true } - // Check if this event's prev_events reference the message - // (e.g. a dummy event tying the forward extremities together). + // Check if this event's prev_events directly reference the message + // (e.g. a dummy event tying the two forward extremities together). + // If so, the joining server can backfill from that event and will + // discover the message. We only check one level of prev_events: + // if the reference is deeper in the DAG the joining server can + // still reach the message through backfill. pdu.Get("prev_events").ForEach(func(_, prevEvent gjson.Result) bool { if prevEvent.String() == messageEventID { messageDiscoverableWaiter.Finish() @@ -636,6 +653,9 @@ func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) { return true }) w.WriteHeader(200) + // Respond with an empty PDU error map, which is the federation /send + // success response format: each key would be a PDU ID whose processing + // failed; an empty object means all PDUs were accepted. w.Write([]byte(`{"pdus":{}}`)) })).Methods("PUT") From 1194d18af953b323bbcbb960aee6a8b211871d9c Mon Sep 17 00:00:00 2001 From: FrenchGithubUser Date: Fri, 24 Apr 2026 14:30:00 +0200 Subject: [PATCH 3/4] apply more suggestions --- tests/federation_room_join_test.go | 39 ++++++++++++++++-------------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/tests/federation_room_join_test.go b/tests/federation_room_join_test.go index 40eea08e..c4733124 100644 --- a/tests/federation_room_join_test.go +++ b/tests/federation_room_join_test.go @@ -8,6 +8,7 @@ import ( "net/http" "net/url" "strings" + "sync/atomic" "testing" "time" @@ -594,7 +595,9 @@ func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) { // After send_join, we wait for hs1 to send us either: // - the message event itself, or // - any event whose prev_events reference the message (e.g. a dummy event) - var messageEventID string + // atomic.Value is used because messageEventID is written on the main goroutine + // and read on the HTTP handler goroutine, with no other synchronization. + var messageEventID atomic.Value messageDiscoverableWaiter := helpers.NewWaiter() srv := federation.NewServer(t, deployment, @@ -613,10 +616,7 @@ func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) { // srv.rooms). Instead we parse the raw transaction body ourselves. srv.Mux().Handle("/_matrix/federation/v1/send/{transactionID}", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { body, err := io.ReadAll(req.Body) - if err != nil { - t.Fatalf("failed to read request body in /send handler: %v", err) - return - } + must.NotError(t, "failed to read request body in /send handler: %v", err) txn := gjson.ParseBytes(body) txn.Get("pdus").ForEach(func(_, pdu gjson.Result) bool { eventID := pdu.Get("event_id").String() @@ -626,24 +626,27 @@ func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) { // messageEventID is set after make_join but before send_join. // Transactions can arrive before that window, so skip PDUs that // arrive before we know which event to look for. - if messageEventID == "" { + msgID, _ := messageEventID.Load().(string) + if msgID == "" { return true } // Check if this IS the message event (server pushed it directly). - if eventID == messageEventID { + if eventID == msgID { messageDiscoverableWaiter.Finish() return true } - // Check if this event's prev_events directly reference the message - // (e.g. a dummy event tying the two forward extremities together). - // If so, the joining server can backfill from that event and will - // discover the message. We only check one level of prev_events: - // if the reference is deeper in the DAG the joining server can - // still reach the message through backfill. + // Check if this event's prev_events directly reference the message (e.g. a dummy + // event tying the two forward extremities together). If so, the joining server + // can backfill from that event and will discover the message. + // + // XXX: We only check one level of prev_events: if the reference is deeper in the + // DAG, it's valid and the joining server can still reach the message through + // backfill but our checks don't account for that yet (feel free to edit this + // assertion if you run into this) pdu.Get("prev_events").ForEach(func(_, prevEvent gjson.Result) bool { - if prevEvent.String() == messageEventID { + if prevEvent.String() == msgID { messageDiscoverableWaiter.Finish() return false } @@ -683,14 +686,14 @@ func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) { // Step 2: Alice sends a message on hs1. This advances the DAG past the // point captured by make_join's prev_events. The Complement server is not // yet in the room, so it won't receive this event via normal federation. - messageEventID = alice.SendEventSynced(t, roomID, b.Event{ + messageEventID.Store(alice.SendEventSynced(t, roomID, b.Event{ Type: "m.room.message", Content: map[string]interface{}{ "msgtype": "m.text", "body": "Message sent between make_join and send_join", }, - }) - t.Logf("Alice sent message %s between make_join and send_join", messageEventID) + })) + t.Logf("Alice sent message %s between make_join and send_join", messageEventID.Load()) // Step 3: Build and sign the join event, then send_join. // The join event's prev_events are from step 1 (before the message), @@ -716,7 +719,7 @@ func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) { messageDiscoverableWaiter.Waitf(t, 5*time.Second, "Timed out waiting for message event %s to become discoverable — "+ "the event sent between make_join and send_join was lost to the "+ - "joining server", messageEventID, + "joining server", messageEventID.Load(), ) } From 34636d5a3ee3870a5a296b126cd45969f744577c Mon Sep 17 00:00:00 2001 From: FrenchGithubUser Date: Thu, 30 Apr 2026 11:02:21 +0200 Subject: [PATCH 4/4] update comment on why we use `atomic.Value` for `messageEventID` Co-authored-by: Eric Eastwood --- tests/federation_room_join_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/federation_room_join_test.go b/tests/federation_room_join_test.go index c4733124..5288adfe 100644 --- a/tests/federation_room_join_test.go +++ b/tests/federation_room_join_test.go @@ -595,8 +595,10 @@ func TestEventBetweenMakeJoinAndSendJoinIsNotLost(t *testing.T) { // After send_join, we wait for hs1 to send us either: // - the message event itself, or // - any event whose prev_events reference the message (e.g. a dummy event) - // atomic.Value is used because messageEventID is written on the main goroutine - // and read on the HTTP handler goroutine, with no other synchronization. + // + // atomic.Value is used because messageEventID is written on the main goroutine and + // read on the HTTP handler goroutine, and needs synchronization (without + // synchronization, writes are not guaranteed to be observed by other goroutines) var messageEventID atomic.Value messageDiscoverableWaiter := helpers.NewWaiter()